news 2026/3/7 22:54:25

从零开始学Flink:数据输出的终极指南

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
从零开始学Flink:数据输出的终极指南

实时数据处理的完整链路中,数据输出(Sink)是最后一个关键环节,它负责将处理后的结果传递到外部系统供后续使用。Flink提供了丰富的数据输出连接器,支持将数据写入Kafka、Elasticsearch、文件系统、数据库等各种目标系统。本文将深入探讨Flink数据输出的核心概念、配置方法和最佳实践,并基于Flink 1.20.1构建一个完整的数据输出案例。

一、Flink Sink概述

1. 什么是Sink

Sink(接收器)是Flink数据处理流水线的末端,负责将计算结果输出到外部存储系统或下游处理系统。在Flink的编程模型中,Sink是DataStream API中的一个转换操作,它接收DataStream并将数据写入指定的外部系统。

2. Sink的分类

Flink的Sink连接器可以分为以下几类:

内置Sink:如print()、printToErr()等用于调试的内置输出

文件系统Sink:支持写入本地文件系统、HDFS等

消息队列Sink:如Kafka、RabbitMQ等

数据库Sink:如JDBC、Elasticsearch等

自定义Sink:通过实现SinkFunction接口自定义输出逻辑

3. 输出语义保证

Flink为Sink提供了三种输出语义保证:

最多一次(At-most-once):数据可能丢失,但不会重复

至少一次(At-least-once):数据不会丢失,但可能重复

精确一次(Exactly-once):数据既不会丢失,也不会重复

这些语义保证与Flink的检查点(Checkpoint)机制密切相关,我们将在后面详细讨论。

二、环境准备与依赖配置

1. 版本说明

Flink:1.20.1

JDK:17+

Gradle:8.3+

外部系统:Kafka 3.4.0、Elasticsearch 7.17.0、MySQL 8.0

2. 核心依赖

dependencies {

// Flink核心依赖

implementation 'org.apache.flink:flink_core:1.20.1'

implementation 'org.apache.flink:flink-streaming-java:1.20.1'

implementation 'org.apache.flink:flink-clients:1.20.1'

// Kafka Connector

implementation 'org.apache.flink:flink-connector-kafka:3.4.0-1.20'

// Elasticsearch Connector

implementation 'org.apache.flink:flink-connector-elasticsearch7:3.1.0-1.20'

// JDBC Connector

implementation 'org.apache.flink:flink-connector-jdbc:3.3.0-1.20'

implementation 'mysql:mysql-connector-java:8.0.33'

// FileSystem Connector

implementation 'org.apache.flink:flink-connector-files:1.20.1'

}

三、基础Sink操作

1. 内置调试Sink

Flink提供了一些内置的Sink用于开发和调试阶段:

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class BasicSinkDemo {

public static void main(String[] args) throws Exception {

// 创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 创建数据源

DataStream<String> stream = env.fromElements("Hello", "Flink", "Sink");

// 打印到标准输出

stream.print("StandardOutput");

// 打印到标准错误输出

stream.printToErr("ErrorOutput");

// 执行作业

env.execute("Basic Sink Demo");

}

}

2. 文件系统Sink

Flink支持将数据写入本地文件系统、HDFS等。下面是一个写入本地文件系统的示例:

package com.cn.daimajiangxin.flink.sink;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;

import org.apache.flink.configuration.MemorySize;

import org.apache.flink.connector.file.sink.FileSink;

import org.apache.flink.core.fs.Path;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class FileSystemSinkDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Object> stream = env.fromData("Hello", "Flink", "FileSystem", "Sink");

RollingPolicy<Object, String> rollingPolicy = DefaultRollingPolicy.<Object, String>builder()

.withRolloverInterval(Duration.ofMinutes(15))

.withInactivityInterval(Duration.ofMinutes(5))

.withMaxPartSize(MemorySize.ofMebiBytes(64))

.build();

// 创建文件系统Sink

FileSink<Object> sink = FileSink

.forRowFormat(new Path("file:///tmp/flink-output"), new SimpleStringEncoder<>())

.withRollingPolicy(rollingPolicy)

.build();

// 添加Sink

stream.sinkTo(sink);

env.execute("File System Sink Demo");

}

}

四、高级Sink连接器

1. Kafka Sink

Kafka是实时数据处理中常用的消息队列,Flink提供了强大的Kafka Sink支持:

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;

