本文是「Mini Distributed Storage」项目系列的第二篇。第一阶段实现了基于 LSM-Tree 的单机 KV 存储引擎(MemTable / SSTable / WAL / Compaction),本文聚焦第二阶段:在单机引擎之上构建完整的分布式架构。
代码在 GitHub:Remedios11/Mini-HDFS-Ceph: 分布式存储系统
一、整体架构
┌─────────────────────────────────────────────┐
│ StorageClient │
│ PutFile / GetFile / DeleteFile │
└──────────────────┬──────────────────────────┘
│ TCP(长度前缀帧)
┌──────────▼──────────┐
│ NameNode │
│ MetadataStore │ ← 文件路径 → BlockID 列表
│ BlockManager │ ← Block 分配(轮询)
│ DataNodeManager │ ← 心跳 / 状态机
│ EditLog │ ← 操作日志持久化
│ FaultDetector │ ← 副本监控 + 自动修复
└──────────┬──────────┘
│ 注册 / 心跳 / BlockReport
┌───────────┼───────────┐
▼ ▼ ▼
DataNode1 DataNode2 DataNode3
BlockStore BlockStore BlockStore
(CRC32校验) (原子写入) (Pipeline转发)
整个系统分为四个部分实现:
- epoll Reactor网络层+ThreadPool
- NameNode:元数据+心跳+BlockReport
- DataNode:块存储+Pipeline3副本写入
- 容错:副本监控+自动修复+一致性校验
二、网络层设计
为什么选epoll+Reactor?
分布式系统对网络层的要求是高并发、低延迟。传统的 one-thread-per-connection 模型在连接数上千时线程切换开销极大。
Reactor 模型的思路:用少量线程监听大量连接,有事件才处理。
┌─────────────────────────────────────────┐
│ EventLoop │
│ epoll_wait() ──► 就绪事件队列 │
│ │ │
│ Channel::HandleEvent() │
│ ├── 可读 → TcpConnection::HandleRead │
│ └── 可写 → TcpConnection::HandleWrite │
└────────────────────┬────────────────────┘
│ 耗时任务投递
┌──────▼──────┐
│ ThreadPool │ (4线程)
│ 处理业务逻辑 │
└─────────────┘
粘包处理:长度前缀帧
TCP 是字节流协议,不保留消息边界。我们用4字节大端长度前缀解决粘包:
┌────────────┬─────────────────────────┐
│ 4 bytes │ N bytes │
│ msg_len │ protobuf payload │
└────────────┴─────────────────────────┘
// 发送:可靠写(处理内核缓冲区满导致的短写) bool SendMsg(int fd, const std::string& data) { uint32_t len = htonl((uint32_t)data.size()); if (!WriteAll(fd, &len, 4)) return false; return WriteAll(fd, data.data(), data.size()); } // WriteAll 循环写直到全部发出 bool WriteAll(int fd, const void* buf, size_t n) { const char* p = static_cast<const char*>(buf); while (n > 0) { ssize_t w = write(fd, p, n); if (w <= 0) return false; p += w; n -= w; } return true; }
踩坑记录:最初直接用write(fd, data, size)一次发送 4MB 数据,发现大文件写入必定失败。原因是内核 socket 缓冲区默认只有 128KB,write()会短写。必须用循环写保证全部数据发出。
序列化:Protobuf
所有消息用 Protobuf 定义,方便扩展和跨语言:
message NameNodeRequest {
enum OpType {
CREATE_FILE = 0;
ALLOCATE_BLOCK = 1;
GET_FILE_BLOCKS = 2;
HEARTBEAT = 6;
BLOCK_REPORT = 8;
}
OpType op = 1;
uint64 request_id = 2;
// ... 各操作的子消息
}
三、NameNode 设计
NameNode是整个系统的「大脑」,负责三件事:
- 元数据管理:文件路径 → Block 列表 → DataNode 位置
- Block 分配:客户端写文件时,决定数据存到哪些 DataNode
- DataNode 管理:通过心跳感知节点存活
元数据结构
struct FileInfo { FilePath path; int64_t size; std::vector<BlockID> blocks; // 有序的 Block 列表 }; struct BlockInfo { BlockID block_id; int64_t size; std::vector<DataNodeID> locations; // 存放该 Block 的 DN 地址 };两张哈希表,读写用shared_mutex保护(读多写少场景下比普通 mutex 性能好):
std::unordered_map<FilePath, FileInfo> files_; std::unordered_map<BlockID, BlockInfo> blocks_; mutable std::shared_mutex mutex_;Block 分配:轮询选 DataNode
分配Block时从存活DN里选min(alive, 3)个,用轮询避免热点:
BlockInfo BlockManager::AllocateBlock(const std::vector<DataNodeInfo>& alive) { BlockID bid = next_block_id_.fetch_add(1); // 原子自增 int replicas = std::min((int)alive.size(), kReplicationFactor); // 轮询选节点 int start = round_robin_index_.load() % alive.size(); std::vector<DataNodeID> selected; for (int i = 0; i < replicas; i++) selected.push_back(alive[(start + i) % alive.size()]); round_robin_index_.store((start + replicas) % alive.size()); return BlockInfo{bid, 0, selected}; }心跳与状态机
DataNode每 3 秒发一次心跳,NameNode维护三态状态机:
ALIVE ──(10s无心跳)──► SUSPECT ──(30s无心跳)──► DEAD
▲ │
└──────────────(收到心跳)──────────────────────┘
void DataNodeManager::CheckDataNodeHealth() { int64_t now = NowMs(); for (auto& [id, info] : nodes_) { int64_t elapsed = now - info.last_heartbeat; if (elapsed > kHeartbeatTimeoutSec * 1000) info.status = DataNodeStatus::DEAD; else if (elapsed > kHeartbeatTimeoutSec * 1000 / 3) info.status = DataNodeStatus::SUSPECT; } }EditLog:操作日志持久化
NameNode重启后需要恢复内存状态。所有写操作先追加到EditLog,启动时回放:
EditLog 文件格式:
┌──────────┬──────────────────┬──────────┬──────────────────┐
│ 4B len_1 │ protobuf op_1 │ 4B len_2 │ protobuf op_2 │
└──────────┴──────────────────┴──────────┴──────────────────┘
四、DataNode + Pipeline 写入
BlockStore:原子写入 + CRC32 校验
每个 Block 写入分两步,保证原子性:
1. 数据写入 tmp/block_000001.dat.tmp
2. 计算 CRC32,写入 blocks/block_000001.meta
3. rename(tmp → blocks/block_000001.dat)
← 原子操作
rename()是原子的系统调用,保证即使写到一半断电,磁盘上不会出现半截文件。
bool BlockStore::WriteBlock(BlockID id, const std::string& data) { // Step 1: 写临时文件 std::ofstream f(BlockTmpPath(id), std::ios::binary); f.write(data.data(), data.size()); // Step 2: 写 meta(含 CRC32) BlockMeta meta; meta.crc32 = crc32(0, (const Bytef*)data.data(), data.size()); WriteMeta(id, meta); // Step 3: 原子 rename return std::rename(BlockTmpPath(id).c_str(), BlockDataPath(id).c_str()) == 0; }Pipeline 副本写入
3副本写入用 Pipeline 模式,不是 NameNode 逐一通知,而是 DataNode 链式转发:
Client ──► DN1 ──► DN2 ──► DN3
│ │ │
写盘 写盘 写盘
│ │ │
ACK◄────ACK◄────ACK
客户端只需连接 DN1,DN1 写完后自动转发给 DN2,DN2 再转发给 DN3,ACK 逐级返回。
WriteBlockResponse PipelineHandler::HandleWrite(const WriteBlockRequest& req) { // 1. 本节点先写盘 block_store_->WriteBlock(req.block_id(), req.data()); // 2. 如果 pipeline 里还有下一个节点,转发 if (req.pipeline_size() > 0) { WriteBlockRequest forward = req; forward.mutable_pipeline()->DeleteSubrange(0, 1); // 去掉第一个 ForwardToNext(req.pipeline(0), forward); } return success_response; }Pipeline 的优势:3副本写入的延迟只比单副本多了两跳网络 RTT,而不是串行的 3倍。
五、容错机制
设计思路
故障容错的核心是回答两个问题:
- 谁坏了?→ DataNode 心跳超时判 DEAD
- 怎么修?→ 找存活副本,复制到新 DataNode
整体由FaultDetector统一调度,每隔固定时间跑一轮:
FaultDetector::RunOnce()
│
├── Step 1: CheckDataNodeHealth() // 更新 DN 状态
│
├── Step 2: ReplicationMonitor // 扫描副本不足的 Block
│ ScanUnderReplicated()
│
├── Step 3: BlockReplicator // 修复副本
│ RepairUnderReplicated()
│ ├── FetchBlock(source_dn) // 从存活 DN 读数据
│ ├── PushBlock(target_dn) // 写入新 DN
│ └── UpdateBlockLocation() // 更新元数据
│
└── Step 4: ConsistencyChecker // CRC 一致性校验
ScanAllBlocks()
ReplicationMonitor:副本健康扫描
std::vector<UnderReplicatedBlock> ReplicationMonitor::ScanUnderReplicated() { auto alive_set = GetAliveSet(); // 存活 DN 的 ID 集合 for (auto& file : metadata_->ListFiles("")) { for (BlockID bid : file.blocks) { auto block = metadata_->GetBlock(bid); int alive_count = 0; for (auto& dn : block->locations) if (alive_set.count(dn)) alive_count++; if (alive_count < kReplicationFactor) result.push_back({bid, alive_count, ...}); } } return result; }扫描结果示例(1个 DataNode 宕机后):
[ReplicationMonitor] Under-replicated block 1: 2/3 replicas
[BlockReplicator] Replicating block 1 from 127.0.0.1:9002 to 127.0.0.1:9004
[BlockReplicator] Block 1 replicated successfully
集群健康状态报告
struct ClusterHealth { int total_blocks; // 全部 Block 数量 int healthy_blocks; // 副本数 == 3 int under_replicated; // 副本数 < 3(需要修复) int lost_blocks; // 副本数 == 0(数据丢失) int alive_datanodes; int dead_datanodes; };六、关于面试
Q:3副本是怎么保证一致性的?
A:写入时用 Pipeline 模式,所有副本都写成功才返回 ACK 给客户端。如果某个 DataNode 写失败,客户端会收到失败响应,不会出现部分副本有数据的情况。长期的副本修复由 FaultDetector 定期扫描处理。
Q:NameNode 单点故障怎么办?
A:第二阶段还没实现,这是第三阶段要做的 Raft 协议。目前的设计有 EditLog 持久化,重启能恢复元数据,但宕机期间服务不可用。
Q:epoll 和 select 的区别?
A:select 每次调用都要把全部 fd 从用户态拷到内核态,复杂度 O(n),且最多监听 1024 个 fd。epoll 用红黑树维护监听集合,只返回就绪的 fd,复杂度 O(1),没有 fd 数量限制。
七、总结收获
- epoll + Reactor:从看懂 muduo 的代码到自己写一个简化版
- Protobuf:定义 proto → 生成代码 → 序列化传输的完整流程
- Pipeline 写入:理解了 GFS 论文里副本写入设计的精妙之处
- 故障检测:心跳超时 + 状态机 + 自动修复,和 HDFS 的设计思路高度一致
八、挫折
我真想聊聊心跳失败这个问题,卡死我了,总结了几个原因
1. 大数据块短写(最核心的 Bug)
现象:小文件写入成功,大文件(>128KB)写入必定失败,心跳/注册等小消息完全正常。
根本原因:Linux 内核 socket 发送缓冲区默认约 128KB,单次write()超过缓冲区大小时会短写,只发出一部分数据,剩余数据被丢弃。接收方读到长度前缀说要 4MB,但实际只收到 128KB,卡死等待剩余数据。
// ❌ 错误写法:一次 write 可能只发出部分数据 write(fd, data.data(), data.size()); // ✅ 正确写法:循环写直到全部发出 bool WriteAll(int fd, const void* buf, size_t n) { const char* p = static_cast<const char*>(buf); while (n > 0) { ssize_t w = write(fd, p, n); if (w <= 0) return false; p += w; n -= w; } return true; }2. DataNode 未注册就发心跳
现象:心跳返回失败,日志显示DataNode not registered。
原因:DataNode 启动顺序错误,没等注册成功就开始发心跳。NameNode 收到心跳但找不到对应节点记录,直接拒绝。
// ❌ 错误:先启动心跳线程 heartbeat_thread_ = std::thread([this] { HeartbeatLoop(); }); RegisterSelf(); // 注册还没完成,心跳已经发出去了 // ✅ 正确:注册成功后再启动心跳 if (!RegisterSelf()) return false; SendBlockReport(); heartbeat_thread_ = std::thread([this] { HeartbeatLoop(); });3. 短读导致消息解析失败
现象:偶发性消息解析错误,Protobuf 报ParseFromString失败。
原因:和短写对称,read()也可能短读。读到长度前缀说有 N 字节,但一次read()只返回了部分数据,剩余数据在下一次read()才到。
// ❌ 错误:一次 read 可能只读到部分数据 read(fd, &buf[0], len); // ✅ 正确:循环读直到读满 bool ReadAll(int fd, void* buf, size_t n) { char* p = static_cast<char*>(buf); while (n > 0) { ssize_t r = read(fd, p, n); if (r <= 0) return false; p += r; n -= r; } return true; }4. NameNode 未启动完成就连接
现象:DataNode 启动时注册失败,connect()返回Connection refused。
原因:NameNode 的Start()是异步的,listen()在后台线程里执行,主线程还没来得及listen()就返回了,DataNode 立刻去连接,端口还没开。
// ✅ 修复:Start() 后等一段时间再连接 namenode.Start(); std::this_thread::sleep_for(std::chrono::milliseconds(300)); // 等 NN 就绪 dn1->Start();5. DataNode 心跳间隔设置不合理
现象:NameNode 把存活的 DataNode 误判为 DEAD,触发不必要的副本修复。
原因:心跳间隔(3s)和超时阈值(30s)之间的比例设置不合理,或者心跳线程被其他任务阻塞(如发送大 BlockReport),导致心跳延迟超过阈值。
// 合理的参数配置 constexpr int kHeartbeatIntervalSec = 3; // 心跳间隔 constexpr int kHeartbeatTimeoutSec = 30; // 超时阈值(至少是间隔的 5-10 倍) // ✅ 心跳线程用小粒度睡眠,方便响应 stop 信号 void HeartbeatLoop() { while (!stop_.load()) { for (int i = 0; i < 30 && !stop_.load(); i++) std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (!stop_.load()) SendHeartbeat(); } }6. 端口冲突导致注册失败
现象:第二次运行测试时 DataNode 启动失败,bind failed: Address already in use。
原因:上次测试进程没有完全退出,端口仍被占用。或者没有设置SO_REUSEADDR。
6. 端口冲突导致注册失败 现象:第二次运行测试时 DataNode 启动失败,bind failed: Address already in use。 原因:上次测试进程没有完全退出,端口仍被占用。或者没有设置 SO_REUSEADDR。7. Pipeline 转发地址解析错误
现象:Pipeline 第一个节点写入成功,但转发到下一个节点失败,心跳正常但 Block 副本数只有 1。
原因:Pipeline 地址格式为host:port,解析时用find(':')而不是rfind(':'),如果 host 是 IPv6 地址(含多个冒号)会解析错误。
// ❌ 用 find 在 IPv6 地址下会出错 size_t colon = addr.find(':'); // ✅ 用 rfind 取最后一个冒号,host:port 格式下永远正确 size_t colon = addr.rfind(':'); std::string host = addr.substr(0, colon); int port = std::stoi(addr.substr(colon + 1));8. 连接超时未设置
现象:某个 DataNode 宕机后,向它发送心跳/读取 Block 的请求永久阻塞,拖死整个检测流程。
原因:connect()和read()默认没有超时,连接到一个不可达的地址会一直等待。
// ✅ 所有连接都设置超时 struct timeval tv{10, 0}; // 10秒超时 setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));