news 2026/5/8 1:48:01

PyFlink Connectors 如何在 Python 作业里正确使用 Kafka/JSON 等连接器(JAR 依赖、DDL 建表、pipeline.jars、内置 Source/Sink、

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
PyFlink Connectors 如何在 Python 作业里正确使用 Kafka/JSON 等连接器(JAR 依赖、DDL 建表、pipeline.jars、内置 Source/Sink、

1. PyFlink 为什么要手动指定 Connector/Format JAR?

因为:

  • Flink 核心运行时在 JVM 上
  • connector(如 kafka)和 format(如 json)都是 JVM 侧实现
  • Python 代码只是驱动 Table/SQL 的规划与提交

所以你需要通过pipeline.jars指定依赖(多个 jar 用;分隔):

table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

实战建议:

  • connector jar 和 format jar 都要带上(例如 Kafka + JSON)
  • 路径用file:///这种绝对 URI,避免分布式环境找不到文件
  • 生产上更推荐把 jar 放到统一位置(Flink lib 或制品仓)并在提交时声明依赖,pipeline.jars适合快速验证与 demo

2. 在 PyFlink Table API 中,推荐用 DDL 定义 Source/Sink

PyFlink 的 Table API 使用 connector 最推荐的方式是:DDL + execute_sql()
理由很简单:DDL 更直观、更可复制、也最接近线上 SQL Gateway/SQL Client 的使用方式。

2.1 Kafka Source/Sink + JSON Format(最小可用示例)

source_ddl=""" CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """sink_ddl=""" CREATE TABLE sink_table( a VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'sink_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query("SELECT a FROM source_table")\.execute_insert("sink_table").wait()

关键点拆解:

  • execute_sql()注册表(source/sink)
  • sql_query()产出一个 Table
  • execute_insert()触发写入(并提交作业)
  • .wait()在本地/mini cluster 场景常用,用于等待作业执行(远程集群通常不建议一直 wait)

3. 完整可运行的 Python 结构(把 jar、DDL、DML 串起来)

你给的完整示例结构非常标准,我建议你在博客里也用这种方式组织代码:

frompyflink.tableimportTableEnvironment,EnvironmentSettingsdeflog_processing():env_settings=EnvironmentSettings.in_streaming_mode()t_env=TableEnvironment.create(env_settings)# 1) 指定 connector & format jarst_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")# 2) DDL: source/sinksource_ddl=""" CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """sink_ddl=""" CREATE TABLE sink_table( a VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'sink_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)# 3) DML: query + insertt_env.sql_query("SELECT a FROM source_table")\.execute_insert("sink_table")\.wait()if__name__=='__main__':log_processing()

4. PyFlink 里“内置”的 Sources/Sinks:不用额外 jar 也能跑

除了 Kafka 这类外部 connector,Flink 也提供了一些“开箱即用”的数据源/数据汇,特别适合本地调试与单测。

4.1 from/to Pandas(非常适合快速验证)

frompyflink.table.expressionsimportcolimportpandasaspdimportnumpyasnp pdf=pd.DataFrame(np.random.rand(1000,2))table=t_env.from_pandas(pdf,["a","b"]).filter(col('a')>0.5)pdf2=table.to_pandas()

注意:to_pandas()会把结果收集到客户端内存,生产慎用,建议先limit()

4.2 from_elements():用 Python 集合直接造表

frompyflink.tableimportDataTypes# 自动推断table_env.from_elements([(1,'Hi'),(2,'Hello')])# 指定字段名table_env.from_elements([(1,'Hi'),(2,'Hello')],['a','b'])# 指定 schema(更稳)table_env.from_elements([(1,'Hi'),(2,'Hello')],DataTypes.ROW([DataTypes.FIELD("a",DataTypes.INT()),DataTypes.FIELD("b",DataTypes.STRING())]))

这类内置 source 对写教程、做 POC、复现 bug 特别省事。

5. 自定义 Sources & Sinks:Python 不能直接写,需 Java/Scala 实现

文档明确说明了现阶段的边界:

  • 自定义 source/sink 需要 Java/Scala 实现
  • Python 侧可以通过实现 TableFactory(也是 Java/Scala)让它能被 DDL 发现并使用

也就是说:你可以用 PyFlink 写作业逻辑,但 connector 生态仍然是 JVM 的。

如果你后面要写“自定义 connector”系列博客,可以按这个路线写:

  • 先用 Java 写 DynamicTableSourceFactory / DynamicTableSinkFactory(SPI 注册)
  • 再在 PyFlink 里通过 DDL'connector'='xxx'直接使用

6. 常见踩坑清单(PyFlink Connector 场景高频问题)

  • 只加了 connector jar,没加 format jar:DDL 里用了'format'='json',但没带 json format 的 jar,会在运行期报找不到 format factory
  • pipeline.jars 路径不可达:本地 file 路径对集群 TaskManager 不可见,必须用集群可访问路径或随 job 提交
  • 用 DDL 建表但没触发执行:Table/SQL 是惰性执行,必须execute_insert()execute_sql(INSERT ...)才会提交作业
  • wait() 用错场景:本地调试很方便;远程集群提交通常希望异步返回,避免客户端阻塞
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/29 22:21:32

怎样用3种新方法将Phaser游戏搬上桌面?

怎样用3种新方法将Phaser游戏搬上桌面? 【免费下载链接】phaser Phaser is a fun, free and fast 2D game framework for making HTML5 games for desktop and mobile web browsers, supporting Canvas and WebGL rendering. 项目地址: https://gitcode.com/gh_mi…

作者头像 李华
网站建设 2026/5/4 4:47:37

教学实践:如何在计算机课程中使用Llama Factory开展大模型实验

教学实践:如何在计算机课程中使用Llama Factory开展大模型实验 大模型技术正在改变计算机教育的面貌,但对于大学讲师来说,如何让学生在设备性能参差不齐的情况下统一参与实践环节是个难题。本文将介绍如何利用Llama Factory这一开源工具&…

作者头像 李华
网站建设 2026/5/7 0:06:11

5分钟快速验证:使用Registry-1.docker.io部署微服务原型

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 创建一个快速原型开发工具,允许用户通过简单表单定义微服务的基本参数(如编程语言、框架、依赖项等),自动生成:1) 微服务代码骨架;2…

作者头像 李华
网站建设 2026/4/21 14:33:00

Wan FusionX:6步打造影院级AI视频的终极指南

Wan FusionX:6步打造影院级AI视频的终极指南 【免费下载链接】Wan2.1-FLF2V-14B-720P-diffusers 项目地址: https://ai.gitcode.com/hf_mirrors/Wan-AI/Wan2.1-FLF2V-14B-720P-diffusers 在数字内容创作领域,AI视频生成技术正以前所未有的速度改…

作者头像 李华
网站建设 2026/5/6 11:18:28

AI教育玩具:基于Llama Factory开发儿童编程学习助手

AI教育玩具:基于Llama Factory开发儿童编程学习助手 作为一名教育科技创业者,你是否想过将AI大模型的能力融入儿童编程学习产品中?但面对复杂的模型控制和调整界面,非技术团队成员的参与往往成为难题。本文将介绍如何利用Llama Fa…

作者头像 李华
网站建设 2026/4/29 0:55:12

零基础入门:5分钟学会太阳能电池基础分类

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 创建一个交互式学习应用,向初学者介绍太阳能电池的基本分类。应用需要包含不同类型太阳能电池(单晶硅、多晶硅、薄膜等)的图片展示、简要特性说…

作者头像 李华