0%

redis-网络模型

当我们进行网络IO时,Linux给我提供的常用的有seelct 和epoll两个系统级函数,来进行IO操作。现在大部分高并发的网络模型都是基于epoll,比如nginx,redis等。先学习下Linux的网络模型
Linux网络I/O模型简介
Linux的内核将所有外部设备都看做一个文件来操作,对一个文件的读写操作会调用内核提供的系统命令,返回一个file descriptor (fd,文件描述符)。而对一个socket的该写也会有相应的描述符,称为socketfd (socket描述符),描述符就是一个数字,它指向内核中的一个结构体(文件路径,数据区等一些属性)。
根据UNIX网络编程对I/O模型的分类, UNIX提供了5种I/O模型,分别如下。
(1)阻塞I/O模型:

​ 最常用的I/O模型就是阻塞I/O模型,缺省情形下,所有文件操作都是阻塞的。我们以套接字接口为例来讲解此模型:在进程空间中调用recvfrom,其系统调用直到数据包到达且被复制到应用进程的缓冲区中或者发生错误时才返回,在此期间一直会等待,进程在从调用recvfrom开始到它返回的整段时间内都是被阻塞的,因此被秘为阻塞I/O模型,如图1-1所示。

阻塞IO当使用accept会一直阻塞住,等待IO,然后read读取数据等待。当对已建立的链接进行读取的时候,应用进程被阻塞,直到数据从内核缓冲区复制到应用进程缓冲区中才返回。应该注意到,在阻塞的过程中,其它应用进程还可以执行,因此阻塞不意味着整个操作系统都被阻塞。因为其它应用进程还可以执行,所以不消耗 CPU 时间,这种模型的 CPU 利用率会比较高。

(2)非阻塞I/O模型:

recvfrom从应用层到内核的时候,如果该缓冲区没有数据的话,就直接返回一个EWOULDBLOCK错误,一般都对非阻塞I/O模型进行轮询检查这个状态,看内核是不是有数据到来。

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。应用进程可以继续执行,但是需要不断的执行系统调用来获知 I/O 是否完成,这种方式称为轮询(polling)。linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:如图1-2所示。

img

(3) I/O复用模型(多路复用):

Linux提供select/poll,进程通过将一个或多个fd传递给select或poll系统调用,阻塞在select操作上,这样select/poll可以帮我们侦测多个fd是否处于就绪状态。select/poll是顺序扫描fd是否就绪,而且支持的fd数量有限,因此它的使用受到了一些制约。Linux还提供了一个epoll系统调用, epoll使用基于事件驱动方式代替顺序扫描,因此性能更高。当有fd就绪时,立即回调函数rollback,如图1-3所示。

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。应用进程可以继续执行,但是需要不断的执行系统调用来获知 I/O 是否完成,这种方式称为轮询(polling)。linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:如图1-2所示。

select和epoll都提供IO多路复用支持的,可以在一个进程内支持很多的网络链接。并且他们在读取数据的时候都是阻塞的模式【因为他们都需要在读写事件就绪后自己负责进行读写(把数据从内核态拷贝到当前用户进程态),也就是说这个读写过程是阻塞的】

(4)信号驱动I/O模型:

首先开启套接口信号驱动I/O功能,并通过系统调用sigaction执行一个信号处理函数(此系统调用立即返回,进程继续工作,它是非阻塞的)。当数据准备就绪时,就为该进程生成一个SIGIO信号,通过信号回调通知应用程序调用recvfrom来读取数据,并通知主循环函数处理数据,如图1-4所示。

img

(5)异步I/O:

告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核复制到用户自己的缓冲区,其它IO模型都的调用系统函数把数据从内核态拷贝到当前用户进程态,所以其他IO模型可以说是非阻塞同步IO或是阻塞同步IO。)通知我们。这种模型与信号驱动模型的主要区别是:信号驱动I/O由内核通知我们何时可以开始一个I/O操作;异步I/O模型由内核通知我们I/O操作何时已经完成,

