文章目录
- 核心实现
- 1. Signaler 实现
- 2. Socket Poller 与 Signaler
- 3. 信号与 Poll 的配合
- 详细流程
- 1. 信号发送流程
- 2. 信号接收流程
- 技术要点
- 1. 跨平台实现
- 2. 线程安全
- 3. 高效处理
- 代码示例
- 总结
先看一段用户层代码,
// 创建线程安全的 socketvoid*socket=zmq_socket(ctx,ZMQ_PUB);// PUB 是线程安全的zmq_bind(socket,"tcp://*:5555");// 创建 pollerzmq_pollitem_t items[]={{socket,0,ZMQ_POLLOUT,0}// 监听可写事件};// 轮询while(1){intrc=zmq_poll(items,1,1000);if(rc==-1)break;if(items[0].revents&ZMQ_POLLOUT){// 可以发送消息zmq_send(socket,"Hello",5,0);}}此处的socket是封装后的socket,并非我们linux编程时的套接字,它怎么能被zmq_poll所管理的。我们印象中,poll管理的必须是文件描述符,自然的,socket内部借助了文件描述符的数据可读来实现通知用户消息可读,而这其中,linux的管道最适合,这个通知机制被zmq封装成了signaler。
核心实现
signaler用户通知事件发生,为何能让其被poll管理,unix平台上使用管道。
ZeroMQ 的信号(signaler)通过文件描述符与 poll 系统调用配合工作,具体实现如下:
1. Signaler 实现
signaler_t是一个跨平台的信号机制:
- Unix:使用 pipe 实现
- Windows:使用 event object 实现
关键方法:
get_fd():获取信号的文件描述符send():发送信号recv():接收信号
2. Socket Poller 与 Signaler
socket_poller_t在处理线程安全的 socket 时使用 signaler:
intzmq::socket_poller_t::add(socket_base_t*socket_,void*user_data_,shortevents_){// 处理线程安全 socketif(is_thread_safe(*socket_)){if(_signaler==NULL){_signaler=new(std::nothrow)signaler_t();// ...}socket_->add_signaler(_signaler);}// ...}3. 信号与 Poll 的配合
poll 处理信号的流程:
添加信号文件描述符:
if(_use_signaler){item_nbr=1;_pollfds[0].fd=_signaler->get_fd();_pollfds[0].events=POLLIN;}等待信号:
// 调用 poll()constintrc=poll(_pollfds,_pollset_size,timeout);处理信号:
// 检测 signaler 事件if(_use_signaler&&_pollfds[0].revents&POLLIN)_signaler->recv();// 接收信号检查 socket 事件:
// 检查所有 socket 的事件constintfound=check_events(events_,n_events_);
详细流程
1. 信号发送流程
┌─────────────────────────────────────────────────────────────────────┐ │ 信号发送流程 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ 1. 消息到达 socket │ │ └─> socket::read_activated() │ │ └─> 消息进入接收队列 │ │ └─> signaler->send() 发送信号 │ │ │ │ 2. Signaler 处理 │ │ └─> signaler::send() │ │ └─> Unix: write(pipe[1], ...) │ │ └─> Windows: SetEvent(...) │ │ │ └─────────────────────────────────────────────────────────────────────┘2. 信号接收流程
┌─────────────────────────────────────────────────────────────────────┐ │ 信号接收流程 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ 1. zmq_poll 等待 │ │ └─> socket_poller_t::wait() │ │ └─> poll(_pollfds, ...) │ │ └─> 检测到 signaler 的文件描述符可读 │ │ │ │ 2. 处理信号 │ │ └─> signaler->recv() │ │ └─> Unix: read(pipe[0], ...) │ │ └─> Windows: WaitForSingleObject(...) │ │ │ │ 3. 检查事件 │ │ └─> check_events() │ │ └─> 对每个 socket 调用 getsockopt(ZMQ_EVENTS) │ │ └─> 检查是否有可读事件 │ │ │ └─────────────────────────────────────────────────────────────────────┘技术要点
1. 跨平台实现
Signaler 在不同平台的实现:
| 平台 | 实现方式 | 发送操作 | 接收操作 |
|---|---|---|---|
| Unix | pipe | write(pipe[1], ...) | read(pipe[0], ...) |
| Windows | event | SetEvent(...) | WaitForSingleObject(...) |
2. 线程安全
信号机制确保线程安全:
- 线程安全的 socket 使用 signaler 进行通知
- 非线程安全的 socket 直接使用文件描述符
- 信号机制避免了线程竞争
3. 高效处理
信号处理的高效性:
- 信号只作为通知,不传递数据
- 避免了频繁的轮询
- 减少了系统调用的开销
代码示例
使用 signaler 的典型场景:
// 创建线程安全的 socketvoid*socket=zmq_socket(ctx,ZMQ_PUB);// PUB 是线程安全的zmq_bind(socket,"tcp://*:5555");// 创建 pollerzmq_pollitem_t items[]={{socket,0,ZMQ_POLLOUT,0}// 监听可写事件};// 轮询while(1){intrc=zmq_poll(items,1,1000);if(rc==-1)break;if(items[0].revents&ZMQ_POLLOUT){// 可以发送消息zmq_send(socket,"Hello",5,0);}}总结
ZeroMQ 信号与 Poll 的配合机制:
信号发送:
- 当 socket 状态变化时(如收到消息),通过 signaler 发送信号
信号检测:
- zmq_poll 将 signaler 的文件描述符加入 poll 集合
- 使用系统的 poll/select 等调用等待事件
信号处理:
- 检测到 signaler 事件后,调用 signaler->recv() 清除信号
- 然后检查所有 socket 的事件状态
事件通知:
- 将有事件的 socket 通知给用户
这种设计使得 ZeroMQ 能够高效、线程安全地处理多个 socket 的事件,是其高性能的重要基础。