news 2026/4/23 17:31:59

PyFlink DataStream 程序骨架、常用 Source/Sink、状态(State)、与 Table/SQL 互转一篇搞定

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
PyFlink DataStream 程序骨架、常用 Source/Sink、状态(State)、与 Table/SQL 互转一篇搞定

1. PyFlink DataStream 作业的通用结构

一个标准的 DataStream 作业,通常有 5 步:

1)创建StreamExecutionEnvironment
2)创建 Source 得到初始DataStream
3)编排 transformation(map/filter/key_by/reduce/process…)
4)绑定 Sink(print / file / kafka / jdbc / table sink …)
5)执行提交:env.execute(job_name)

下面这个示例就是典型骨架:NumberSequenceSource -> key_by -> stateful map -> FileSink -> execute

frompyflink.commonimportWatermarkStrategy,Rowfrompyflink.common.serializationimportEncoderfrompyflink.common.typeinfoimportTypesfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.datastream.connectors.file_systemimportFileSink,OutputFileConfigfrompyflink.datastream.connectors.number_seqimportNumberSequenceSourcefrompyflink.datastream.functionsimportRuntimeContext,MapFunctionfrompyflink.datastream.stateimportValueStateDescriptorclassMyMapFunction(MapFunction):defopen(self,runtime_context:RuntimeContext):state_desc=ValueStateDescriptor('cnt',Types.PICKLED_BYTE_ARRAY())self.cnt_state=runtime_context.get_state(state_desc)defmap(self,value):cnt=self.cnt_state.value()ifcntisNoneorcnt<2:self.cnt_state.update(1ifcntisNoneelsecnt+1)returnvalue[0],value[1]+1else:returnvalue[0],value[1]defstate_access_demo():env=StreamExecutionEnvironment.get_execution_environment()seq_num_source=NumberSequenceSource(1,10000)ds=env.from_source(source=seq_num_source,watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name='seq_num_source',type_info=Types.LONG())ds=ds.map(lambdaa:Row(a%4,1),output_type=Types.ROW([Types.LONG(),Types.LONG()]))\.key_by(lambdaa:a[0])\.map(MyMapFunction(),output_type=Types.TUPLE([Types.LONG(),Types.LONG()]))output_path='/opt/output/'file_sink=FileSink \.for_row_format(output_path,Encoder.simple_string_encoder())\.with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build())\.build()ds.sink_to(file_sink)env.execute('state_access_demo')if__name__=='__main__':state_access_demo()

你只要记住:DataStream 的“执行入口”是env.execute(),没有它就不会真正跑起来。

2. 创建 StreamExecutionEnvironment

这是 DataStream 的“运行上下文”,负责决定并行度、运行模式、Checkpoint 等(这里先聚焦入门):

frompyflink.datastreamimportStreamExecutionEnvironment env=StreamExecutionEnvironment.get_execution_environment()

3. 创建 DataStream:三类常见方式

3.1 从 Python list/collection 创建(最适合本地调试)

frompyflink.common.typeinfoimportTypesfrompyflink.datastreamimportStreamExecutionEnvironment env=StreamExecutionEnvironment.get_execution_environment()ds=env.from_collection(collection=[(1,'aaa|bb'),(2,'bb|a'),(3,'aaa|a')],type_info=Types.ROW([Types.INT(),Types.STRING()]))

注意:type_info不传的话,默认会变成Types.PICKLED_BYTE_ARRAY(),后续很多算子/互转会不够“类型明确”,建议新手阶段显式写出来。

3.2 用 DataStream connectors 创建

这里分两种 API:

3.2.1add_source:老式 Source(当前 PyFlink 限制比较多)

Kafka 示例(注意:当前仅支持FlinkKafkaConsumer作为 DataStream source connector):

frompyflink.common.typeinfoimportTypesfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.datastream.connectors.kafkaimportFlinkKafkaConsumerfrompyflink.datastream.formats.jsonimportJsonRowDeserializationSchema env=StreamExecutionEnvironment.get_execution_environment()# 用 fat jar 避免依赖问题(官方建议用 flink-sql-connector-kafka)env.add_jars("file:///path/to/flink-sql-connector-kafka.jar")deserialization_schema=JsonRowDeserializationSchema.builder()\.type_info(type_info=Types.ROW([Types.INT(),Types.STRING()]))\.build()kafka_consumer=FlinkKafkaConsumer(topics='test_source_topic',deserialization_schema=deserialization_schema,properties={'bootstrap.servers':'localhost:9092','group.id':'test_group'})ds=env.add_source(kafka_consumer)