如图1-5所示。

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

如果想要了解更多的UNIX系统网络编程知识,可以阅读《UNIX网络编程》,里面有非常详细的原理和API介绍。

img

IO多路复用:

就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

img

IO多路复用解决的问题:

非阻塞IO时的用户进程的一直轮询,而是告诉内核我当前所关注的IO事件是哪些,当内核检测到进程所关注的IO有事件的时候,内核会返回就绪的事件IO,这个时候还是需要阻塞的同步去把数据从内核态拷贝用户进程态(拷贝数据是阻塞,而异步IO是内核把数据拷贝好了后通知进程)。select 和epoll都支持IO多路复用,并且阻塞IO,非阻塞IO都属于同步IO。

select :

调用后select函数会阻塞(通过 socket -set设置成非阻塞模式就会一直轮询CPU,linux下使用fcntl函数设置),直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但 是这样也会造成效率的降低。

epoll:

epoll没有IO的限制,而且底层使用红黑树结构,对于事件的添加和删除更高效。是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

epoll相对于select的优势:

1:支持一个进程打开的socket描述符(FD)不受限制(仅受限于操作系统的最大文到

件句柄数)。select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD-SETSIZE设

的置,默认值是1024,对于那些需要支持上万个TCP连接的大型服务器来说显然太少了。

可以选择修改这个宏然后重新编译内核,不过这会带来网络效率的下降。我们也可以通过选择多进程的方案(传统的Apache方案)解决这个问题,不过虽然在Linux上创建进程的代价比较小,但仍旧是不可忽视的。另外,进程间的数据交换非常麻烦,对于Java来说,由于没有共享内存,需要通过Socket通信或者其他方式进行数据同步,这带来了额外的性能损耗,增加了程序复杂度,所以也不是一种完美的解决方案。值得庆幸的是, epoll并没有这个限制,它所支持的FD上限是操作系统的最大文件句柄数,这个数字远远大于1024,例如,在1GB内存的机器上大约是10万个句柄左右,具体的值可以通过cat/proc/sys/fs/file-max察看,通常情况下这个值跟系统的内存关系比较大。

2.:IO效率不会随着FD数目的增加而线性下降。

传统select/poll的另一个致命弱点,就是当你拥有一个很大的socket集合时,由于网络延时或者链路空闲,任一时刻只有少部分的socket是“活跃”的,但是selectpoll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。epoll不存在这个问题,它只会对“活跃”的socket进行操作-这是因为在内核实现中, epoll是根据每个fd上面的callback函数实现的。那么,只有“活跃”的socket才会去主动调用callback函数,其他idle状态的socket则不会。在这点上, epoll实现了一个伪AIO。针对epoll和select性能对比的benchmark测试表明:如果所有的socket都处于活跃态-例如一个高速LAN环境, epol并不比select/poll效率高太多;相反,如果过多使用epollctl,效率相比还有稍微地降低但是一旦使用idle connections模拟WAN环境, epoll的效率就远在selectpoll之上了。

3: 使用mmap加速内核与用户空间的消息传递。

无论是select, poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存复制就显得非常重要, epoll是通过内核和用户空间mmap同一块内存来实现的。

4: epoll的API更加简单

包括创建一个epoll描述符、添加监听事件、阻塞等待所监听的事件发生、关闭epoll描述符等

epoll操作过程需要三个接口,分别如下:

int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

\1. int epoll_create(int size);

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。

当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

\2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

函数是对指定描述符fd执行op操作。

- epfd:是epoll_create()的返回值。

- op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。

- fd:是需要监听的fd(文件描述符)

- epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
//感兴趣的事件和被触发的事件
//events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

\3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待epfd上的io事件,最多返回maxevents个事件。

参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

redis在网络模型结构

