news 2026/3/11 1:15:16

Flink源码阅读:如何生成StreamGraph

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink源码阅读:如何生成StreamGraph

Flink 中有四种执行图,分别是 StreamGraph、JobGraph、ExecutionGraph 和 Physical Graph。今天我们来看下我们编写的 Flink 程序代码是如何生成 StreamGraph 的。

在开始读代码之前,我们先来简单介绍一下四种图之间的关系和区别。

StreamGraph 是根据用户用 Stream API 编写的代码生成的图,用来表示整个程序的拓扑结构。

JobGraph 是由 StreamGraph 生成的,它在 StreamGraph 的基础上,对链化了部分算子,将其合并成为一个节点,减少数据在节点之间传输时序列化和反序列化这些消耗。

ExecutionGraph 是由 JobGraph 生成的,它的主要特点是并行,将多并发的节点拆分。

PhysicalGraph 是 ExecutionGraph 实际部署后的图,它并不是一种数据结构。

StreamExecutionEnvironment

OK,了解了 Flink 四种执行图之后,我们就正式开始源码探索了。首先从 StreamExecutionEnvironment 入手,在编写 Flink 程序时,它是必不可少的一个类。它提供了一系列方法来配置流处理程序的执行环境(如并行度、Checkpoint 配置、时间属性等)。

本文我们主要关注 StreamGraph 的生成,首先是数据流的入口,即 Source 节点。在 StreamExecutionEnvironment 中有 addSource 和 fromSource 等方法,它们用来定义从哪个数据源读取数据,然后返回一个 DataStreamSource (继承自 DataStream),得到 DataStream 之后,它会在各个算子之间流转,最终到 Sink 端输出。

我们从 addSource 方法入手,addSource 方法中主要做了三件事:

1、处理数据类型,优先使用用户执行的数据类型,也可以自动推断

2、闭包清理,使用户传入的 function 能被序列化并发布到分布式环境执行

3、创建 DataStreamSource 并返回

private<OUT>DataStreamSource<OUT>addSource(finalSourceFunction<OUT>function,finalStringsourceName,@NullablefinalTypeInformation<OUT>typeInfo,finalBoundednessboundedness){checkNotNull(function);checkNotNull(sourceName);checkNotNull(boundedness);TypeInformation<OUT>resolvedTypeInfo=getTypeInfo(function,sourceName,SourceFunction.class,typeInfo);booleanisParallel=functioninstanceofParallelSourceFunction;clean(function);finalStreamSource<OUT,?>sourceOperator=newStreamSource<>(function);returnnewDataStreamSource<>(this,resolvedTypeInfo,sourceOperator,isParallel,sourceName,boundedness);}

现在我们有了 DataStream 了,那如何知道后续要进行哪些转换逻辑呢?答案在 transformations 这个变量中,它保存了后续所有的转换。

protectedfinalList<Transformation<?>>transformations=newArrayList<>();

Transformation

我们来看 Transformation 是如何生成和描述 DataStream 的转换流程的。以最常见的 map 方法为例。

public<R>SingleOutputStreamOperator<R>map(MapFunction<T,R>mapper,TypeInformation<R>outputType){returntransform("Map",outputType,newStreamMap<>(clean(mapper)));}

它调用了 transform 方法,transform 又调用了 doTransform 方法。