import org.apache.flink.connector.kafka.sink.KafkaSink;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class KafkaSinkDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 开启检查点以支持Exactly-Once语义

env.enableCheckpointing(5000);

DataStream<String> stream = env.fromElements("Hello Kafka", "Flink to Kafka", "Data Pipeline");

// Kafka配置

Properties props = new Properties();

props.setProperty("bootstrap.servers", "localhost:9092");

// 创建Kafka Sink

KafkaSink<String> sink = KafkaSink.<String>

builder()

.setKafkaProducerConfig(props)

.setRecordSerializer(KafkaRecordSerializationSchema.builder()

.setTopic("flink-output-topic")

.setValueSerializationSchema(new SimpleStringSchema())

.build())

.build();

// 添加Sink

stream.sinkTo(sink);

env.execute("Kafka Sink Demo");

}

}

kafka消息队列消息:

20250929104749

2. Elasticsearch Sink

Elasticsearch是一个实时的分布式搜索和分析引擎,非常适合存储和查询Flink处理的实时数据:

package com.cn.daimajiangxin.flink.sink;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;

import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.http.HttpHost;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.client.Requests;

import java.util.Map;

public class ElasticsearchSinkDemo {

private static final ObjectMapper objectMapper = new ObjectMapper();

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000);

DataStream<String> stream = env.fromData(

"{\"id\":\"1\",\"name\":\"Flink\",\"category\":\"framework\"}",

"{\"id\":\"2\",\"name\":\"Elasticsearch\",\"category\":\"database\"}");

// 配置Elasticsearch节点

HttpHost httpHost=new HttpHost("localhost", 9200, "http");

// 创建Elasticsearch Sink

ElasticsearchSink<String> sink=new Elasticsearch7SinkBuilder<String>()

.setBulkFlushMaxActions(10) // 批量操作数量

.setBulkFlushInterval(5000) // 批量刷新间隔(毫秒)

.setHosts(httpHost)

.setConnectionRequestTimeout(60000) // 连接请求超时时间

.setConnectionTimeout(60000) // 连接超时时间

.setSocketTimeout(60000) // Socket 超时时间

.setEmitter((element, context, indexer) -> {

try {

Map<String, Object> json = objectMapper.readValue(element, Map.class);

IndexRequest request = Requests.indexRequest()

.index("flink_documents")

.id((String) json.get("id"))

.source(json);

indexer.add(request);

} catch (Exception e) {

// 处理解析异常

System.err.println("Failed to parse JSON: " + element);

}

})

.build();

// 添加Sink

stream.sinkTo(sink);

env.execute("Elasticsearch Sink Demo");

}

}

使用post工具查看数据

wechat_2025-09-29_180718_279

3. JDBC Sink

使用JDBC Sink可以将数据写入各种关系型数据库:

package com.cn.daimajiangxin.flink.sink;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;

import org.apache.flink.connector.jdbc.JdbcExecutionOptions;

import org.apache.flink.connector.jdbc.JdbcStatementBuilder;

import org.apache.flink.connector.jdbc.core.datastream.Jdbc;

import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;

import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

import java.util.List;

public class JdbcSinkDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000);

List<User> userList = Arrays.asList( new User(1, "Alice", 25,"alice"),

new User(2, "Bob", 30,"bob"),

new User(3, "Charlie", 35,"charlie"));

// 模拟用户数据

DataStream<User> userStream = env.fromData(userList);

JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()

.withBatchSize(1000)

.withBatchIntervalMs(200)

.withMaxRetries(5)

.build();

JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://localhost:3306/test")

.withDriverName("com.mysql.cj.jdbc.Driver")

.withUsername("username")

.withPassword("password")

.build();

String insertSql = "INSERT INTO user (id, name, age, user_name) VALUES (?, ?, ?, ?)";

JdbcStatementBuilder<User> statementBuilder = (statement, user) -> {

statement.setInt(1, user.getId());

statement.setString(2, user.getName());

statement.setInt(3, user.getAge());

statement.setString(4, user.getUserName());

};

// 创建JDBC Sink

JdbcSink<User> jdbcSink = new Jdbc().<User>sinkBuilder()

.withQueryStatement( new SimpleJdbcQueryStatement<User>(insertSql,statementBuilder))

.withExecutionOptions(jdbcExecutionOptions)

.buildAtLeastOnce(connectionOptions);

// 添加Sink

userStream.sinkTo(jdbcSink);

env.execute("JDBC Sink Demo");

}

// 用户实体类