事件处理主要基于eventLoop结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/* State of an event based program 
*
* 事件处理器的状态
*/
typedef struct aeEventLoop {

// 目前已注册的最大描述符
int maxfd; /* highest file descriptor currently registered */

// 目前已追踪的最大描述符
int setsize; /* max number of file descriptors tracked */

// 用于生成时间事件 id
long long timeEventNextId;

// 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */

// 已注册的文件事件
aeFileEvent *events; /* Registered events */

// 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */

// 时间事件
aeTimeEvent *timeEventHead;

// 事件处理器的开关
int stop;

// 多路复用库的私有数据
void *apidata; /* This is used for polling API specific data */

// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;

} aeEventLoop;

img

1:对 epoll的封装,用来操作加入监听事件,删除事件和获取就绪事件 ae_apoll.c其中也有 ae_kqueue.c

定义这些主要是对外有统一的接口和访问方式,来屏蔽不同操作系统间的差异。并且对外提供一致的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/* Linux epoll(2) based ae.c module
*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/


#include <sys/epoll.h>
#include <stdlib.h>
#include <stdio.h>
/*
* 事件状态
*/
typedef struct aeApiState {

// epoll_creat创建返回的ID
int epfd;

// 事件槽,用来保存 epoll_wait返回的事件数组
struct epoll_event *events;

} aeApiState;

/*
* 创建一个新的 epoll 实例,并将它赋值给 eventLoop
*/
static int aeApiCreate(aeEventLoop *eventLoop) {
//申请内存,并且创建 aeApiState变量
aeApiState *state = zmalloc(sizeof(aeApiState));
//类似 aeApiState *state = malloc(sizeof(aeApiState));
// 创建失败
if (!state) return -1;
// 初始化事件槽空间
//malloc(sizeof(struct epoll_event)*eventLoop->setsize);
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
//free(state);
zfree(state);
return -1;
}

// 创建 epoll 实例
//epoll_create的参数只是一个参考值,没有实质行的影响
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
//free(state->events)
zfree(state->events);
//free(state);
zfree(state);
return -1;
}
// 赋值给 eventLoop,作为epoll的记录
eventLoop->apidata = state;
return 0;
}

/*
* 调整事件槽大小
*/
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
aeApiState *state = eventLoop->apidata;
state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
return 0;
}

/*
* 释放 epoll 实例和事件槽
*/
static void aeApiFree(aeEventLoop *eventLoop) {
aeApiState *state = eventLoop->apidata;

close(state->epfd);
zfree(state->events);
zfree(state);
}

/*
* 关联给定事件到 fd
*/
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
//创建一个 epoll_event 结构体变量
struct epoll_event ee;

/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation.
*
* 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。
*
* 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。
*/
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;

// 注册事件到 epoll
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
//添加到 epoll事件里
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;

return 0;
}

/*
* 从 fd 中删除特定事件
*/
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;

int mask = eventLoop->events[fd].mask & (~delmask);

ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (mask != AE_NONE) {//判断是否需要删除,如果还有事件,修改
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {//没有事件从epoll里删除事件
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}

/*
* 获取可执行事件
*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;

// 等待时间
// 如果有事件,epoll会填充 state->events
//state->events数组的大小要和 eventLoop->setsize数字值一致
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

// 有至少一个事件就绪?
if (retval > 0) {
int j;
// 为已就绪事件设置相应的模式
// 并加入到 eventLoop 的 fired 数组中
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;

if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
//从 eventLoop->apiData->events里填充到 eventLoop->fired
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}

// 返回已就绪事件个数
return numevents;
}

/*
* 返回当前正在使用的 poll 库的名字
*/
static char *aeApiName(void) {
return "epoll";
}

2:redis对事件模型做了一次封装 在ae.h和ae.c.其中ae.h定义一些头部文件和ae.c是是实现,并且会判断不同操作系统以便加载对应的系统文件,如linux下加载ae_epoll.c。ae.c会调用【ae_apoll,ae_kqueue.c,这些文件对外提供里统一的操作方式,对外提供里统一的接口】里的方法来操作系统提供的IO多路复用函数。

ae.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
/* A simple event-driven programming library. Originally I wrote this code
* for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
* it in form of a library for easy reuse.
*
* Copyright (c) 2006-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef __AE_H__
#define __AE_H__

/*
* 事件执行状态
*/
// 成功
#define AE_OK 0
// 出错
#define AE_ERR -1

/*
* 文件事件状态
*/
// 未设置
#define AE_NONE 0
// 可读
#define AE_READABLE 1
// 可写
#define AE_WRITABLE 2

/*
* 时间处理器的执行 flags
*/
// 文件事件
#define AE_FILE_EVENTS 1
// 时间事件
#define AE_TIME_EVENTS 2
// 所有事件
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
// 不阻塞,也不进行等待
#define AE_DONT_WAIT 4

/*
* 决定时间事件是否要持续执行的 flag
*/
#define AE_NOMORE -1

/* Macros */
#define AE_NOTUSED(V) ((void) V)

/*
* 事件处理器状态
*/
struct aeEventLoop;

/* Types and data structures
*
* 事件接口
*/
//定义处理事件的接口函数
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);

