TDengine 数据订阅架构设计与最佳实践
一、设计理念
TDengine 数据订阅(TMQ)是一个高性能、低延迟、高可靠的实时数据流处理系统,核心设计理念是:基于 WAL 的事件流存储 + Push-Pull 混合消费模式 + 自动负载均衡。
核心设计目标
- 实时性:毫秒级数据推送延迟
- 可靠性:至少一次(At-Least-Once)消费保证
- 高性能:直接从 WAL 读取,零拷贝传输
- 易用性:兼容 Kafka API,降低学习成本
二、整体架构
┌────────────────────────────────────────────────────────┐ │ 数据订阅系统架构 │ ├────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ 生产者 │ │ 生产者 │ │ │ │ (写入应用) │ │ (流计算) │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ ↓ 写入 ↓ 写入 │ │ ┌─────────────────────────────────────────┐ │ │ │ vnode (数据节点) │ │ │ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │ │ │ WAL 1 │ │ WAL 2 │ │ WAL 3 │ │ │ │ │ │ (队列) │ | (队列) │ │ (队列) │ │ │ │ │ └────────┘ └────────┘ └────────┘ │ │ │ │ ↓ ↓ ↓ │ │ │ │ [索引] + [保留策略] + [持久化存储] │ │ │ └─────────────────────────────────────────┘ │ │ ↑ │ │ │ 订阅/消费 │ │ ┌──────┴──────────────────────────────┐ │ │ │ mnode (元数据管理) │ │ │ │ - 主题管理 │ │ │ │ - 消费组管理 │ │ │ │ - Rebalance 调度 │ │ │ │ - 心跳检测 │ │ │ └─────────────────────────────────────┘ │ │ ↑ ↑ ↑ │ │ │ │ │ │ │ ┌──────┴──┐ ┌────┴────┐ ┌──┴──────┐ │ │ │Consumer1│ │Consumer2│ │Consumer3│ │ │ │ (消费组) │ │ (消费组) │ │ (独立) │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ └────────────────────────────────────────────────────────┘三、核心组件详解
3.1 主题 (Topic)
设计特点:基于 WAL 的持久化事件流
主题类型及用途: 1. 数据库订阅 (Database Topic) CREATE TOPIC topic_db AS DATABASE db_name; 用途: 数据库级别的全量复制和迁移 2. 超级表订阅 (Super Table Topic) CREATE TOPIC topic_stable AS STABLE stable_name; 用途: 超级表及其所有子表的数据订阅 3. 查询订阅 (Query Topic) ⭐ 核心优势 CREATE TOPIC topic_query AS SELECT ts, temperature, location FROM sensors WHERE temperature > 30; 用途: 实时数据过滤和预处理 优势: - 服务端完成过滤,减少网络传输 90%+ - 无需在消费端重复计算 - 支持复杂 SQL 表达式WAL 作为消息队列:
┌──────────────────────────────────────────────┐ │ WAL 文件结构 (消息队列) │ ├──────────────────────────────────────────────┤ │ Version 1: CREATE TABLE sensor_001 ... │ │ Version 2: INSERT sensor_001 VALUES (...) │ │ Version 3: INSERT sensor_001 VALUES (...) │ │ Version 4: ALTER TABLE sensor_001 ... │ │ Version 5: INSERT sensor_001 VALUES (...) │ │ Version 6: INSERT sensor_002 VALUES (...) │ │ ... │ │ Version N: INSERT sensor_100 VALUES (...) │ └──────────────────────────────────────────────┘ ↑ └─ 消费进度 (Offset = Version) 特性: ✓ 顺序写入,性能高 ✓ 自动创建索引,快速随机访问 ✓ 可配置保留时间和大小 ✓ 支持多个消费组独立消费3.2 消费者 (Consumer)
Push-Pull 混合模式(核心创新):
// 消费模式切换逻辑消费流程:1.有大量未消费数据 → Pull 模式 Consumer → vnode:"拉取数据"vnode → Consumer:"返回数据批次"Consumer → vnode:"继续拉取"↓ 优势:批量传输,高吞吐2.无待消费数据 → Push 模式 Consumer → vnode:"注册 handle"vnode:(等待新数据写入)新数据写入 → vnode 主动推送给 Consumer ↓ 优势:低延迟,<10ms消费者状态机:
┌─────────────────────────────────────────────┐ │ 消费者状态转换 │ ├─────────────────────────────────────────────┤ │ │ │ [创建] → [Rebalancing] │ │ ↓ │ │ (等待 vnode 分配) │ │ ↓ │ │ [Ready] ←──────┐ │ │ ↓ │ │ │ (正常消费) │ │ │ ↓ │ │ │ ┌────────┴────────┐│ │ │ ↓ ↓│ │ │ [订阅变更] [新增消费者] │ │ ↓ ↓ │ │ [Rebalancing] ←────────┘ │ │ ↓ │ │ (Rebalance 完成) │ │ ↓ │ │ [Ready] │ │ │ │ [心跳丢失 12s+] → [Clear] → [删除] │ │ [主动退出] → [Clear] → [删除] │ └─────────────────────────────────────────────┘3.3 消费组 (Consumer Group)
自动负载均衡机制:
示例: 主题数据分布在 4 个 vnode 场景1: 1 个消费者 ┌─────────────────────────────────┐ │ Consumer1 │ │ ├─ vnode1 │ │ ├─ vnode2 │ │ ├─ vnode3 │ │ └─ vnode4 │ └─────────────────────────────────┘ 场景2: 2 个消费者 ┌─────────────┐ ┌─────────────┐ │ Consumer1 │ │ Consumer2 │ │ ├─ vnode1 │ │ ├─ vnode3 │ │ └─ vnode2 │ │ └─ vnode4 │ └─────────────┘ └─────────────┘ 场景3: 3 个消费者 ┌──────────┐ ┌──────────┐ ┌──────────┐ │Consumer1 │ │Consumer2 │ │Consumer3 │ │├─ vnode1 │ │├─ vnode2 │ │├─ vnode3 │ │ │ │ │ │└─ vnode4 │ └──────────┘ └──────────┘ └──────────┘ 场景4: 5 个消费者 ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │Consumer1 │ │Consumer2 │ │Consumer3 │ │Consumer4 │ │Consumer5 │ │├─ vnode1 │ │├─ vnode2 │ │├─ vnode3 │ │├─ vnode4 │ │(空闲) │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘ 规则: - 消费者数 ≤ vnode 数: 均匀分配 - 消费者数 > vnode 数: 多余消费者空闲3.4 Rebalance 机制
触发条件:
- 新消费者加入消费组
- 消费者退出或故障
- 订阅主题变更
- 心跳丢失超时
Rebalance 流程:
第1步: 检测触发条件 mnode 定时器(2s)检测消费者状态 ↓ 发现需要 rebalance 第2步: 标记消费者状态 将相关消费者状态设为 "Rebalancing" ↓ 消费者暂停数据消费 第3步: 重新分配 vnode 根据当前活跃消费者数量 ↓ 使用均匀分配算法 ↓ 生成新的 vnode 分配表 第4步: 通知消费者 mnode 更新分配信息 ↓ 消费者定期查询获取新分配 ↓ 消费者状态变为 "Ready" 第5步: 恢复消费 消费者从上次提交的 offset 继续 ↓ 或根据配置从 earliest/latest 开始 耗时: 通常 < 5s3.5 消费进度管理
Offset 存储与提交:
-- 自动提交配置CREATETOPIC topic1ASSELECT*FROMsensors;-- 消费者配置{"enable.auto.commit":"true",-- 启用自动提交"auto.commit.interval.ms":"5000"-- 5秒提交一次}-- 手动提交tmq_consumer_poll(consumer,1000);-- 拉取数据// ... 处理数据 ...tmq_commit_sync(consumer,msg);-- 同步提交// 或tmq_commit_async(consumer,msg,cb);-- 异步提交Offset 语义:
┌────────────────────────────────────────────┐ │ Offset 在 vnode 中的位置 │ ├────────────────────────────────────────────┤ │ WAL Version 1: [已消费] ← offset=1 │ │ WAL Version 2: [已消费] ← offset=2 │ │ WAL Version 3: [已消费] ← offset=3 │ │ WAL Version 4: [待消费] ← 下次从这开始 │ │ WAL Version 5: [待消费] │ │ ... │ └────────────────────────────────────────────┘ 语义: - offset = N 表示版本 N 已消费 - 下次消费从版本 N+1 开始 - 类似 Kafka 的 offset 概念四、数据流详解
4.1 订阅流程
应用程序启动订阅: Step 1: 创建主题 CREATE TOPIC topic_sensors AS SELECT * FROM sensors WHERE temperature > 30; Step 2: 创建消费者 consumer = tmq_consumer_new(conf); 配置: - group.id: "group_1" - client.id: "consumer_1" - auto.offset.reset: "earliest" Step 3: 订阅主题 tmq_subscribe(consumer, ["topic_sensors"]); ↓ Consumer 发送订阅请求到 mnode ↓ mnode 标记 Consumer 状态为 "Rebalancing" Step 4: 等待 Rebalance Consumer 定期查询 mnode ↓ mnode 执行 rebalance ↓ mnode 分配 vnode 给 Consumer ↓ Consumer 获取 vnode 列表和 offset ↓ Consumer 状态变为 "Ready" Step 5: 开始消费 Consumer 向各 vnode 发送消费请求 ↓ vnode 返回数据 ↓ Consumer 处理数据并提交 offset4.2 消费流程
// 消费循环伪代码while(running){// 1. Poll 数据 (内部自动处理 Push/Pull)msg=tmq_consumer_poll(consumer,timeout);if(msg==NULL){// 超时,继续等待continue;}// 2. 处理数据process_message(msg);// 3. 提交 offset (手动模式)if(manual_commit){tmq_commit_sync(consumer,msg);}// 4. 释放消息tmq_free_result(msg);}消费数据的完整流程:
Consumer 端: poll() 调用 ↓ 检查是否有缓存数据 ↓ 向 vnode 发送消费请求 ↓ (等待响应或推送) vnode 端: 收到消费请求 ↓ 检查 WAL 中是否有新数据 ↓ ┌─ 有大量数据: Pull 模式 │ 读取数据批次 → 返回给 Consumer │ └─ 无数据: Push 模式 注册 Consumer handle ↓ (等待新数据写入) ↓ 新数据写入时主动推送 Consumer 端: 收到数据 ↓ 解析数据块 ↓ 应用查询过滤(如果是 Query Topic) ↓ 返回给应用程序 时延对比: - Pull 模式: 50-200ms (批量高吞吐) - Push 模式: < 10ms (实时低延迟)五、最佳实践
5.1 ✅ 推荐的使用方法
1. 使用查询订阅减少网络传输
-- ❌ 差: 订阅全部数据,消费端过滤CREATETOPIC topic_allASDATABASEsensor_db;消费端代码:formsginconsumer:ifmsg.temperature>30:-- 客户端过滤process(msg)问题:-传输100%数据-消费端 CPU 占用高-网络带宽浪费-- ✅ 好: 服务端过滤,只传输需要的数据CREATETOPIC topic_filteredASSELECTts,temperature,device_idFROMsensorsWHEREtemperature>30;消费端代码:formsginconsumer: process(msg)-- 直接处理,无需过滤优势:-传输量减少90%+-消费端处理简单-网络带宽节省2. 合理设置消费组和消费者数量
# ✅ 好: 消费者数量 ≤ vnode 数量# 假设主题数据分布在 4 个 vnode# 场景1: 实时性要求高consumer_count=4# 每个 vnode 一个消费者→ 并行度最高,延迟最低# 场景2: 资源有限consumer_count=2# 两个消费者分担→ 平衡资源和性能# ❌ 差: 消费者过多consumer_count=10# 6 个消费者空闲→ 资源浪费,无性能提升3. 选择合适的 Offset 重置策略
// ✅ 好: 根据业务需求选择tmq_conf_t*conf=tmq_conf_new();// 场景1: 数据分析,需要完整历史tmq_conf_set(conf,"auto.offset.reset","earliest");→ 从最早数据开始消费// 场景2: 实时告警,只关注最新tmq_conf_set(conf,"auto.offset.reset","latest");→ 只消费新产生的数据// 场景3: 断点续传tmq_conf_set(conf,"enable.auto.commit","true");tmq_conf_set(conf,"auto.commit.interval.ms","5000");→ 自动提交 offset,重启后继续4. 合理设置 WAL 保留策略
-- ✅ 好: 根据消费延迟设置保留时间CREATEDATABASEsensor_db WAL_RETENTION_PERIOD7;-- 保留 7 天-- WAL_RETENTION_SIZE 1024; -- 保留 1GB使用场景:1.实时消费: 保留时间=最大可容忍延迟2.批量消费: 保留时间=批次周期+容错时间3.数据重放: 根据业务需求设置 计算公式: 保留时间 ≥ 最大消费延迟 ×2示例:-消费者每小时处理一次 → 保留48小时-实时消费(秒级)→ 保留24小时(容错)5. 批量消费提高吞吐量
// ✅ 好: 批量拉取和处理tmq_conf_set(conf,"msg.with.table.name","true");while(running){// 一次拉取多条消息msg=tmq_consumer_poll(consumer,1000);while(msg){// 批量处理intnumRows=0;void*data=tmq_get_raw_block(msg,&numRows);// 批量插入目标库或批量计算batch_process(data,numRows);msg=tmq_consumer_poll(consumer,0);// 立即尝试获取下一批}// 批量提交 offsettmq_commit_sync(consumer,NULL);}性能提升:-单条处理:10,000条/秒-批量处理:100,000+条/秒6. 使用异步提交提高性能
// ✅ 好: 异步提交 offsetvoidcommit_cb(tmq_t*tmq,int32_tcode,void*param){if(code!=0){log_error("Commit failed: %s",tmq_err2str(code));// 处理提交失败}}while(running){msg=tmq_consumer_poll(consumer,1000);process_message(msg);// 异步提交,不阻塞消费循环tmq_commit_async(consumer,msg,commit_cb,NULL);}性能对比:-同步提交:每次 commit 阻塞5-10ms-异步提交:无阻塞,吞吐量提升50%+7. 监控消费滞后
-- ✅ 好: 定期检查消费进度-- 查询消费者信息SELECT*FROMinformation_schema.ins_consumers;-- 查询消费组订阅信息SELECT*FROMinformation_schema.ins_subscriptions;-- 计算消费滞后lag=当前 WAL 版本-已提交offset告警阈值:-lag<1000: 正常-lag1000-10000: 警告-lag>10000: 严重,需扩容消费者5.2 ❌ 要避免的使用误区
1. 避免频繁创建/销毁消费者
// ❌ 差: 每次消费都创建新消费者while(true){tmq_t*consumer=tmq_consumer_new(conf);tmq_subscribe(consumer,topics);msg=tmq_consumer_poll(consumer,1000);process(msg);tmq_consumer_close(consumer);// 销毁sleep(1);}问题:-频繁触发 rebalance-消费进度丢失-性能极差// ✅ 好: 长连接消费tmq_t*consumer=tmq_consumer_new(conf);tmq_subscribe(consumer,topics);while(running){msg=tmq_consumer_poll(consumer,1000);process(msg);}tmq_consumer_close(consumer);2. 避免不提交 Offset
// ❌ 差: 从不提交 offsettmq_conf_set(conf,"enable.auto.commit","false");while(running){msg=tmq_consumer_poll(consumer,1000);process(msg);// 没有 commit!}问题:-消费者重启后从头消费-重复处理数据-业务逻辑错误// ✅ 好: 启用自动提交或手动提交tmq_conf_set(conf,"enable.auto.commit","true");tmq_conf_set(conf,"auto.commit.interval.ms","5000");3. 避免单消费者订阅过多主题
// ❌ 差: 单消费者订阅大量主题tmq_list_t*topics=tmq_list_new();for(inti=0;i<100;i++){tmq_list_append(topics,topic_names[i]);}tmq_subscribe(consumer,topics);问题:-Rebalance 时间长-消费延迟高-内存占用大// ✅ 好: 按业务逻辑分组// 消费者1: 订阅温度相关主题tmq_subscribe(consumer1,["topic_temp_*"]);// 消费者2: 订阅湿度相关主题tmq_subscribe(consumer2,["topic_hum_*"]);4. 避免在消费循环中执行耗时操作
// ❌ 差: 消费循环中执行数据库写入while(running){msg=tmq_consumer_poll(consumer,1000);// 同步写入数据库,阻塞 100msinsert_to_database(msg);}问题:-消费速度慢-无法触发 Push 模式-消费滞后严重// ✅ 好: 异步处理或批量处理queue=Queue();// 消费线程while(running){msg=tmq_consumer_poll(consumer,100);queue.put(msg);// 快速入队}// 处理线程while(running){batch=queue.get_batch(100);batch_insert_to_database(batch);}5. 避免忽略 Rebalance 期间的状态
// ❌ 差: 不处理 rebalancewhile(running){msg=tmq_consumer_poll(consumer,1000);if(msg==NULL){continue;// 可能正在 rebalance}process(msg);}问题:-Rebalance 期间误判为无数据-无法感知消费者状态变化// ✅ 好: 检查错误码while(running){msg=tmq_consumer_poll(consumer,1000);if(msg==NULL){interr=tmq_get_err(consumer);if(err==TMQ_ERR_REBALANCING){log_info("Rebalancing...");continue;}}process(msg);}6. 避免 WAL 保留时间过短
-- ❌ 差: WAL 保留时间太短CREATEDATABASEsensor_db WAL_RETENTION_PERIOD1;-- 只保留 1 天问题:-消费者故障超过1天后数据丢失-无法重新消费历史数据-- ✅ 好: 根据业务需求设置CREATEDATABASEsensor_db WAL_RETENTION_PERIOD7;-- 保留 7 天考虑因素:1.最大可容忍的消费延迟2.数据重放需求3.存储成本六、性能优化建议
6.1 写入端优化
-- 1. 批量写入INSERTINTOsensor_001VALUES(now,25.5),(now+1s,25.6),(now+2s,25.7),...-- 批量插入 1000 条-- 2. 合理设置 WAL 参数ALTERDATABASEsensor_db WAL_LEVEL1-- 1=写入即返回, 2=fsync后返回WAL_FSYNC_PERIOD3000;-- 3秒fsync一次性能对比:-WAL_LEVEL=2: 强一致,10000写入/秒-WAL_LEVEL=1: 最终一致,100000+写入/秒6.2 消费端优化
// 1. 增加 Poll 超时时间(批量拉取)tmq_consumer_poll(consumer,5000);// 5秒超时// 2. 启用消息压缩tmq_conf_set(conf,"msg.enable.compression","true");// 3. 调整批量大小tmq_conf_set(conf,"fetch.max.messages","1000");// 4. 多线程处理for(inti=0;i<thread_count;i++){pthread_create(&threads[i],NULL,consume_thread,consumer);}6.3 集群配置优化
-- 1. 增加 vnode 数量(提高并行度)CREATEDATABASEsensor_db VGROUPS16;-- 16个vnode,支持16个并行消费者-- 2. 配置多副本(高可用)CREATEDATABASEsensor_db REPLICA3;-- 3副本,容忍2个节点故障七、性能数据
7.1 延迟对比
| 场景 | TDengine TMQ | Kafka |
|---|---|---|
| 实时推送 (Push) | < 10ms | 50-100ms |
| 批量拉取 (Pull) | 50-200ms | 50-200ms |
| 端到端延迟 | < 100ms | 200-500ms |
7.2 吞吐量对比
| 优化方法 | 吞吐量 |
|---|---|
| 单条消费 | 10,000 条/秒 |
| 批量消费 | 100,000 条/秒 |
| 多消费者并行 | 1,000,000+ 条/秒 |
| 查询订阅(服务端过滤) | 减少传输 90%+ |
7.3 资源占用
消费者内存占用: 10-50 MB 消费者 CPU 占用: < 5% WAL 索引开销: < 1% 原始数据大小八、总结
TDengine 数据订阅核心优势
- ✅基于 WAL 的高性能队列:顺序写入,零拷贝读取
- ✅Push-Pull 混合模式:实时推送 + 批量拉取,兼顾延迟和吞吐
- ✅查询订阅:服务端过滤,减少传输 90%+
- ✅自动 Rebalance:消费者增删自动负载均衡
- ✅至少一次语义:Offset 管理,保证数据不丢失
- ✅多消费组隔离:支持不同消费场景独立消费
最佳实践要点
推荐:
- ✅ 使用查询订阅减少网络传输
- ✅ 消费者数量 ≤ vnode 数量
- ✅ 批量消费提高吞吐量
- ✅ 异步提交 offset
- ✅ 合理设置 WAL 保留时间
- ✅ 监控消费滞后
避免:
- ❌ 频繁创建/销毁消费者
- ❌ 不提交 offset
- ❌ 单消费者订阅过多主题
- ❌ 消费循环中执行耗时操作
- ❌ WAL 保留时间过短
- ❌ 忽略 rebalance 状态
适用场景
- 实时数据同步:集群间数据复制
- 实时告警:异常数据实时推送
- 实时 ETL:数据清洗和转换
- 实时分析:流式计算输入源
- 数据分发:一份数据多个下游消费
TDengine 数据订阅通过创新的 Push-Pull 混合模式和基于 WAL 的队列设计,实现了毫秒级延迟 + 百万级吞吐的性能表现,同时提供了 Kafka 兼容的 API 和更强大的查询订阅功能,是物联网和时序数据场景的理想选择。
关于 TDengine
TDengine 专为物联网IoT平台、工业大数据平台设计。其中,TDengine TSDB 是一款高性能、分布式的时序数据库(Time Series Database),同时它还带有内建的缓存、流式计算、数据订阅等系统功能;TDengine IDMP 是一款AI原生工业数据管理平台,它通过树状层次结构建立数据目录,对数据进行标准化、情景化,并通过 AI 提供实时分析、可视化、事件管理与报警等功能。