public static class User {

private int id;

private String name;

private String userName;

private int age;

public User(int id, String name, int age,String userName) {

this.id = id;

this.name = name;

this.age = age;

this.userName=userName;

}

public int getId() {

return id;

}

public String getName() {

return name;

}

public int getAge() {

return age;

}

public String getUserName() {

return userName;

}

}

}

登录mysql客户端查看数据

20250930113343

五、Sink的可靠性保证机制

1. 检查点与保存点

Flink的检查点(Checkpoint)机制是实现精确一次语义的基础。当开启检查点后,Flink会定期将作业的状态保存到持久化存储中。如果作业失败,Flink可以从最近的检查点恢复,确保数据不会丢失。

// 配置检查点

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 启用检查点,间隔5000ms

env.enableCheckpointing(5000);

// 配置检查点模式为EXACTLY_ONCE(默认)

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 设置检查点超时时间

env.getCheckpointConfig().setCheckpointTimeout(60000);

// 设置最大并行检查点数量

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 开启外部化检查点,作业失败时保留检查点

env.getCheckpointConfig().enableExternalizedCheckpoints(

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2. 事务与二阶段提交

对于支持事务的外部系统,Flink使用二阶段提交(Two-Phase Commit)协议来实现精确一次语义:

第一阶段(预提交):Flink将数据写入外部系统的预提交区域,但不提交

第二阶段(提交):所有算子完成预提交后,Flink通知外部系统提交数据

这种机制确保了即使在作业失败或恢复的情况下,数据也不会被重复写入或丢失。

3. 不同Sink的语义保证级别

不同的Sink连接器支持不同级别的语义保证:

支持精确一次(Exactly-once):Kafka、Elasticsearch(版本支持)、文件系统(预写日志模式)

支持至少一次(At-least-once):JDBC、Redis、RabbitMQ

最多一次(At-most-once):简单的无状态输出

六、自定义Sink实现

当Flink内置的Sink连接器不能满足需求时,我们可以通过实现SinkFunction接口来自定义Sink:

package com.cn.daimajiangxin.flink.sink;

import org.apache.flink.api.common.functions.RuntimeContext;

import org.apache.flink.api.connector.sink2.Sink;

import org.apache.flink.api.connector.sink2.SinkWriter;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.io.IOException;

public class CustomSinkDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> stream = env.fromElements("Custom", "Sink", "Example");

// 使用自定义Sink

stream.sinkTo(new CustomSink());

env.execute("Custom Sink Demo");

}

// 自定义Sink实现 - 使用新API

public static class CustomSink implements Sink<String> {

@Override

public SinkWriter<String> createWriter(InitContext context) {

return new CustomSinkWriter();

}

// SinkWriter负责实际的数据写入逻辑

private static class CustomSinkWriter implements SinkWriter<String> {

// 初始化资源

public CustomSinkWriter() {

// 初始化连接、客户端等资源

System.out.println("CustomSink initialized");

}

// 处理每个元素

@Override

public void write(String value, Context context) throws IOException, InterruptedException {

// 实际的写入逻辑

System.out.println("Writing to custom sink: " + value);

}

// 刷新缓冲区

@Override

public void flush(boolean endOfInput) {

// 刷新逻辑(如果需要)

}

// 清理资源

@Override

public void close() throws Exception {

// 关闭连接、客户端等资源

System.out.println("CustomSink closed");

}

}

}

}

sad20251006111134

七、实战案例:实时数据处理流水线

下面我们将构建一个完整的实时数据处理流水线,从Kafka读取数据,进行转换处理,然后输出到多个目标系统:

1. 系统架构

Kafka Source -> Flink Processing -> Multiple Sinks

|-> Kafka Sink

|-> Elasticsearch Sink

|-> JDBC Sink

2. 数据模型

我们将使用日志数据模型,定义一个LogEntry类来表示日志条目:

package com.cn.daimajiangxin.flink.sink;

public class LogEntry {

private String timestamp;

private String logLevel;

private String source;

private String message;

public String getTimestamp() {

return timestamp;

}

public void setTimestamp(String timestamp) {

this.timestamp = timestamp;

}

public String getLogLevel() {

return logLevel;

}

public void setLogLevel(String logLevel) {

this.logLevel = logLevel;

}

public String getSource() {

return source;

}

public void setSource(String source) {

this.source = source;

}

public String getMessage() {

return message;

}

public void setMessage(String message) {

this.message = message;

}

@Override

public String toString() {

return String.format("LogEntry{timestamp='%s', logLevel='%s', source='%s', message='%s'}",

timestamp, logLevel, source, message);

}

}

