news 2026/4/15 20:53:57

Flink Procedures 用 SQL 的 `CALL` 跑 Flink Job(实现、类型推断、命名参数、Catalog 集成一篇搞懂)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink Procedures 用 SQL 的 `CALL` 跑 Flink Job(实现、类型推断、命名参数、Catalog 集成一篇搞懂)

1. Procedures 是什么,适合做什么

Procedure 可以理解为:SQL 世界里的“存储过程”,但执行体可以启动 Flink 作业

典型用途

  • 管理类:生成测试数据、重建/维护某些资源、触发后台作业
  • 数据操作类:一键跑一个数据准备/清洗/校验 Job,并把结果以表的形式返回
  • 平台化:把一堆“运维脚本/管理逻辑”收敛到 Catalog 中,让用户统一用 SQLCALL调用

2. 实现规则:必须实现Procedure接口 + 定义call(...)

2.1 类要求

  • 实现org.apache.flink.table.procedures.Procedure
  • 类必须public、非抽象、全局可访问
  • 不能是匿名类、非 static 内部类

2.2call(...)方法要求(最关键)

接口本身不定义方法,你需要自己定义名为call的方法:

硬性规则

  • call必须public

  • 第一个参数必须是ProcedureContext

    • context.getExecutionEnvironment()拿到StreamExecutionEnvironment
  • 返回类型必须是数组int[]String[]Row[]Long[]

而且 JVM 普通重载规则都适用

  • 支持重载:call(ctx, int)/call(ctx, String)
  • 支持 varargs:call(ctx, Integer...)
  • 支持继承入参:call(ctx, Object)

如果你用 Scala

  • varargs 需要加scala.annotation.varargs
  • 建议用装箱类型(java.lang.Integer而不是Int)以支持 NULL

3. 一个最小可用的 Procedure 示例:生成序列

下面这个示例展示了:Procedure 拿到StreamExecutionEnvironment,用fromSequence跑一个小 Job,然后把结果收集为数组返回。

importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.util.CloseableIterator;publicclassGenerateSequenceProcedureimplementsProcedure{publiclong[]call(ProcedureContextcontext,intn)throwsException{returngenerate(context.getExecutionEnvironment(),n);}publiclong[]call(ProcedureContextcontext,Stringn)throwsException{returngenerate(context.getExecutionEnvironment(),Integer.parseInt(n));}privatelong[]generate(StreamExecutionEnvironmentenv,intn)throwsException{long[]sequenceN=newlong[n];inti=0;try(CloseableIterator<Long>it=env.fromSequence(0,n-1).executeAndCollect()){while(it.hasNext()){sequenceN[i++]=it.next();}}returnsequenceN;}}

要点

  • call可以重载
  • context.getExecutionEnvironment()获取执行环境
  • 结果必须是数组

4. 类型推断 Type Inference:为什么你有时必须加注解

Flink Table/SQL 是强类型生态,因此 Procedure 的参数/返回值需要映射成 Flink DataType。