重要限制点:

  • add_source创建的 DataStream 只能在 streaming 执行模式跑
  • 当前add_source仅支持FlinkKafkaConsumer
3.2.2from_source:统一 Source(新一点,但当前支持面也有限)
frompyflink.common.typeinfoimportTypesfrompyflink.common.watermark_strategyimportWatermarkStrategyfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.datastream.connectors.number_seqimportNumberSequenceSource env=StreamExecutionEnvironment.get_execution_environment()seq_num_source=NumberSequenceSource(1,1000)ds=env.from_source(source=seq_num_source,watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name='seq_num_source',type_info=Types.LONG())

重要限制点:

  • 当前from_source只支持NumberSequenceSourceFileSource
  • 但它既能跑 batch 也能跑 streaming(比 add_source 更灵活)

3.3 直接复用 Table/SQL connectors 来创建 DataStream(非常实用)

思路是:Table API 的 connector 生态更完整(datagen、kafka、jdbc、filesystem…),所以你可以先用 DDL 创建 Table,再转成 DataStream:

frompyflink.common.typeinfoimportTypesfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.tableimportStreamTableEnvironment env=StreamExecutionEnvironment.get_execution_environment()t_env=StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql(""" CREATE TABLE my_source ( a INT, b VARCHAR ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '10' ) """)ds=t_env.to_append_stream(t_env.from_path('my_source'),Types.ROW([Types.INT(),Types.STRING()]))

要点:

  • 创建StreamTableEnvironment时必须把env传进去
  • “Table & SQL connector 能在 PyFlink Table 用”,也意味着“你可以间接在 DataStream 里用”

4. DataStream Transformations:从 map 开始串出拓扑

最简单的 transformation:

ds=ds.map(lambdaa:a+1)

真实项目通常会组合:

  • map / flat_map / filter
  • key_by分组
  • reduce聚合
  • 更复杂的process / window / state(后面进阶)

官方示例里key_by -> reduce的 wordcount 风格非常典型:

defsplit(line):yieldfromline.split()ds=ds.flat_map(split)\.map(lambdai:(i,1),output_type=Types.TUPLE([Types.STRING(),Types.INT()]))\.key_by(lambdai:i[0])\.reduce(lambdai,j:(i[0],i[1]+j[1]))

5. DataStream 与 Table 的互转:很多“工程化能力”靠它

互转能力能让你在一个作业里同时拿到两边的优势:

  • DataStream:更灵活的状态、时间控制
  • Table/SQL:更强的 connector 生态、更强的优化器、更好表达的关系计算

5.1 DataStream -> Table

table=t_env.from_data_stream(ds,'a, b, c')

5.2 Table -> DataStream

ds_append=t_env.to_append_stream(table,Types.ROW([Types.INT(),Types.STRING()]))ds_retract=t_env.to_retract_stream(table,Types.ROW([Types.INT(),Types.STRING()]))

理解建议:

  • append_stream适合 insert-only(只增不改)
  • retract_stream适合有更新/撤回语义的表(例如聚合结果会更新)

6. 输出结果:print、collect、DataStream sink、Table sink 四种常见方式

6.1 直接打印到标准输出

ds.print()

6.2 拉到客户端(调试用,注意内存)

withds.execute_and_collect()asresults:forrinresults:print(r)

建议:结果量大时不要这么干,客户端内存会爆。

6.3 输出到 DataStream sink connector

6.3.1add_sink(当前限制:仅支持 FlinkKafkaProducer / JdbcSink,且只能 streaming 模式)

Kafka sink 示例:

frompyflink.common.typeinfoimportTypesfrompyflink.datastream.connectors.kafkaimportFlinkKafkaProducerfrompyflink.datastream.formats.jsonimportJsonRowSerializationSchema serialization_schema=JsonRowSerializationSchema.builder().with_type_info(type_info=Types.ROW([Types.INT(),Types.STRING()])).build()kafka_producer=FlinkKafkaProducer(topic='test_sink_topic',serialization_schema=serialization_schema,producer_config={'bootstrap.servers':'localhost:9092','group.id':'test_group'})ds.add_sink(kafka_producer)
6.3.2sink_to(统一 sink,目前只支持 FileSink,但可 batch/stream)
frompyflink.datastream.connectors.file_systemimportFileSink,OutputFileConfigfrompyflink.common.serializationimportEncoder output_path='/opt/output/'file_sink=FileSink \.for_row_format(output_path,Encoder.simple_string_encoder())\.with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build())\.build()ds.sink_to(file_sink)

