1. Connector 定位与核心特性
FileSystem SQL Connector 提供对 Flink FileSystem 抽象支持的文件系统访问能力(本地、HDFS、S3 等)。
关键点:
- connector 本身内置在 Flink,无需额外依赖(Flink distribution 的
/lib里就有对应 jar) - 必须指定
format,否则无法读写行数据(CSV/JSON/Avro/Parquet/ORC…) - path 指向目录而不是文件:新 connector 的行为与老 filesystem connector 很不同,你声明的 path 下不会出现“你能直接预期的文件名”,而是会按 rolling 与 checkpoint 生成 part 文件
2. 快速上手:定义一个分区文件表
一个典型的分区文件表(Hive 风格目录):
CREATETABLEMyUserTable(user_id STRING,order_amountDOUBLE,dt STRING,`hour`STRING)PARTITIONEDBY(dt,`hour`)WITH('connector'='filesystem','path'='file:///path/to/whatever','format'='parquet','partition.default-name'='__DEFAULT_PARTITION__');PARTITIONED BY的字段会变成目录层级,比如dt=2020-05-20/hour=12/part-...partition.default-name用来处理动态分区列为 null/空字符串的情况(否则容易写出“脏目录”)
2.1 Hive 风格分区目录长什么样
Flink 的分区推断遵循标准 Hive 目录格式,类似:
path/ └── dt=2019-08-25 └── hour=11 ├── part-0.parquet ├── part-1.parquet分区不需要预注册,Flink 会根据目录结构自动发现与推断。
3. Source:一次性扫描 vs 目录持续监听
默认情况下,filesystem source 是bounded:扫描一次path就结束。
如果你希望像 tail 一样持续消费目录中新出现的文件,开启目录监听:
WITH('connector'='filesystem','path'='...','format'='json','source.monitor-interval'='10 s')source.monitor-interval> 0 才会持续监听- 每个文件以path 唯一标识,只处理一次
- 已处理文件集合会写入 state,并随 checkpoint/savepoint 持久化
实战建议:
- 监听间隔越短,发现越快,但 listing/遍历开销越大(对象存储更明显)
- S3/OSS/HDFS 下建议先用 30s~1min 起步,再根据延迟要求调整
4. Metadata:把文件信息当列用(定位问题神器)
filesystem connector 支持把文件元信息暴露成只读列:
file.path:全路径file.name:文件名file.size:字节数file.modification-time:修改时间(TIMESTAMP_LTZ)
示例:
CREATETABLEMyUserTableWithFilepath(user_id STRING,order_amountDOUBLE,`file.path`STRINGNOTNULLMETADATA)WITH('connector'='filesystem','path'='file:///path/to/whatever','format'='json');排查重复数据、脏文件、延迟文件时,file.path非常好用。
5. Sink:Streaming 写文件的关键机制
5.1 Row 格式 vs Bulk 格式
- Row-encoded:CSV、JSON(行写入)
- Bulk-encoded:Parquet、ORC、Avro(批量写入)
Bulk 格式通常配合 checkpoint 才会“可见”,这是很多人第一次落 parquet 时遇到的“为什么目录里没文件”的根源之一。
5.2 Rolling Policy:控制 part 文件的滚动
每个分区目录下,至少会有每个 sink subtask 一个 part 文件(只要该 subtask 收到过该分区数据)。滚动策略决定何时关闭当前文件、开启新文件:
sink.rolling-policy.file-size(默认 128MB)sink.rolling-policy.rollover-interval(默认 30min)sink.rolling-policy.check-interval(默认 1min)
实战建议:
- 想减少小文件:增大
file-size,适当增大rollover-interval - 想尽快落盘可见(特别是 CSV/JSON):配合缩短 checkpoint interval + 缩短 rollover/interval
文档强调:
- 对 bulk formats(parquet/orc/avro):rolling policy + checkpoint interval 一起决定 part 文件的大小与数量,且 pending 文件通常在下一次 checkpoint后变成 finished
- 对 row formats(csv/json):可以通过 rolling policy 更快看到文件;对 avro/orc 这类 bulk,更多依赖 checkpoint
6. File Compaction:小文件合并(但有代价)
filesystem sink 支持自动 compaction:
auto-compaction:是否开启(默认 false)compaction.file-size:目标文件大小(默认等于 rolling file size)
机制特点:
- 先写临时文件(checkpoint 前不可见)
- checkpoint 完成后把该 checkpoint 产生的临时文件合并成更大的文件
- 只合并同一个 checkpoint 内的文件
也就是说:checkpoint 越频繁,至少会生成“checkpoint 数量”级别的文件批次
注意代价(文档点名的坑):
- 文件可见性 = checkpoint interval + compaction time(你会觉得“落盘变慢”)
- compaction 太慢会 backpressure,拉长 checkpoint 时间
上线建议:
- 对象存储 + 高并发写场景,小文件问题突出时可以开
- 但要监控 checkpoint duration / backpressure,一旦 compaction 扛不住,作业会抖
7. Partition Commit:让下游知道“这个分区写完了”
写完分区后,常见需求:
- 写
_SUCCESS文件(下游批任务/调度系统用) - Hive metastore 添加分区(filesystem 本身不需要,但 hive table 需要)
filesystem sink 的 Partition Commit 由触发器(Trigger)+策略(Policy)组成。
重要限制:Partition Commit 只在动态分区写入时生效(dynamic partition inserting)。
7.1 Trigger:process-time vs partition-time
配置项:
sink.partition-commit.trigger:process-time(默认)或partition-timesink.partition-commit.delay:延迟(按分区粒度设置,小时分区通常 1h,天分区通常 1d)sink.partition-commit.watermark-time-zone:仅 partition-time 有效,默认 UTC
两种触发方式怎么选:
1)process-time(默认)
按“系统时间超过分区创建时间 + delay”触发
优点:不需要 watermark、通用
缺点:数据延迟/故障恢复可能导致“分区提前提交”
2)partition-time(更准确)
从分区值提取时间,等 watermark 超过 “分区时间 + delay” 才提交
优点:尽量保证分区数据完整再提交
前提:必须有 watermark,且能从分区字段提取时间
如果你希望“只要有数据就让下游立刻看到”(不保证完整):
- trigger=process-time(默认)
- delay=0s(默认)
- 可能会多次提交同一分区(晚到数据触发再次提交)
如果你希望“尽量完整再提交”(强烈推荐用于小时/天分区):
- trigger=partition-time
- delay=1h(小时分区)或 1d(天分区)
- 正确配置 watermark-time-zone(尤其 TIMESTAMP_LTZ)
7.2 Watermark 时区大坑:TIMESTAMP_LTZ 必配
文档明确提醒:如果 watermark 定义在TIMESTAMP_LTZ列上,并用 partition-time commit,则必须设置sink.partition-commit.watermark-time-zone为 session 时区,否则可能晚几个小时才提交。
示例(你文档里给的 Asia/Shanghai):
'sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.watermark-time-zone'='Asia/Shanghai','sink.partition-commit.policy.kind'='success-file'7.3 Partition Time Extractor:从分区值解析时间
默认 extractor 通过partition.time-extractor.timestamp-pattern拼一个时间字符串,再按timestamp-formatter解析:
partition.time-extractor.timestamp-pattern:例如'$dt $hour:00:00'partition.time-extractor.timestamp-formatter:例如yyyy-MM-dd HH:mm:ss
也支持自定义类实现PartitionTimeExtractor(适用于复杂目录/特殊字段)。
7.4 Policy:怎么“提交分区”
核心配置:
sink.partition-commit.policy.kind:success-file:写_SUCCESS文件metastore:写 metastore(主要 Hive 表)- 支持组合:
metastore,success-file custom:自定义实现PartitionCommitPolicy
_SUCCESS文件名可配:sink.partition-commit.success-file.name(默认_SUCCESS)
8. sink.shuffle-by-partition:减少小文件但可能数据倾斜
文档提供了一个很实用的开关:
sink.shuffle-by-partition.enable:是否按动态分区字段 shuffle(默认 false)
效果:
开启后,数据会按分区字段重分布,同一分区更集中到更少的 subtask
→ 分区内文件数量显著下降
风险:分区热点会造成 data skew(某些分区特别热时,某个 subtask 压力爆炸)
上线建议:
- 分区分布均匀时可以开
- 热点明显时慎用,或配合热点拆分(更细粒度分区)、上游 key 打散等手段
9. sink.parallelism:只支持 INSERT-ONLY 上游
filesystem sink 支持通过表选项设置并行度:
sink.parallelism
但文档强调限制:
- 仅当上游 changelog mode 是 INSERT-ONLY 才允许配置
- 否则会直接抛异常
所以如果你上游是更新流(聚合、join),不要指望随便改 sink 并行度来控文件数,要从语义和方案上调整(例如落 changelog 到别处、或者先转 append-only)。
10. 完整示例:Kafka → 文件系统(Parquet 分区)→ 批读回
10.1 Kafka 源表(带 watermark)
CREATETABLEkafka_table(user_id STRING,order_amountDOUBLE,log_tsTIMESTAMP(3),WATERMARKFORlog_tsASlog_ts-INTERVAL'5'SECOND)WITH(...);10.2 文件系统 sink 表(按 dt/hour 分区 + success-file 提交)
CREATETABLEfs_table(user_id STRING,order_amountDOUBLE,dt STRING,`hour`STRING)PARTITIONEDBY(dt,`hour`)WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='success-file');10.3 流式写入
INSERTINTOfs_tableSELECTuser_id,order_amount,DATE_FORMAT(log_ts,'yyyy-MM-dd'),DATE_FORMAT(log_ts,'HH')FROMkafka_table;10.4 批查询读取(分区裁剪)
SELECT*FROMfs_tableWHEREdt='2020-05-20'AND`hour`='12';10.5 TIMESTAMP_LTZ + partition-time commit 的正确写法
当 watermark 在 TIMESTAMP_LTZ 上:
CREATETABLEkafka_table(user_id STRING,order_amountDOUBLE,tsBIGINT,ts_ltzASTO_TIMESTAMP_LTZ(ts,3),WATERMARKFORts_ltzASts_ltz-INTERVAL'5'SECOND)WITH(...);CREATETABLEfs_table(user_id STRING,order_amountDOUBLE,dt STRING,`hour`STRING)PARTITIONEDBY(dt,`hour`)WITH('connector'='filesystem','path'='...','format'='parquet','partition.time-extractor.timestamp-pattern'='$dt $hour:00:00','sink.partition-commit.delay'='1 h','sink.partition-commit.trigger'='partition-time','sink.partition-commit.watermark-time-zone'='Asia/Shanghai','sink.partition-commit.policy.kind'='success-file');INSERTINTOfs_tableSELECTuser_id,order_amount,DATE_FORMAT(ts_ltz,'yyyy-MM-dd'),DATE_FORMAT(ts_ltz,'HH')FROMkafka_table;11. 上线前的避坑清单
path 必须是目录,不是文件;不要期待固定文件名
bulk 格式文件可见性高度依赖 checkpoint:看不到文件先检查 checkpoint 是否在跑
小文件治理优先顺序:
- 合理 rolling(size/interval)
- 合理 checkpoint interval
- 必要时开启 shuffle-by-partition(注意倾斜)
- 仍不行再考虑 auto-compaction(注意可见性与 backpressure)
Partition Commit:
- 要“尽量完整再提交”用 partition-time(必须 watermark + time extractor)
- watermark 在 TIMESTAMP_LTZ 上时必须配 watermark-time-zone,否则提交延迟“神秘晚几小时”
想改 sink.parallelism:先确认上游是 INSERT-ONLY