news 2026/6/9 21:24:38

Kafka笔记

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kafka笔记

Apache Kafka 是一个强大的分布式流处理平台,适用于大规模数据处理和实时分析。它的高吞吐量、低延迟、可扩展性和容错性使其成为现代数据架构中的重要组件。无论是用于消息队列、日志聚合还是流式处理,Kafka 都提供了高效、可靠的解决方案。

一、核心特性

  1. 高吞吐量

    Kafka 能够处理高吞吐量的数据,支持每秒数百万条消息的读写,适用于大规模数据处理场景。
  2. 低延迟

    Kafka 的设计确保了低延迟的消息传递,通常在毫秒级别,适合对实时性要求较高的应用。
  3. 可扩展性

    Kafka 是一个分布式系统,可以轻松扩展到多个服务器,通过增加更多的 broker 来提高系统的处理能力。
  4. 持久化存储

    Kafka 将消息持久化存储在磁盘上,支持数据的可靠存储和故障恢复。
  5. 容错性

    Kafka 支持副本机制,确保数据的高可用性和容错性。即使部分节点故障,数据也不会丢失。
  6. 消息持久化和顺序保证

    Kafka 保证消息在分区内的顺序,并且可以配置消息的持久化策略,确保数据不会因为系统故障而丢失。

二、主要组件

  1. Broker

    Kafka 集群由多个 broker 组成,每个 broker 是一个 Kafka 服务器实例,负责存储和管理消息。
  2. Topic

    Topic 是 Kafka 中消息的分类,生产者将消息发送到特定的 topic,消费者从 topic 中读取消息。
  3. Partition

    为了提高可扩展性,每个 topic 可以被划分为多个分区(Partition),每个分区是一个有序的消息队列。
  4. Producer

    生产者是向 Kafka 发送消息的应用程序,负责将数据写入指定的 topic。
  5. Consumer

    消费者是从 Kafka 读取消息的应用程序,负责从 topic 中读取数据。
  6. Consumer Group

    消费者组是一组消费者实例,它们共同消费一个 topic 的消息,确保每个消息只被组内的一个消费者处理。

三、使用场景

  1. 消息队列

    Kafka 可以作为高性能的消息队列使用,支持高吞吐量的消息传递和复杂的消费模式。
  2. 日志聚合

    Kafka 常用于收集和聚合系统日志,将日志数据集中存储和分析。
  3. 流式处理

    Kafka 与流处理框架(如 Apache Flink、Apache Spark Streaming)集成,支持实时数据处理和分析。
  4. 事件源

    Kafka 可以作为事件源系统的核心组件,支持事件驱动的架构。
  5. 微服务通信

    Kafka 用于微服务之间的异步通信,支持服务间的解耦和高可用性。

四、架构

Kafka 的架构基于分布式系统设计,具有以下特点:

  1. 分布式存储

    消息分布在多个 broker 上,通过分区和副本机制提高系统的可扩展性和容错性。
  2. 高可用性

    Kafka 支持副本机制,确保数据的高可用性。即使部分 broker 故障,系统仍然可以正常运行。
  3. 水平扩展

    Kafka 集群可以通过增加更多的 broker 来水平扩展,提高系统的处理能力。
  4. 消息持久化

    Kafka 将消息持久化存储在磁盘上,支持数据的可靠存储和故障恢复。

五、单元测试

生产

KafkaproducerTest.java

@BeforeEach注解表示该方法在每个测试方法执行之前都会被调用

@AfterEach注解表示该方法在每个测试方法执行之后都会被调用。

//包声明 package org.javaup; //导包 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Properties; public class kafkaproducertest {//类声明 private KafkaProducer<String, String> producer; @BeforeEach public void setUp() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.100.128:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<>(props); } @AfterEach public void tearDown() { producer.close(); } @Test public void testSend() { String topic = "test-topic"; String key = "test-key"; String value = "你好吗,朋友"; producer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { if (exception == null) { System.out.println("Message sent successfully: " + metadata.topic() + " " + metadata.partition() + " " + metadata.offset()); } else { exception.printStackTrace(); } }); // 等待消息发送完成 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
  • 这个方法用于初始化 Kafka 生产者实例:

    • 创建一个Properties对象来存储 Kafka 生产者的配置。

    • 设置BOOTSTRAP_SERVERS_CONFIG,指定 Kafka broker 的地址。

    • 设置KEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG,指定键和值的序列化器。

    • 使用这些配置创建一个KafkaProducer实例。

