news 2026/3/5 3:44:11

Flink 2.2 从本地 Standalone 到 Docker/Kubernetes,把 Hive 批流打通,并在 SQL 里接入 OpenAI 推理

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink 2.2 从本地 Standalone 到 Docker/Kubernetes,把 Hive 批流打通,并在 SQL 里接入 OpenAI 推理

1. 先把“部署积木”和两种模式讲清楚

一个 Flink 集群永远绕不开这些角色:

  • Client:把你的程序/SQL 编译成 JobGraph 并提交
  • JobManager:调度与协调
  • TaskManager:真正跑算子的执行进程
  • 可选外部组件:HA(ZooKeeper 或 K8s HA)、文件系统(checkpoint/savepoint)、指标系统等

而部署时最关键的分歧点是两种运行模式

  • Session Mode:先起一个长驻集群,多个作业共享资源(省集群启动成本,但隔离性弱)
  • Application Mode:一个应用一个集群,JM 上直接执行 main()(隔离更好,应用级生命周期更清晰)

这两个模式在 Standalone、Docker、K8s 上都成立,只是启动方式不同。

2. Java 版本怎么选:别让“能跑”变成“踩坑”

Flink 的 Java 支持大体是:

  • Java 11:1.10 起支持
  • Java 17:Flink 2.0 起默认推荐(官方镜像也默认)
  • Java 21:2.0 起实验性支持

从 Java 16 开始的JDK 模块化(Project Jigsaw)会影响反射(比如 Kryo 序列化 UDF / 数据类型),如果你的 UDF/类型触碰到 JDK 内部类,可能需要在env.java.opts.all里追加--add-opens/--add-exports,注意“不要删默认配置,只能追加”。

另外,文档里也明确提示:Hive connector / HBase 1.x connector 在 Java 11/17/21 下属于“未测试特性”。建议生产上:要么严格做压测验证,要么把 Hive 相关链路尽量走“你能控制的版本组合”。

3. Standalone:最快跑起来的方式(也最“原始”)

Standalone 的定位很直白:在操作系统上起进程,资源回收、失败拉起主要靠你自己。

3.1 Session Mode(最常见本地方式)

# 启动./bin/start-cluster.sh# 提交示例作业./bin/flink run ./examples/streaming/TopSpeedWindowing.jar# 停止./bin/stop-cluster.sh

默认 Web UI:http://localhost:8081

3.2 Application Mode(把应用“塞进”JM)

核心脚本:bin/standalone-job.sh

常用姿势 1:把 jar 放进lib/,JM 启动时直接识别 classpath:

cp./examples/streaming/TopSpeedWindowing.jar lib/ ./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.TopSpeedWindowing# 需要再起 TM./bin/taskmanager.sh start

常用姿势 2:用--jars让 Flink 拉取/挂载制品(适合制品统一管理)。

3.3 Standalone HA(ZooKeeper)

你需要:

  • high-availability.type: zookeeper
  • high-availability.zookeeper.quorum
  • high-availability.storageDir(通常是 HDFS/S3 等)
  • conf/masters配多个 JM(含 web ui 端口),实现 standby

3.4 调试与日志

  • 本地日志目录:logs/
  • 需要更细:把conf/log4j.properties里 rootLogger 提到 DEBUG

4. Docker:把 Standalone 装进容器,环境立刻可复制

4.1 Session Cluster(最短路径)

核心点:先建 network,让 JM/TM 能互相解析;并设置jobmanager.rpc.address

FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"dockernetwork create flink-networkdockerrun --rm --name=jobmanager --network flink-network -p8081:8081\--envFLINK_PROPERTIES="${FLINK_PROPERTIES}"flink:2.2.0-scala_2.12 jobmanagerdockerrun --rm --name=taskmanager --network flink-network\--envFLINK_PROPERTIES="${FLINK_PROPERTIES}"flink:2.2.0-scala_2.12 taskmanager

之后你用本机 Flink 分发包提交 job 即可。

4.2 Docker Compose(推荐)

Compose 的价值:配置、扩缩容、依赖关系一次性固化。你还可以加一个sql-client容器,把 SQL 提交也容器化。

重要提醒:如果要用 Kafka/Hive 等 connector,要把 connector jar 放进镜像/opt/flink/lib(通常自建镜像最稳)。文档也提示:SQL 里的ADD JAR对“宿主机文件”并不好使,因为容器看到的是 overlay fs。

