news 2026/3/30 17:11:51

DataGen Connector本地造数神器(不用 Kafka 也能把 Pipeline 跑起来)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
DataGen Connector本地造数神器(不用 Kafka 也能把 Pipeline 跑起来)

1、它到底做了什么

  • Source 并行运行:有多少个 source 并发子任务,就把Long的序列切成多少段(sub-sequence)
  • 你提供一个GeneratorFunction<Long, OUT>:把输入的 index(Long)映射成任意事件类型
  • 每个 subtask 内部有序,但全局顺序取决于并行度(parallelism)

一句话:Flink 负责发 index,你负责把 index 变成事件。

2、最小可跑示例:生成 0~999 的字符串

importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassDataGenDemo{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();GeneratorFunction<Long,String>generator=index->"Number: "+index;longnumberOfRecords=1000;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,numberOfRecords,Types.STRING);DataStreamSource<String>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"Generator Source");stream.print();env.execute("datagen-demo");}}

要点:

  • 并行度为 1 时输出是严格"Number: 0""Number: 999"顺序
  • 并行度 > 1 时:每个 subtask 内部仍然按序,但不同 subtask 的结果交织输出

3、限速:控制总吞吐(全局每秒不超过 N 条)

importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.connector.datagen.source.RateLimiterStrategy;GeneratorFunction<Long,String>generator=index->"Number: "+index;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,Long.MAX_VALUE,RateLimiterStrategy.perSecond(100),// 全部 source subtasks 加起来 <= 100 条/sTypes.STRING);

适用场景:

  • 你想模拟“上游流量”但又不想把本机打爆
  • 做算子性能对比、Backpressure 观察、checkpoint 行为观察

4、有界/无界:它“永远是 bounded”,但可以“看起来无界”

  • 语义上永远是 bounded(理论上会结束)
  • numberOfRecords = Long.MAX_VALUE基本等同“不会结束”(实践上像 unbounded)

建议:

  • 要跑有限数据:考虑 BATCH mode,更贴近离线回放
  • 要模拟持续输入:用Long.MAX_VALUE+ rate limit

5、容错语义:at-least-once / end-to-end exactly-once 能不能保证?

可以,但有个硬条件:

  • GeneratorFunction必须对输入 index 完全确定性
    也就是:同一个 index 永远生成同样的输出。

反例(会破坏确定性):

  • random()System.currentTimeMillis()、读外部可变配置、读网络请求结果

正确做法:

  • 用 index 推导数据(例如 hash(index) 生成用户、金额、状态)
  • 或者用固定 seed 的伪随机:new Random(index)(每个 index 固定)

6、Watermark:也可以在 Source 侧发“确定性水位线”

默认例子用noWatermarks(),但你完全可以:

  • 在生成事件里带 eventTime
  • 配合自定义WatermarkStrategy生成 deterministic watermarks
    适合做 event-time 窗口、乱序、迟到数据的测试演示。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/28 6:51:05

你以为日期选择器很简单?我刚进团队就被它狠狠干了一周

我最近加入了一个新团队。那种“成熟到可怕”的 Design System 团队&#xff1a;Figma 命名规矩、代码语义清晰、会议都有议程——你甚至能在日历里看到“讨论结束时间”。 但我第一次见识到他们的“当下大麻烦”&#xff0c;不是在什么战情室&#xff0c;也不是在发布事故复盘…

作者头像 李华
网站建设 2026/3/27 11:08:42

漫谈人机协同中的人机功能分配

在人机协同的分工逻辑中&#xff0c;“人杂机复”与“人道机术”是两种互补且有深度的视角&#xff0c;分别从任务属性和职能定位两个维度&#xff0c;揭示了人类与机器在协同中的核心优势与边界。两者结合&#xff0c;为人机协同的高效实现提供了完整的理论框架。一、基于任务…

作者头像 李华
网站建设 2026/3/22 4:04:54

美国战争部AI加速战略的核心就是人机环境系统智能

美国战争部近期启动的AI加速战略&#xff0c;表面上是推动军事AI技术的快速部署与领先&#xff0c;但其深层逻辑可归结为以“人机环境系统智能”为核心&#xff0c;通过重构人&#xff08;军事人员&#xff09;、机&#xff08;AI技术&#xff09;、环境&#xff08;任务场景&a…

作者头像 李华
网站建设 2026/3/25 20:40:28

2026黄金戒指怎么选?推荐这7款:款式多样,佩戴舒适!

"2026年黄金戒指怎么选&#xff1f;从材质、款式到舒适度&#xff0c;精选7大品牌推荐。足金999保值又时尚&#xff0c;活口设计贴合手指&#xff0c;简约百搭或精致雕花&#xff0c;周六福是正品吗总有一款让你爱不释手。"话说回来&#xff0c;想在2026年挑个合适的…

作者头像 李华
网站建设 2026/3/28 7:01:34

HoRain云--JavaScript Switch语句详解与最佳实践

&#x1f3ac; HoRain云小助手&#xff1a;个人主页 &#x1f525; 个人专栏: 《Linux 系列教程》《c语言教程》 ⛺️生活的理想&#xff0c;就是为了理想的生活! ⛳️ 推荐 前些天发现了一个超棒的服务器购买网站&#xff0c;性价比超高&#xff0c;大内存超划算&#xff01;…

作者头像 李华
网站建设 2026/3/27 17:35:14

自建软件仓库

一&#xff0c;目的&#xff1a;为了把网络软件仓库中的软件下载下来后分享给本地主机&#xff0c;或者直接把网上下载下来的rpm加入到软件仓库中&#xff0c;更高效便捷。搭建一个本地 HTTP 软件仓库&#xff0c;把 Docker 相关的 RPM 包下载下来存到仓库里&#xff0c;这样局…

作者头像 李华