>

引言:前面一章简单介绍了关于epoll 的使用方式,这一章介绍一下一个简单的反应堆模型,没有实现超时机制的管理。最主要的是要介绍一下关于异步事件反应堆的设计方式。

反应堆的模型图在上一张可以看到,但是那张图是盗来的,展示的是 twisted 的反应堆。今天给不熟悉这个部分的朋友介绍一下基于 epoll 的反应堆,过程类似于 libevent。

反应堆可以提供几个操作:

(0)创建一个反应堆:

1
mc_event_base_t * mc_base_new(void) ;

返回一个操作句柄.  

(1)为某一个需要监听的文件描述符加入回调函数,并注册事件类型。

1
2
3
4
5
6
int mc_event_set( mc_event_t *ev , short revent , int fd , mc_ev_callback callback , void *args )  ;
/*
* Initialize a event , add callback and event type
* if the event exists , this function will change the mode of this event
* and fd
*/

这里的 revent 由宏定义为几种类型:

1
2
3
4
5
#define MC_EV_READ     0x0001
#define MC_EV_WRITE 0x0002
#define MC_EV_SIGNAL 0x0004
#define MC_EV_TIMEOUT 0x0008
#define MC_EV_LISTEN 0x0010

相应的操作可以使用 | 运算来并几个需要监听的事件类型。
事件类型定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct mc_event_s {
struct mc_event_s;
struct mc_event_s;
unsigned int;
int ev_fd; // file des of event
short revent; // event type
struct timeval; // event timeout time
mc_ev_callback callback; // callback of this event
void *;
int ev_flags;
mc_event_base_t *base;
} mc_event_t ;

事件结构本身后面解释。 

(2)将需要监听的并且已经初始化的事件加入反应堆。

1
2
3
4
5
6
int mc_event_post( mc_event_t *ev , mc_event_base_t * base ) ;
/*
* Post this event to event_base
* struct base has two queue , active queue and added queue
* this function will post event to added queue , but not in active queue
*/

将刚才注册了事件类型和回调函数的事件加入 base, 即将其看做一个反应堆。

(3)最后提供了一个 dispatch 函数,反应堆开始循环,等待事件的发生。如果对应的 fd 上的事件发生,调用相应的回调函数。由第一步注册。

1
2
3
4
5
6
int mc_dispatch( mc_event_base_t * base ) ;
/*
* start loop
* and dispatch event by
* mc_event_loop
*/

反应堆支持在循环过程中,通过相应的回调函数再注册事件,类似于热加入,热移除。
实现方式很简单,就是在第一个事件的回调函数上调用 mc_event_set()然后注册。再加入 base.

base 的结构如下 :

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct mc_event_base_s{
void *added_list;
void *active_list;
unsigned int event_num;
unsigned int event_active_num;

// mc_minheap minheap;

int epoll_fd; //for epoll only
int ev_base_stop;
int magic;
struct timeval event_time;
} mc_event_base_t;

让我们来看一个简单的 demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//_____________________test bellow ______________________
#define mc_sock_fd int
#define DEFAULT_NET AF_INET
#define DEFAULT_DATA_GRAM SOCK_STREAM
#define DEFAULT_PORT (1115)
#define DEFAULT_BACKLOG (200)

/* simple connection */
struct _connection {
int fd ;
mc_event_t read ;
mc_event_t write ;
char buf[1024] ;
mc_event_base_t * base ;
};

void setreuseaddr( mc_sock_fd fd ) {
int yes = 1 ;
setsockopt( fd , SOL_SOCKET , SO_REUSEADDR , &yes , sizeof(int) );
}

int mc_socket() {
int retsock = socket(DEFAULT_NET,DEFAULT_DATA_GRAM,0) ;
if( retsock < 0 ) {
/* we should add some debug information here
fprintf(LOGPATH,"socket error\n");
*/
return -1 ;
}
return retsock ;
}

int mc_bind(mc_sock_fd listenfd ) {
struct sockaddr_in serveraddr ;
bzero(&serveraddr,sizeof(serveraddr));

serveraddr.sin_family = AF_INET ;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(DEFAULT_PORT);
return bind(listenfd,(struct sockaddr *)&serveraddr , sizeof(serveraddr ));
}

