一、Flink SQL 概述
Flink SQL 是 Apache Flink 提供的流批统一的计算引擎,支持通过标准 SQL 语句进行数据处理。SQL 经过解析、优化后生成高效的物理执行计划,运行在 Flink 集群上。
二、Flink SQL 工作流程与内部优化
2.1 SQL 执行流程
text
SQL Query → Abstract Syntax Tree (AST) → Optimized Physical Plan → Execution
2.2 核心内部优化
| 优化策略 | 说明 |
|---|---|
| Expression Reduce | 常量表达式预计算,如1+2+t1.value优化为3+t1.value |
| Projection Push Down | 只读取查询所需的字段,减少 I/O 和网络传输 |
| Sub-Plan Reuse | 复用相同子查询的执行结果(需开启配置) |
三、Flink SQL 最佳实践
3.1 子图复用 (Sub-Plan Reuse)
通过复用相同的数据源子查询,避免重复计算:
sql
-- 优化前 INSERT INTO sink1 SELECT * FROM t1 WHERE a > 10; INSERT INTO sink2 SELECT * FROM t1 WHERE a > 10 AND b < 100; -- 优化后(复用 v1) CREATE TEMPORARY VIEW v1 AS SELECT * FROM t1 WHERE a > 10; INSERT INTO sink1 SELECT * FROM v1; INSERT INTO sink2 SELECT * FROM v1 WHERE b < 100;
开启配置:
yaml
table.optimizer.reuse-sub-plan-enabled: true table.optimizer.reuse-source-enabled: true
3.2 MiniBatch 优化
MiniBatch 将微批数据聚合后再处理,显著提升吞吐量,适用于聚合类操作。
yaml
table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 5s table.exec.mini-batch.size: 10
3.3 两阶段聚合 (Two-Phase Aggregate)
在 MiniBatch 基础上增加本地预聚合,减少 Shuffle 数据量。
yaml
table.optimizer.agg-phase-strategy: TWO_PHASE # 或 AUTO
3.4 Distinct Aggregation 优化
拆分 distinct 聚合为 partial + final 两个阶段:
yaml
table.optimizer.distinct-agg.split.enabled: true table.optimizer.distinct-agg.split.bucket-num: 1024 table.optimizer.incremental-agg-enabled: true # 默认 true
完整配置示例:
yaml
table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 5s table.optimizer.agg-phase-strategy: TWO_PHASE table.optimizer.distinct-agg.split.enabled: true table.optimizer.distinct-agg.split.bucket-num: 1024
3.5 CASE WHEN → FILTER 语法改写
推荐使用FILTER替代CASE WHEN进行条件聚合,优化器可生成更高效的计划。
sql
-- 推荐 SELECT SUM(amount) FILTER (WHERE type = 'A') AS sum_a FROM orders; -- 不推荐 SELECT SUM(CASE WHEN type = 'A' THEN amount END) AS sum_a FROM orders;
3.6 Regular Join 状态优化
三种状态存储策略对比:
| 场景 | 状态结构 | 存储效率 |
|---|---|---|
| Join Key 包含主键 | Map<PK, Tuple2<Row, Int>> | 最高 |
| Join Input 有主键 | Tuple2<Row, Int> | 中等 |
| 无主键 | Map<Row, Tuple2<Int, Int>> | 最低 |
优化建议:
Join Key 尽量包含主键
Join 前只保留必要字段
3.7 Lookup Join 优化(1.16+)
异步模式
sql
-- 同步(默认) SELECT /*+ LOOKUP('table='my_table2', 'async='false') */ * FROM t1 JOIN my_table2 FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.a = t2.c; -- 异步(提升吞吐) SELECT /*+ LOOKUP('table='my_table2', 'async='true') */ * FROM t1 JOIN my_table2 FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.a = t2.c;异步输出模式
sql
-- Ordered(保序) /*+ LOOKUP('table='my_table2', 'async='true', 'output-mode='ordered') */ -- Unordered(更高吞吐) /*+ LOOKUP('table='my_table2', 'async='true', 'output-mode='allow_unordered') */缓存策略(1.17+)
| 缓存类型 | 适用场景 | 配置 |
|---|---|---|
| FULL | 小数据集 | 'lookup.cache' = 'FULL' |
| PARTIAL | 大数据集 | 'lookup.cache' = 'PARTIAL' |
| NONE | 实时性要求高 | 'lookup.cache' = 'NONE' |
3.8 TopN 优化策略
| 策略 | 适用条件 | 状态存储 |
|---|---|---|
| AppendRank | 输入仅 insert 变更 | 按 key 存储 TopN 记录 |
| UpdateFastRank | Upsert key 含 partition key;order-by 字段单调且方向相反 | Map<orderKey, Record> |
| RetractRank | 通用场景(回撤流) | 存储全部输入数据 |
3.9 自定义 Connector 接口实现
实现以下接口可提升 Connector 性能:
| 接口 | 收益 |
|---|---|
SupportsFilterPushDown | 减少扫描 I/O |
SupportsProjectionPushDown | 减少无效字段读取 |
SupportsPartitionPushDown | 静态分区裁剪 |
SupportsDynamicFiltering | 动态分区裁剪 |
SupportsLimitPushDown | 减少扫描 I/O |
SupportsAggregatePushDown | 减少 I/O,输出数据量更小 |
SupportsStatisticReport | 生成更优执行计划 |
四、Flink SQL 开发建议
4.1 维表 Join 个数不超过 5 个
过多维表会导致 TM Heap 压力大,频繁 GC,作业性能下降。
4.2 减少嵌套层级
嵌套层级越多,回撤流数据量越大,建议控制在合理范围内。
4.3 提前过滤数据
在 Aggregate、Join 前进行数据过滤,可减少 Shuffle 数据量和网络 I/O。
sql
-- 优化前:shuffle 后过滤 SELECT * FROM A JOIN B ON A.id = B.id WHERE A.userid > 10; -- 优化后:shuffle 前过滤 SELECT * FROM (SELECT * FROM A WHERE userid > 10) A JOIN B ON A.id = B.id;
4.4 优先使用 LIKE 替代正则表达式
正则表达式(REGEXP、REGEXP_EXTRACT、REGEXP_REPLACE)性能开销约为加减乘除的百倍,极端情况可能无限循环。
4.5 UDF 嵌套不超过 6 个
多层 UDF 嵌套可能导致生成的代码超过 64KB,引发编译错误。
4.6 合理设置并行度
Source 并行度:推荐由上游组件推断(Kafka 分区数 / HDFS Block 数)
过大并行度:资源浪费
过小并行度:处理缓慢
4.7 非状态计算资源优化
对于 Filter、Union All、Lookup Join 等无状态操作,重点调优Heap Size和Network。
4.8 状态计算资源优化
对于 Join、Window、Group By 等有状态操作,重点调优:
vCore:适当增加 CPU 资源
Managed Memory:用于状态后端管理
off-Heap / Overhead:适当提高(如 5G)
五、Flink 1.15+ 特性增强
5.1 表级 TTL(Join 场景)
不同表可设置不同 TTL,减少状态冗余:
sql
INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info AS t JOIN user_score /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */ AS d ON t.user_id = d.user_id;5.2 JTL (Join-To-Live)
根据关联次数决定数据是否过期,减少状态压力(仅支持 Join/Inner Join,不与 TTL/广播同时使用):
sql
/*+ OPTIONS('eliminate-state.left.threshold'='10', 'eliminate-state.right.threshold'='10') */5.3 窗口优化:Pane 存储结构
传统方式:数据分配到多个窗口(如 102s 的数据分配到 4 个窗口),存在冗余存储
Pane 优化:数据仅存储在一个 Pane 中,触发计算时从 Pane 中提取对应窗口数据,内存只存一份
5.4 DISTRIBUTEBY Hint
按指定字段分区,解决数据仅需分区的场景:
sql
SELECT /*+ DISTRIBUTEBY(id) */ id, name FROM t1; SELECT /*+ DISTRIBUTEBY(id, name) */ id, name FROM t1;
5.5 BROADCAST Hint
大小表 Join 时,将小表广播到所有 Join Task,大表通过 Rebalance 打散:
sql
SELECT /*+ BROADCAST(A) */ a2, b2 FROM A JOIN B ON a1 = b1;
5.6 Kafka Source 限流
sql
CREATE TABLE KafkaSource (...) WITH ( 'subtask.scan.records-per-second.limit' = '1000' );
5.7 Source 独立并发设置
sql
CREATE TABLE KafkaSource (...) WITH ( 'source.parallelism' = '2' );
5.8 其他性能增强
| 特性 | 说明 |
|---|---|
| Lookup 算子复用 | 多 Sink 场景复用同一 Lookup Join 算子 |
| UDF 重用 | 多次执行仅复制第一次结果 |
| JSON_VALUE 优化 | 解析同一 JSON item 的多个字段时复用解析结果 |
六、总结速查表
| 场景 | 推荐配置/方案 |
|---|---|
| 聚合类作业 | MiniBatch + 两阶段聚合 + Distinct 拆分 |
| 多表 Join | 提前过滤 + 保留必要字段 + 表级 TTL |
| 维表关联 | ≤5 张 + Lookup Cache + 异步模式 |
| TopN | 根据输入类型选择合适的 Rank 策略 |
| 资源调优 | 无状态→Heap/Network;有状态→vCore/Managed Memory |
| 数据倾斜 | BROADCAST Hint + DISTRIBUTEBY |
| 限流 | Kafkasubtask.scan.records-per-second.limit |