news 2026/1/25 13:23:26

Flink MongoDB SQL Connector Scan/Lookup/Sink 全打通,Upsert、分片集群与缓存一篇讲透

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink MongoDB SQL Connector Scan/Lookup/Sink 全打通,Upsert、分片集群与缓存一篇讲透

1、版本与依赖现状:Flink 2.2 暂无可用 Connector

官方文档目前明确提示:Flink 2.2 还没有可用的 MongoDB connector。 (Apache Nightlies)
同时,MongoDB connector不在 Flink 二进制发行包里,跑集群需要你自己把 connector jar 带上(放lib/或打成 uber-jar)。 (Apache Nightlies)

如果你在稳定版本线上落地(例如 Flink 1.20),Maven 中央仓库能找到对应依赖,例如:

  • DataStream 连接器:org.apache.flink:flink-connector-mongodb(例如2.0.0-1.20) (Maven Repository)
  • SQL 连接器:org.apache.flink:flink-sql-connector-mongodb(例如2.0.0-1.20) (Maven Repository)

2、三种能力先记住一句话:主键决定 Sink 模式

MongoDB SQL Connector 的写入有两种模式:

  • DDL定义了 PRIMARY KEY:Sink 进入Upsert 模式,可以接收 UPDATE/DELETE 的 changelog
  • DDL不定义 PRIMARY KEY:只能Append 模式,只允许 INSERT(追加写) (Apache Nightlies)

这点决定了你后面到底能不能用INSERT INTO ... SELECT ...去承接一个会产生更新/撤回的 Flink SQL 结果(比如聚合、Join、TopN、维表更新等)。

3、快速开始:建表、写入、查询、维表 Join 一次跑通

3.1 注册 MongoDB 表(推荐 Upsert:显式主键)

CREATETABLEMyUserTable(_id STRING,name STRING,ageINT,statusBOOLEAN,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb','uri'='mongodb://user:password@127.0.0.1:27017','database'='my_db','collection'='users');

NOT ENFORCED的含义是:Flink 不帮你在运行时强制校验唯一性,但会把这个主键用于 changelog/upsert 语义与下游映射。

3.2 写入 MongoDB(从别的表 T 写入)

INSERTINTOMyUserTableSELECT_id,name,age,statusFROMT;

3.3 扫描读取 MongoDB(有界 Scan)

SELECT_id,name,age,statusFROMMyUserTable;

3.4 作为维表做 temporal join(Lookup Source,同步)

SELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable._id;

以上 DDL 与用法是官方文档给出的标准示例。 (Apache Nightlies)

4、主键与 _id 映射机制:为什么 Upsert 天生“幂等”

MongoDB 里_id是集合的保留主键(唯一且不可变),MongoDB connector 会用你在 DDL 定义的主键字段来生成 MongoDB 文档_id,从而实现幂等 Upsert:

  • 主键只有 1 列:直接把该字段转成 BSON 值作为_id
  • 主键多列:把多列按 DDL 顺序组合成一个 BSON document 作为_id,形如:_id: {f1: v1, f2: v2}(Apache Nightlies)

注意两个很容易踩的点:

1)如果 DDL 里有_id字段,但主键却不是_id,会产生歧义(到底谁是 MongoDB 的_id)。官方建议:要么把_id作为主键,要么改名避免冲突。 (Apache Nightlies)

2)MongoDB 的_id不能是 Array,且如果_id是嵌套子字段,子字段名不能以$开头。 (Apache Nightlies)

4.1 为什么 Upsert 更适合 Flink 容错恢复

Flink 发生失败恢复后,会从最近一次成功 checkpoint 之后重放一段数据(at-least-once 常态),这会带来“重放写入”。Upsert 写入因为_id稳定,可以天然幂等,避免重复数据与唯一键冲突。 (Apache Nightlies)

另外一个隐藏坑:INSERT OVERWRITE写 MongoDB 时会强制使用 Upsert;如果你没在 DDL 定义主键,会被拒绝。 (Apache Nightlies)

5、分片集群 Sharded Collection:Upsert 必须带 shard key,Flink 用 PARTITIONED BY 来声明

MongoDB 对分片集合的更新/Upsert 有额外约束:Upsert 的 filter 必须包含 shard key。因此在 Flink SQL 建 sink 表时,需要用PARTITIONED BY声明 shard key,让 connector 在运行时把 shard key 写进 filter。 (Apache Nightlies)

示例(官方思路):

