news 2026/5/12 5:02:10

Flink实时数据写入Redis实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink实时数据写入Redis实战

这段代码展示了如何使用 Flink 将数据流写入 Redis,核心是通过RedisSinkRedisMapper实现数据的映射和存储。通过这种方式,可以方便地将实时处理的结果存储到 Redis 中,供后续查询或分析使用。

这段代码的主要功能是将用户点击事件流写入 Redis 的哈希表中,方便后续查询和分析。

  • 适用场景:
    • 实时记录用户点击行为。
    • 将 Flink 处理后的结果存储到 Redis 中,供其他系统使用。

代码结构

  • 包名:package sink
    表示这个类属于sink包。
  • 导入的依赖:
    • org.apache.flink.streaming.api.scala._: Flink 的 Scala API。
    • org.apache.flink.streaming.connectors.redis.RedisSink: Flink 提供的 Redis Sink 连接器。
    • org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig: Redis 连接池的配置类。
    • org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}: Redis 映射相关的类。
    • source.ClickSource: 自定义的数据源,用于生成模拟的点击事件流。
package sink import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} import source.ClickSource /** * * @PROJECT_NAME: flink1.13 * @PACKAGE_NAME: sink * @author: 赵嘉盟-HONOR * @data: 2023-11-20 14:53 * @DESCRIPTION * */ object sinkToRedis { def main(args: Array[String]): Unit = { val env=StreamExecutionEnvironment.getExecutionEnvironment val data = env.addSource(new ClickSource) val conf=new FlinkJedisPoolConfig.Builder().setHost("").build() data.addSink(new RedisSink[source.Event](conf,new RedisMapper[source.Event] { override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET,"click") override def getKeyFromData(t: source.Event): String = t.user override def getValueFromData(t: source.Event): String = t.url })) env.execute("sinkRedis") } }
代码解释:
(1) 创建 Flink 执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
  • 获取 Flink 的流处理执行环境。
(2) 添加数据源
val data = env.addSource(new ClickSource)
  • 使用自定义的ClickSource作为数据源,生成一个数据流data
    ClickSource可能是一个模拟用户点击事件的数据源,生成Event类型的数据。
(3) 配置 Redis 连接池
val conf = new FlinkJedisPoolConfig.Builder().setHost("").build()
  • 创建一个 Redis 连接池配置FlinkJedisPoolConfig
    这里setHost("")需要填写 Redis 服务器的主机地址(例如"localhost""127.0.0.1")。
(4) 添加 Redis Sink
data.addSink(new RedisSink[source.Event](conf, new RedisMapper[source.Event] { override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "click") override def getKeyFromData(t: source.Event): String = t.user override def getValueFromData(t: source.Event): String = t.url }))
  • 将数据流data写入 Redis。
  • RedisSink:
    • 第一个参数是 Redis 连接池配置conf
    • 第二个参数是一个RedisMapper的实现,用于定义如何将数据映射到 Redis。
  • RedisMapper:
    • getCommandDescription: 定义 Redis 命令和键名。
      这里使用HSET命令,将数据写入 Redis 的哈希表"click"
    • getKeyFromData: 定义哈希表中的字段(field),这里使用Eventuser字段。
    • getValueFromData: 定义哈希表中的值(value),这里使用Eventurl字段。
(5) 启动 Flink 任务
env.execute("sinkRedis")
  • 启动 Flink 任务,任务名称为"sinkRedis"

基于scala使用flink将读取到的数据写入到Redis

getCommandDescription第一个参数为写入方式,第二个参数为Hset的键

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/12 4:59:36

Obsidian智能管家:基于规则引擎的笔记库自动化运维实践

1. 项目概述:一个为Obsidian而生的智能管家如果你和我一样,是个重度Obsidian用户,那你一定经历过这样的时刻:笔记库越来越大,文件散落在各个角落,标签和链接关系变得错综复杂,想要找一个特定的笔…

作者头像 李华
网站建设 2026/5/12 4:59:32

Java项目脚手架工具:一键生成现代化开发环境配置

1. 项目概述:一个为Java开发者“减负”的脚手架工具如果你是一名Java开发者,或者你的日常工作离不开Maven、Spring Boot、Quarkus这些技术栈,那么下面这个场景你一定不陌生:老板或客户突然丢过来一个新项目需求,你摩拳…

作者头像 李华
网站建设 2026/5/12 4:59:11

Real-ESRGAN-ncnn-vulkan性能优化秘籍:多GPU并行与内存管理技巧

Real-ESRGAN-ncnn-vulkan性能优化秘籍:多GPU并行与内存管理技巧 【免费下载链接】Real-ESRGAN-ncnn-vulkan NCNN implementation of Real-ESRGAN. Real-ESRGAN aims at developing Practical Algorithms for General Image Restoration. 项目地址: https://gitcod…

作者头像 李华
网站建设 2026/5/12 4:54:51

Radon实战指南:在CI/CD中集成Python代码质量检查的完整教程

Radon实战指南:在CI/CD中集成Python代码质量检查的完整教程 【免费下载链接】radon Various code metrics for Python code 项目地址: https://gitcode.com/gh_mirrors/rad/radon Radon是一个强大的Python代码质量分析工具,能够帮助开发者自动检测…

作者头像 李华
网站建设 2026/5/12 4:54:31

Ninja框架入门指南:10分钟快速搭建你的第一个Java Web应用

Ninja框架入门指南:10分钟快速搭建你的第一个Java Web应用 【免费下载链接】ninja Ninja is a full stack web framework for Java. Rock solid, fast and super productive. 项目地址: https://gitcode.com/gh_mirrors/ninj/ninja 如果你正在寻找一个快速、…

作者头像 李华