news 2026/2/25 18:12:42

Flink SQL Materialized Table 语句CREATE / ALTER / DROP介绍

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink SQL Materialized Table 语句CREATE / ALTER / DROP介绍

1. Flink 目前支持的 Materialized Table 语句

1.1CREATE MATERIALIZED TABLE:创建物化表(定义查询 + 刷新策略)
1.2ALTER MATERIALIZED TABLE:管理物化表(暂停/恢复/手动刷新/改查询)
1.3DROP MATERIALIZED TABLE:删除物化表(先删刷新管道再删元数据)

2. CREATE MATERIALIZED TABLE:创建物化表

2.1 语法总览

2.1.1 完整语法:

CREATEMATERIALIZEDTABLE[catalog_name.][db_name.]table_name[([<table_constraint>])][COMMENTtable_comment][PARTITIONEDBY(partition_column_name1,partition_column_name2,...)][WITH(key1=val1,key2=val2,...)][FRESHNESS=INTERVAL'<num>'{SECOND[S]|MINUTE[S]|HOUR[S]|DAY[S]}][REFRESH_MODE={ CONTINUOUS|FULL}]AS<select_statement>

2.1.2 核心理解:

  • AS <select_statement>:决定“算什么”(物化结果来自这条查询)
  • FRESHNESS / REFRESH_MODE:决定“怎么刷、多频繁”(自动生成刷新 Pipeline)
  • Schema:由查询自动推导(你不能显式写列定义)

2.2 PRIMARY KEY:可选主键约束

2.2.1 语法:

<table_constraint>:[CONSTRAINTconstraint_name]PRIMARYKEY(column_name,...)NOTENFORCED

2.2.2 关键点:

  • PRIMARY KEY 用于“逻辑唯一标识每行”
  • 主键列必须非 NULL
  • NOT ENFORCED表示 Flink 不强制校验唯一性(通常依赖外部存储/语义保证)

2.3 PARTITIONED BY:可选分区键(强约束)

2.3.1 语法:

PARTITIONEDBY(partition_column_name1,partition_column_name2,...)

2.3.2 强约束(非常重要):

  • 分区列必须包含在物化表的查询输出中(即必须出现在AS SELECT的 select 列里)

2.3.3 示例:按ds分区创建物化表

CREATEMATERIALIZEDTABLEmy_materialized_table PARTITIONEDBY(ds)FRESHNESS=INTERVAL'1'HOURASSELECTdsFROM...;

2.3.4 直观收益:

  • 若物化表 sink 是 filesystem,会按分区创建目录结构
  • FULL 模式下结合date-formatter可以做到“只刷最新分区”,成本更低

2.4 WITH Options:表属性 / Connector 参数 / 分区时间格式映射

2.4.1 用途:

  • 指定物化表属性(含 connector options)
  • 指定分区字段的时间格式选项:partition.fields.<field>.date-formatter

2.4.2 示例:按ds分区,并指定ds的时间格式为yyyy-MM-dd

CREATEMATERIALIZEDTABLEmy_materialized_table PARTITIONEDBY(ds)WITH('format'='json','partition.fields.ds.date-formatter'='yyyy-MM-dd')FRESHNESS=INTERVAL'1'HOURASSELECTds,...FROM...;

2.4.3 机制说明:

  • 在 FULL 模式下,每次调度触发都会把“调度时间”转换成ds分区值
  • 例如调度时间2024-01-01 00:00:00→ 刷新分区ds = '2024-01-01'

2.4.4 注意事项(官方限制)

  • partition.fields.#.date-formatter只在 FULL 模式生效
  • 该配置里的字段必须是string 类型分区字段

2.5 FRESHNESS:数据新鲜度(可选,但极关键)

2.5.1 定义与语法

2.5.1.1 定义:

  • FRESHNESS 定义物化表允许落后基础表更新的最大时间(目标值,非强保证)

2.5.1.2 语法:

FRESHNESS=INTERVAL'<num>'{SECOND|MINUTE|HOUR|DAY}

2.5.2 参数规则与合法性

2.5.2.1 规则:

  • <num>必须是正整数
  • 不支持MONTH / YEAR
  • FULL 模式下<num>还要满足“可转 cron/公约数”要求(下文有支持列表)

2.5.2.2 典型非法例子:

