>
引言:

上一篇文章介绍了一个简单的基于 epoll 的反应堆的实现,详述了其工作过程。这一篇文章继续
探讨反应堆的设计。

首先,先来回顾一下 Reactor 的使用方式:
注册事件(为需要监听的 fd 加入回调函数)—–> 将事件加入反应堆 ——> 开始事件循环 ——> 事件发生,调用回调函数。

第一次加入的描述符可以为监听描述符,即由 socket() 函数创建,当这个描述符有事件发生,意味着有新的连接的到来,调用回调函数handler_accept() 其中这个函数里面涉及到调用 accept()系统调用和为这个新连接分配实例,然后设置这个连接的回调函数,即 handler_read() handler_write() 等,设置完后相应的连接描述符如果有事件发生,即可以调用相应的不同种事件的回调函数。这个是我们的总体思路,如果需要多进程方式,可以创建多进程,然后每个进程不同的反应堆,但是需要注意的是,如果父子进程共享监听描述符,会引起进程组的惊群现象,就是说,每一个进程都可能会尝试 accept() 这个新的连接,那么这种方式设计的时候需要为 accept() 加锁,具体方式可以看Nginx关于这方面的设计,不仅实现了 accept() 锁,还达到了进程间的负债均衡。
关于连接对象池,可以下次写一篇博文介绍一下对象池的设计方式。

言归正传,说说反应堆的设计方式,如果需要方便的话,可以使用libevent..memcached的网络模块就是基于这个,相信大家也知道。
从上上篇关于 epoll 的介绍中就可以看到,事件是整个系统设计的核心,我们的整个反应堆都是围绕一个叫做事件的东西来做相应的处理。
那么,作为应用层的事件,可以这么说来,就是那个地方发生了某件事情,而这个事情是在我们的规定范围的,然后,我们需要知道这个事件给予了我们什么样的权利,比如说,我们可以读,可以写,可以操作等等.
(下面不加说明,都是以 epoll 为例 )
操作系统为我们提供的接口也是类似的操作
创建一个 epoll —–> 注册相应的事件 epoll_ctl() —–> 进入事件循环监听事件(可能超时返回 )epoll_wait() —–>返回事件 (event_list[i].event && EPOLLIN)等
我们需要做的事情就是,为这个流程的事件做一个封装,并能够有效的管理整个事件,而不是离散的处理。

看看 event的封装:

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct mc_event_s {
struct mc_event_s *next;
struct mc_event_s *prev;
unsigned int min_heap_index;
int ev_fd; // file des of event
short revent; // event type
struct timeval ev_timeval; // event timeout time
mc_ev_callback callback; // callback of this event
void *args;
int ev_flags;
mc_event_base_t *base;
} mc_event_t;

两个指针分别指向事件的前部和后部,标准的双向队列方式,没什么可说的。
min_heap_index 是作为超时管理的最小堆的下标,目前还没有最这方面的设计,可以先忽略。
ev_fd 是作为这个事件的描述符本身, callback 是注册在事件上的回调函数, ev_flags 是事件的状态,由宏定义为:

1
2
3
4
#define MC_EV_INITD    0x0001
#define MC_EV_ADDED 0x0002
#define MC_EV_ACTIVE 0x0004
#define MC_EV_DELED 0x0008

状态分为 是否初始化,是否加入了队列,是否加入了激活的队列,已删除。
这里注意的是事件分为两个队列,一个是已加入的,另一个是激活的。我们在处理的时候会将accept()返回的时间加入到已加入的队列,当有事件发生,将这个事件加入到激活的事件队列中,然后依次轮训处理每一个激活的事件。
整个反应堆需要一个控制块,也就是反应堆的实例,结构是像这样:

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct mc_event_base_s {
void *added_list;
void *active_list;
unsigned int event_num;
unsigned int event_active_num;

// mc_minheap minheap;
int epoll_fd; //for epoll only
int ev_base_stop;
int magic;
struct timeval event_time;
} mc_event_base_t;

可以看到,反应堆中维护了两个队列, added_list 和 active_list 为的是能够有效控制所有的事件。
event_num是事件个数, event_active_num 是已激活的事件个数
epoll_fd 是由epoll_create()创建的句柄,这里没有加入宏定义来区分是否操作系统有 epoll
可以这样:

1
2
3
#if (HAVE_EPOLL)
    int   epoll_fd;