/* File event structure
*
* 文件事件结构
*/
typedef struct aeFileEvent {

// 监听事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE ,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */

// 读事件处理器
aeFileProc *rfileProc;

// 写事件处理器
aeFileProc *wfileProc;

// 多路复用库的私有数据
void *clientData;

} aeFileEvent;

/* Time event structure
*
* 时间事件结构
*/
typedef struct aeTimeEvent {

// 时间事件的唯一标识符
long long id; /* time event identifier. */

// 事件的到达时间
long when_sec; /* seconds */
long when_ms; /* milliseconds */

// 事件处理函数
aeTimeProc *timeProc;

// 事件释放函数
aeEventFinalizerProc *finalizerProc;

// 多路复用库的私有数据
void *clientData;

// 指向下个时间事件结构,形成链表
struct aeTimeEvent *next;

} aeTimeEvent;

/* A fired event
*
* 已就绪事件
*/
typedef struct aeFiredEvent {

// 已就绪文件描述符
int fd;

// 事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是两者的或
int mask;

} aeFiredEvent;

/* State of an event based program
*
* 事件处理器的状态
*/
typedef struct aeEventLoop {

// 目前已注册的最大描述符
int maxfd; /* highest file descriptor currently registered */

// 目前已追踪的最大描述符
int setsize; /* max number of file descriptors tracked */

// 用于生成时间事件 id
long long timeEventNextId;

// 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */

// 已注册的文件事件
aeFileEvent *events; /* Registered events */

// 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */

// 时间事件
aeTimeEvent *timeEventHead;

// 事件处理器的开关
int stop;

// 多路复用库的私有数据
void *apidata; /* This is used for polling API specific data */

// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;

} aeEventLoop;

/* 定义函数模型 */
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);

#endif

ae.c具体的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
/* A simple event-driven programming library. Originally I wrote this code
* for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
* it in form of a library for easy reuse.
*
* Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <poll.h>
#include <string.h>
#include <time.h>
#include <errno.h>

#include "ae.h"
#include "zmalloc.h"
#include "config.h"

/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
//判断加载那个操作系统下的事件函数
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif

/*
* 初始化事件处理器状态
*/
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;

// 创建事件状态结构
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;

// 初始化文件事件结构和已就绪文件事件结构数组
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
// 设置数组大小
eventLoop->setsize = setsize;
// 初始化执行最近一次执行时间
eventLoop->lastTime = time(NULL);

// 初始化时间事件结构
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;

eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;

/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
// 初始化监听事件
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;

// 返回事件循环
return eventLoop;

err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}

/* Return the current set size. */
// 返回当前事件槽大小
int aeGetSetSize(aeEventLoop *eventLoop) {
return eventLoop->setsize;
}

