上一篇:【第45篇】Kafka日志存储源码解析(四)——FileMessageSet与ByteBufferMessageSet
下一篇:【第47篇】Kafka延迟操作(DelayedOperation)源码解析——优雅处理等待响应
摘要
如果说Log是"日志段的管理者",那么LogManager就是"整个Kafka日志存储的总管家"。它负责加载所有分区日志、调度日志清理/压缩任务、协调日志刷写、处理日志恢复。本文深入解析LogManager的架构设计、核心方法、以及它如何像"交响乐指挥"一样协调Log、LogSegment、OffsetIndex等组件高效工作。
一、LogManager在Kafka存储架构中的位置
1.1 从"快递分拣中心"理解LogManager
┌─────────────────────────────────────────────────────────────┐ │ Kafka 存储架构全景图 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ │ │ │ LogManager │ ← 本篇主角 │ │ │ (大管家) │ │ │ └──────┬──────┘ │ │ │ │ │ ┌─────────────┼─────────────┐ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Log实例1 │ │ Log实例2 │ │ Log实例3 │ ← 每个分区一个Log│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │LogSegment│ │LogSegment│ │LogSegment│ ← 每个Log有多个│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │OffsetIndex│ │OffsetIndex│ │OffsetIndex│ ← 每个Segment有│ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ 核心职责:加载/恢复/清理/压缩/刷写 │ │ │ └─────────────────────────────────────────────────────────────┘LogManager就像"快递分拣中心的总经理":
- 早上开门:加载所有分区日志(Log.load())
- 日常运营:调度清理线程(CleanerThread)、压缩线程(CompressorThread)
- 确保不丢件:定期刷写日志到磁盘(flush())
- 晚上盘点:关闭所有日志,释放资源(shutdown())
二、LogManager的架构设计
2.1 核心字段一览
// kafka/log/LogManager.scalaclassLogManager(logDirs:Seq[File],// 日志存储目录(可配置多个,提高并行度)topicConfigs:Map[String,LogConfig],// 每个Topic的配置defaultConfig:LogConfig,// 默认配置cleanerConfig:CleanerConfig,// 日志压缩器配置ioThreads:Int,// IO线程数(用于日志刷写)heartbeat:Long,// 心跳间隔(用于检测日志目录健康状态)valrecoveryPoint:Long=0// 恢复点(崩溃恢复时用))extendsLoggingwithBrokerRole{// 核心数据结构1:所有分区日志(ConcurrentHashMap,支持并发访问)privatevallogs=newPool[TopicPartition,Log]()// 核心数据结构2:日志清理调度器(定时清理过期日志)privatevallogCleaner:LogCleaner=if(cleanerConfig.enableCleaner){newLogCleaner(cleanerConfig,logs)}else{null}// 核心数据结构3:日志压缩调度器(定时压缩日志段)privatevallogCompressor:LogCompressor=newLogCompressor(logs,defaultConfig)// 核心数据结构4:定时任务调度器(ScheduledExecutorService)privatevalscheduler:ScheduledExecutorService=Executors.newScheduledThreadPool(ioThreads)}2.2 启动流程:LogManager.startup()
┌─────────────────────────────────────────────────────────────┐ │ LogManager.startup() 启动流程 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 1: 加载所有日志目录(log.dirs) │ │ │ │ │ │ │ for (logDir <- logDirs) { │ │ │ │ val dir = new File(logDir) │ │ │ │ if (!dir.exists()) dir.mkdirs() │ │ │ │ │ │ │ │ // 加载该目录下所有分区日志 │ │ │ │ loadLogsInDir(dir) │ │ │ │ } │ │ │ └────────────────┬───────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 2: 恢复未刷写的日志(崩溃恢复) │ │ │ │ │ │ │ for (log <- logs.values()) { │ │ │ │ log.recover(recoveryPoint) │ │ │ │ } │ │ │ └────────────────┬───────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 3: 启动日志清理器(LogCleaner) │ │ │ │ │ │ │ if (logCleaner != null) { │ │ │ │ logCleaner.start() │ │ │ │ } │ │ │ └────────────────┬───────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 4: 启动定时任务(日志刷写/清理) │ │ │ │ │ │ │ scheduler.scheduleAtFixedRate( │ │ │ │ () => flushDirtyLogs(), │ │ │ │ heartbeat, heartbeat, MILLISECONDS) │ │ │ │ │ │ │ │ scheduler.scheduleAtFixedRate( │ │ │ │ () => cleanupExpiredLogs(), │ │ │ │ heartbeat, heartbeat, MILLISECONDS) │ │ │ └────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘三、核心功能一:日志加载与恢复
3.1 日志加载:loadLogsInDir()
// kafka/log/LogManager.scaladefloadLogsInDir(logDir:File):Unit={// 1. 列出目录下所有分区目录(命名格式:topic-uuid)valtopicPartitions=logDir.listFiles().filter(_.isDirectory())for(dir<-topicPartitions){try{// 2. 解析目录名,获取Topic和Partition编号valtopicPartition=parseTopicPartition(dir.getName())// 3. 创建Log实例(核心!)vallog=Log(dir=dir,config=topicConfigs.getOrElse(topicPartition.topic,defaultConfig),scheduler=scheduler,time=time)// 4. 加入内存中的logs池logs.put(topicPartition,log)}catch{casee:Exception=>error(s"Failed to load log$dir",e)}}}3.2 崩溃恢复:Log.recover()
┌─────────────────────────────────────────────────────────────┐ │ 日志恢复流程(崩溃后重启) │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 场景:Kafka宕机,Page Cache中有未刷写的数据 │ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 1: 读取恢复点(recoveryPoint) │ │ │ │ │ │ │ 从上次成功刷写的位置开始恢复 │ │ │ └────────────────┬───────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 2: 遍历所有日志段(LogSegment) │ │ │ │ │ │ │ for (segment <- log.segments) { │ │ │ │ // 验证每个消息的CRC32校验和 │ │ │ │ segment.recover() │ │ │ │ } │ │ │ └────────────────┬───────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 3: 截断非法数据(CRC校验失败) │ │ │ │ │ │ │ 将最后一个合法offset之后的数据全部删除 │ │ │ └────────────────┬───────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 4: 更新恢复点(recoveryPoint) │ │ │ │ │ │ │ 记录本次成功恢复的位置 │ │ │ └────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘四、核心功能二:日志清理策略
Kafka支持两种日志清理策略:删除(Delete)和压缩(Compact)。
4.1 删除策略:DefaultLogCleaner
┌─────────────────────────────────────────────────────────────┐ │ 日志删除策略(基于时间/大小) │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 触发条件(满足任一即删除): │ │ │ │ 1. 时间策略:log.retention.hours=168(7天) │ │ → 删除修改时间超过7天的日志段 │ │ │ │ 2. 大小策略:log.retention.bytes=1073741824(1GB) │ │ → 从最老的日志段开始删除,直到总大小<1GB │ │ │ │ 删除流程: │ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 1: 计算需要删除的日志段 │ │ │ │ │ │ │ val deletableSegments = log.segments.filter { │ │ │ │ segment => │ │ │ │ // 时间策略 │ │ │ │ (now - segment.lastModified) > retentionMs │ │ │ │ // 大小策略 │ │ │ │ || (totalSize - segment.size) > retentionBytes │ │ │ │ } │ │ │ └────────────────┬───────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 2: 异步删除(避免阻塞IO线程) │ │ │ │ │ │ │ scheduler.schedule( │ │ │ │ () => deletableSegments.foreach(_.delete()) │ │ │ │ ) │ │ │ └────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘4.2 压缩策略:LogCompactor
┌─────────────────────────────────────────────────────────────┐ │ 日志压缩策略(保留最新K/V) │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 核心思想:对于每个Key,只保留最新的一条消息 │ │ │ │ 压缩前: 压缩后: │ │ ┌──────┬──────┬──────┬──────┐ ┌──────┐ │ │ │ Key1 │ Key2 │ Key1 │ Key3 │ │ Key2 │ │ │ │ Val1 │ Val2 │ Val3 │ Val4 │ → │ Key1 │ │ │ └──────┴──────┴──────┴──────┘ │ Key3 │ │ │ ↑ 同一Key的最新值 └──────┘ │ │ │ │ 压缩流程: │ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 1: 构建Key映射表(Map[Key, Offset])│ │ │ │ │ │ │ 遍历所有日志段,记录每个Key的最新offset│ │ │ └────────────────┬───────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 2: 重写日志段(只保留最新K/V) │ │ │ │ │ │ │ 创建新的日志段,写入去重后的消息 │ │ │ └────────────────┬───────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────┐ │ │ │ Step 3: 替换旧日志段(原子操作) │ │ │ │ │ │ │ 用新日志段替换旧的,释放磁盘空间 │ │ │ └────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘压缩策略的适用场景:
- ✅Kafka Connect:保留最新的Connector配置
- ✅事件溯源(Event Sourcing):只关心实体最新状态
- ❌日志收集:不需要保留Key历史
五、核心功能三:日志刷写机制
5.1 为什么要刷写(Flush)?
┌─────────────────────────────────────────────────────────────┐ │ 刷写:平衡性能与可靠性 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 不刷写(依赖Page Cache): │ │ ┌──────┐ ┌──────┐ │ │ │ 磁盘 │ ◄── Page Cache ◄──│ Kafka │ │ │ └──────┘ └──────┘ │ │ ↑ 宕机时Page Cache数据丢失! │ │ │ │ 同步刷写(每次写入都fsync): │ │ ┌──────┐ ┌──────┐ │ │ │ 磁盘 │ ◄── fsync ◄──│ Kafka │ │ │ └──────┘ └──────┘ │ │ ↑ 性能极差!(磁盘IO成为瓶颈) │ │ │ │ Kafka的方案:定时刷写(平衡性能与可靠性) │ │ ┌──────┐ ┌──────┐ │ │ │ 磁盘 │ ◄── fsync ◄──│ Kafka │ │ │ └──────┘ (定时) └──────┘ │ │ ↑ 定期刷写,既保证性能,又避免大量数据丢失 │ │ │ └─────────────────────────────────────────────────────────────┘5.2 LogManager.flushDirtyLogs()
// kafka/log/LogManager.scaladefflushDirtyLogs():Unit={// 遍历所有分区日志for((topicPartition,log)<-logs){try{// 获取该分区最后一个刷写点valtimeSinceLastFlush=time.milliseconds()-log.lastFlushTime// 判断是否需要刷写(log.flush.interval.ms)if(timeSinceLastFlush>log.config.flushInterval){// 关键:只刷写"脏数据"(Page Cache中有但磁盘没有的数据)valflushedMessages=log.flush()info(s"Flushed$flushedMessagesmessages for$topicPartition")}}catch{casee:Exception=>error(s"Failed to flush log$topicPartition",e)}}}六、性能调优:LogManager相关参数
| 参数 | 默认值 | 推荐值 | 作用 |
|---|---|---|---|
log.retention.hours | 168 (7天) | 24-72 | 日志保留时间(根据磁盘容量调整) |
log.retention.bytes | -1 (无限) | 1073741824 (1GB) | 日志保留大小(防止磁盘写满) |
log.segment.bytes | 1073741824 (1GB) | 268435456 (256MB) | 日志段大小(调小可加快压缩/清理速度) |
log.flush.interval.ms | Long.MaxValue (不刷写) | 60000 (1分钟) | 刷写间隔(根据可靠性要求调整) |
num.io.threads | 8 | 16-32 | IO线程数(磁盘IO密集型场景调大) |
log.cleaner.enable | false | true | 是否启用日志压缩(事件溯源场景开启) |
log.cleaner.threads | 1 | 2-4 | 压缩线程数(压缩密集型场景调大) |
七、本篇小结
今日收获:
- LogManager是"大管家":负责加载/恢复/清理/压缩/刷写所有分区日志
- 两种清理策略:删除(基于时间/大小)和压缩(保留最新K/V)
- 崩溃恢复机制:通过recoveryPoint + CRC校验,确保数据不丢失
- 定时刷写:平衡性能与可靠性,避免Page Cache数据丢失
下篇预告:
文章047将深入DelayedOperation——Kafka的"延迟操作 scheduler"。当你发送消息时设置了acks=all,服务端必须等待所有副本拉取到数据后才能返回成功,这个"等待"就是用DelayedOperation实现的!
上一篇:【第45篇】Kafka日志存储源码解析(四)——FileMessageSet与ByteBufferMessageSet
下一篇:【第47篇】Kafka延迟操作(DelayedOperation)源码解析——优雅处理等待响应