4.3 插件、配置与 jemalloc

  • FLINK_PROPERTIES环境变量覆盖 config
  • ENABLE_BUILT_IN_PLUGINS启用内置插件(例如 S3)
  • jemalloc 默认启用;如果想回退 glibc,可DISABLE_JEMALLOC=true(遇到 savepoint/checkpoint 内存碎片问题时有意义)

5. Kubernetes(Standalone on K8s):把“进程”变成“资源对象”

这条路是:用 K8s 的 Deployment/Service/ConfigMap 把 Standalone 集群拼出来。文档也建议新用户优先考虑Native KubernetesFlink Kubernetes Operator,因为生命周期管理更舒服,但 Standalone on K8s 依然很适合“先跑通/先迁移”。

5.1 准备工作:集群要能用

  • kubectl get nodes确认 kubelet 就绪
  • 本地可用 minikube
    minikube 有个常见坑:需要把docker0设成 promisc,否则 Flink 组件可能“引用不到自己”。 (Apache Nightlies)

5.2 Session 集群(最典型):3 个组件

一个 Session 集群至少包含:

  • JobManager 的 Deployment
  • TaskManager 的 Deployment(一个池子)
  • 暴露 JM REST/UI 的 Service (Apache Nightlies)

你按顺序创建(配置与 service → deployments),再 port-forward 就能进 UI、提交作业。 (Apache Nightlies)

访问方式也很灵活:

  • kubectl port-forward(最常用)
  • kubectl proxy(走 apiserver 代理)
  • NodePort(把 REST 端口暴露为节点端口) (Apache Nightlies)

5.3 Application 集群:一个应用一个集群

Application 模式下,JM 往往用Job(或特定资源定义)启动,TM 仍然是 Deployment。作业制品一般三种方式提供:

  • 挂载 volume 到/opt/flink/usrlib
  • 自建镜像把 jar bake 进去
  • --jars指向 DFS/HTTP(S)

5.4 Reactive 模式:让并行度跟着资源走

Reactive Mode 的核心就是在 config 里启用scheduler-mode: reactive,然后通过扩缩 TaskManager 副本数触发作业自动调并行度;也可以结合 HPA 做自动伸缩。 (Apache Nightlies)

5.5 HA:用 Kubernetes HA Services(更像云原生的 HA)

要点:

  • high-availability.type: kubernetes
  • high-availability.storageDir(仍然需要外部持久化存储)
  • JobManager Pod 往往需要用Pod IP作为 rpc address
  • 需要带权限的ServiceAccount(能创建/修改/删除 ConfigMaps) (Apache Nightlies)

更快恢复:把 JM replicas 设大于 1,起 standby。

5.6 让恢复更快:本地恢复 + StatefulSet(见第 6 节)

当你把 TM 做成 StatefulSet,并给它挂 PV,配合 deterministic 的taskmanager.resource-id,可以做到“Pod 重启后仍能用同一块盘做本地恢复”,恢复速度差别非常明显。

6. Working Directory:Flink 的“可恢复本地工作台”

Working Directory(FLIP-198)可以理解为:JM/TM 的本地持久工作目录,用来存储“能在进程重启后复用的东西”。

目录结构:

  • JM:<base>/jm_<JM_RESOURCE_ID>
  • TM:<base>/tm_<TM_RESOURCE_ID>(Apache Nightlies)

配置项:

  • process.working-dir(通用 base)
  • process.jobmanager.working-dir/process.taskmanager.working-dir(分别指定)
  • jobmanager.resource-id/taskmanager.resource-id(不指定就随机)

会放哪些东西?

  • BlobServer/BlobCache 的 blobs
  • 启用state.backend.local-recovery时的本地 state
  • RocksDB 工作目录等 (Apache Nightlies)

本地恢复跨重启(FLIP-201)的关键条件

要实现“TM 挂了重启还能本地恢复”,必须同时满足:

  • state.backend.local-recovery: true
  • taskmanager.resource-id必须确定(不能每次随机)
  • 进程重启后还能访问同一块 working dir 所在 volume (Apache Nightlies)

在 K8s 上最顺手的组合就是:StatefulSet + PV +taskmanager.resource-id = PodName

7. Hive Read & Write:用 HiveCatalog 把批流读写统一起来

