news 2026/4/18 8:32:23

PyFlink JAR、Python 包、requirements、虚拟环境、模型文件,远程集群怎么一次搞定?

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
PyFlink JAR、Python 包、requirements、虚拟环境、模型文件,远程集群怎么一次搞定?

1. 先记住一条总原则:混用 DataStream + Table 时,用 DataStream API 配依赖

文档强调了一句非常关键的话:

如果一个 Job 里混用了 Python DataStream API 和 Python Table API,建议通过 DataStream API去指定依赖,这样两边都能生效。

也就是:

  • 纯 Table:table_env.get_config()/table_env.add_python_*
  • 混用:优先StreamExecutionEnvironmentadd_jars / add_python_file / set_python_requirements / add_python_archive / set_python_executable

2. JAR 依赖:pipeline.jars vs pipeline.classpaths vs add_jars vs add_classpaths

2.1 Table API 方式

A)pipeline.jars:上传到集群(最常用)

  • 只能file://本地路径
  • 会把 JAR上传到集群
table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

Windows 示例(注意还是 file:///):

table_env.get_config().set("pipeline.jars","file:///E:/my/jar/path/connector.jar;file:///E:/my/jar/path/json.jar")

B)pipeline.classpaths:不上传,只加到 classpath(要求集群也能访问同路径)

  • 你必须保证 client、cluster 都能访问这些 URL(比如共享盘、同目录、分发好了)
table_env.get_config().set("pipeline.classpaths","file:///opt/flink/jars/connector.jar;file:///opt/flink/jars/json.jar")

一句话:

  • 你不想折腾分发:用pipeline.jars
  • 你已经把 jar 管理好并且集群路径一致:用pipeline.classpaths

2.2 DataStream API 方式(混用场景首选)

A)add_jars(...):上传到集群

env.add_jars("file:///my/jar/path/connector1.jar","file:///my/jar/path/connector2.jar")

B)add_classpaths(...):加到 client + cluster classpath(同样要求可达)

env.add_classpaths("file:///opt/flink/jars/connector1.jar","file:///opt/flink/jars/connector2.jar")

2.3 提交参数--jarfile的限制

  • 只支持一个 jar,所以多个依赖通常要求你自己打fat jar / uber jar

3. Python 依赖:三种层级(文件/目录、requirements、归档环境)

3.1 python.files / add_python_file:带“代码/包”到 PYTHONPATH

适合:

  • 你的 UDF 写在my_udf.py
  • 或者你有一坨自研包目录my_pkg/
  • 或者你打好了*.whl / *.egg(zip 本质)

Table API:

table_env.add_python_file("/path/to/my_udf.py")table_env.add_python_file("/path/to/my_pkg/")# 目录也可以

DataStream API:

env.add_python_file("/path/to/my_udf.py")env.add_python_file("/path/to/my_pkg/")

等价的还有:

  • 配置python.files
  • 提交参数-pyfs / --pyFiles

关键点:这些会被加到Python UDF worker 的 PYTHONPATH

3.2 requirements.txt / set_python_requirements:让集群 pip 安装第三方依赖

适合:

  • numpy/pandas/requests/sklearn 这种 pip 依赖
  • 你希望 Flink 在 worker 上自动安装

Table API:

table_env.set_python_requirements(requirements_file_path="/path/to/requirements.txt",requirements_cache_dir="cached_dir"# 可选)

DataStream API:

env.set_python_requirements(requirements_file_path="/path/to/requirements.txt",requirements_cache_dir="cached_dir")

离线安装(集群没网)怎么做?

文档给了关键命令:

pip download -d cached_dir -r requirements.txt --no-binary :all:

然后把这个cached_dir作为requirements_cache_dir传进去,Flink 会上传它用于离线安装。

硬要求(很容易忽视):

  • pip >= 20.3
  • setuptools >= 37.0.0
  • cached_dir 里的包必须匹配集群平台与 Python 版本(比如 manylinux、glibc、cp310/cp311)

等价的还有:

  • 配置python.requirements
  • 提交参数-pyreq / --pyRequirements

3.3 python.archives / add_python_archive:带“环境/数据/模型文件”并自动解压

适合:

  • 你要带模型、词典、数据文件
  • 你要带一个完整虚拟环境(venv/conda 打包)

Table API:

table_env.add_python_archive("/path/to/py_env.zip","myenv")

DataStream API:

env.add_python_archive("/path/to/py_env.zip","myenv")

在 UDF 里访问(相对路径):

defmy_udf():withopen("myenv/py_env/data/data.txt")asf:...

如果没写 target_dir:

table_env.add_python_archive("/path/to/py_env.zip")# UDF 内访问:open("py_env.zip/py_env/data/data.txt")

支持格式:

  • zip 系(zip/jar/whl/egg…)
  • tar 系(tar/tar.gz/tgz…)

注意:如果 archive 里是虚拟环境,一定要和集群平台一致。

4. Python 解释器:worker 端与 client 端是两回事

这是很多人线上翻车的根本原因:
client 侧需要 Python 来解析/编译 UDF;cluster 侧 worker 需要 Python 来执行 UDF。

4.1 worker 端 Python:python.executable / set_python_executable

Table API:

table_env.get_config().set_python_executable("/path/to/python")

DataStream API:

env.set_python_executable("/path/to/python")

解释器放在 archive 里(推荐“自带环境”打法)

env.add_python_archive("/path/to/py_env.zip","venv")env.set_python_executable("venv/py_env/bin/python")

注意:如果指向 archive 内路径,用相对路径,别写绝对路径。

等价方式:

  • 配置python.executable
  • 提交参数-pyexec / --pyExecutable

4.2 client 端 Python:python.client.executable / --pyClientExecutable

client 端用于“编译阶段解析 Python UDF”。你可以:

  • 直接激活你本地的 venv:source my_env/bin/activate
  • 或通过配置/参数指定:python.client.executable/-pyclientexec/PYFLINK_CLIENT_EXECUTABLE

5. 在 Java/SQL 里用 Python UDF:依赖还是走 Python 那套配置

你可以在 Java Table API 或纯 SQL 里注册 Python UDF,例如:

tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python");

但 Python 依赖依然要通过这些配置/参数提供:

  • python.files
  • python.archives
  • python.requirements
  • python.executable
  • python.client.executable

6. 一套工程化推荐组合(拿去就能用)

场景A:集群能上网(最省事)

  • JAR:pipeline.jarsenv.add_jars
  • Python:set_python_requirements(requirements.txt)
  • 自研代码:add_python_file(my_udf.py / my_pkg/)
  • 模型文件:add_python_archive(model.zip, "model")

场景B:集群没网(企业内网最常见)

  • requirements 离线缓存:pip download -d cached_dir -r requirements.txt
  • 代码依赖:add_python_file(...)
  • 模型/数据:add_python_archive(...)
  • 如果集群 Python 环境不可控:直接把 venv 打包进 archive,再set_python_executable("venv/.../python")

场景C:你要“零环境依赖”的可移植作业(最稳)

  • 把 venv 打包成 zip(或 conda pack)
  • add_python_archive(venv.zip, "venv")
  • set_python_executable("venv/.../python")
  • 第三方包不再走 requirements(除非你愿意让它再装一遍)

7. 最常见的坑清单(提前避雷)

  • 混用 Table + DataStream,你只在 Table 侧配了依赖,DataStream 侧 UDF 找不到包
    → 混用就统一用StreamExecutionEnvironment

  • pipeline.classpaths指向本地路径,集群节点根本访问不到
    → 不确定就用pipeline.jars / add_jars上传

  • 离线 cached_dir 的 wheel/源码包与集群平台/Python 版本不匹配
    → 必须按集群环境构建缓存(Linux x86_64 + cp310/cp311)

  • worker 端 Python 和 client 端 Python 版本不一致,Arrow/Pandas 相关依赖经常爆炸
    → 统一版本,或者用 archive 自带解释器

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/17 13:53:48

基于Python的商场停车管理系统的设计与实现_szvoh5b2

前言基于Python的商场停车管理系统是一个集车位管理、车辆识别、计费收费、数据统计与用户服务于一体的智能化平台,通过物联网、计算机视觉与数据库技术,实现停车场的高效运营与用户体验优化。一、项目介绍 开发语言:Python python框架&#…

作者头像 李华
网站建设 2026/4/17 23:45:23

springboot文理医院预约挂号系统(11672)

有需要的同学,源代码和配套文档领取,加文章最下方的名片哦 一、项目演示 项目演示视频 二、资料介绍 完整源代码(前后端源代码SQL脚本)配套文档(LWPPT开题报告)远程调试控屏包运行 三、技术介绍 Java…

作者头像 李华
网站建设 2026/4/18 3:01:59

Nodejs+vue大学生二手电子数码产品交易平台设计与实现 _39qu9

文章目录系统设计背景技术架构核心功能模块安全与风控措施创新点与价值--nodejs技术栈--结论源码文档获取/同行可拿货,招校园代理 :文章底部获取博主联系方式!系统设计背景 随着高校学生电子设备更新速度加快,二手电子数码产品交易需求显著增…

作者头像 李华
网站建设 2026/4/16 15:49:21

宠物行为AI实时分析,兽医误诊率砍半

📝 博客主页:Jax的CSDN主页 宠物行为AI实时分析:兽医误诊率砍半的创新实践目录宠物行为AI实时分析:兽医误诊率砍半的创新实践 引言:宠物行为诊断的隐性危机 一、技术原理:LLM如何破解行为诊断困局 核心技术…

作者头像 李华
网站建设 2026/4/17 15:08:58

输入聚食的人数,预算和口味偏好,自动推荐附近的餐厅,还能生成聚餐菜单。

智能聚餐规划系统1. 项目概述实际应用场景在朋友、同事或家庭聚会时,组织一次成功的聚餐需要考虑多个因素:人数、预算、不同人的口味偏好、餐厅选择、菜品搭配等。传统方式需要人工查询、比较、协调,过程繁琐且容易遗漏重要信息。痛点分析- 信…

作者头像 李华
网站建设 2026/4/15 9:02:33

AI健身动作识别:预置运动实体模型,APP快速集成

AI健身动作识别:预置运动实体模型,APP快速集成 引言:让健身APP拥有专业教练的眼睛 想象一下这样的场景:用户在家跟着健身APP做深蹲时,手机摄像头能像专业教练一样实时指出"膝盖内扣了""下蹲幅度不够&…

作者头像 李华