news 2026/3/18 19:05:00

Flink Table/SQL 自定义 Connector从 DDL 元数据到运行时 Source/Sink(含 Socket 全栈例子拆解)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink Table/SQL 自定义 Connector从 DDL 元数据到运行时 Source/Sink(含 Socket 全栈例子拆解)

1. 先搞清楚:Connector 在 Flink 里是怎么“从声明走到运行”的

Flink Table/SQL 是声明式的。你写的 DDL、WITH 参数,不会直接触碰外部系统,而是走三段式翻译链路:元数据 → 规划 → 运行。(nightlies.apache.org)

1.1 Metadata:DDL 只是更新 CatalogTable

执行CREATE TABLE后,通常只是在 Catalog 里多了一条表元数据(CatalogTable)。外部系统里的真实表/Topic/文件并不会因此被创建或修改(取决于具体 Catalog 实现)。(nightlies.apache.org)

1.2 Planning:CatalogTable → DynamicTableSource/DynamicTableSink

优化器在生成执行计划时,会把 CatalogTable 解析成:

  • DynamicTableSource:用于SELECT的读
  • DynamicTableSink:用于INSERT INTO的写

这一步由 Factory 完成:

  • DynamicTableSourceFactory
  • DynamicTableSinkFactory

Factory 的典型职责是:校验 WITH 参数、配置 Format、实例化 Source/Sink,并暴露能力接口(Projection/Filter/Limit PushDown 等)给优化器做进一步改写。(nightlies.apache.org)

1.3 Runtime:拿到 RuntimeProvider,生成真正跑在集群上的实现

规划完成后,Source/Sink 会产出 runtime provider(如 ScanRuntimeProvider / SinkRuntimeProvider),底层最终落到 Flink 核心 connector 接口的运行时实现(例如 Source/Sink V2、或某些 legacy provider)。(nightlies.apache.org)

你可以把整个链路记成一句话:

DDL 写的是“配置”,Factory 负责“翻译”,RuntimeProvider 才是“真正在 TaskManager 上跑的代码”。

2. Flink 1.16 之后的一个坑:自定义 Connector 的 ClassLoader 必须用“用户类加载器”

从 Flink 1.16 开始,TableEnvironment 引入了 user class loader 来统一 SQL Client / SQL Gateway / Table 程序的类加载行为。自定义 connector 里如果还用Thread.currentThread().getContextClassLoader()去加载用户 jar(ADD JAR 或 CREATE FUNCTION USING JAR),就可能出现ClassNotFoundException。正确方式是从DynamicTableFactory.Context拿 user class loader。(nightlies.apache.org)

实战建议:

  • 任何需要反射加载用户类、反序列化用户对象的地方,都优先使用context.getClassLoader()(或对应可访问到的 user class loader)。
  • 如果你在 SQL Client 用-j加载 connector jar 还能遇到类加载问题,优先检查是不是错误使用了 TCCL(线程上下文类加载器)。(Apache Issues)

3. 项目依赖与打包:thin jar + uber jar 的正确姿势

3.1 依赖怎么加

开发自定义 connector / format,通常只需要 table-common 这类“扩展点依赖”。例如(以 2.2.0 为例):(nightlies.apache.org)

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>2.2.0</version><scope>provided</scope></dependency>

如果你要“桥接 DataStream API”(把 DataStream connector 适配到 Table API),再加:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>2.2.0</version><scope>provided</scope></dependency>

3.2 打包建议

  • 发布给用户使用时,建议同时提供:

    • thin jar:只包含你的 connector 代码
    • uber jar:包含第三方依赖(但不要把 Flink table 相关依赖一起打进去,避免冲突)
  • 不要在生产代码依赖flink-table-planner_2.12之类 planner 内部实现:Flink 1.15 引入 planner-loader 后,应用 classpath 不再直接可见 planner 内部类。(nightlies.apache.org)

4. Extension Points 全景:你能扩展哪些接口

4.1 Factory:用 SPI 让 Flink 发现你的 connector

实现 Factory 后,需要注册到 SPI 文件:

META-INF/services/org.apache.flink.table.factories.Factory