用 HiveCatalog 后,Flink 可以:

  • 批模式:读“提交那一刻”的表快照(bounded)
  • 流模式:持续监控分区/文件增量读取(unbounded)

7.1 Streaming 读 Hive 的关键参数(你最容易踩的地方)

  • streaming-source.enable:打开流式读
    注意:每个 partition/file 必须“原子写入”,否则会读到不完整数据

  • 分区表:

    • 可以监控新分区并增量读
    • streaming-source.partition.include = latest可用于“只追最新分区”的时间维表场景(配合 temporal join)
  • 非分区表:监控目录新文件增量读
    要求:新文件必须原子写入目标目录

性能与稳定性提示:

  • 分区太多会导致扫描开销大(监控策略是扫目录/文件)
  • streaming 读 Hive 表在 Flink DDL 中不支持 watermark 语法,因此不能直接用于窗口算子

7.2 读 Hive View 的限制

  • 必须把 HiveCatalog 设为 current catalog 才能查 view
  • view 的 SQL 要兼容 Flink 语法(关键词/字面量差异常见)

7.3 读性能:向量化、并行度推断、split 调优

  • ORC/Parquet + 无复杂类型 → 可向量化读取(默认开)

  • Source 并行度可按文件/split 动态推断

  • split 调优:

    • table.exec.hive.split-max-size(默认 128MB)
    • table.exec.hive.file-open-cost(默认 4MB,小文件多时很关键)
    • 分区太多导致 size 统计慢:用table.exec.hive.calculate-partition-size.thread-num提速(当前仅 ORC 生效)

7.4 写 Hive:Batch vs Streaming

Batch:

  • INSERT INTO追加
  • INSERT OVERWRITE覆盖(表或分区)
  • 只有作业结束才“可见”

Streaming:

  • 持续写入并增量提交,让数据逐步可见
  • 不支持 streaming overwrite
  • 常见做法:Kafka → Hive 分区表 + partition-commit

典型示例(按 dt/hr 分区提交):

SETtable.sql-dialect=hive;CREATETABLEhive_table(user_id STRING,order_amountDOUBLE)PARTITIONEDBY(dt STRING,hr STRING)STOREDASparquet TBLPROPERTIES('partition.time-extractor.timestamp-pattern'='$dt $hr:00:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file');SETtable.sql-dialect=default;CREATETABLEkafka_table(user_id STRING,order_amountDOUBLE,log_tsTIMESTAMP(3),WATERMARKFORlog_tsASlog_ts-INTERVAL'5'SECOND)WITH(...);INSERTINTOTABLEhive_tableSELECTuser_id,order_amount,DATE_FORMAT(log_ts,'yyyy-MM-dd'),DATE_FORMAT(log_ts,'HH')FROMkafka_table;

如果用的是TIMESTAMP_LTZ并且按 partition-time 提交,记得配sink.partition-commit.watermark-time-zone(否则可能延后几个小时才提交)。

S3 上 Exactly-once:

  • 默认只支持 rename committer(S3 不友好)
  • 可以把table.exec.hive.fallback-mapred-writer=false,让 sink 用 Flink native writer(仅 parquet/orc)

动态分区写入:

  • 默认会按动态分区列额外排序,减少 writer 数量,避免 OOM
  • 如果关掉排序(table.exec.hive.sink.sort-by-dynamic-partition.enable=false),要警惕“同一节点分区过多”导致 OOM
  • 批模式下可以用DISTRIBUTED BY/SORTED BY辅助

8. Hive Functions:HiveModule + 原生聚合加速

8.1 HiveModule:把 Hive 内置函数直接带进 Flink

Stringname="myhive";Stringversion="2.3.4";tableEnv.loadModule(name,newHiveModule(version));

注意:旧版 Hive 某些内置函数有线程安全问题,生产建议自行打补丁。

8.2 原生 Hive 聚合函数(hash agg 更快)

如果 HiveModule 优先级高于 CoreModule,Flink 会先用 Hive 内置函数。问题是 Hive 内置聚合在 Flink 里通常只能走 sort-based aggregation。
从 Flink 1.17 起引入了native hive aggregation(hash-based),目前支持sum/count/avg/min/max,通过:

  • table.exec.hive.native-agg-function.enabled = true

能明显提升聚合性能。