6.4 输出到 Table/SQL sink connector(工程里经常用)

思路:先 DataStream -> Table,然后execute_insert()写入 Table sink(print、kafka、jdbc…都可以)

frompyflink.tableimportStreamTableEnvironmentfrompyflink.datastreamimportStreamExecutionEnvironment env=StreamExecutionEnvironment.get_execution_environment()t_env=StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql(""" CREATE TABLE my_sink ( a INT, b VARCHAR ) WITH ( 'connector' = 'print' ) """)table=t_env.from_data_stream(ds)table.execute_insert("my_sink")

注意点:

  • ds 的输出类型必须是“复合类型”(Row/Tuple 等),才能顺利转成 Table

7. 作业提交:什么时候用 env.execute(),什么时候用 t_env.execute()?

常规 DataStream 作业:最后env.execute(job_name)

如果你走的是 “DataStream -> Table -> Table sink connector” 的链路,有时需要使用 TableEnvironment 的执行入口:

t_env.execute()

实践建议:

  • 纯 DataStream sink:用env.execute()
  • 以 Table 的插入为主要执行驱动(多 sink、statementSet、复杂 Table 管道):按 Table 侧的执行模型来(你看到的execute_insert()/t_env.execute()这套)

8. 一句话总结:PyFlink DataStream 入门路线怎么走最快

1)先用from_collection+print()跑通本地链路
2)再用NumberSequenceSource/FileSourcefrom_source
3)再把 Table/SQL connector 拉进来做“生产级接入”(datagen/kafka/jdbc/filesystem)
4)开始引入key_by + state,把状态算子写熟
5)最后再上窗口、水位线、容错、端到端语义

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 15:44:41

基于Python的商场停车管理系统的设计与实现_szvoh5b2

前言基于Python的商场停车管理系统是一个集车位管理、车辆识别、计费收费、数据统计与用户服务于一体的智能化平台&#xff0c;通过物联网、计算机视觉与数据库技术&#xff0c;实现停车场的高效运营与用户体验优化。一、项目介绍 开发语言&#xff1a;Python python框架&#…

作者头像 李华
网站建设 2026/4/20 22:46:35

springboot文理医院预约挂号系统(11672)

有需要的同学&#xff0c;源代码和配套文档领取&#xff0c;加文章最下方的名片哦 一、项目演示 项目演示视频 二、资料介绍 完整源代码&#xff08;前后端源代码SQL脚本&#xff09;配套文档&#xff08;LWPPT开题报告&#xff09;远程调试控屏包运行 三、技术介绍 Java…

作者头像 李华
网站建设 2026/4/21 2:01:51

Nodejs+vue大学生二手电子数码产品交易平台设计与实现 _39qu9

文章目录系统设计背景技术架构核心功能模块安全与风控措施创新点与价值--nodejs技术栈--结论源码文档获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;系统设计背景 随着高校学生电子设备更新速度加快&#xff0c;二手电子数码产品交易需求显著增…

作者头像 李华
网站建设 2026/4/23 17:21:13

宠物行为AI实时分析,兽医误诊率砍半

&#x1f4dd; 博客主页&#xff1a;Jax的CSDN主页 宠物行为AI实时分析&#xff1a;兽医误诊率砍半的创新实践目录宠物行为AI实时分析&#xff1a;兽医误诊率砍半的创新实践 引言&#xff1a;宠物行为诊断的隐性危机 一、技术原理&#xff1a;LLM如何破解行为诊断困局 核心技术…

作者头像 李华
网站建设 2026/4/21 12:24:23

输入聚食的人数,预算和口味偏好,自动推荐附近的餐厅,还能生成聚餐菜单。

智能聚餐规划系统1. 项目概述实际应用场景在朋友、同事或家庭聚会时&#xff0c;组织一次成功的聚餐需要考虑多个因素&#xff1a;人数、预算、不同人的口味偏好、餐厅选择、菜品搭配等。传统方式需要人工查询、比较、协调&#xff0c;过程繁琐且容易遗漏重要信息。痛点分析- 信…

作者头像 李华
网站建设 2026/4/21 10:55:34

AI健身动作识别:预置运动实体模型,APP快速集成

AI健身动作识别&#xff1a;预置运动实体模型&#xff0c;APP快速集成 引言&#xff1a;让健身APP拥有专业教练的眼睛 想象一下这样的场景&#xff1a;用户在家跟着健身APP做深蹲时&#xff0c;手机摄像头能像专业教练一样实时指出"膝盖内扣了""下蹲幅度不够&…

作者头像 李华