@Test public void testSend() { String topic = "test-topic"; String key = "test-key"; String value = "你好吗,朋友"; producer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { if (exception == null) { System.out.println("Message sent successfully: " + metadata.topic() + " " + metadata.partition() + " " + metadata.offset()); } else { exception.printStackTrace(); } }); // 等待消息发送完成 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
  • @Test注解表示这是一个测试方法。

  • 这个方法用于测试 Kafka 生产者发送消息的功能:

    • 定义要发送的消息的主题、键和值。

    • 使用producer.send方法发送消息,并提供一个回调函数来处理发送结果:

      • 如果消息发送成功,打印成功信息。

      • 如果发送失败,打印异常信息。

    • 使用Thread.sleep等待一段时间,确保消息发送完成

消费

KafkaconsumerTest.java

package org.javaup; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class kafkaconsumer { private KafkaConsumer<String, String> consumer; @BeforeEach public void setUp() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.100.128:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); } @AfterEach public void tearDown() { consumer.close(); } @Test public void testReceive() { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("Received message: " + record.value()); } } } }
结果:

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/9 19:50:32

RK3588平台arm64异常处理机制全面讲解:异常向量表与模式切换

RK3588平台arm64异常处理机制实战解析&#xff1a;从向量表到模式切换你有没有遇到过这样的场景&#xff1f;系统突然“啪”地一下死机&#xff0c;串口输出一串看不懂的寄存器值&#xff0c;其中ELR_EL1、ESR_EL1跳来跳去——这时候&#xff0c;如果你不懂arm64的异常处理机制…

作者头像 李华
网站建设 2026/6/9 19:49:03

如何用CosyVoice3实现高精度声音克隆?支持多语言与情感控制

如何用 CosyVoice3 实现高精度声音克隆&#xff1f;支持多语言与情感控制 在虚拟主播一夜爆红、AI配音走进短视频创作的今天&#xff0c;人们不再满足于“能说话”的语音合成系统。真正打动用户的&#xff0c;是那句“听起来像你”的声音——带有熟悉的语调、情绪起伏&#xf…

作者头像 李华
网站建设 2026/6/7 6:26:20

投稿不踩坑!IEEE Publication Recommender —— 工程领域研究者的选刊神器

对于工程学及相关领域的研究者来说&#xff0c;“论文写好后投哪本期刊 / 哪个会议” 常常是令人头疼的难题&#xff1a;投错期刊可能遭遇 “desk rejection”&#xff0c;浪费时间不说还打击信心&#xff1b;错过会议截稿日期又得等下一届 —— 而 IEEE Publication Recommend…

作者头像 李华
网站建设 2026/6/9 19:51:13

CosyVoice3支持语音风格迁移稳定性吗?长时间运行压力测试

CosyVoice3 的语音风格迁移稳定性与长期运行表现深度解析 在智能语音内容爆发式增长的今天&#xff0c;用户对语音合成&#xff08;TTS&#xff09;系统的要求早已超越“能说话”的基础功能。无论是虚拟主播、有声书生成&#xff0c;还是多语言客服系统&#xff0c;都要求模型…

作者头像 李华
网站建设 2026/6/9 19:50:13

解决语音合成卡顿问题:CosyVoice3重启机制与资源释放技巧

解决语音合成卡顿问题&#xff1a;CosyVoice3重启机制与资源释放技巧 在当前AIGC应用快速落地的背景下&#xff0c;语音合成技术正从实验室走向千行百业。阿里开源的 CosyVoice3 凭借“3秒极速复刻”和“自然语言控制发音风格”两大亮点&#xff0c;迅速成为声音克隆领域的热门…

作者头像 李华
网站建设 2026/6/9 19:45:27

CosyVoice3能否克隆婴儿名字呼唤声?育儿场景语音助手

CosyVoice3能否克隆婴儿名字呼唤声&#xff1f;育儿场景语音助手 在智能音箱、早教机、儿童陪伴机器人日益普及的今天&#xff0c;一个看似微小却真实存在的问题逐渐浮现&#xff1a;为什么这些设备说话总是“冷冰冰”的&#xff1f; 孩子可以接受陌生的声音讲故事&#xff0…

作者头像 李华