阻塞IO当使用accept会一直阻塞住,等待IO,然后read读取数据等待。当对已建立的链接进行读取的时候,应用进程被阻塞,直到数据从内核缓冲区复制到应用进程缓冲区中才返回。应该注意到,在阻塞的过程中,其它应用进程还可以执行,因此阻塞不意味着整个操作系统都被阻塞。因为其它应用进程还可以执行,所以不消耗 CPU 时间,这种模型的 CPU 利用率会比较高。
int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
/* Linux epoll(2) based ae.c module * * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */
/* * 关联给定事件到 fd */ staticintaeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask){ aeApiState *state = eventLoop->apidata; //创建一个 epoll_event 结构体变量 structepoll_eventee;
/* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. * * 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。 * * 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。 */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
/* A simple event-driven programming library. Originally I wrote this code * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated * it in form of a library for easy reuse. * * Copyright (c) 2006-2012, Salvatore Sanfilippo <antirez at gmail dot com> * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */
// 多路复用库的私有数据 void *apidata; /* This is used for polling API specific data */
// 在处理事件前要执行的函数 aeBeforeSleepProc *beforesleep;
} aeEventLoop;
/* 定义函数模型 */ aeEventLoop *aeCreateEventLoop(int setsize); void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeStop(aeEventLoop *eventLoop); int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); int aeGetFileEvents(aeEventLoop *eventLoop, int fd); long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc); int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); int aeProcessEvents(aeEventLoop *eventLoop, int flags); int aeWait(int fd, int mask, long long milliseconds); void aeMain(aeEventLoop *eventLoop); char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); int aeGetSetSize(aeEventLoop *eventLoop); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
/* A simple event-driven programming library. Originally I wrote this code * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated * it in form of a library for easy reuse. * * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com> * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */
/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ //判断加载那个操作系统下的事件函数 #ifdef HAVE_EVPORT #include"ae_evport.c" #else #ifdef HAVE_EPOLL #include"ae_epoll.c" #else #ifdef HAVE_KQUEUE #include"ae_kqueue.c" #else #include"ae_select.c" #endif #endif #endif
/* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ // 初始化监听事件 for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE;
// 返回事件循环 return eventLoop;
err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } returnNULL; }
/* Return the current set size. */ // 返回当前事件槽大小 intaeGetSetSize(aeEventLoop *eventLoop){ return eventLoop->setsize; }
/* Resize the maximum set size of the event loop. * * 调整事件槽的大小 * * If the requested set size is smaller than the current set size, but * there is already a file descriptor in use that is >= the requested * set size minus one, AE_ERR is returned and the operation is not * performed at all. * * 如果尝试调整大小为 setsize ,但是有 >= setsize 的文件描述符存在 * 那么返回 AE_ERR ,不进行任何动作。 * * Otherwise AE_OK is returned and the operation is successful. * * 否则,执行大小调整操作,并返回 AE_OK 。 */ intaeResizeSetSize(aeEventLoop *eventLoop, int setsize){ int i;
if (setsize == eventLoop->setsize) return AE_OK; if (eventLoop->maxfd >= setsize) return AE_ERR; if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;
/* Make sure that if we created new slots, they are initialized with * an AE_NONE mask. */ for (i = eventLoop->maxfd+1; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return AE_OK; }
// 执行清理处理器 if (te->finalizerProc) te->finalizerProc(eventLoop, te->clientData);
// 释放时间事件 zfree(te);
return AE_OK; } prev = te; te = te->next; }
return AE_ERR; /* NO event with the specified ID found */ }
/* Search the first timer to fire. * This operation is useful to know how many time the select can be * put in sleep without to delay any event. * If there are no timers NULL is returned. * * Note that's O(N) since time events are unsorted. * Possible optimizations (not needed by Redis so far, but...): * 1) Insert the event in order, so that the nearest is just the head. * Much better but still insertion or deletion of timers is O(N). * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). */ // 寻找里目前时间最近的时间事件 // 因为链表是乱序的,所以查找复杂度为 O(N) static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; aeTimeEvent *nearest = NULL;
/* Process time events * * 处理所有已到达的时间事件 */ staticintprocessTimeEvents(aeEventLoop *eventLoop){ int processed = 0; aeTimeEvent *te; longlong maxId; time_t now = time(NULL);
/* If the system clock is moved to the future, and then set back to the * right value, time events may be delayed in a random way. Often this * means that scheduled operations will not be performed soon enough. * * Here we try to detect system clock skews, and force all the time * events to be processed ASAP when this happens: the idea is that * processing events earlier is less dangerous than delaying them * indefinitely, and practice suggests it is. */ // 通过重置事件的运行时间, // 防止因时间穿插(skew)而造成的事件处理混乱 if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } } // 更新最后一次处理时间事件的时间 eventLoop->lastTime = now;
// 遍历链表 // 执行那些已经到达的事件 te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; while(te) { long now_sec, now_ms; longlong id;
// 跳过无效事件 if (te->id > maxId) { te = te->next; continue; } // 获取当前时间 aeGetTime(&now_sec, &now_ms);
// 如果当前时间等于或等于事件的执行时间,那么说明事件已到达,执行这个事件 if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval;
id = te->id; // 执行事件处理器,并获取返回值 retval = te->timeProc(eventLoop, id, te->clientData); processed++; /* After an event is processed our time event list may * no longer be the same, so we restart from head. * Still we make sure to don't process events registered * by event handlers itself in order to don't loop forever. * To do so we saved the max ID we want to handle. * * FUTURE OPTIMIZATIONS: * Note that this is NOT great algorithmically. Redis uses * a single time event so it's not a problem but the right * way to do this is to add the new elements on head, and * to flag deleted elements in a special way for later * deletion (putting references to the nodes to delete into * another linked list). */
// 因为执行事件之后,事件列表可能已经被改变了 // 因此需要将 te 放回表头,继续开始执行事件 te = eventLoop->timeEventHead; } else { te = te->next; } } return processed; }
/* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * * 处理所有已到达的时间事件,以及所有已就绪的文件事件。 * * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪, * 或者下个时间事件到达(如果有的话)。 * * If flags is 0, the function does nothing and returns. * 如果 flags 为 0 ,那么函数不作动作,直接返回。 * * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。 * * if flags has AE_FILE_EVENTS set, file events are processed. * 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。 * * if flags has AE_TIME_EVENTS set, time events are processed. * 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。 * * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * 如果 flags 包含 AE_DONT_WAIT , * 那么函数在处理完所有不许阻塞的事件之后,即刻返回。 * * The function returns the number of events processed. * 函数的返回值为已处理事件的数量 */ intaeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents;
/* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return0;
/* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; structtimevaltv, *tvp;
// 获取最近的时间事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 如果时间事件存在的话 // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间 long now_sec, now_ms;
/* Calculate the time missing for the nearest * timer to fire. */ // 计算距今最近的时间事件还要多久才能达到 // 并将该时间距保存在 tv 结构中 aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; }
/* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { // 设置文件事件不阻塞 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ // 文件事件可以阻塞直到有事件到达为止 tvp = NULL; /* wait forever */ } }
int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ // 读事件 if (fe->mask & mask & AE_READABLE) { // rfired 确保读/写事件只能执行其中一个 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 写事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); }
processed++; } }
/* Check time events */ // 执行时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */ }
/* Wait for milliseconds until the given file descriptor becomes * writable/readable/exception * * 在给定毫秒内等待,直到 fd 变成可写、可读或异常 */ intaeWait(int fd, int mask, longlong milliseconds){ structpollfdpfd; int retmask = 0, retval;
memset(&pfd, 0, sizeof(pfd)); pfd.fd = fd; if (mask & AE_READABLE) pfd.events |= POLLIN; if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
if ((retval = poll(&pfd, 1, milliseconds))== 1) { if (pfd.revents & POLLIN) retmask |= AE_READABLE; if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; return retmask; } else { return retval; } }