#endif

不同的IO多路复用方式不同,操作句柄也不一样。
ev_base_stop是用来判断是否停止的标志位, magic 被定义为一个宏:

#define MC_BASE_MAGIC 0x1989
用来判断整个反应堆是否初始化。

为事件的封装提供了几种操作,初始化,加入队列,删除事件,改变事件类型,循环监听事件。  
然后将struct mc_event_ops 中的函数指针与实际的操作分开,类似于HOOK 方式,这样做的目的是为不同的底层IO多路复用提供了统一的接口,如 select(),epoll(),kqueue()等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typedef  struct mc_event_ops {
void* (*init)(mc_event_base_t *);
int (*add)(void *, mc_event_t *);
int (*del)(void *, mc_event_t *);
int (*mod)(void *, mc_event_t *);
int (*dispatch)(void *, mc_event_base_t *,struct timeval);
} mc_event_opt;
/*
* Functions point of events option
* there points will point to a instance of function
* and other module call there function by ops instance
*/

extern mc_event_opt mc_event_op_val;

#define mc_event_ini mc_event_op_val.init
#define mc_event_add mc_event_op_val.add
#define mc_event_del mc_event_op_val.del
#define mc_event_mod mc_event_op_val.mod
#define mc_event_loop mc_event_op_val.dispatch

看看实际的操作事件方式,我们为这几个操作添加了epoll 的钩子,具体是下面这样:
对应mc_event_ops的第一个钩子

1
2
3
4
5
6
7
8
void *mc_epoll_init(mc_event_base_t *meb){
if(meb->magic != MC_BASE_MAGIC) {
fprintf(stderr,"In function mc_epoll_init %d, %s ",__LINE__,__FILE__);
return NULL;
}
meb->epoll_fd = epoll_create(MC_EVENT_MAX);
return meb;
}

方式很简单,调用 epoll_create() 创建一个 epoll 实例,并把这个实例的句柄赋给反应堆。
第二个加入,对应mc_event_ops的第二个钩子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int mc_epoll_add(void * arg,mc_event_t *ev) {
if(ev->base->magic != MC_BASE_MAGIC) {
fprintf(stderr,"In function mc_epoll_add %d, %s ",__LINE__,__FILE__);
return -1;
}
mc_event_base_t *base = ev->base;
int epoll_fd = base->epoll_fd;
int err;
struct epoll_event epoll_ev ;
epoll_ev.data.ptr = ev;
epoll_ev.events = EPOLLIN|EPOLLET;

if(!(ev->ev_flags & MC_EV_ADDED)) {
err = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, ev->ev_fd, &epoll_ev);
if(err != 0) {
perror("epoll_ctl");
fprintf(stderr, "In function mc_epoll_add the epoll_ctl error in file:%s,line:%d\n",__FILE__,__LINE__);
return -1;
}
ev->ev_flags |= MC_EV_ADDED;
}
return 0;
}

具体来说是调用了 epoll_ctl 并设置宏为 EPOLL_CTL_ADD然后设置事件类型
第三个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int mc_epoll_del(void * arg,mc_event_t *ev) {
if(ev->base->magic != MC_BASE_MAGIC) {
fprintf(stderr,"In function mc_epoll_add %d, %s ",__LINE__,__FILE__);
return -1;
}
mc_event_base_t *base = ev->base;
int epoll_fd = base->epoll_fd;
int err;

if(!(ev->ev_flags & MC_EV_INITD)) {
return -1;
}
err = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev->ev_fd, NULL);
if(err != 0) {
fprintf(stderr, "In function mc_epoll_del the epoll_ctl error in file:%s,line:%d\n",__FILE__,__LINE__);
return -1;
}
ev->ev_flags = 0x0000;
return 0;
}

第四个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int mc_epoll_mod(void * arg,mc_event_t *ev) {
if(ev->base->magic != MC_BASE_MAGIC) {
fprintf(stderr,"In function mc_epoll_mod %d, %s ",__LINE__,__FILE__);
return -1;
}
mc_event_base_t *base = ev->base;
int epoll_fd = base->epoll_fd;
int err;
unsigned int mode;
if(!(ev->ev_flags & MC_EV_INITD)) {
return -1;
}

struct epoll_event epoll_ev ;
epoll_ev.data.ptr = ev;
if(arg == NULL) {
return 0;
}
mode = *(unsigned int *)arg;
epoll_ev.events = mode;
err = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, ev->ev_fd, &epoll_ev);
if(err != 0) {
fprintf(stderr, "In function mc_epoll_del the epoll_ctl error in file:%s,line:%d\n",__FILE__,__LINE__);
return -1;
}
return 0;
}