protected<R>SingleOutputStreamOperator<R>doTransform(StringoperatorName,TypeInformation<R>outTypeInfo,StreamOperatorFactory<R>operatorFactory){// read the output type of the input Transform to coax out errors about MissingTypeInfotransformation.getOutputType();OneInputTransformation<T,R>resultTransform=newOneInputTransformation<>(this.transformation,operatorName,operatorFactory,outTypeInfo,environment.getParallelism(),false);@SuppressWarnings({"unchecked","rawtypes"})SingleOutputStreamOperator<R>returnStream=newSingleOutputStreamOperator(environment,resultTransform);getExecutionEnvironment().addOperator(resultTransform);returnreturnStream;}

在 doTransform 方法中,就是创建 Transformation 和 SingleOutputStreamOperator(DataStream 的一个子类),然后调用 addOperator 方法将 transform 存到 StreamExecutionEnviroment 中的 transformations 变量中。

每个 Transformation 都有 id、name、parallelism 和 slotSharingGroup 等信息。其子类也记录有输入信息,如 OneInputTransformation 和 TwoInputTransformation。

StreamOperator

我们在调用 map 方法时,会传入一个自定义的处理函数,它也会保存在 Transformation 中。在 Flink 中定义了 StreamOperator 方法来抽象这类处理函数。在 map 方法中,它将我们传入的函数转成了 StreamMap,它继承了 AbstractUdfStreamOperator,同时实现了 OneInputStreamOperator 接口。

StreamOperator 定义了对算子生命周期管理的函数。

voidopen()throwsException;voidfinish()throwsException;voidclose()throwsException;OperatorSnapshotFuturessnapshotState(longcheckpointId,longtimestamp,CheckpointOptionscheckpointOptions,CheckpointStreamFactorystorageLocation)throwsException;voidinitializeState(StreamTaskStateInitializerstreamTaskStateManager)throwsException;

OneInputStreamOperator 是 StreamOperator 的子接口。在其基础上增加了对具体元素的处理,主要是对 key 的提取。

defaultvoidsetKeyContextElement(StreamRecord<IN>record)throwsException{setKeyContextElement1(record);}

AbstractUdfStreamOperator 则是提供了对自定义函数生命周期管理的实现。

@Overridepublicvoidopen()throwsException{super.open();FunctionUtils.openFunction(userFunction,DefaultOpenContext.INSTANCE);}@Overridepublicvoidfinish()throwsException{super.finish();if(userFunctioninstanceofSinkFunction){((SinkFunction<?>)userFunction).finish();}}@Overridepublicvoidclose()throwsException{super.close();FunctionUtils.closeFunction(userFunction);}

到这里,我们就知道了 Flink 中 DataStream 是如何转换的。处理逻辑保存在 Transformation 中。下面我们来看一组 Transformation 是如何生成 StreamGraph 的。

StreamGraph

生成 StreamGraph 的入口在org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#generateStreamGraph

在 generate 方法中,会遍历所有 Transformation 并调用 transform 方法。在调用节点的 transform 方法之前,会先确保它的输入节点都已经转换成功。

目前定义了以下 Transformation:

static{@SuppressWarnings("rawtypes")Map<Class<?extendsTransformation>,TransformationTranslator<?,?extendsTransformation>>tmp=newHashMap<>();tmp.put(OneInputTransformation.class,newOneInputTransformationTranslator<>());tmp.put(TwoInputTransformation.class,newTwoInputTransformationTranslator<>());tmp.put(MultipleInputTransformation.class,newMultiInputTransformationTranslator<>());tmp.put(KeyedMultipleInputTransformation.class,newMultiInputTransformationTranslator<>());tmp.put(SourceTransformation.class,newSourceTransformationTranslator<>());tmp.put(SinkTransformation.class,newSinkTransformationTranslator<>());tmp.put(GlobalCommitterTransform.class,newGlobalCommitterTransformationTranslator<>());tmp.put(LegacySinkTransformation.class,newLegacySinkTransformationTranslator<>());tmp.put(LegacySourceTransformation.class,newLegacySourceTransformationTranslator<>());tmp.put(UnionTransformation.class,newUnionTransformationTranslator<>());tmp.put(StubTransformation.class,newStubTransformationTranslator<>());tmp.put(PartitionTransformation.class,newPartitionTransformationTranslator<>());tmp.put(SideOutputTransformation.class,newSideOutputTransformationTranslator<>());tmp.put(ReduceTransformation.class,newReduceTransformationTranslator<>());tmp.put(TimestampsAndWatermarksTransformation.class,newTimestampsAndWatermarksTransformationTranslator<>());tmp.put(BroadcastStateTransformation.class,newBroadcastStateTransformationTranslator<>());tmp.put(KeyedBroadcastStateTransformation.class,newKeyedBroadcastStateTransformationTranslator<>());tmp.put(CacheTransformation.class,newCacheTransformationTranslator<>());translatorMap=Collections.unmodifiableMap(tmp);}

Flink 会根据不同的 Transformation 类调用其 translateInternal 方法。在 translateInternal 方法中就会去添加节点和边。

streamGraph.addOperator(transformationId,slotSharingGroup,transformation.getCoLocationGroupKey(),operatorFactory,inputType,transformation.getOutputType(),transformation.getName());for(IntegerinputId:context.getStreamNodeIds(parentTransformations.get(0))){streamGraph.addEdge(inputId,transformationId,0);}

在 addOperator 方法中,它通过调用 addNode 来创建 StreamNode。

protectedStreamNodeaddNode(IntegervertexID,@NullableStringslotSharingGroup,@NullableStringcoLocationGroup,Class<?extendsTaskInvokable>vertexClass,@NullableStreamOperatorFactory<?>operatorFactory,StringoperatorName){if(streamNodes.containsKey(vertexID)){thrownewRuntimeException("Duplicate vertexID "+vertexID);}StreamNodevertex=newStreamNode(vertexID,slotSharingGroup,coLocationGroup,operatorFactory,operatorName,vertexClass);streamNodes.put(vertexID,vertex);isEmpty=false;returnvertex;}

在 addEdgeInternal 方法中,对于 sideOutput 和 partition 这类虚拟节点,会先解析出原始节点,再建立实际的边。

privatevoidaddEdgeInternal(IntegerupStreamVertexID,IntegerdownStreamVertexID,inttypeNumber,StreamPartitioner<?>partitioner,List<String>outputNames,OutputTagoutputTag,StreamExchangeModeexchangeMode,IntermediateDataSetIDintermediateDataSetId){if(virtualSideOutputNodes.containsKey(upStreamVertexID)){intvirtualId=upStreamVertexID;upStreamVertexID=virtualSideOutputNodes.get(virtualId).f0;if(outputTag==null){outputTag=virtualSideOutputNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,null,outputTag,exchangeMode,intermediateDataSetId);}elseif(virtualPartitionNodes.containsKey(upStreamVertexID)){intvirtualId=upStreamVertexID;upStreamVertexID=virtualPartitionNodes.get(virtualId).f0;if(partitioner==null){partitioner=virtualPartitionNodes.get(virtualId).f1;}exchangeMode=virtualPartitionNodes.get(virtualId).f2;addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,outputNames,outputTag,exchangeMode,intermediateDataSetId);}else{createActualEdge(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,outputTag,exchangeMode,intermediateDataSetId);}}