/* Resize the maximum set size of the event loop.
*
* 调整事件槽的大小
*
* If the requested set size is smaller than the current set size, but
* there is already a file descriptor in use that is >= the requested
* set size minus one, AE_ERR is returned and the operation is not
* performed at all.
*
* 如果尝试调整大小为 setsize ,但是有 >= setsize 的文件描述符存在
* 那么返回 AE_ERR ,不进行任何动作。
*
* Otherwise AE_OK is returned and the operation is successful.
*
* 否则,执行大小调整操作,并返回 AE_OK 。
*/
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
int i;

if (setsize == eventLoop->setsize) return AE_OK;
if (eventLoop->maxfd >= setsize) return AE_ERR;
if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;

eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
eventLoop->setsize = setsize;

/* Make sure that if we created new slots, they are initialized with
* an AE_NONE mask. */
for (i = eventLoop->maxfd+1; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return AE_OK;
}

/*
* 删除事件处理器
*/
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
aeApiFree(eventLoop);
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}

/*
* 停止事件处理器
*/
void aeStop(aeEventLoop *eventLoop) {
eventLoop->stop = 1;
}

/*
* 根据 mask 参数的值,监听 fd 文件的状态,
* 当 fd 可用时,执行 proc 函数
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}

if (fd >= eventLoop->setsize) return AE_ERR;

// 取出文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];

// 监听指定 fd 的指定事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;

// 设置文件事件类型,以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;

// 私有数据
fe->clientData = clientData;

// 如果有需要,更新事件处理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;

return AE_OK;
}

/*
* 将 fd 从 mask 指定的监听队列中删除
*/
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
if (fd >= eventLoop->setsize) return;

// 取出文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];

// 未设置监听的事件类型,直接返回
if (fe->mask == AE_NONE) return;

// 计算新掩码
fe->mask = fe->mask & (~mask);
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
/* Update the max fd */
int j;

for (j = eventLoop->maxfd-1; j >= 0; j--)
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j;
}

// 取消对给定 fd 的给定事件的监视
aeApiDelEvent(eventLoop, fd, mask);
}

/*
* 获取给定 fd 正在监听的事件类型
*/
int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
if (fd >= eventLoop->setsize) return 0;
aeFileEvent *fe = &eventLoop->events[fd];

return fe->mask;
}

/*
* 取出当前时间的秒和毫秒,
* 并分别将它们保存到 seconds 和 milliseconds 参数中
*/
static void aeGetTime(long *seconds, long *milliseconds)
{
struct timeval tv;

gettimeofday(&tv, NULL);
*seconds = tv.tv_sec;
*milliseconds = tv.tv_usec/1000;
}

/*
* 在当前时间上加上 milliseconds 毫秒,
* 并且将加上之后的秒数和毫秒数分别保存在 sec 和 ms 指针中。
*/
static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
long cur_sec, cur_ms, when_sec, when_ms;

// 获取当前时间
aeGetTime(&cur_sec, &cur_ms);

// 计算增加 milliseconds 之后的秒数和毫秒数
when_sec = cur_sec + milliseconds/1000;
when_ms = cur_ms + milliseconds%1000;

// 进位:
// 如果 when_ms 大于等于 1000
// 那么将 when_sec 增大一秒
if (when_ms >= 1000) {
when_sec ++;
when_ms -= 1000;
}

// 保存到指针中
*sec = when_sec;
*ms = when_ms;
}

/*
* 创建时间事件
*/
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
// 更新时间计数器
long long id = eventLoop->timeEventNextId++;

// 创建时间事件结构
aeTimeEvent *te;

te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;

// 设置 ID
te->id = id;

// 设定处理事件的时间
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
// 设置事件处理器
te->timeProc = proc;
te->finalizerProc = finalizerProc;
// 设置私有数据
te->clientData = clientData;

// 将新事件放入表头
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;

return id;
}

/*
* 删除给定 id 的时间事件
*/
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
aeTimeEvent *te, *prev = NULL;

// 遍历链表
te = eventLoop->timeEventHead;
while(te) {

// 发现目标事件,删除
if (te->id == id) {

if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;

// 执行清理处理器
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);

// 释放时间事件
zfree(te);

return AE_OK;
}
prev = te;
te = te->next;
}

