news 2026/4/26 0:59:08

zmq源码分析之IO线程绑定时机

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
zmq源码分析之IO线程绑定时机

文章目录

    • 核心流程
    • 详细代码分析
      • 1. Socket 创建入口
      • 2. IO 线程选择
      • 3. IO 线程选择逻辑
      • 4. Session 创建与绑定
      • 5. 连接建立时的 IO 线程绑定
      • 6. Session 与 IO 线程关联
    • 完整绑定流程
    • 技术要点
      • 1. IO 线程选择策略
      • 2. 绑定机制
      • 3. 线程安全
    • 总结

核心流程

用户创建 socket 到绑定 IO 线程的完整流程

  1. 用户调用zmq_socket(ctx, type)
  2. 内部创建ctx->create_socket(type)
  3. IO 线程选择choose_io_thread(affinity)
  4. Session 创建:与 IO 线程关联
  5. 绑定完成:socket 成功绑定到 IO 线程

详细代码分析

1. Socket 创建入口

用户代码

void*socket=zmq_socket(ctx,ZMQ_REP);

内部实现

void*zmq_socket(void*ctx_,inttype_){zmq::ctx_t*ctx=static_cast<zmq::ctx_t*>(ctx_);zmq::socket_base_t*s=ctx->create_socket(type_);// ...returns;}

2. IO 线程选择

ctx_t::create_socket

zmq::socket_base_t*zmq::ctx_t::create_socket(inttype_){// ...if(unlikely(_starting)){if(!start())returnNULL;}// ...// 选择 IO 线程io_thread_t*io_thread=choose_io_thread(0);if(!io_thread){errno=EMTHREAD;returnNULL;}// ...socket_base_t*s=socket_base_t::create(type_,this,slot,sid);// ...}

3. IO 线程选择逻辑

ctx_t::choose_io_thread

zmq::io_thread_t*zmq::ctx_t::choose_io_thread(uint64_taffinity_){// 遍历 IO 线程,选择负载最小的intmin_load=-1;io_thread_t*selected_thread=NULL;for(io_threads_t::size_type i=0;i!=_io_threads.size();i++){if(!affinity_||(affinity_&(1ULL<<i))){intload=_io_threads[i]->get_load();if(min_load==-1||load<min_load){min_load=load;selected_thread=_io_threads[i];}}}returnselected_thread;}

4. Session 创建与绑定

socket_base_t::create

zmq::socket_base_t*zmq::socket_base_t::create(inttype_,ctx_t*parent_,uint32_ttid_,intsid_){// 创建具体类型的 socketsocket_base_t*s=NULL;switch(type_){caseZMQ_REP:s=new(std::nothrow)rep_t(parent_,tid_,sid_);break;// ... 其他类型}// ...returns;}

5. 连接建立时的 IO 线程绑定

当 socket 绑定或连接时

intzmq::socket_base_t::bind(constchar*endpoint_uri_){// ...if(protocol==protocol_name::tcp){// 选择 IO 线程io_thread_t*io_thread=choose_io_thread(options.affinity);if(!io_thread){errno=EMTHREAD;return-1;}// 创建 sessionsession_base_t*session=session_base_t::create(io_thread,true,this,options,paddr);// ...}// ...}

6. Session 与 IO 线程关联

session_base_t 构造

zmq::session_base_t::session_base_t(zmq::io_thread_t*io_thread_,boolactive_,zmq::socket_base_t*socket_,constoptions_t&options_,address_t*addr_):own_t(io_thread_,options_),io_object_t(io_thread_),_active(active_),_pipe(NULL),_zap_pipe(NULL),_incomplete_in(false),_pending(false),_engine(NULL),_socket(socket_),_io_thread(io_thread_),_has_linger_timer(false),_addr(addr_){// session 现在与 IO 线程关联}

完整绑定流程

┌─────────────────────────────────────────────────────────────────────┐ │ Socket 绑定 IO 线程流程 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ 1. 用户创建 socket │ │ └─> zmq_socket(ctx, type) │ │ └─> ctx->create_socket(type) │ │ └─> choose_io_thread(affinity) │ │ └─> 选择负载最小的 IO 线程 │ │ └─> socket_base_t::create() │ │ └─> 创建具体类型的 socket │ │ │ │ 2. Socket 绑定或连接 │ │ └─> zmq_bind() / zmq_connect() │ │ └─> socket_base_t::bind() / connect() │ │ └─> choose_io_thread(options.affinity) │ │ └─> session_base_t::create() │ │ └─> 与 IO 线程关联 │ │ └─> attach_pipe() │ │ │ │ 3. 引擎创建与注册 │ │ └─> 创建 stream_engine │ │ └─> engine->plug(io_thread, session) │ │ └─> io_object_t::plug(io_thread) │ │ └─> add_fd(_s) 注册到 poller │ │ │ └─────────────────────────────────────────────────────────────────────┘

技术要点

1. IO 线程选择策略

  • 负载均衡:选择负载最小的 IO 线程
  • 亲和性:支持通过ZMQ_AFFINITY指定 IO 线程
  • 默认策略:轮询或基于负载

2. 绑定机制

  • Session 作为桥梁:连接 socket 和 IO 线程
  • Engine 注册:网络引擎注册到 IO 线程的 poller
  • Pipe 通信:socket 和 session 通过 pipe 通信

3. 线程安全

  • 命令传递:通过 mailbox 传递命令
  • 无锁设计:使用 ypipe 进行线程间通信
  • 事件驱动:IO 线程通过事件循环处理

总结

Socket 绑定到 IO 线程的过程

  1. 创建阶段:选择 IO 线程并创建 socket
  2. 连接阶段:创建 session 并与 IO 线程关联
  3. 运行阶段:引擎注册到 IO 线程的 poller

这种设计使得 ZeroMQ 能够:

  • 高效处理:IO 线程专门处理网络 I/O
  • 灵活扩展:通过增加 IO 线程数提高性能
  • 负载均衡:自动分配任务到负载较轻的线程

这是 ZeroMQ 高性能网络通信的重要基础!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/26 0:59:07

zmq源码分析之poller和signaler如何建立联动实现用户层通知

文章目录核心实现1. Signaler 实现2. Socket Poller 与 Signaler3. 信号与 Poll 的配合详细流程1. 信号发送流程2. 信号接收流程技术要点1. 跨平台实现2. 线程安全3. 高效处理代码示例总结先看一段用户层代码&#xff0c; // 创建线程安全的 socket void *socket zmq_socket(…

作者头像 李华
网站建设 2026/4/26 0:56:48

如何利用curatedMetagenomicData快速获取标准化人类微生物组数据

如何利用curatedMetagenomicData快速获取标准化人类微生物组数据 【免费下载链接】curatedMetagenomicData Curated Metagenomic Data of the Human Microbiome 项目地址: https://gitcode.com/gh_mirrors/cu/curatedMetagenomicData curatedMetagenomicData 是一个革命…

作者头像 李华
网站建设 2026/4/26 0:45:22

CSS 网格元素

CSS 网格元素 引言 随着互联网技术的发展,网页设计越来越注重用户体验和视觉效果。CSS网格布局(Grid Layout)作为一种先进的布局技术,能够帮助开发者更高效地创建复杂的网页布局。本文将详细介绍CSS网格元素的基本概念、使用方法以及在实际应用中的优势。 一、CSS网格元…

作者头像 李华
网站建设 2026/4/26 0:26:31

TestDisk PhotoRec数据恢复完整指南:5步高效找回丢失分区与文件

TestDisk & PhotoRec数据恢复完整指南&#xff1a;5步高效找回丢失分区与文件 【免费下载链接】testdisk TestDisk & PhotoRec 项目地址: https://gitcode.com/gh_mirrors/te/testdisk 数据丢失是每个计算机用户都可能面临的噩梦场景。当重要文件被误删、分区意…

作者头像 李华
网站建设 2026/4/26 0:25:30

一体化项目管理工具有哪些?6款热门方案对比与分析

本文将深入对比6款企业项目管理平台与方案&#xff1a;PingCode、Worktile、Jira Confluence、ClickUp、monday.com、Asana。一、企业在一体化平台与多工具拼接之间&#xff0c;真正要比较的是什么很多团队在选型时&#xff0c;容易把问题理解成“谁功能更多”。但从企业视角看…

作者头像 李华