最后根据两个物理节点创建 StreamEdge 进行连接。

privatevoidcreateActualEdge(IntegerupStreamVertexID,IntegerdownStreamVertexID,inttypeNumber,StreamPartitioner<?>partitioner,OutputTagoutputTag,StreamExchangeModeexchangeMode,IntermediateDataSetIDintermediateDataSetId){StreamNodeupstreamNode=getStreamNode(upStreamVertexID);StreamNodedownstreamNode=getStreamNode(downStreamVertexID);// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.if(partitioner==null&&upstreamNode.getParallelism()==downstreamNode.getParallelism()){partitioner=dynamic?newForwardForUnspecifiedPartitioner<>():newForwardPartitioner<>();}elseif(partitioner==null){partitioner=newRebalancePartitioner<Object>();}if(partitionerinstanceofForwardPartitioner){if(upstreamNode.getParallelism()!=downstreamNode.getParallelism()){if(partitionerinstanceofForwardForConsecutiveHashPartitioner){partitioner=((ForwardForConsecutiveHashPartitioner<?>)partitioner).getHashPartitioner();}else{thrownewUnsupportedOperationException("Forward partitioning does not allow "+"change of parallelism. Upstream operation: "+upstreamNode+" parallelism: "+upstreamNode.getParallelism()+", downstream operation: "+downstreamNode+" parallelism: "+downstreamNode.getParallelism()+" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");}}}if(exchangeMode==null){exchangeMode=StreamExchangeMode.UNDEFINED;}/** * Just make sure that {@link StreamEdge} connecting same nodes (for example as a result of * self unioning a {@link DataStream}) are distinct and unique. Otherwise it would be * difficult on the {@link StreamTask} to assign {@link RecordWriter}s to correct {@link * StreamEdge}. */intuniqueId=getStreamEdges(upstreamNode.getId(),downstreamNode.getId()).size();StreamEdgeedge=newStreamEdge(upstreamNode,downstreamNode,typeNumber,partitioner,outputTag,exchangeMode,uniqueId,intermediateDataSetId);getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}

通过 StreamNode 和 StreamEdge,就可以得到所有的节点和边,也就是我们的 StreamGraph 就创建完成了。

总结

本文先介绍了 Flink 的四种执行图以及它们之间的关系。接着又通过源码探索了 StreamGraph 的生成逻辑,Flink 将处理 逻辑保存在 Transformation 中,又由 Transformation 生成了 StreamGraph。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/3 8:56:56

YOLOv8+PyQt5西红柿成熟度检测(可以重新训练,yolov8模型,从图像、视频和摄像头三种路径识别检测,包含登陆页面、注册页面和检测页面)

https://www.bilibili.com/video/BV1sr421j7w4/?spm_id_from333.999.0.0 资源包含可视化的西红柿成熟度检测系统&#xff0c;基于最新的YOLOv8训练的西红柿成熟度检测模型&#xff0c;和基于PyQt5制作的可视化西红柿成熟度检测系统&#xff0c;包含登陆页面、注册页面和检测页…

作者头像 李华
网站建设 2026/3/6 2:11:19

当AI开始“说人话“:微软VibeVoice如何让机器300毫秒内开口

你有没有想过&#xff0c;为什么Siri、小爱同学们总要"思考"一会儿才开口&#xff1f;而人类对话时&#xff0c;几乎是无缝衔接的。今天&#xff0c;我们要聊的这个项目&#xff0c;正在改变这一切。 一、从"哑巴AI"到"话痨机器人"的进化史 还记…

作者头像 李华
网站建设 2026/3/9 16:59:21

汇编语言全接触-26.启动画面

上一章我们学习了位图的使用.在这一章我们要用上帝赋予我们的创造力来融会贯通上一章我们学到的知识.那就是研究如何用位图来创建启动画面. 你可以在这里下载示范: the example. 理论首先,我们先要搞清楚什么是启动画面.举个简单的例子:我们启动某些作的专业一点的程序时(比如N…

作者头像 李华
网站建设 2026/3/10 4:41:05

随机抽奖算法实现与对比:聚焦洗牌算法(Fisher-Yates)

期末课程设计中&#xff0c;我和团队成员共同完成了 “随机抽奖算法实现与比较” 的课题。本次设计的核心目标是模拟实际抽奖场景&#xff0c;从指定号码范围&#xff08;min_num 到 max_num&#xff09;中抽取 k 个不重复的中奖号码&#xff0c;并通过实现四种不同算法&#x…

作者头像 李华
网站建设 2026/3/10 6:43:05

【Hadoop+Spark+python毕设】物联网网络安全威胁数据分析系统、计算机毕业设计、包括数据爬取、数据分析、数据可视化、Hadoop、实战教学

&#x1f393; 作者&#xff1a;计算机毕设小月哥 | 软件开发专家 &#x1f5a5;️ 简介&#xff1a;8年计算机软件程序开发经验。精通Java、Python、微信小程序、安卓、大数据、PHP、.NET|C#、Golang等技术栈。 &#x1f6e0;️ 专业服务 &#x1f6e0;️ 需求定制化开发源码提…

作者头像 李华
网站建设 2026/3/8 16:32:28

Springboot连锁药店进销存业务系统98i85(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。

系统程序文件列表项目功能&#xff1a;员工,供应商,药品信息,药品采购,进货出库,药品销售,退货入库,药品报损,药品销毁开题报告内容基于SpringBoot的连锁药店进销存业务系统开题报告一、选题背景与意义1.1 行业现状与痛点随着医疗行业的快速发展和人们对健康需求的日益增加&…

作者头像 李华