news 2026/6/10 1:11:22

Flink CLI 从提交作业到 Savepoint/Checkpoint、再到 YARN/K8S 与 PyFlink

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink CLI 从提交作业到 Savepoint/Checkpoint、再到 YARN/K8S 与 PyFlink

1. CLI 的工作方式:它连接谁?

bin/flink会连接到 Flink 配置文件里指定的 JobManager(或你在命令里用--jobmanager host:port指定的 JM)。
前提:必须有一个可用的 Flink 部署(本地、YARN、K8S、Standalone Session 等)。

2. 提交作业(run):最常用的入口

2.1 提交一个 JAR(推荐加 --detached)

./bin/flink run\--detached\./examples/streaming/StateMachineExample.jar
  • --detached:提交完就返回,不会一直挂在终端等作业结束
  • 输出里会给出JobID(后续 list/savepoint/stop/cancel 都靠它)

把 JobID 存变量,后续更方便:

exportJOB_ID="cca7bc1061d61cf15238e92312c2fc20"

2.2 用 -D 传递配置(发布时非常关键)

run支持-D传递额外配置,例如:

./bin/flink run --detached\-Dpipeline.max-parallelism=120\./your-job.jar

这个能力对Application Mode特别重要:你可以不改flink-conf.yaml,直接在提交时把内存、并发、checkpoint 等配置传进集群。

注意:提交到已存在的 Session 集群时,一般只支持执行相关参数(execution config)生效,别指望所有参数都能“改动集群级别行为”。

3. 监控作业(list):查运行中/排队中

./bin/flink list

它会列出:

  • Running/Restarting Jobs(运行中/重启中)
  • Scheduled Jobs(已提交但尚未启动)

实战习惯:提交后第一件事 list 一下,确认 Job 状态不是立刻 FAILED/RESTARTING。

4. Savepoint:可控的“状态快照”,用于迁移/升级/回滚

4.1 创建 Savepoint

./bin/flink savepoint\$JOB_ID\/tmp/flink-savepoints
  • savepoint 目录可选:如果execution.checkpointing.savepoint-dir没配置,就必须在命令里带上
  • 成功后会返回一个 savepoint 路径(后续--fromSavepoint用它)

4.2 Savepoint 触发超时怎么办?用 detached

状态大时,客户端等待 savepoint 完成可能超时(TimeoutException)。解决方式是“触发后立刻返回”:

./bin/flink savepoint\$JOB_ID\/tmp/flink-savepoints\-detached

这会返回一个 triggerId,之后可以通过 REST API 查询该 trigger 的完成状态(CLI 文档也建议这么做)。

4.3 Dispose Savepoint:删除 savepoint 数据与元信息

./bin/flink savepoint\--dispose\/tmp/flink-savepoints/savepoint-xxx\$JOB_ID

注意一个坑:如果你的状态里有自定义 state/自定义类(尤其 RocksDB state),dispose 时可能需要提供原作业 jar,否则会 ClassNotFound:

./bin/flink savepoint\--dispose<savepointPath>\--jarfile<jarFile>

5. 手动触发 Checkpoint:更偏“运维诊断/临时保底”

./bin/flink checkpoint$JOB_ID

如果你的作业默认跑的是 incremental checkpoint,但你想强制做一次 full checkpoint:

./bin/flink checkpoint$JOB_ID--full

Checkpoint 和 Savepoint 的关键差异(实战理解版):

  • Checkpoint:系统为容错自动做(也可手动触发),更偏“持续容错”
  • Savepoint:人为控制,用于“迁移/升级/回滚/停止再启动”

6. 停作业:stop vs cancel(一个优雅,一个粗暴)

6.1 stop:优雅停止并创建最终 Savepoint(强烈推荐用于可恢复停机)

./bin/flink stop\--savepointPath /tmp/flink-savepoints\$JOB_ID

stop 的语义是“从 source 到 sink”平滑停:

  • 让 source 发最后一次 barrier,生成 savepoint
  • savepoint 成功后,source 调用 cancel() 结束

如果你要“彻底停机并清空事件时间相关的等待”,可以加--drain

./bin/flink stop\--savepointPath /tmp/flink-savepoints\--drain\$JOB_ID

--drain会发送 MAX_WATERMARK,触发 event-time timer(比如窗口)把“该出结果的都出完”。
注意:想将来从 savepoint 恢复继续跑,通常不要 drain,否则可能引入恢复后的语义问题。

6.2 cancel:直接取消(不保证状态一致性/不做最终保存)

./bin/flink cancel$JOB_ID

文档里提到--withSavepoint在 cancel 时顺便做 savepoint 这个功能已 deprecated:生产建议用 stop 来做“取消 + 最终 savepoint”。

7. 从 Savepoint 启动作业:升级/迁移的核心套路

./bin/flink run\--detached\--fromSavepoint /tmp/flink-savepoints/savepoint-xxx\./your-job.jar

如果你的新版本作业删掉了某些算子,导致 savepoint 里有“无法恢复的状态”,但你仍想启动,可以加:

./bin/flink run\--fromSavepoint<savepointPath>\--allowNonRestoredState\...

这是“兼容演进”常用开关,但它也意味着你明确接受丢弃某些旧状态。

8. CLI Actions 速查表(你每天会用到的)

  • run:提交并运行作业(JAR/PyFlink)
  • info:打印优化后的执行图(排查 SQL/Plan 很有用)
  • list:列出运行/排队作业
  • savepoint:触发/清理 savepoint
  • checkpoint:手动触发 checkpoint(含 full)
  • stop:优雅停止并生成最终 savepoint
  • cancel:直接取消

