文章目录
- 核心流程
- 详细代码分析
- 1. Socket 创建入口
- 2. IO 线程选择
- 3. IO 线程选择逻辑
- 4. Session 创建与绑定
- 5. 连接建立时的 IO 线程绑定
- 6. Session 与 IO 线程关联
- 完整绑定流程
- 技术要点
- 1. IO 线程选择策略
- 2. 绑定机制
- 3. 线程安全
- 总结
核心流程
用户创建 socket 到绑定 IO 线程的完整流程:
- 用户调用:
zmq_socket(ctx, type) - 内部创建:
ctx->create_socket(type) - IO 线程选择:
choose_io_thread(affinity) - Session 创建:与 IO 线程关联
- 绑定完成:socket 成功绑定到 IO 线程
详细代码分析
1. Socket 创建入口
用户代码:
void*socket=zmq_socket(ctx,ZMQ_REP);内部实现:
void*zmq_socket(void*ctx_,inttype_){zmq::ctx_t*ctx=static_cast<zmq::ctx_t*>(ctx_);zmq::socket_base_t*s=ctx->create_socket(type_);// ...returns;}2. IO 线程选择
ctx_t::create_socket:
zmq::socket_base_t*zmq::ctx_t::create_socket(inttype_){// ...if(unlikely(_starting)){if(!start())returnNULL;}// ...// 选择 IO 线程io_thread_t*io_thread=choose_io_thread(0);if(!io_thread){errno=EMTHREAD;returnNULL;}// ...socket_base_t*s=socket_base_t::create(type_,this,slot,sid);// ...}3. IO 线程选择逻辑
ctx_t::choose_io_thread:
zmq::io_thread_t*zmq::ctx_t::choose_io_thread(uint64_taffinity_){// 遍历 IO 线程,选择负载最小的intmin_load=-1;io_thread_t*selected_thread=NULL;for(io_threads_t::size_type i=0;i!=_io_threads.size();i++){if(!affinity_||(affinity_&(1ULL<<i))){intload=_io_threads[i]->get_load();if(min_load==-1||load<min_load){min_load=load;selected_thread=_io_threads[i];}}}returnselected_thread;}4. Session 创建与绑定
socket_base_t::create:
zmq::socket_base_t*zmq::socket_base_t::create(inttype_,ctx_t*parent_,uint32_ttid_,intsid_){// 创建具体类型的 socketsocket_base_t*s=NULL;switch(type_){caseZMQ_REP:s=new(std::nothrow)rep_t(parent_,tid_,sid_);break;// ... 其他类型}// ...returns;}5. 连接建立时的 IO 线程绑定
当 socket 绑定或连接时:
intzmq::socket_base_t::bind(constchar*endpoint_uri_){// ...if(protocol==protocol_name::tcp){// 选择 IO 线程io_thread_t*io_thread=choose_io_thread(options.affinity);if(!io_thread){errno=EMTHREAD;return-1;}// 创建 sessionsession_base_t*session=session_base_t::create(io_thread,true,this,options,paddr);// ...}// ...}6. Session 与 IO 线程关联
session_base_t 构造:
zmq::session_base_t::session_base_t(zmq::io_thread_t*io_thread_,boolactive_,zmq::socket_base_t*socket_,constoptions_t&options_,address_t*addr_):own_t(io_thread_,options_),io_object_t(io_thread_),_active(active_),_pipe(NULL),_zap_pipe(NULL),_incomplete_in(false),_pending(false),_engine(NULL),_socket(socket_),_io_thread(io_thread_),_has_linger_timer(false),_addr(addr_){// session 现在与 IO 线程关联}完整绑定流程
┌─────────────────────────────────────────────────────────────────────┐ │ Socket 绑定 IO 线程流程 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ 1. 用户创建 socket │ │ └─> zmq_socket(ctx, type) │ │ └─> ctx->create_socket(type) │ │ └─> choose_io_thread(affinity) │ │ └─> 选择负载最小的 IO 线程 │ │ └─> socket_base_t::create() │ │ └─> 创建具体类型的 socket │ │ │ │ 2. Socket 绑定或连接 │ │ └─> zmq_bind() / zmq_connect() │ │ └─> socket_base_t::bind() / connect() │ │ └─> choose_io_thread(options.affinity) │ │ └─> session_base_t::create() │ │ └─> 与 IO 线程关联 │ │ └─> attach_pipe() │ │ │ │ 3. 引擎创建与注册 │ │ └─> 创建 stream_engine │ │ └─> engine->plug(io_thread, session) │ │ └─> io_object_t::plug(io_thread) │ │ └─> add_fd(_s) 注册到 poller │ │ │ └─────────────────────────────────────────────────────────────────────┘技术要点
1. IO 线程选择策略
- 负载均衡:选择负载最小的 IO 线程
- 亲和性:支持通过
ZMQ_AFFINITY指定 IO 线程 - 默认策略:轮询或基于负载
2. 绑定机制
- Session 作为桥梁:连接 socket 和 IO 线程
- Engine 注册:网络引擎注册到 IO 线程的 poller
- Pipe 通信:socket 和 session 通过 pipe 通信
3. 线程安全
- 命令传递:通过 mailbox 传递命令
- 无锁设计:使用 ypipe 进行线程间通信
- 事件驱动:IO 线程通过事件循环处理
总结
Socket 绑定到 IO 线程的过程:
- 创建阶段:选择 IO 线程并创建 socket
- 连接阶段:创建 session 并与 IO 线程关联
- 运行阶段:引擎注册到 IO 线程的 poller
这种设计使得 ZeroMQ 能够:
- 高效处理:IO 线程专门处理网络 I/O
- 灵活扩展:通过增加 IO 线程数提高性能
- 负载均衡:自动分配任务到负载较轻的线程
这是 ZeroMQ 高性能网络通信的重要基础!