第五个钩子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int  mc_epoll_loop(void * args, mc_event_base_t *base, struct timeval ev_time) {
if(base == NULL) {
fprintf(stderr,"base == NULL in mc_epoll_loop in file:%s,line:%d\n",__FILE__,__LINE__);
return -1;
}

if(base->magic != MC_BASE_MAGIC) {
fprintf(stderr,"In function mc_epoll_mod %d, %s ",__LINE__,__FILE__);
return -1;
}

int nfds;

/* we pass args as nevents in this function */
struct epoll_event *nevents = (struct epoll_event *)args;

struct epoll_event epoll_ev ;
nfds = epoll_wait( base->epoll_fd, nevents, MC_EVENT_MAX, 1);
if(nfds <= -1) {
fprintf(stderr,"epoll wait function in mc_epoll_loop in file:%s,line:%d\n",__FILE__,__LINE__);
return nfds;
}
return nfds;
}

实现方式具体可以看相应的代码。文章有点长了,很多代码就不贴出来了。
然后看看最后的一开始提到的几个操作:

注册事件(为需要监听的fd加入回调函数)—–>将事件加入反应堆——>开始事件循环——>事件发生,调用回调函数
+ View Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
mc_event_base_t * mc_base_new(void) {
mc_event_base_t * base = (mc_event_base_t *)malloc(sizeof(mc_event_base_t));
if(base == NULL) {
fprintf(stderr,"Init the base moudle in mc_base_new error in file:%s,line:%d\n",__FILE__,__LINE__);
return NULL;
}

// init the base lists
base->added_list = NULL;
base->active_list = NULL;
base->magic = MC_BASE_MAGIC;
base->event_num = 0;
base->event_active_num = 0;
base->ev_base_stop = MC_BASE_STOP;
base->magic = MC_BASE_MAGIC;
gettimeofday(&base->event_time,NULL);
mc_event_ini(base);
return base;
}

int mc_event_set(mc_event_t *ev, short revent, int fd, mc_ev_callback callback, void *args) {
if(ev == NULL) {
fprintf(stderr, " mc_event_set error, ev == NULL or other segment error in file:%s,line:%d\n",__FILE__,__LINE__);
return -1;
}
#if (HAVE_EPOLL)
unsigned int epoll_flag;
#endif
int err;
memset(ev,0,sizeof(mc_event_t));
ev->revent = revent;
ev->ev_fd = fd ;
ev->callback = callback;
ev->next = NULL;
ev->prev = NULL;
if(args == NULL)
ev->args = NULL ;
else
ev->args = args ;

/* This job post to mc_event_post
*if(revent & MC_EV_LISTEN)
*{
* err = mc_event_add(NULL, ev);
* if(err != 0)
* fprintf(stderr,"mc_event_add in mc_event_set \n");
*}
*/

/* event should post to base */
if(ev->base == NULL)
return 0;

#if (HAVE_EPOLL)
if(revent & MC_EV_READ) {
epoll_flag = EPOLLIN|EPOLLET;

err = mc_event_mod((void *)&epoll_flag, ev);
if(err != 0)
fprintf(stderr,"mc_event_mod (MC_EVENT_READ) in mc_event_set in file:%s,line:%d\n",__FILE__,__LINE__);
}

if(revent & MC_EV_WRITE) {
epoll_flag = EPOLLOUT|EPOLLET;
err = mc_event_mod((void *)&epoll_flag,ev);
if(err != 0)
fprintf(stderr,"mc_event_mod (MC_EVENT_WRITE) in mc_event_set in file:%s,line:%d\n",__FILE__,__LINE__);
}

ev->ev_flags |= MC_EV_INITD ;
#endif

return 0;
}