FRESHNESS=INTERVAL'-1'SECOND-- 负数FRESHNESS=INTERVAL'0'SECOND-- 0FRESHNESS=INTERVAL'1'MONTH-- 不支持FRESHNESS=INTERVAL'1'YEAR-- 不支持

2.5.3 FRESHNESS 与刷新模式/刷新频率的关系

2.5.3.1 关系总结:

  • Freshness 会参与推断 Refresh Mode(CONTINUOUS 或 FULL)

  • Freshness 会决定刷新频率

    • CONTINUOUS:Freshness → streaming job 的 checkpoint interval
    • FULL:Freshness → workflow 的调度周期(cron)

2.5.3.2 示例(假设materialized-table.refresh-mode.freshness-threshold = 30 minutes

  • FRESHNESS = INTERVAL '1' SECOND→ Streaming Job,checkpoint=1s
  • FRESHNESS = INTERVAL '1' MINUTE→ Streaming Job,checkpoint=1m
  • FRESHNESS = INTERVAL '1' HOUR→ Scheduled Workflow,周期=1h
  • FRESHNESS = INTERVAL '1' DAY→ Scheduled Workflow,周期=1d

2.5.4 默认 FRESHNESS(当你省略时)

2.5.4.1 省略FRESHNESS会使用系统默认值:

  • CONTINUOUS:materialized-table.default-freshness.continuous(默认 3 分钟)
  • FULL:materialized-table.default-freshness.full(默认 1 小时)

2.5.4.2 示例:省略 freshness(默认 CONTINUOUS 3 分钟)

CREATEMATERIALIZEDTABLEmy_materialized_tableASSELECT*FROMsource_table;

2.5.4.3 示例:省略 freshness,但显式指定 FULL(默认 1 小时)

CREATEMATERIALIZEDTABLEmy_materialized_table_full REFRESH_MODE=FULLASSELECT*FROMsource_table;

2.5.5 FULL 模式 freshness 的 cron 支持范围(必须牢记)

由于 FULL 模式需要把 freshness 翻译成 cron,当前仅支持以下间隔:

2.5.5.1 Second 支持:1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30
2.5.5.2 Minute 支持:1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30
2.5.5.3 Hour 支持:1, 2, 3, 4, 6, 8, 12
2.5.5.4 Day 支持:1

2.5.5.5 FULL 模式常见非法例子(不在支持列表或不满足约束):

FRESHNESS=INTERVAL'60'SECONDFRESHNESS=INTERVAL'5'HOUR

2.6 REFRESH_MODE:显式指定刷新模式(优先级最高)

2.6.1 语法:

REFRESH_MODE={ CONTINUOUS|FULL}

2.6.2 行为:

  • 一旦显式指定,优先于系统基于 freshness 的推断

2.6.3 示例:freshness 1 小时,但强制跑 CONTINUOUS(checkpoint=1h)

CREATEMATERIALIZEDTABLEmy_materialized_table FRESHNESS=INTERVAL'1'HOURREFRESH_MODE=CONTINUOUSASSELECT...;

2.6.4 示例:freshness 10 分钟,但强制跑 FULL(调度周期=10m)

CREATEMATERIALIZEDTABLEmy_materialized_table FRESHNESS=INTERVAL'10'MINUTEREFRESH_MODE=FULLASSELECT...;

2.7 AS <select_statement>:定义物化查询

2.7.1 说明:

  • 物化表的数据来自AS <select_statement>
  • 上游可以是:物化表 / 普通表 / 视图
  • select_statement支持所有 Flink SQL Queries

2.7.2 示例:

CREATEMATERIALIZEDTABLEmy_materialized_table FRESHNESS=INTERVAL'10'SECONDASSELECT*FROMkafka_catalog.db1.kafka_table;

2.8 CREATE 的限制

2.8.1 不支持显式指定列定义(列名/类型从查询自动推导)
2.8.2 不支持在查询中引用临时表、临时视图、临时函数

3. ALTER MATERIALIZED TABLE:管理与演进物化表

3.1 语法总览

ALTERMATERIALIZEDTABLE[catalog_name.][db_name.]table_name SUSPEND|RESUME[WITH(key1=val1,key2=val2,...)]|REFRESH[PARTITIONpartition_spec]|AS<select_statement>

3.2 SUSPEND:暂停后台刷新管道

3.2.1 语法:

ALTERMATERIALIZEDTABLEmy_materialized_table SUSPEND;

3.2.2 关键注意:

  • 如果物化表是 CONTINUOUS 模式,默认使用STOP WITH SAVEPOINT暂停作业
  • 因此你需要先设置 savepoint 保存路径:
SET'execution.checkpointing.savepoint-dir'='hdfs://savepoint_path';ALTERMATERIALIZEDTABLEmy_materialized_table SUSPEND;

3.3 RESUME:恢复刷新(支持动态选项,但不持久化)

3.3.1 基本恢复:

ALTERMATERIALIZEDTABLEmy_materialized_table RESUME;

3.3.2 带动态参数(仅对当前刷新 pipeline 生效,不会写回元数据):

ALTERMATERIALIZEDTABLEmy_materialized_table RESUMEWITH('sink.parallelism'='10');

3.4 REFRESH:手动触发刷新(会启动 Batch Job)

3.4.1 刷新整表:

ALTERMATERIALIZEDTABLEmy_materialized_table REFRESH;

3.4.2 刷新指定分区:

ALTERMATERIALIZEDTABLEmy_materialized_table REFRESHPARTITION(ds='2024-06-28');

3.4.3 注意:

  • REFRESH 会启动一个Flink Batch Job来刷新数据

3.5 ALTER … AS:修改查询定义(并触发 schema 演进)

3.5.1 用途:

  • 修改物化表的 query definition
  • 系统会先基于新 query 推导 schema 并进行 schema evolution,然后用新 query 刷新数据
  • 默认不影响历史数据(尤其是 FULL 模式的历史分区)

3.5.2 FULL 模式下的行为

3.5.2.1 流程:

  • 更新 schema + query definition

  • 下次刷新触发时:

    • 若分区表且正确设置partition.fields.#.date-formatter→ 只刷新最新分区
    • 否则 → 整表覆盖刷新

3.5.3 CONTINUOUS 模式下的行为(风险点)

3.5.3.1 流程:

  • 暂停当前运行的刷新 job
  • 更新 schema + query definition
  • 启动新的刷新 job

3.5.3.2 风险:

  • 新 job不会恢复旧 job 的 state
  • 可能造成短暂的数据重复或数据丢失
  • 数据源起始 offset 由 connector 默认实现或 query 中的动态 hint 决定

3.5.4 Schema 演进限制(当前唯一支持的方式)

  • 仅支持:在原 schema 末尾新增“可为 NULL”的列

3.5.5 示例:在末尾新增一个可空列avg_amount

ALTERMATERIALIZEDTABLEmy_materialized_tableASSELECTuser_id,COUNT(*)ASevent_count,SUM(amount)AStotal_amount,AVG(amount)ASavg_amountFROMkafka_catalog.db1.eventsWHEREevent_type='purchase'GROUPBYuser_id;

4. DROP MATERIALIZED TABLE:删除物化表

4.1 语法

DROPMATERIALIZEDTABLE[IFEXISTS][catalog_name.][database_name.]table_name;

4.2 行为说明

4.2.1 先删除后台刷新 pipeline
4.2.2 再从 Catalog 删除物化表元数据

4.3 示例

DROPMATERIALIZEDTABLEIFEXISTSmy_materialized_table;

5. 典型创建示例(连续刷新 vs 定时刷新)

5.1 示例一:FRESHNESS=10 秒 → 推断 CONTINUOUS(Streaming 增量刷新)

CREATEMATERIALIZEDTABLEmy_materialized_table_continuous PARTITIONEDBY(ds)WITH('format'='debezium-json','partition.fields.ds.date-formatter'='yyyy-MM-dd')FRESHNESS=INTERVAL'10'SECONDASSELECTk.ds,k.user_id,COUNT(*)ASevent_count,SUM(k.amount)AStotal_amount,MAX(u.age)ASmax_ageFROMkafka_catalog.db1.kafka_table kJOINuser_catalog.db1.user_table uONk.user_id=u.user_idWHEREk.event_type='purchase'GROUPBYk.ds,k.user_id;

注意:date-formatter 仅 FULL 生效;这里写了也不会影响 CONTINUOUS 的分区覆盖逻辑,但不影响语法演示。

5.2 示例二:FRESHNESS=1 小时 → 推断 FULL(定时批刷新,覆盖写入)

CREATEMATERIALIZEDTABLEmy_materialized_table_full PARTITIONEDBY(ds)WITH('format'='json','partition.fields.ds.date-formatter'='yyyy-MM-dd')FRESHNESS=INTERVAL'1'HOURASSELECTp.ds,p.product_id,p.product_name,AVG(s.sale_price)ASavg_sale_price,SUM(s.quantity)AStotal_quantityFROMpaimon_catalog.db1.product_table pLEFTJOINpaimon_catalog.db1.sales_table sONp.product_id=s.product_idWHEREp.category='electronics'GROUPBYp.ds,p.product_id,p.product_name;

6. 生产落地避坑清单(强烈建议收藏)

6.1 分区字段一定要在AS SELECT输出中,否则建表会失败或逻辑不成立
6.2 FULL 模式想“只刷最新分区”,必须同时满足:

  • PARTITIONED BY (ds)
  • WITH ('partition.fields.ds.date-formatter'='yyyy-MM-dd')
  • ds为 string 类型分区字段
    6.3 FULL 模式 freshness 不是随便写:只支持指定的秒/分/时/天间隔(否则无法转 cron)
    6.4 CONTINUOUS 模式 freshness 越小 checkpoint 越频繁,可能明显影响性能
    6.5ALTER ... AS在 CONTINUOUS 模式会重启作业且不继承 state:
  • 可能短暂重复/丢失
  • 上线修改建议配合灰度、或先切 FULL 过渡
    6.6 schema evolution 目前只支持“末尾追加可空列”,别指望随意改列类型/顺序/删除列
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/24 14:19:10

WeiboImageReverse:一键追溯微博图片原始发布者的终极解决方案

在社交媒体时代&#xff0c;你是否经常遇到这样的困扰&#xff1a;看到一张精彩的微博图片&#xff0c;却找不到原始发布者&#xff1f;发现有人盗用你的原创图片&#xff0c;却无法快速锁定侵权者&#xff1f;这些问题现在有了完美的解决方案。WeiboImageReverse是一个专为微博…

作者头像 李华
网站建设 2026/2/13 16:09:40

DeTikZify:3步将手绘草图秒变专业LaTeX图表

DeTikZify&#xff1a;3步将手绘草图秒变专业LaTeX图表 【免费下载链接】DeTikZify Synthesizing Graphics Programs for Scientific Figures and Sketches with TikZ 项目地址: https://gitcode.com/gh_mirrors/de/DeTikZify 还在为科研绘图耗费大量时间&#xff1f;De…

作者头像 李华
网站建设 2026/2/24 19:08:56

利用SMBus实现电池电量监测:完整示例

如何用SMBus精准读取电池电量&#xff1f;从协议到代码的实战解析你有没有遇到过这样的情况&#xff1a;手机显示还有30%电量&#xff0c;刚拿起打个电话&#xff0c;突然“啪”一下关机了&#xff1f;或者在开发一款便携设备时&#xff0c;发现电压法估算的剩余电量总是在负载…

作者头像 李华
网站建设 2026/2/25 11:17:08

KeymouseGo:彻底告别重复工作的智能自动化神器

KeymouseGo&#xff1a;彻底告别重复工作的智能自动化神器 【免费下载链接】KeymouseGo 类似按键精灵的鼠标键盘录制和自动化操作 模拟点击和键入 | automate mouse clicks and keyboard input 项目地址: https://gitcode.com/gh_mirrors/ke/KeymouseGo 还在为每天重复的…

作者头像 李华
网站建设 2026/2/25 17:02:53

如何快速掌握通达信数据解析:量化投资高效数据处理的完整指南

如何快速掌握通达信数据解析&#xff1a;量化投资高效数据处理的完整指南 【免费下载链接】mootdx 通达信数据读取的一个简便使用封装 项目地址: https://gitcode.com/GitHub_Trending/mo/mootdx 在量化投资领域&#xff0c;数据获取与处理往往是策略实现的第一道门槛。…

作者头像 李华
网站建设 2026/2/22 19:40:16

KeymouseGo终极教程:免费自动化工具完整使用指南

还在为重复性的鼠标键盘操作烦恼吗&#xff1f;KeymouseGo这款开源免费的自动化工具&#xff0c;正是你需要的解决方案&#xff01;它能够记录并回放你的所有操作&#xff0c;让你从单调的工作中彻底解放出来。&#x1f680; 【免费下载链接】KeymouseGo 类似按键精灵的鼠标键盘…

作者头像 李华