int mc_isten(mc_sock_fd listenfd) {
return listen(listenfd,DEFAULT_BACKLOG);
}

void handler_accept( int fd , short revent , void *args ) {
struct sockaddr_in in_addr ;
size_t in_len ;
int s ;
int done = 0 ;
struct _connection * lc = (struct _connection *)args ;

in_len = sizeof( in_addr );
mc_setnonblocking(fd) ;
while( !done ) {
s = accept( fd , (struct sockaddr *)&in_addr , &in_len );
if( s == -1 ) {
if( (errno == EAGAIN )|| (errno == EWOULDBLOCK ) ) {
break;
}
else {
perror("accept");
break;
}
}
if( s == 0 ) {
fprintf(stderr,"Accept a connection on %d \n",fd );
}
done = 1 ;
}
mc_setnonblocking(s) ;
lc->fd = s ;
mc_event_set( &(lc->read) , MC_EV_READ , lc->fd , handler_read , lc );


mc_event_set( &(lc->write) , MC_EV_WRITE , lc->fd , handler_write , lc );
mc_event_post( &(lc->write) , lc->base );
}

void handler_read( int fd , short revent , void *args ) {
mc_setnonblocking(fd) ;
struct _connection * lc ;
lc = (struct _connection *)args ;
read( fd , lc->buf , 1024 );
mc_event_set( &(lc->write) , MC_EV_WRITE , lc->fd , handler_write , lc );
}

void handler_write( int fd , short revent , void *args ) {
mc_setnonblocking(fd) ;
struct _connection * lc ;
lc = (struct _connection *)args ;
write( fd , lc->buf , 1024 );
mc_event_set( &(lc->read) , MC_EV_READ , lc->fd , handler_read , lc );
}

void cab( int fd , short revent , void *args ) {
mc_setnonblocking(fd) ;
char buf[1024] = "xx00xx00xx00xx00\n";
write(fd,buf,1024);
}

int main() {
mc_event_t mev ;
mc_event_base_t *base = mc_base_new() ;
struct _connection lc ;
lc.base = base ;

int sockfd = mc_socket() ;
mc_bind(sockfd);
mc_isten(sockfd);

mc_event_set( &(lc.read) , MC_EV_READ , sockfd , handler_accept , &lc );
mc_event_post( &(lc.read) , base );
mc_dispatch(base);
return 0;
}

首先:封装的几个套接口操作没有考虑错误处理,作为简单的实例。

定义了一个 connection 结构,用于表示每一个到来的连接,这里的 struct _connection 中包含读写事件和一个缓冲区,还有指向反应堆的指针和对应注册的fd

工作过程如下:(主要看 main 函数)

(1)创建一个反应堆。
(2)实例化一个 connection
(3)创建套接口,bind,listen 老生常谈,这里就不多说了
(4)将这个监听套接口注册相应的回调函数,这里我们注册的是 handler_accept() 函数,回调函数类型都是 void *XXX( int , short , void *) ;

当监听套接口发生可读事件时,第一次我们认为是相应的监听套接口得到了新的连接,所以,第一次调用的时候直接调用注册了的回调函数 handler_accept().

在handler_accept() 函数中,我们为这个连接的读写事件添加了相应的回调函数,并把连接描述符(不是监听描述符)注册到这个上。下次这个套接口可读的时候调用handler_read(),可写的时候调用handler_write(). 如果需要改变状态或改变回调函数,只需要一个状态机或者别的方式来确定需要的回调函数是哪一个,在我们的handler_write() 和 handler_read()中可以改变回调函数,代码所示。

需要注意的是,我们的事件是一个实例,不管是在connection结构中或是自己定义,都需要不断的向操作系统申请空间,如果采用对象池或者connection池的方式,可以减少服务器的负载。

总结:反应堆模式最基本的操作就是:注册事件(为需要监听的fd加入回调函数)—–>将事件加入反应堆——>开始事件循环——>事件发生,调用回调函数。

异步操作的精髓就是在这里,而不是同步的等待每一个事件。下一章讲解这个反应堆的实现。