Flink 会通过反射自动推断(Automatic Type Inference),但在以下情况经常需要你“补注解”

  • 小数精度/scale(DECIMAL)
  • 嵌套 Row 类型(ROW<…>)
  • RAW/自定义序列化对象
  • 一个call想吃多种输入类型、但希望统一输出类型(用@ProcedureHint

4.1@DataTypeHint:给参数或返回值补充类型信息

注意:call的返回值必须是T[],如果你给返回值加@DataTypeHint,其实标注的是数组元素类型 T

importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.annotation.InputGroup;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.types.Row;importjava.math.BigDecimal;importjava.nio.ByteBuffer;importjava.time.Instant;publicclassOverloadedProcedureimplementsProcedure{publicLong[]call(ProcedureContextcontext,longa,longb){returnnewLong[]{a+b};}public@DataTypeHint("DECIMAL(12, 3)")BigDecimal[]call(ProcedureContextcontext,doublea,doubleb){returnnewBigDecimal[]{BigDecimal.valueOf(a+b)};}@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")publicRow[]call(ProcedureContextcontext,inti){returnnewRow[]{Row.of(String.valueOf(i),Instant.ofEpochSecond(i))};}@DataTypeHint(value="RAW",bridgedTo=ByteBuffer.class)publicByteBuffer[]call(ProcedureContextcontext,@DataTypeHint(inputGroup=InputGroup.ANY)Objecto){returnnewByteBuffer[]{MyUtils.serializeToByteBuffer(o)};}}

4.2@ProcedureHint:把“输入类型 -> 输出类型”的映射说清楚

适合场景

  • 一个call想统一处理多输入类型(例如Object...
  • 多个重载call的输出类型一致,想全局声明一次
importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.annotation.ProcedureHint;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.types.Row;@ProcedureHint(output=@DataTypeHint("ROW<s STRING, i INT>"))publicclassSumProcedureimplementsProcedure{publicRow[]call(ProcedureContextcontext,inta,intb){returnnewRow[]{Row.of("Sum",a+b)};}publicRow[]call(ProcedureContextcontext){returnnewRow[]{Row.of("Empty args",-1)};}}

更极端的用法:完全用 hint 决定类型,call只要 JVM 能调用即可。

5. 命名参数 Named Parameters:让 CALL 更可读,也能省参数

调用 procedure 时可以用“命名参数”,好处

  • 不怕参数顺序写错
  • 可选参数可省略(默认补null
  • 可读性强(适合平台化)

通过@ArgumentHint标注参数名、类型、是否可选。

参数级别标注示例

importorg.apache.flink.table.annotation.ArgumentHint;importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;publicclassNamedParameterProcedureimplementsProcedure{public@DataTypeHint("INT")Integer[]call(ProcedureContextcontext,@ArgumentHint(name="a",isOptional=true)Integera,@ArgumentHint(name="b")Integerb){returnnewInteger[]{a+(b==null?0:b)};}}

重要限制

  • 命名参数仅在没有重载、没有可变参数(varargs)时生效
  • @ArgumentHint已包含@DataTypeHint,在某些组合场景下不能混用(按文档要求)

6. 把 Procedure 放进 Catalog:getProcedure+listProcedures

Procedure 必须存在于 Catalog 才能被CALL

你需要在 Catalog 中实现:

  • Catalog.getProcedure(ObjectPath procedurePath):返回 procedure 实例
  • Catalog.listProcedures(String dbName):列出该库下有哪些 procedure

示例(内存 catalog 内置 procedure)

importorg.apache.flink.table.catalog.GenericInMemoryCatalog;importorg.apache.flink.table.catalog.ObjectPath;importorg.apache.flink.table.catalog.exceptions.CatalogException;importorg.apache.flink.table.catalog.exceptions.DatabaseNotExistException;importorg.apache.flink.table.catalog.exceptions.ProcedureNotExistException;importorg.apache.flink.table.procedures.Procedure;importjava.util.*;importjava.util.stream.Collectors;publicclassCatalogWithBuiltInProcedureextendsGenericInMemoryCatalog{privatestaticfinalMap<ObjectPath,Procedure>PROCEDURE_MAP=newHashMap<>();static{PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"),newGenerateSequenceProcedure());}publicCatalogWithBuiltInProcedure(Stringname){super(name);}@OverridepublicList<String>listProcedures(StringdbName)throwsDatabaseNotExistException,CatalogException{if(!databaseExists(dbName)){thrownewDatabaseNotExistException(getName(),dbName);}returnPROCEDURE_MAP.keySet().stream().filter(p->p.getDatabaseName().equals(dbName)).map(ObjectPath::getObjectName).collect(Collectors.toList());}@OverridepublicProceduregetProcedure(ObjectPathprocedurePath)throwsProcedureNotExistException,CatalogException{Procedurep=PROCEDURE_MAP.get(procedurePath);if(p!=null){returnp;}thrownewProcedureNotExistException(getName(),procedurePath);}}

7. SQL 调用:CALL catalog.db.proc(args...)

注册 Catalog 后就能调用:

TableEnvironmenttEnv=TableEnvironment.create(...);tEnv.registerCatalog("my_catalog",newCatalogWithBuiltInProcedure("my_catalog"));// 调用tEnv.executeSql("CALL my_catalog.`system`.generate_n(5)");

SQL 侧一般就是

  • CALL my_catalog.\system.generate_n(5)
  • 或者用命名参数(如果你的 procedure 支持且没有重载/varargs)

8. 实战建议:什么时候用 Procedure,什么时候别用

推荐用 Procedure

  • 平台里做“管理命令”:一键生成数据、触发离线/流式任务、数据质量检查
  • 把复杂逻辑隐藏在 Procedure 里,让用户只写CALL ...

不太推荐(或要谨慎)

  • 你只是想做查询内的行级/集合级变换:那是 UDF/PTF 的领域
  • Procedure 内部启动长周期作业时,要考虑资源、权限、隔离和可观测性(日志/指标/审计)
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/10 7:11:34

【前端学习AI】大模型调用实战

本地部署&#xff1a;基于Ollama调用开源大模型 Ollama 是轻量级本地大模型运行框架&#xff0c;无需依赖云端服务&#xff0c;可快速部署通义千问、Llama 等开源大模型&#xff0c;特别适合无网络环境或隐私敏感场景。 步骤1&#xff1a;安装Ollama 从官方网站下载并安装&a…

作者头像 李华
网站建设 2026/4/15 13:36:39

LeetCode 3075.幸福值最大化的选择方案:排序

【LetMeFly】3075.幸福值最大化的选择方案&#xff1a;排序 力扣题目链接&#xff1a;https://leetcode.cn/problems/maximize-happiness-of-selected-children/ 给你一个长度为 n 的数组 happiness &#xff0c;以及一个 正整数 k 。 n 个孩子站成一队&#xff0c;其中第 i…

作者头像 李华
网站建设 2026/4/13 9:51:23

Open-AutoGLM 2.0实战指南:从零到部署的完整路径,节省200+开发工时

第一章&#xff1a;Open-AutoGLM 2.0实战指南&#xff1a;从零到部署的完整路径&#xff0c;节省200开发工时 环境准备与依赖安装 在开始使用 Open-AutoGLM 2.0 前&#xff0c;确保系统已安装 Python 3.9 及 pip 包管理工具。推荐使用虚拟环境以隔离项目依赖。 创建虚拟环境&…

作者头像 李华
网站建设 2026/4/15 19:27:54

(独家解读)智谱Open-AutoGLM论文中的7个创新点,99%的人还没注意到

第一章&#xff1a;智谱Open-AutoGLM论文的核心贡献概述智谱AI发布的Open-AutoGLM论文提出了一种面向中文场景自动化的大型语言模型&#xff08;LLM&#xff09;应用框架&#xff0c;旨在降低大模型在实际任务中的使用门槛。该框架通过引入任务感知的提示工程与自动化微调机制&…

作者头像 李华
网站建设 2026/4/15 18:38:18

16、Windows Azure 存储客户端开发与认证详解

Windows Azure 存储客户端开发与认证详解 在使用 Windows Azure 存储服务时,理解如何通过 REST API 进行操作以及如何构建一个简单的存储客户端是非常重要的。下面将详细介绍相关的关键概念和操作步骤。 1. 基本概念 URL :URL 用于标识你想要获取的资源。在 Windows Azur…

作者头像 李华
网站建设 2026/4/15 18:37:24

18、Windows Azure Blob 存储服务全解析

Windows Azure Blob 存储服务全解析 在云计算时代,存储服务是至关重要的基础设施之一。Windows Azure Blob 存储服务提供了强大且灵活的存储解决方案,下面将详细介绍其定价、数据模型、使用注意事项、API 及客户端库的使用,以及容器的相关操作。 1. 定价策略 Windows Azu…

作者头像 李华