int mc_event_post(mc_event_t *ev, mc_event_base_t * base) {
if(ev == NULL || base == NULL) {
fprintf(stderr," In function mc_event_post, the args error, please check your arguments in file:%s,line:%d\n",__FILE__,__LINE__);
return -1;
}
if(base->magic != MC_BASE_MAGIC) {
fprintf(stderr,"The mc_event_base_t * points base non inited in mc_event_post in file:%s,line:%d\n",__FILE__,__LINE__);
return -1;
}
int err;
ev->base = base;
add_event_to_queue(ev,(mc_event_t **)&(base->added_list));
base->event_num++;

err = mc_event_add(NULL, ev);
if(err == -1) {
fprintf(stderr,"In function mc_event_add error in file:%s,line:%d\n",__FILE__,__LINE__);
return -1;
}
}

int mc_dispatch(mc_event_base_t * base) {
if(base == NULL) {
fprintf(stderr, "base == NULL in function mc_dispatch in file:%s,line:%d\n",__FILE__,__LINE__);
return -1;
}
if(base->magic != MC_BASE_MAGIC) {
fprintf(stderr,"In function mc_disptahch noinitlized line:%d, in file:%s ",__LINE__,__FILE__);
return -1;
}

struct epoll_event *nevents = (struct epoll_event *)malloc(sizeof(struct epoll_event));
int done = 0;

int nevent;
int i;
mc_event_t *levent;
mc_event_t *retevent;
while(!done) {
nevent = mc_event_loop(nevents, base, base->event_time);

if(nevent == -1) {
fprintf(stderr,"No event check, return in file:%s line:%d \n",__FILE__,__LINE__);
goto err1;
}
for(i = 0; i < nevent; i++) {
if(nevents[i].events & EPOLLERR || nevents[i].events & EPOLLHUP) {
levent = nevents[i].data.ptr;
if(!(levent->ev_flags & MC_EV_INITD))
continue;
if((levent->ev_flags & MC_EV_ACTIVE) || (levent->ev_flags & MC_EV_ADDED))
del_event_from_queue(levent);
}
if(nevents[i].events & EPOLLIN) {
levent = nevents[i].data.ptr;
levent->revent = MC_EV_READ ;
add_event_to_queue(levent, (mc_event_t **)&(base->active_list));
levent->ev_flags |= MC_EV_ACTIVE;
base->event_active_num++;
}
else if(nevents[i].events & EPOLLOUT) {
levent = nevents[i].data.ptr;
levent->revent = MC_EV_WRITE;
add_event_to_queue(levent, (mc_event_t **)&(base->active_list));
levent->ev_flags |= MC_EV_ACTIVE;
base->event_active_num++;
}
else {
fprintf(stderr,"Unknow err in file:%s,line:%d\n",__FILE__,__LINE__);
goto err1;
}
}

retevent = (mc_event_t *)(base->active_list);
for(i = 0;i < nevent; i++) {
fprintf(stderr," %d event(s)\n",nevent);
if(retevent == NULL)
break;

retevent = get_event_and_del((mc_event_t *)(base->active_list));
/* If we want to reuse this event we should set event again */
retevent->ev_flags = retevent->ev_flags&(~MC_EV_ACTIVE);
base->event_active_num--;
if(retevent == NULL)
fprintf(stderr,"event is NULL file:%s,line:%d\n",__FILE__,__LINE__);

retevent->callback(retevent->ev_fd,retevent->revent,retevent->args);
}

}
return 0;
err1:
return -1;
}

在 dispatch函数中,我们的每一次 epoll 返回后都会轮训 active 事件列表,然后调用事件相应的回调函数。

附上一开头所说的一些宏定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#define HAVE_EPOLL     1
#define MC_EV_READ 0x0001
#define MC_EV_WRITE 0x0002
#define MC_EV_SIGNAL 0x0004
#define MC_EV_TIMEOUT 0x0008
#define MC_EV_LISTEN 0x0010

/* the ev_flags value in mc_event_s */
#define MC_EV_INITD 0x0001
#define MC_EV_ADDED 0x0002
#define MC_EV_ACTIVE 0x0004
#define MC_EV_DELED 0x0008

#define MC_BASE_STOP 0x0000
#define MC_BASE_ACTIVE 0x0001

#define MC_BASE_MAGIC 0x1989
#define MC_EVENT_MAX 10240

总结:这篇文章不是那么的完善,涉及的内容太多,希望大家谅解,后续会有更新。
贴出了反应堆的设计方式和简单的思路,仅供参考。系列文章服务器端的模型就到这里。后续可能会写关于在这个模型上不同的知名的软件的设计方式,比如说 libevent的,apache ,Nginx 等。