return AE_ERR; /* NO event with the specified ID found */
}

/* Search the first timer to fire.
* This operation is useful to know how many time the select can be
* put in sleep without to delay any event.
* If there are no timers NULL is returned.
*
* Note that's O(N) since time events are unsorted.
* Possible optimizations (not needed by Redis so far, but...):
* 1) Insert the event in order, so that the nearest is just the head.
* Much better but still insertion or deletion of timers is O(N).
* 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
*/
// 寻找里目前时间最近的时间事件
// 因为链表是乱序的,所以查找复杂度为 O(N)
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;

while(te) {
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}

/* Process time events
*
* 处理所有已到达的时间事件
*/
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);

/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
// 通过重置事件的运行时间,
// 防止因时间穿插(skew)而造成的事件处理混乱
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// 更新最后一次处理时间事件的时间
eventLoop->lastTime = now;

// 遍历链表
// 执行那些已经到达的事件
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;

// 跳过无效事件
if (te->id > maxId) {
te = te->next;
continue;
}

// 获取当前时间
aeGetTime(&now_sec, &now_ms);

// 如果当前时间等于或等于事件的执行时间,那么说明事件已到达,执行这个事件
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;

id = te->id;
// 执行事件处理器,并获取返回值
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
/* After an event is processed our time event list may
* no longer be the same, so we restart from head.
* Still we make sure to don't process events registered
* by event handlers itself in order to don't loop forever.
* To do so we saved the max ID we want to handle.
*
* FUTURE OPTIMIZATIONS:
* Note that this is NOT great algorithmically. Redis uses
* a single time event so it's not a problem but the right
* way to do this is to add the new elements on head, and
* to flag deleted elements in a special way for later
* deletion (putting references to the nodes to delete into
* another linked list). */

// 记录是否有需要循环执行这个事件时间
if (retval != AE_NOMORE) {
// 是的, retval 毫秒之后继续执行这个时间事件
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
// 不,将这个事件删除
aeDeleteTimeEvent(eventLoop, id);
}

// 因为执行事件之后,事件列表可能已经被改变了
// 因此需要将 te 放回表头,继续开始执行事件
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}

/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
*
* 处理所有已到达的时间事件,以及所有已就绪的文件事件。
*
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪,
* 或者下个时间事件到达(如果有的话)。
*
* If flags is 0, the function does nothing and returns.
* 如果 flags 为 0 ,那么函数不作动作,直接返回。
*
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。
*
* if flags has AE_FILE_EVENTS set, file events are processed.
* 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。
*
* if flags has AE_TIME_EVENTS set, time events are processed.
* 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。
*
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
* 如果 flags 包含 AE_DONT_WAIT ,
* 那么函数在处理完所有不许阻塞的事件之后,即刻返回。
*
* The function returns the number of events processed.
* 函数的返回值为已处理事件的数量
*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;

// 获取最近的时间事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果时间事件存在的话
// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
long now_sec, now_ms;

/* Calculate the time missing for the nearest
* timer to fire. */
// 计算距今最近的时间事件还要多久才能达到
// 并将该时间距保存在 tv 结构中
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}

// 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {

// 执行到这一步,说明没有时间事件
// 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度

/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
// 设置文件事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 文件事件可以阻塞直到有事件到达为止
tvp = NULL; /* wait forever */
}
}

// 处理文件事件,阻塞时间由 tvp 决定
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;

/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 读事件
if (fe->mask & mask & AE_READABLE) {
// rfired 确保读/写事件只能执行其中一个
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}

processed++;
}
}

/* Check time events */
// 执行时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */
}