Flink 会按两个维度匹配“唯一 Factory”:

  • factoryIdentifier()(对应 WITH 里的'connector' = 'xxx'
  • 你实现的 base class(SourceFactory 或 SinkFactory)

这也是为什么 connector 的 WITH 里connector=socket必须和factoryIdentifier()对得上。(nightlies.apache.org)

4.2 Source 三种形态:Scan / Lookup / VectorSearch

Flink 的动态表读侧分三类(可以同时实现多种,planner 根据查询选择用哪种):(nightlies.apache.org)

1)ScanTableSource
全表扫描,可做 bounded/unbounded,也能做 CDC changelog(insert/update/delete)。需要声明getChangelogMode()告诉 planner 你会产出哪些 RowKind。

2)LookupTableSource
按 key 查维表(TableFunction / AsyncTableFunction),目前只支持 insert-only 语义。

3)VectorSearchTableSource
按向量相似度检索 topK(同样是 TableFunction / AsyncTableFunction),语义也是 insert-only,且匹配不是等值而是相似度。Flink 2.2 的 release notes 里也明确提到 Table/SQL 增强了 VECTOR_SEARCH 支持方向。(nightlies.apache.org)

4.3 Source Abilities:把过滤、投影、limit 下推到数据源

如果你希望性能更好,强烈建议实现部分 abilities,让 planner 在 planning 阶段就把计算下推到外部系统附近,例如:

  • SupportsFilterPushDown
  • SupportsProjectionPushDown
  • SupportsLimitPushDown
  • SupportsPartitionPushDown
  • SupportsReadingMetadata
  • SupportsWatermarkPushDown/SupportsSourceWatermark

这些接口只对 ScanTableSource 生效(Lookup/VectorSearch 不支持)。(nightlies.apache.org)

4.4 Sink:写侧同样支持 Changelog + Abilities

写侧DynamicTableSink接收 changelog(insert/update/delete)能力取决于你声明的ChangelogMode。还能扩展:

  • SupportsOverwrite
  • SupportsPartitioning
  • SupportsWritingMetadata
  • SupportsRowLevelDelete/Update
  • SupportsStaging(CTAS/RTAS 原子语义)

运行时 provider 推荐SinkV2Provider,数据结构同样是RowData。(nightlies.apache.org)

5. Format 体系:Connector 和 Format 是两套 SPI,可以独立复用

很多 connector 并不直接解析 bytes/JSON/CSV,而是把“编解码”交给 format 插件(同样用 SPI 发现)。Kafka 就是典型:通过value.format找到DeserializationFormatFactory,最终拿到DecodingFormat<DeserializationSchema>。(nightlies.apache.org)

你实现 format 的价值在于:同一个 format 可以给多个 connector 复用,不要把解析逻辑写死在 connector 里。

6. 全栈例子拆解:Socket Connector + Changelog CSV Format(最小可跑通模型)

这个例子非常适合当你写“第一个自定义 connector”时的模板:connector 负责建 Source,format 负责把 bytes 解成 RowData,并且支持 changelog 语义(INSERT/DELETE)。(nightlies.apache.org)

6.1 用户侧 DDL 长什么样

CREATETABLEUserScores(name STRING,scoreINT)WITH('connector'='socket','hostname'='localhost','port'='9999','byte-delimiter'='10','format'='changelog-csv','changelog-csv.column-delimiter'='|');

然后直接聚合:

SELECTname,SUM(score)FROMUserScoresGROUPBYname;

6.2 Factory 层要做的关键动作

以 SourceFactory 为例,你需要做 4 件事:

1)声明 required/optional options
2)用FactoryUtil.createTableFactoryHelper做参数校验
3)用 helper 发现 decoding format
4)从 schema 推导 producedDataType(排除 computed columns)

核心点:FactoryUtil 会帮你处理changelog-csv.xxx这种带前缀的 format option 映射,非常省心。(nightlies.apache.org)

6.3 Planning 层 Source:输出 RowData,返回 RuntimeProvider

ScanTableSource.getScanRuntimeProvider()里组装运行时对象:

  • decodingFormat.createRuntimeDecoder(...)拿到DeserializationSchema<RowData>
  • 把它塞进运行时SourceFunction(示例里是 socket 读取)
  • 返回 provider(示例使用 SourceFunctionProvider)

这里最容易踩的坑是:运行时必须产出RowData,如果你内部用 POJO/Row,需要走DataStructureConverter转换。(nightlies.apache.org)