限制也要认清:

  • 能力与 Hive 不完全对齐(部分类型不支持)
  • SqlClient 里目前不能 per-job 开,只能 module 级先开再 load(未来会修)

8.3 复用 Hive UDF(UDF/GenericUDF/UDTF/UDAF…)

Flink 会在 plan/execute 时自动把 Hive 的 UDF 映射成 Flink 对应的 Function 类型。前提条件:

  • 当前 catalog 是 backed by Hive Metastore(包含该函数)
  • 包含 UDF 的 jar 在 Flink classpath 中

9. Flink SQL 调 OpenAI:把推理接进数据管道

你给的 “OpenAI Model Function” 属于典型的“边处理边推理”能力,适合:

  • 文本分类(情感、主题、风险标签)
  • 抽取结构化字段(输出 json)
  • Embedding 向量化(召回/聚类/相似度)

9.1 Chat Completions 示例:情感分类

CREATEMODEL ai_analyze_sentiment INPUT(`input`STRING)OUTPUT(`content`STRING)WITH('provider'='openai','endpoint'='https://api.openai.com/v1/chat/completions','api-key'='<YOUR KEY>','model'='gpt-3.5-turbo','system-prompt'='Classify the text below into one of the following labels: [positive, negative, neutral, mixed]. Output only the label.');INSERTINTOprint_sinkSELECTid,movie_name,contentaspredicit_label,actual_labelFROMML_PREDICT(TABLEmovie_comment,MODEL ai_analyze_sentiment,DESCRIPTOR(user_comment));

9.2 生产上你必须配置的三类选项

  • 成本与上下文控制:

    • max-context-size+context-overflow-action(截断/跳过/记录日志)
  • 稳定性:

    • error-handling-strategy(RETRY/FAILOVER/IGNORE)
    • retry-num+retry-fallback-strategy
  • 输出结构:

    • response-format = 'json_object'(做结构化更稳)

如果你把IGNORE打开,还可以把失败信息通过 metadata 列带回流里(error-string/http-status-code/http-headers-map),做“坏样本旁路 + 可观测”。

10. 一份很实用的落地 Checklist

  • 部署模式选型:多作业共享资源 → Session;强隔离/一应用一集群 → Application

  • Java:优先 Java 17;涉及 Hive connector 必做压测;模块化--add-opens只追加不删

  • Hive Streaming 读:保证文件/分区原子可见;分区过多注意 metastore 压力与扫描开销

  • Hive 维表 Join:

    • “最新分区维表” →streaming-source.partition.include=latest+ 合理 monitor-interval
    • “整表缓存维表” → lookup join cache TTL,确保 TM slot 内存装得下
  • Working Directory + Local Recovery:

    • 本地盘要稳定可复用(K8s 用 PV)
    • taskmanager.resource-id必须确定(StatefulSet 用 PodName 最顺)
  • K8s HA:high-availability.type=kubernetes+ storageDir + ServiceAccount 权限

  • 推理(OpenAI):强烈建议先在离线/低流量链路压测吞吐与失败策略;把失败信息回写流里,别直接“静默丢”

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/28 9:00:49

计算机Java毕设实战-基于springboot的小区居民社区健康管理系统社区健康管理系统【完整源码+LW+部署说明+演示视频,全bao一条龙等】

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华
网站建设 2026/3/4 18:10:50

亲测好用10个AI论文软件,助继续教育学生轻松写论文!

亲测好用10个AI论文软件&#xff0c;助继续教育学生轻松写论文&#xff01; AI 工具如何让论文写作更高效 在当今快节奏的学习环境中&#xff0c;继续教育学生面临着论文写作的多重挑战。无论是选题、大纲搭建&#xff0c;还是初稿撰写和降重处理&#xff0c;都需要耗费大量时…

作者头像 李华
网站建设 2026/3/4 22:42:56

8768756

867238

作者头像 李华
网站建设 2026/3/3 10:47:06

计算机Java毕设实战-基于springboot的无人机销售系统的设计与实现【完整源码+LW+部署说明+演示视频,全bao一条龙等】

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华
网站建设 2026/3/1 16:55:16

Java毕设项目推荐-基于 SpringBoot 的社区智慧养老监护管理平台系统设计与实现基于springboot的社区独居老人健康管理系统【附源码+文档,调试定制服务】

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华