一、内容简介
使用裸epoll,有事件状态机的复杂切换逻辑难以维护、业务逻辑和网络io重度耦合难以迭代的问题。
在服务器开发时使用Reactor封装好epoll,可在简化事件管理的同时将将业务逻辑与网络io层解耦。本文将以实现http服务器为载体,介绍reactor。
二、Reactor是什么?有什么用?
Reactor是什么?一种处理高并发网络请求的设计模式,采用了同步非阻塞的方式处理io。本质上,Reactor将对io的处理转化为对就绪事件的处理。
Reactor模式将事件监听分发与事件处理进行解耦
回忆一下:一个典型的服务端网络io的流程如下:
1.socket
2.bind
3.listen
4.accept
5.read、recv
6.write、send
7.循环处理5、6,直到read、recv返回0
8.close
服务器无法事先知道客户端什么时候与来建立连接,与其阻塞主线程等待连接,Reactor模式的解决办法是:
- 1.注册事件:通过epoll_ctl()先注册到io多路复用当中
- 2.事件监听与分发:通过epoll_wait()当客户端实际上连接进来的时候抛出I/O事件
- 3.事件处理:执行回调函数(callback),处理io事件,不同的io事件与回调函数对应如下
I/O —> event —> callback
listenfd —> EPOLLIN —> accept_cb
clientfd —> EPOLLIN —> recv_cb
clientfd —> EPOLLOUT —> send_cb
2.1 连接conn的底层数据结构
conn这个结构体是我们所需要管理的来自客户端的连接,当我们建立起一个连接的时候,应当知道它的文件描述符fd、所使用的回调函数、以及用于接受和发送数据的缓冲器buffer。并且通过state的置位与否来确定buffer是否读取完毕
typedefint(*CALLBACK)(intfd);structconn{intfd;charrbuffer[BUFFER_LENGTH];intrlength;charwbuffer[BUFFER_LENGTH];intwlength;CALLBACK send_callback;union{CALLBACK recv_callback;CALLBACK accept_callback;}r_action;intstatus;};2.2 Reactor是如何封装epoll的?
在set_event中封装epoll_ctl函数,实现对epoll事件的添加和修改。event_register调用set_event来注册事件。
intset_event(intfd,intevent,intflag){if(flag){structepoll_eventev;ev.events=event;ev.data.fd=fd;epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);}else{structepoll_eventev;ev.events=event;ev.data.fd=fd;epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);}}intevent_register(intfd,intevent){conn_list[fd].fd=fd;conn_list[fd].r_action.recv_callback=recv_cb;conn_list[fd].send_callback=send_cb;memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);conn_list[fd].rlength=0;memset(conn_list[fd].wbuffer,0,BUFFER_LENGTH);conn_list[fd].wlength=0;set_event(fd,event,1);}2.3 回调函数
回调函数是Reactor实现的核心部分,是实现业务逻辑和网络io解耦和简单化事件管理的关键。
accept_cb调用accept,为接收到的事件分配一个客户端文件描述符,并且注册EPOLLIN事件。
recv_cb调用recv将将数据从客户端接收到buffer中,然后调用业务层逻辑(http_request)。完成后,调用set_event将事件event改为EPOLLOUT
send_cb调用send,由于tcp分包特性,数据不完整时使用status来确认是否继续从缓冲区读取。调用http_response进业务层逻辑。完成后,调用set_event将event改为EPOLLIN
intaccept_cb(intfd){structsockaddr_inclientaddr;socklen_tlen=sizeof(clientaddr);intclientfd=accept(fd,(structsockaddr*)&clientaddr,&len);printf("accept finshed: %d\n",clientfd);event_register(clientfd,EPOLLIN);return0;}intrecv_cb(intfd){intcount=recv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);if(count==0){// disconnect,recv返回0是断开printf("client disconnect: %d\n",fd);close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);//待完善return0;}elseif(count<0){printf("count: %d, errno: %d, %s\n",count,errno,strerror(errno));close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);return0;}conn_list[fd].rlength=count;set_event(fd,EPOLLOUT,0);http_request(&conn_list[fd]);//业务逻辑}intsend_cb(intfd){http_response(&conn_list[fd]);intcount=0;set_event(fd,EPOLLOUT,0);}elseif(conn_list[fd].status==2){set_event(fd,EPOLLOUT,0);}elseif(conn_list[fd].status==0){if(conn_list[fd].wlength!=0){count=send(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);}set_event(fd,EPOLLIN,0);}//set_event(fd, EPOLLOUT, 0);returncount;}2.4 主循环(mainloop)
当io事件批量发生,epoll_wait将返回对应的就绪事件集。然后调用对应的回调函数处理事件。
intmain(){unsignedshortport=2000;intsockfd=init_server(port);epfd=epoll_create(1);conn_list[sockfd].fd=sockfd;conn_list[sockfd].r_action.recv_callback=accept_cb;set_event(sockfd,EPOLLIN,1);while(1){structepoll_eventevents[1024]={0};intnready=epoll_wait(epfd,events,1024,-1);inti=0;for(i=0;i<nready;i++){intconnfd=events[i].data.fd;if(events[i].events&EPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].events&EPOLLOUT){conn_list[connfd].send_callback(connfd);}}}}业务层逻辑:http请求和回应
通过reactor封装后,业务逻辑不需要关心网络io层是如何使用epoll来管理事件、收发数据的。
以接收数据为例,struct conn当中的rbuffer,是一个“取餐窗口”,我们的reactor使用recv_cb调用recv将来自客户端的数据发送到rbuffer当中,而我们在业务层视角里,只需要知道rbuffer当中有我们需要的数据即可。
inthttp_request(structconn*c){printf("request: %s\n",c->rbuffer);memset(c->wbuffer,0,BUFFER_LENGTH);c->wlength=0;c->status=0;}inthttp_response(structconn*c){intfilefd=open("c1000k.png",O_RDONLY);structstatstat_buf;fstat(filefd,&stat_buf);if(c->status==0){c->wlength=sprintf(c->wbuffer,"HTTP/1.1 200 OK\r\n""Content-Type: image/png\r\n""Accept-Ranges: bytes\r\n""Content-Length: %ld\r\n""Date: Tue, 30 Apr 2024 13:16:46 GMT\r\n\r\n",stat_buf.st_size);}elseif(c->status==1){intret=sendfile(c->fd,filefd,NULL,stat_buf.st_size);if(ret==-1){printf("error: %d\n",errno);}c->status=2;}elseif(c->status==2){c->wlength=0;memset(c->wbuffer,0,BUFFER_LENGTH);}close(filefd);returnc->wlength;}