定义一个日志统计实体类LogStats,用于表示每个源的日志统计信息:

package com.cn.daimajiangxin.flink.sink;

public class LogStats {

private String source;

private long count;

public LogStats() {

}

public LogStats(String source, long count) {

this.source = source;

this.count = count;

}

public String getSource() {

return source;

}

public void setSource(String source) {

this.source = source;

}

public long getCount() {

return count;

}

public void setCount(long count) {

this.count = count;

}

@Override

public String toString() {

return String.format("LogStats{source='%s', count=%d}", source, count);

}

}

3. 完整实现代码

package com.cn.daimajiangxin.flink.sink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;

import org.apache.flink.connector.jdbc.JdbcExecutionOptions;

import org.apache.flink.connector.jdbc.JdbcStatementBuilder;

import org.apache.flink.connector.jdbc.core.datastream.Jdbc;

import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;

import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;

import org.apache.flink.connector.kafka.sink.KafkaSink;

import org.apache.flink.connector.kafka.source.KafkaSource;

import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;

import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;

import org.apache.http.HttpHost;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.client.Requests;

import java.sql.PreparedStatement;

import java.time.LocalDateTime;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

public class MultiSinkPipeline {

public static void main(String[] args) throws Exception {

// 1. 创建执行环境并配置检查点

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000);

// 2. 创建Kafka Source

KafkaSource<String> source = KafkaSource.<String>

builder()

.setBootstrapServers("localhost:9092")

.setTopics("logs-input-topic")

.setGroupId("flink-group")

.setStartingOffsets(OffsetsInitializer.earliest())

.setValueOnlyDeserializer(new SimpleStringSchema())

.build();

// 3. 读取数据并解析

DataStream<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

// 解析日志数据

DataStream<LogEntry> logStream = kafkaStream

.map(line -> {

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/6 15:20:27

10 个被严重低估的 JS 特性,直接少写 500 行代码

一、Set: 数组去重 快速查找&#xff0c;比 filter 快3倍 提到数组去重&#xff0c;很多第一反应是 filter indexOf&#xff0c;但是这种写法的时间复杂度是O(n)&#xff0c;而 Set 天生支持 “唯一值”&#xff0c;查找速度是 O(1)&#xff0c;还能直接转数组。 示例&…

作者头像 李华
网站建设 2026/3/4 11:31:29

0难度搞懂算法备案材料

算法备案的材料其实就那么几大类&#xff0c;但每份都得写得实打实&#xff0c;经得起审核推敲。下面我把所有涉及到的材料一个个拆开说清楚&#xff0c;包括系统里在线填报的部分和需要上传的附件&#xff0c;基于最新系统要求&#xff08;到2025年底还没大变&#xff09;。 整…

作者头像 李华
网站建设 2026/3/2 11:45:58

【光照】[PBR][环境光]实现方法解析

环境光实现流程 环境光在基于物理的渲染(PBR)中主要通过以下流程实现&#xff1a; ‌环境贴图采样‌&#xff1a;获取周围环境的辐照度 ‌漫反射计算‌&#xff1a;处理非金属材质的漫反射部分 ‌镜面反射计算‌&#xff1a;处理金属和高光的反射部分 ‌环境光遮蔽‌&#…

作者头像 李华
网站建设 2026/3/3 19:49:12

sguard_limit:彻底解决腾讯游戏卡顿的终极资源限制方案

还在为游戏关键时刻的突然卡顿而懊恼不已吗&#xff1f;ACE-Guard反作弊系统虽然保障了游戏环境的安全&#xff0c;但其过度的资源占用却成为了游戏流畅体验的主要障碍。本文将为你详细介绍sguard_limit项目的完整使用指南&#xff0c;让你轻松告别游戏卡顿&#xff0c;享受真正…

作者头像 李华
网站建设 2026/3/7 18:37:40

基于IA-32/x86-64架构的内存管理策略

为什么64位系统不适用二级页表&#xff1f;补充 - PAE技术PAE是什么&#xff1f;PAE&#xff08;页地址扩展&#xff09;是一种允许32位处理器访问超过4GB物理内存的技术。在传统的32位系统中&#xff0c;由于地址总线宽度为32位&#xff0c;因此最大可寻址的内存空间为4GB。PA…

作者头像 李华