/* Wait for milliseconds until the given file descriptor becomes
* writable/readable/exception
*
* 在给定毫秒内等待,直到 fd 变成可写、可读或异常
*/
int aeWait(int fd, int mask, long long milliseconds) {
struct pollfd pfd;
int retmask = 0, retval;

memset(&pfd, 0, sizeof(pfd));
pfd.fd = fd;
if (mask & AE_READABLE) pfd.events |= POLLIN;
if (mask & AE_WRITABLE) pfd.events |= POLLOUT;

if ((retval = poll(&pfd, 1, milliseconds))== 1) {
if (pfd.revents & POLLIN) retmask |= AE_READABLE;
if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
return retmask;
} else {
return retval;
}
}

/*
* 事件处理器的主循环
*/
void aeMain(aeEventLoop *eventLoop) {

eventLoop->stop = 0;

while (!eventLoop->stop) {

// 如果有需要在事件处理前执行的函数,那么运行它
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);

// 开始处理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

/*
* 返回所使用的多路复用库的名称
*/
char *aeGetApiName(void) {
return aeApiName();
}

/*
* 设置处理事件前需要被执行的函数
*/
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}

3:我们可以自己写一些测试代码来测试上边的代码

比如接受tcp链接和监听端口的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
typedef struct server {
int serLink;
int port;
struct sockaddr_in serv_addr;
} server_t;

int TcpCreate(server_t *ser)
{
//struct sockaddr_in serv_addr; // 保存服务器套接字地址信息
socklen_t server_addr_size;
//服务器端 句柄
int serverHandler;
// 创建一个TCP服务器套接字
serverHandler = socket(PF_INET, SOCK_STREAM, 0);
// 配置套接字IP和端口信息
//memset(&serv_addr, 0, sizeof(serv_addr));
printf(";;;;set nrtwork config\n");
ser->serv_addr.sin_family = AF_INET;
ser->serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
ser->serv_addr.sin_port = htons(ser->port);
// 绑定服务器套接字
server_addr_size = sizeof(ser->serv_addr);
if(-1 == bind(serverHandler, (struct sockaddr*) &ser->serv_addr, server_addr_size )){
printf(";;;;set nrtwork config fail\n");
return -1;
}
// 监听服务器套接字
if(-1 == listen(serverHandler, 5)){
printf("listen() error");
}
ser->serLink = serverHandler;
}



int TcpAccept(int serLink)
{
struct sockaddr_in clnt_addr; // 保存客户端套接字地址信息
socklen_t clnt_addr_size; // 客户端套接字地址变量的大小
clnt_addr_size = sizeof(clnt_addr);
//客户端句柄
int clHandler;
clHandler = accept(serLink, (struct sockaddr*) &clnt_addr, &clnt_addr_size);
return clHandler;
}

建立一个test文件用来测试上边的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include "tcp_connect.c"
#include "ae.h"
#include "ae.c"
#include <sys/socket.h>
/* Anti-warning macro... */
#define REDIS_NOTUSED(V) ((void) V)
#define MAX_SIZE 1024

aeEventLoop *eventLoop;
server_t *ser;

/*
* 读取客户端的查询缓冲区内容
*/
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[1024]; //缓存字符
int nread = read(fd,buf,sizeof(buf));
// 读入出错
if (nread == -1) {

} else if (nread == 0) {//客户端断开链接
aeDeleteFileEvent(el, fd, AE_READABLE);
close(fd);
}
printf(buf);
}

/**
* 接受客户端链接
*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
int clientFd;
clientFd = TcpAccept(fd);
aeCreateFileEvent(eventLoop, clientFd, AE_READABLE, readQueryFromClient, NULL);
}

/**
*
*/
void start()
{
eventLoop = aeCreateEventLoop(MAX_SIZE);
ser = malloc(sizeof(server_t));
memset(&ser->serv_addr, 0, sizeof(ser->serv_addr));
ser->port = 3336;
TcpCreate(ser);
aeCreateFileEvent(eventLoop, ser->serLink, AE_READABLE, acceptTcpHandler, NULL);
printf("端口开启%d",ser->port);
aeMain(eventLoop);


}

int main(int argc, char *argv[]) {
start();
return 0;
}

我们使用gcc编译过后,可以连接到监听的端口。使用telnet命令链接测试