帮助命令:

./bin/flink --help ./bin/flink<action>--help

9. 选择部署目标:–target 一把梭(Session / Application)

--target会覆盖execution.target的配置。

常见组合:

YARN:

./bin/flink run --target yarn-session... ./bin/flink run --target yarn-application...

Kubernetes:

./bin/flink run --target kubernetes-session... ./bin/flink run --target kubernetes-application...

Standalone:

./bin/flink run --targetlocal... ./bin/flink run --target remote...

理解建议:

  • session:提交到已存在集群(共享 JM/TM)
  • application:提交时起一个专属集群(更适合隔离、参数化、CI/CD)

10. PyFlink 提交:不用 jar,但要管 Python 环境与依赖

10.1 基础运行

./bin/flink run --python examples/python/table/word_count.py

先确认 Python 版本 ≥ 3.9:

python --version

10.2 带依赖文件(–pyFiles)

./bin/flink run\--python your_job.py\--pyFiles file:///user.txt,hdfs:///path/username.txt

--pyFiles会加到 PYTHONPATH(客户端与远端 python worker 都能用)。

10.3 Python 里引用 Java UDF 或外部 connector(–jarfile)

./bin/flink run\--python your_job.py\--jarfile your-udf-or-connector.jar

10.4 用模块方式提交(–pyModule)

./bin/flink run\--pyModule word_count\--pyFiles examples/python/table

10.5 YARN application 模式跑 PyFlink(典型生产形态)

你可以通过-D把 JM/TM 内存、应用名、ship-files 等都带上,还能指定 venv、python 可执行文件:

./bin/flink run -t yarn-application\-Djobmanager.memory.process.size=1024m\-Dtaskmanager.memory.process.size=1024m\-Dyarn.application.name=<ApplicationName>\-Dyarn.ship-files=/path/to/shipfiles\-pyarch shipfiles/venv.zip\-pyclientexec venv.zip/venv/bin/python3\-pyexec venv.zip/venv/bin/python3\-pyfs shipfiles\-pym word_count

一个现实限制:-pyarch通过 blob server 分发,单个归档文件大小上限 2GB,超过要放到分布式文件系统再引用。

10.6 PyFlink 相关参数速记

  • --python/-py:入口脚本
  • --pyModule/-pym:入口模块(通常配合--pyFiles
  • --pyFiles/-pyfs:代码/资源文件(zip/whl/目录都行)
  • --pyArchives/-pyarch:归档(比如 venv、数据包)
  • --pyClientExecutable:提交端 python
  • --pyExecutable:远端 worker python
  • --pyRequirements:requirements.txt + 可选离线包目录
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/8 13:49:24

50系显卡安装pytorch

参考链接&#xff1a;深入解析&#xff1a;RTX5060 Ti显卡安装cuda版本PyTorch踩坑记录 - yxysuanfa - 博客园 5060 显卡安装cuda版本PyTorch踩坑记录 问题如下&#xff1a; TX5060 Ti显卡安装cuda版本PyTorch踩坑记录 问题如下&#xff1a; NVIDIA GeForce RTX 5060 Ti wit…

作者头像 李华
网站建设 2026/6/5 19:43:14

《突破边界!Power BI在大数据网络分析中的应用》

突破边界&#xff01;Power BI在大数据网络分析中的应用 一、引言&#xff1a;当网络数据爆炸遇到传统分析的瓶颈 1.1 网络数据的“野蛮生长” 如今&#xff0c;我们正处于一个万物互联的时代&#xff1a;企业有数千台网络设备&#xff08;路由器、交换机、防火墙&#xff0…

作者头像 李华
网站建设 2026/6/9 18:55:11

巴菲特的创新能力评估:分布式创新网络的价值创造

巴菲特的创新能力评估&#xff1a;分布式创新网络的价值创造 关键词&#xff1a;巴菲特、创新能力评估、分布式创新网络、价值创造、投资策略 摘要&#xff1a;本文旨在深入探讨巴菲特的创新能力&#xff0c;并结合分布式创新网络的概念&#xff0c;分析其在价值创造方面的作用…

作者头像 李华
网站建设 2026/6/9 1:35:22

esm.sh路径遍历漏洞深度解析:CVE-2026-23644技术细节与修复方案

CVE-2026-23644&#xff1a;esm-dev esm.sh中的路径遍历漏洞 威胁概况 严重性&#xff1a;高 类型&#xff1a;漏洞 CVE编号&#xff1a;CVE-2026-23644 漏洞描述 esm.sh是一个用于Web开发的无构建内容分发网络(CDN)。在Go伪版本0.0.0-20260116051925-c62ab83c589e之前&…

作者头像 李华
网站建设 2026/6/5 19:32:13

C++中的策略模式变体

1、非修改序列算法 这些算法不会改变它们所操作的容器中的元素。 1.1 find 和 find_if find(begin, end, value)&#xff1a;查找第一个等于 value 的元素&#xff0c;返回迭代器&#xff08;未找到返回 end&#xff09;。find_if(begin, end, predicate)&#xff1a;查找第…

作者头像 李华
网站建设 2026/6/5 19:36:04

Pcdmis海克斯康三坐标脱机软件2013至2021 CAD++全功能 远程包安装

Pcdmis海克斯康三坐标脱机软件2013至2021 CAD全功能 远程包安装&#xff0c;送学习资料搞三坐标测量的兄弟们应该对Pcdmis不陌生&#xff0c;特别是2013到2021这几个版本&#xff0c;CAD模块简直是逆向工程的神器。今天咱们就唠唠怎么用脚本整活儿&#xff0c;配合远程包安装搞…

作者头像 李华