6.4 Format 层:声明 ChangelogMode,决定 planner 认不认 update/delete

format 的getChangelogMode()是灵魂:

  • 你声明INSERT/DELETE,planner 才允许你在 SQL 上构建“更新视图”并正确传播变更语义
  • 示例里用首列INSERT|.../DELETE|...来决定 RowKind

这也是“自定义 format”比“在 source 里硬 parse”更优雅的原因:语义清晰,planner 可感知。(nightlies.apache.org)

7. 一些生产级建议:别只跑通 demo,要能跑稳

1)能力接口尽量实现
至少 Filter/Projection pushdown,能让你的 connector 从“能用”变成“好用”。

2)支持并行度配置
Factory 里支持scan.parallelism/sink.parallelism并传给实现了ParallelismProvider的 provider,避免用户只能靠全局并行度。(nightlies.apache.org)

3)谨慎使用 legacy SourceFunction
Flink 趋势是 Source/Sink V2。你可以先用 demo 跑通,但生产 connector 更建议对齐新接口演进方向。(nightlies.apache.org)

4)类加载器用对
尤其在 SQL Gateway/SQL Client 场景,避免 TCCL。Flink 也提供了 classloading 调试文档,遇到冲突可以按它的方法排查。(nightlies.apache.org)

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/13 3:43:09

如何高效管理Minecraft数据:NBTExplorer终极应用指南

如何高效管理Minecraft数据&#xff1a;NBTExplorer终极应用指南 【免费下载链接】NBTExplorer A graphical NBT editor for all Minecraft NBT data sources 项目地址: https://gitcode.com/gh_mirrors/nb/NBTExplorer 还在为复杂的Minecraft数据编辑而头疼吗&#xff…

作者头像 李华
网站建设 2026/3/16 15:16:57

PUBG罗技鼠标宏:从新手到高手的自动化射击体验

PUBG罗技鼠标宏&#xff1a;从新手到高手的自动化射击体验 【免费下载链接】logitech-pubg PUBG no recoil script for Logitech gaming mouse / 绝地求生 罗技 鼠标宏 项目地址: https://gitcode.com/gh_mirrors/lo/logitech-pubg 还记得第一次玩PUBG时&#xff0c;看着…

作者头像 李华
网站建设 2026/3/17 8:17:01

围棋AI分析工具LizzieYzy:从入门到精通的完整使用指南

围棋AI分析工具LizzieYzy&#xff1a;从入门到精通的完整使用指南 【免费下载链接】lizzieyzy LizzieYzy - GUI for Game of Go 项目地址: https://gitcode.com/gh_mirrors/li/lizzieyzy 围棋AI分析工具LizzieYzy是一款功能强大的开源软件&#xff0c;专为围棋爱好者和学…

作者头像 李华
网站建设 2026/3/13 7:45:03

高效罗技PUBG压枪配置完全指南

高效罗技PUBG压枪配置完全指南 【免费下载链接】logitech-pubg PUBG no recoil script for Logitech gaming mouse / 绝地求生 罗技 鼠标宏 项目地址: https://gitcode.com/gh_mirrors/lo/logitech-pubg 绝地求生罗技鼠标宏配置是一款专为提升射击稳定性而设计的智能辅助…

作者头像 李华
网站建设 2026/3/13 3:27:38

如何快速掌握BAAI bge-large-zh-v1.5:中文文本嵌入的完整指南

如何快速掌握BAAI bge-large-zh-v1.5&#xff1a;中文文本嵌入的完整指南 【免费下载链接】bge-large-zh-v1.5 项目地址: https://ai.gitcode.com/hf_mirrors/ai-gitcode/bge-large-zh-v1.5 在当今信息爆炸的时代&#xff0c;BAAI bge-large-zh-v1.5作为顶尖的中文文本…

作者头像 李华
网站建设 2026/3/17 8:02:37

深蓝词库转换工具:输入法数据迁移完整指南

深蓝词库转换工具&#xff1a;输入法数据迁移完整指南 【免费下载链接】imewlconverter ”深蓝词库转换“ 一款开源免费的输入法词库转换程序 项目地址: https://gitcode.com/gh_mirrors/im/imewlconverter 还在为更换输入法时词库数据丢失而烦恼吗&#xff1f;深蓝词库…

作者头像 李华