CREATETABLEMySinkTable(_idBIGINT,shardKey0 STRING,shardKey1 STRING,statusSTRING,PRIMARYKEY(_id)NOTENFORCED)PARTITIONEDBY(shardKey0,shardKey1)WITH('connector'='mongodb','uri'='mongodb://user:password@127.0.0.1:27017','database'='my_db','collection'='users');INSERTINTOMySinkTableSELECT_id,shardKey0,shardKey1,statusFROMT;

还支持静态/动态分区写入:

-- 静态分区INSERTINTOMySinkTablePARTITION(shardKey0='value0',shardKey1='value1')SELECT1,'INIT';-- shardKey0 静态 + shardKey1 动态INSERTINTOMySinkTablePARTITION(shardKey0='value0')SELECT1,'value1','INIT';

重要限制(非常关键):虽然 MongoDB 4.2+ shard key 可变,但在 Flink SQL 的 upsert 场景下,如果 shard key 发生更新,connector 拿不到“旧 shard key”去构造 filter,可能导致重复记录风险,因此仍建议 shard key 保持不可变。 (Apache Nightlies)

6、Scan Source 并行加速:4 种分区策略怎么选

MongoDB 批量扫描(Scan Source)支持并行分区读取,核心配置是:

  • scan.partition.strategy:single / sample / split-vector / sharded / default
  • scan.partition.size:分区粒度(默认 64mb)
  • scan.fetch-size:每次 round-trip 拉多少文档(默认 2048)
  • scan.cursor.no-timeout:是否禁用 10 分钟 cursor 超时(默认 true,但超过 30 分钟处理一个 batch 仍可能导致 session 过期) (Apache Nightlies)

策略选择建议(按文档特性翻译成工程话):

  • single:全表一个分区,最稳但最慢(小表/测试)
  • sample:抽样生成分区,速度快但可能不均匀(数据分布很怪时慎用)
  • split-vector:非分片集合首选,分区快且更均匀,但需要 splitVector 权限
  • sharded:分片集合首选,直接用config.chunks的 chunk 范围做分区,快且均匀,但需要读 config 库权限
  • default:分片集合走 sharded,否则走 split-vector (Apache Nightlies)

一个常用的“批量导入/离线回灌”模板:

CREATETABLEmongo_scan_users(_id STRING,name STRING,ageINT,statusBOOLEAN,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb','uri'='mongodb://user:password@127.0.0.1:27017','database'='my_db','collection'='users','scan.partition.strategy'='default','scan.partition.size'='64mb','scan.fetch-size'='2048');

7、Lookup 维表缓存:PARTIAL 缓存让 temporal join 性能起飞,但要权衡新鲜度

MongoDB connector 的 Lookup Source 目前只支持**同步(sync)**模式。为了降低维表 join 对 MongoDB 的压力,提供了lookup.cache = PARTIAL的“进程内缓存”(每个 TaskManager 一份)。 (Apache Nightlies)

关键配置项:

  • lookup.cache = PARTIAL:开启缓存
  • lookup.partial-cache.max-rows:最大缓存行数,超出会淘汰最旧的
  • lookup.partial-cache.expire-after-write:写入后 TTL
  • lookup.partial-cache.expire-after-access:访问后 TTL
  • lookup.partial-cache.caching-missing-key:是否缓存“查不到”的空结果(默认 true)
  • lookup.max-retries/lookup.retry.interval:失败重试 (Apache Nightlies)

典型的维表 join DDL(带缓存):

CREATETABLEdim_users(_id STRING,name STRING,ageINT,statusBOOLEAN,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb','uri'='mongodb://user:password@127.0.0.1:27017','database'='my_db','collection'='users','lookup.cache'='PARTIAL','lookup.partial-cache.max-rows'='200000','lookup.partial-cache.expire-after-write'='10 min','lookup.partial-cache.caching-missing-key'='true','lookup.max-retries'='3','lookup.retry.interval'='1 s');

权衡点务必写在博客里:缓存可能不是最新数据,TTL 越短越“新鲜”,但数据库压力越大;TTL 越长吞吐越高,但维表可能“旧”。 (Apache Nightlies)

8、Filter Pushdown:把能下推的过滤交给 MongoDB

MongoDB connector 支持将部分 Flink SQL 过滤条件下推到 MongoDB,从而减少网络 IO 与 Flink 端处理量。你可以用:

  • filter.handling.policy = always | never(默认 always) (Apache Nightlies)

支持的过滤映射包括:=, <>, >, >=, <, <=, IS NULL, IS NOT NULL, OR, AND等,对应 MongoDB 的$eq/$ne/$gt/$gte/$lt/$lte/$or/$and。 (Apache Nightlies)

建议:
如果你的 MongoDB 集群压力大、你又担心下推导致执行计划不可控,可以临时设成never来做对比压测与排障。

9、Sink 写入:批量 flush、重试、并行度与交付语义

MongoDB sink 常用参数如下:

  • sink.buffer-flush.max-rows(默认 1000):每批最多缓存多少行
  • sink.buffer-flush.interval(默认 1s):定时 flush
  • sink.max-retries(默认 3)/sink.retry.interval(默认 1s):写失败重试
  • sink.parallelism:sink 并行度(默认跟上游链)
  • sink.delivery-guarantee = none | at-least-once目前不支持 exactly-once(Apache Nightlies)

实战建议(写入侧):

  • 你追求低延迟:调小interval(例如 200ms~500ms),适当减小max-rows
  • 你追求高吞吐:调大max-rows(例如 5k~20k),interval可保持 1s~5s
  • 强烈建议配合 Flink checkpoint 使用(否则 at-least-once 缓冲/恢复行为不可控),并尽量选择 Upsert 模式提升幂等性

10、数据类型映射:BSON 到 Flink SQL 的常用对照

MongoDB 本质是 BSON 类型系统,connector 给出了典型映射:

  • ObjectId → STRING
  • DateTime → TIMESTAMP_LTZ(3)
  • Object → ROW
  • Array → ARRAY
  • Decimal128 → DECIMAL
    以及一些特殊 BSON 类型会用 Extended JSON 映射到 STRING(如 Regex、Symbol、DbPointer 等)。 (Apache Nightlies)

实战建议:
如果你希望 Flink 侧 schema 稳定,尽量避免在同一个字段上混用 BSON 类型(例如同字段既可能是 String 又可能是 Object),否则映射与序列化会很难控。

11、一份“生产落地清单”

  1. 先确认版本:Flink 2.2 当前仍提示无可用 connector(别在依赖上空耗)。 (Apache Nightlies)
  2. 写入推荐 Upsert:定义 PRIMARY KEY,让_id稳定,失败重放也幂等。 (Apache Nightlies)
  3. 分片集合 Upsert:必须把 shard key 用PARTITIONED BY声明,并保证 shard key 不变。 (Apache Nightlies)
  4. Scan 并行读取:优先default,需要权限时提前申请 splitVector / config.chunks 读取权限。 (Apache Nightlies)
  5. 维表 join:开启lookup.cache=PARTIAL,用 TTL 在“新鲜度 vs 吞吐”之间找到平衡。 (Apache Nightlies)
  6. 打包部署:connector 不在二进制发行包里,集群必须能加载到 jar。 (Apache Nightlies)
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/24 14:39:55

【小程序毕设全套源码+文档】基于微信小程序的剧本杀游玩一体化平台设计与实现(丰富项目+远程调试+讲解+定制)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华
网站建设 2026/1/19 11:20:44

【小程序毕设源码分享】基于springboot+微信小程序的办公用品管理系统小程序的设计与实现(程序+文档+代码讲解+一条龙定制)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华
网站建设 2026/1/24 1:21:04

【小程序毕设源码分享】基于springboot+微信小程序的剧本杀游玩一体化平台的设计与实现(程序+文档+代码讲解+一条龙定制)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华
网站建设 2026/1/23 13:03:57

【小程序毕设源码分享】基于springboot+微信小程序的农产品管理与销售APP的设计与实现(程序+文档+代码讲解+一条龙定制)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华
网站建设 2026/1/19 11:19:12

reinterpret_cast 有哪些注意事项?

一、reinterpret_cast 核心注意事项&#xff08;必记&#xff09; 1. 仅用于 “底层二进制重解释”&#xff0c;绝不做逻辑上的类型转换 reinterpret_cast不会对数据做任何格式转换&#xff0c;只是告诉编译器 “把这块内存当成另一种类型看待”&#xff0c;逻辑上的类型转换…

作者头像 李华
网站建设 2026/1/23 13:51:44

激光雷达十年演进

下面这份内容&#xff0c;不是“激光雷达从 64 线到 128 线、从机械到固态”的产品演进史&#xff0c;也不是“激光雷达会不会被视觉取代”的路线争论&#xff0c;而是站在 “激光雷达作为自动驾驶系统中最接近‘物理真相’、却最容易被误用为‘绝对真理’的感知基石”高度&…

作者头像 李华