1. PyFlink DataStream 作业的通用结构
一个标准的 DataStream 作业,通常有 5 步:
1)创建StreamExecutionEnvironment
2)创建 Source 得到初始DataStream
3)编排 transformation(map/filter/key_by/reduce/process…)
4)绑定 Sink(print / file / kafka / jdbc / table sink …)
5)执行提交:env.execute(job_name)
下面这个示例就是典型骨架:NumberSequenceSource -> key_by -> stateful map -> FileSink -> execute
frompyflink.commonimportWatermarkStrategy,Rowfrompyflink.common.serializationimportEncoderfrompyflink.common.typeinfoimportTypesfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.datastream.connectors.file_systemimportFileSink,OutputFileConfigfrompyflink.datastream.connectors.number_seqimportNumberSequenceSourcefrompyflink.datastream.functionsimportRuntimeContext,MapFunctionfrompyflink.datastream.stateimportValueStateDescriptorclassMyMapFunction(MapFunction):defopen(self,runtime_context:RuntimeContext):state_desc=ValueStateDescriptor('cnt',Types.PICKLED_BYTE_ARRAY())self.cnt_state=runtime_context.get_state(state_desc)defmap(self,value):cnt=self.cnt_state.value()ifcntisNoneorcnt<2:self.cnt_state.update(1ifcntisNoneelsecnt+1)returnvalue[0],value[1]+1else:returnvalue[0],value[1]defstate_access_demo():env=StreamExecutionEnvironment.get_execution_environment()seq_num_source=NumberSequenceSource(1,10000)ds=env.from_source(source=seq_num_source,watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name='seq_num_source',type_info=Types.LONG())ds=ds.map(lambdaa:Row(a%4,1),output_type=Types.ROW([Types.LONG(),Types.LONG()]))\.key_by(lambdaa:a[0])\.map(MyMapFunction(),output_type=Types.TUPLE([Types.LONG(),Types.LONG()]))output_path='/opt/output/'file_sink=FileSink \.for_row_format(output_path,Encoder.simple_string_encoder())\.with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build())\.build()ds.sink_to(file_sink)env.execute('state_access_demo')if__name__=='__main__':state_access_demo()你只要记住:DataStream 的“执行入口”是env.execute(),没有它就不会真正跑起来。
2. 创建 StreamExecutionEnvironment
这是 DataStream 的“运行上下文”,负责决定并行度、运行模式、Checkpoint 等(这里先聚焦入门):
frompyflink.datastreamimportStreamExecutionEnvironment env=StreamExecutionEnvironment.get_execution_environment()3. 创建 DataStream:三类常见方式
3.1 从 Python list/collection 创建(最适合本地调试)
frompyflink.common.typeinfoimportTypesfrompyflink.datastreamimportStreamExecutionEnvironment env=StreamExecutionEnvironment.get_execution_environment()ds=env.from_collection(collection=[(1,'aaa|bb'),(2,'bb|a'),(3,'aaa|a')],type_info=Types.ROW([Types.INT(),Types.STRING()]))注意:type_info不传的话,默认会变成Types.PICKLED_BYTE_ARRAY(),后续很多算子/互转会不够“类型明确”,建议新手阶段显式写出来。
3.2 用 DataStream connectors 创建
这里分两种 API:
3.2.1add_source:老式 Source(当前 PyFlink 限制比较多)
Kafka 示例(注意:当前仅支持FlinkKafkaConsumer作为 DataStream source connector):
frompyflink.common.typeinfoimportTypesfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.datastream.connectors.kafkaimportFlinkKafkaConsumerfrompyflink.datastream.formats.jsonimportJsonRowDeserializationSchema env=StreamExecutionEnvironment.get_execution_environment()# 用 fat jar 避免依赖问题(官方建议用 flink-sql-connector-kafka)env.add_jars("file:///path/to/flink-sql-connector-kafka.jar")deserialization_schema=JsonRowDeserializationSchema.builder()\.type_info(type_info=Types.ROW([Types.INT(),Types.STRING()]))\.build()kafka_consumer=FlinkKafkaConsumer(topics='test_source_topic',deserialization_schema=deserialization_schema,properties={'bootstrap.servers':'localhost:9092','group.id':'test_group'})ds=env.add_source(kafka_consumer)重要限制点:
add_source创建的 DataStream 只能在 streaming 执行模式跑- 当前
add_source仅支持FlinkKafkaConsumer
3.2.2from_source:统一 Source(新一点,但当前支持面也有限)
frompyflink.common.typeinfoimportTypesfrompyflink.common.watermark_strategyimportWatermarkStrategyfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.datastream.connectors.number_seqimportNumberSequenceSource env=StreamExecutionEnvironment.get_execution_environment()seq_num_source=NumberSequenceSource(1,1000)ds=env.from_source(source=seq_num_source,watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name='seq_num_source',type_info=Types.LONG())重要限制点:
- 当前
from_source只支持NumberSequenceSource和FileSource - 但它既能跑 batch 也能跑 streaming(比 add_source 更灵活)
3.3 直接复用 Table/SQL connectors 来创建 DataStream(非常实用)
思路是:Table API 的 connector 生态更完整(datagen、kafka、jdbc、filesystem…),所以你可以先用 DDL 创建 Table,再转成 DataStream:
frompyflink.common.typeinfoimportTypesfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.tableimportStreamTableEnvironment env=StreamExecutionEnvironment.get_execution_environment()t_env=StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql(""" CREATE TABLE my_source ( a INT, b VARCHAR ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '10' ) """)ds=t_env.to_append_stream(t_env.from_path('my_source'),Types.ROW([Types.INT(),Types.STRING()]))要点:
- 创建
StreamTableEnvironment时必须把env传进去 - “Table & SQL connector 能在 PyFlink Table 用”,也意味着“你可以间接在 DataStream 里用”
4. DataStream Transformations:从 map 开始串出拓扑
最简单的 transformation:
ds=ds.map(lambdaa:a+1)真实项目通常会组合:
map / flat_map / filterkey_by分组reduce聚合- 更复杂的
process / window / state(后面进阶)
官方示例里key_by -> reduce的 wordcount 风格非常典型:
defsplit(line):yieldfromline.split()ds=ds.flat_map(split)\.map(lambdai:(i,1),output_type=Types.TUPLE([Types.STRING(),Types.INT()]))\.key_by(lambdai:i[0])\.reduce(lambdai,j:(i[0],i[1]+j[1]))5. DataStream 与 Table 的互转:很多“工程化能力”靠它
互转能力能让你在一个作业里同时拿到两边的优势:
- DataStream:更灵活的状态、时间控制
- Table/SQL:更强的 connector 生态、更强的优化器、更好表达的关系计算
5.1 DataStream -> Table
table=t_env.from_data_stream(ds,'a, b, c')5.2 Table -> DataStream
ds_append=t_env.to_append_stream(table,Types.ROW([Types.INT(),Types.STRING()]))ds_retract=t_env.to_retract_stream(table,Types.ROW([Types.INT(),Types.STRING()]))理解建议:
append_stream适合 insert-only(只增不改)retract_stream适合有更新/撤回语义的表(例如聚合结果会更新)
6. 输出结果:print、collect、DataStream sink、Table sink 四种常见方式
6.1 直接打印到标准输出
ds.print()6.2 拉到客户端(调试用,注意内存)
withds.execute_and_collect()asresults:forrinresults:print(r)建议:结果量大时不要这么干,客户端内存会爆。
6.3 输出到 DataStream sink connector
6.3.1add_sink(当前限制:仅支持 FlinkKafkaProducer / JdbcSink,且只能 streaming 模式)
Kafka sink 示例:
frompyflink.common.typeinfoimportTypesfrompyflink.datastream.connectors.kafkaimportFlinkKafkaProducerfrompyflink.datastream.formats.jsonimportJsonRowSerializationSchema serialization_schema=JsonRowSerializationSchema.builder().with_type_info(type_info=Types.ROW([Types.INT(),Types.STRING()])).build()kafka_producer=FlinkKafkaProducer(topic='test_sink_topic',serialization_schema=serialization_schema,producer_config={'bootstrap.servers':'localhost:9092','group.id':'test_group'})ds.add_sink(kafka_producer)6.3.2sink_to(统一 sink,目前只支持 FileSink,但可 batch/stream)
frompyflink.datastream.connectors.file_systemimportFileSink,OutputFileConfigfrompyflink.common.serializationimportEncoder output_path='/opt/output/'file_sink=FileSink \.for_row_format(output_path,Encoder.simple_string_encoder())\.with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build())\.build()ds.sink_to(file_sink)6.4 输出到 Table/SQL sink connector(工程里经常用)
思路:先 DataStream -> Table,然后execute_insert()写入 Table sink(print、kafka、jdbc…都可以)
frompyflink.tableimportStreamTableEnvironmentfrompyflink.datastreamimportStreamExecutionEnvironment env=StreamExecutionEnvironment.get_execution_environment()t_env=StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql(""" CREATE TABLE my_sink ( a INT, b VARCHAR ) WITH ( 'connector' = 'print' ) """)table=t_env.from_data_stream(ds)table.execute_insert("my_sink")注意点:
- ds 的输出类型必须是“复合类型”(Row/Tuple 等),才能顺利转成 Table
7. 作业提交:什么时候用 env.execute(),什么时候用 t_env.execute()?
常规 DataStream 作业:最后env.execute(job_name)。
如果你走的是 “DataStream -> Table -> Table sink connector” 的链路,有时需要使用 TableEnvironment 的执行入口:
t_env.execute()实践建议:
- 纯 DataStream sink:用
env.execute() - 以 Table 的插入为主要执行驱动(多 sink、statementSet、复杂 Table 管道):按 Table 侧的执行模型来(你看到的
execute_insert()/t_env.execute()这套)
8. 一句话总结:PyFlink DataStream 入门路线怎么走最快
1)先用from_collection+print()跑通本地链路
2)再用NumberSequenceSource/FileSource练from_source
3)再把 Table/SQL connector 拉进来做“生产级接入”(datagen/kafka/jdbc/filesystem)
4)开始引入key_by + state,把状态算子写熟
5)最后再上窗口、水位线、容错、端到端语义