1 /* A simple event-driven programming library. Originally I wrote this code
2  * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
3  * it in form of a library for easy reuse.
4  *
5  * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  *   * Redistributions of source code must retain the above copyright notice,
12  *     this list of conditions and the following disclaimer.
13  *   * Redistributions in binary form must reproduce the above copyright
14  *     notice, this list of conditions and the following disclaimer in the
15  *     documentation and/or other materials provided with the distribution.
16  *   * Neither the name of Redis nor the names of its contributors may be used
17  *     to endorse or promote products derived from this software without
18  *     specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #include <stdio.h>
34 #include <sys/time.h>
35 #include <sys/types.h>
36 #include <unistd.h>
37 #include <stdlib.h>
38 #include <poll.h>
39 #include <string.h>
40 #include <time.h>
41 #include <errno.h>
42 
43 #include "ae.h"
44 #include "zmalloc.h"
45 #include "config.h"
46 
47 /* Include the best multiplexing layer supported by this system.
48  * The following should be ordered by performances, descending. */
49 #ifdef HAVE_EVPORT
50 #include "ae_evport.c"
51 #else
52     #ifdef HAVE_EPOLL
53     #include "ae_epoll.c"
54     #else
55         #ifdef HAVE_KQUEUE
56         #include "ae_kqueue.c"
57         #else
58         #include "ae_select.c"
59         #endif
60     #endif
61 #endif
62 
aeCreateEventLoop(int setsize)63 aeEventLoop *aeCreateEventLoop(int setsize) {
64     aeEventLoop *eventLoop;
65     int i;
66 
67     if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
68     eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
69     eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
70     if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
71     eventLoop->setsize = setsize;
72     eventLoop->lastTime = time(NULL);
73     eventLoop->timeEventHead = NULL;
74     eventLoop->timeEventNextId = 0;
75     eventLoop->stop = 0;
76     eventLoop->maxfd = -1;
77     eventLoop->beforesleep = NULL;
78     eventLoop->aftersleep = NULL;
79     if (aeApiCreate(eventLoop) == -1) goto err;
80     /* Events with mask == AE_NONE are not set. So let's initialize the
81      * vector with it. */
82     for (i = 0; i < setsize; i++)
83         eventLoop->events[i].mask = AE_NONE;
84     return eventLoop;
85 
86 err:
87     if (eventLoop) {
88         zfree(eventLoop->events);
89         zfree(eventLoop->fired);
90         zfree(eventLoop);
91     }
92     return NULL;
93 }
94 
95 /* Return the current set size. */
aeGetSetSize(aeEventLoop * eventLoop)96 int aeGetSetSize(aeEventLoop *eventLoop) {
97     return eventLoop->setsize;
98 }
99 
100 /* Resize the maximum set size of the event loop.
101  * If the requested set size is smaller than the current set size, but
102  * there is already a file descriptor in use that is >= the requested
103  * set size minus one, AE_ERR is returned and the operation is not
104  * performed at all.
105  *
106  * Otherwise AE_OK is returned and the operation is successful. */
aeResizeSetSize(aeEventLoop * eventLoop,int setsize)107 int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
108     int i;
109 
110     if (setsize == eventLoop->setsize) return AE_OK;
111     if (eventLoop->maxfd >= setsize) return AE_ERR;
112     if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;
113 
114     eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
115     eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
116     eventLoop->setsize = setsize;
117 
118     /* Make sure that if we created new slots, they are initialized with
119      * an AE_NONE mask. */
120     for (i = eventLoop->maxfd+1; i < setsize; i++)
121         eventLoop->events[i].mask = AE_NONE;
122     return AE_OK;
123 }
124 
aeDeleteEventLoop(aeEventLoop * eventLoop)125 void aeDeleteEventLoop(aeEventLoop *eventLoop) {
126     aeApiFree(eventLoop);
127     zfree(eventLoop->events);
128     zfree(eventLoop->fired);
129     zfree(eventLoop);
130 }
131 
aeStop(aeEventLoop * eventLoop)132 void aeStop(aeEventLoop *eventLoop) {
133     eventLoop->stop = 1;
134 }
135 
aeCreateFileEvent(aeEventLoop * eventLoop,int fd,int mask,aeFileProc * proc,void * clientData)136 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
137         aeFileProc *proc, void *clientData)
138 {
139     if (fd >= eventLoop->setsize) {
140         errno = ERANGE;
141         return AE_ERR;
142     }
143     aeFileEvent *fe = &eventLoop->events[fd];
144 
145     if (aeApiAddEvent(eventLoop, fd, mask) == -1)
146         return AE_ERR;
147     fe->mask |= mask;
148     if (mask & AE_READABLE) fe->rfileProc = proc;
149     if (mask & AE_WRITABLE) fe->wfileProc = proc;
150     fe->clientData = clientData;
151     if (fd > eventLoop->maxfd)
152         eventLoop->maxfd = fd;
153     return AE_OK;
154 }
155 
aeDeleteFileEvent(aeEventLoop * eventLoop,int fd,int mask)156 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
157 {
158     if (fd >= eventLoop->setsize) return;
159     aeFileEvent *fe = &eventLoop->events[fd];
160     if (fe->mask == AE_NONE) return;
161 
162     /* We want to always remove AE_BARRIER if set when AE_WRITABLE
163      * is removed. */
164     if (mask & AE_WRITABLE) mask |= AE_BARRIER;
165 
166     aeApiDelEvent(eventLoop, fd, mask);
167     fe->mask = fe->mask & (~mask);
168     if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
169         /* Update the max fd */
170         int j;
171 
172         for (j = eventLoop->maxfd-1; j >= 0; j--)
173             if (eventLoop->events[j].mask != AE_NONE) break;
174         eventLoop->maxfd = j;
175     }
176 }
177 
aeGetFileEvents(aeEventLoop * eventLoop,int fd)178 int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
179     if (fd >= eventLoop->setsize) return 0;
180     aeFileEvent *fe = &eventLoop->events[fd];
181 
182     return fe->mask;
183 }
184 
aeGetTime(long * seconds,long * milliseconds)185 static void aeGetTime(long *seconds, long *milliseconds)
186 {
187     struct timeval tv;
188 
189     gettimeofday(&tv, NULL);
190     *seconds = tv.tv_sec;
191     *milliseconds = tv.tv_usec/1000;
192 }
193 
aeAddMillisecondsToNow(long long milliseconds,long * sec,long * ms)194 static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
195     long cur_sec, cur_ms, when_sec, when_ms;
196 
197     aeGetTime(&cur_sec, &cur_ms);
198     when_sec = cur_sec + milliseconds/1000;
199     when_ms = cur_ms + milliseconds%1000;
200     if (when_ms >= 1000) {
201         when_sec ++;
202         when_ms -= 1000;
203     }
204     *sec = when_sec;
205     *ms = when_ms;
206 }
207 
aeCreateTimeEvent(aeEventLoop * eventLoop,long long milliseconds,aeTimeProc * proc,void * clientData,aeEventFinalizerProc * finalizerProc)208 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
209         aeTimeProc *proc, void *clientData,
210         aeEventFinalizerProc *finalizerProc)
211 {
212     long long id = eventLoop->timeEventNextId++;
213     aeTimeEvent *te;
214 
215     te = zmalloc(sizeof(*te));
216     if (te == NULL) return AE_ERR;
217     te->id = id;
218     aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
219     te->timeProc = proc;
220     te->finalizerProc = finalizerProc;
221     te->clientData = clientData;
222     te->prev = NULL;
223     te->next = eventLoop->timeEventHead;
224     if (te->next)
225         te->next->prev = te;
226     eventLoop->timeEventHead = te;
227     return id;
228 }
229 
aeDeleteTimeEvent(aeEventLoop * eventLoop,long long id)230 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
231 {
232     aeTimeEvent *te = eventLoop->timeEventHead;
233     while(te) {
234         if (te->id == id) {
235             te->id = AE_DELETED_EVENT_ID;
236             return AE_OK;
237         }
238         te = te->next;
239     }
240     return AE_ERR; /* NO event with the specified ID found */
241 }
242 
243 /* Search the first timer to fire.
244  * This operation is useful to know how many time the select can be
245  * put in sleep without to delay any event.
246  * If there are no timers NULL is returned.
247  *
248  * Note that's O(N) since time events are unsorted.
249  * Possible optimizations (not needed by Redis so far, but...):
250  * 1) Insert the event in order, so that the nearest is just the head.
251  *    Much better but still insertion or deletion of timers is O(N).
252  * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
253  */
aeSearchNearestTimer(aeEventLoop * eventLoop)254 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
255 {
256     aeTimeEvent *te = eventLoop->timeEventHead;
257     aeTimeEvent *nearest = NULL;
258 
259     while(te) {
260         if (!nearest || te->when_sec < nearest->when_sec ||
261                 (te->when_sec == nearest->when_sec &&
262                  te->when_ms < nearest->when_ms))
263             nearest = te;
264         te = te->next;
265     }
266     return nearest;
267 }
268 
269 /* Process time events */
processTimeEvents(aeEventLoop * eventLoop)270 static int processTimeEvents(aeEventLoop *eventLoop) {
271     int processed = 0;
272     aeTimeEvent *te;
273     long long maxId;
274     time_t now = time(NULL);
275 
276     /* If the system clock is moved to the future, and then set back to the
277      * right value, time events may be delayed in a random way. Often this
278      * means that scheduled operations will not be performed soon enough.
279      *
280      * Here we try to detect system clock skews, and force all the time
281      * events to be processed ASAP when this happens: the idea is that
282      * processing events earlier is less dangerous than delaying them
283      * indefinitely, and practice suggests it is. */
284     if (now < eventLoop->lastTime) {
285         te = eventLoop->timeEventHead;
286         while(te) {
287             te->when_sec = 0;
288             te = te->next;
289         }
290     }
291     eventLoop->lastTime = now;
292 
293     te = eventLoop->timeEventHead;
294     maxId = eventLoop->timeEventNextId-1;
295     while(te) {
296         long now_sec, now_ms;
297         long long id;
298 
299         /* Remove events scheduled for deletion. */
300         if (te->id == AE_DELETED_EVENT_ID) {
301             aeTimeEvent *next = te->next;
302             if (te->prev)
303                 te->prev->next = te->next;
304             else
305                 eventLoop->timeEventHead = te->next;
306             if (te->next)
307                 te->next->prev = te->prev;
308             if (te->finalizerProc)
309                 te->finalizerProc(eventLoop, te->clientData);
310             zfree(te);
311             te = next;
312             continue;
313         }
314 
315         /* Make sure we don't process time events created by time events in
316          * this iteration. Note that this check is currently useless: we always
317          * add new timers on the head, however if we change the implementation
318          * detail, this check may be useful again: we keep it here for future
319          * defense. */
320         if (te->id > maxId) {
321             te = te->next;
322             continue;
323         }
324         aeGetTime(&now_sec, &now_ms);
325         if (now_sec > te->when_sec ||
326             (now_sec == te->when_sec && now_ms >= te->when_ms))
327         {
328             int retval;
329 
330             id = te->id;
331             retval = te->timeProc(eventLoop, id, te->clientData);
332             processed++;
333             if (retval != AE_NOMORE) {
334                 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
335             } else {
336                 te->id = AE_DELETED_EVENT_ID;
337             }
338         }
339         te = te->next;
340     }
341     return processed;
342 }
343 
344 /* Process every pending time event, then every pending file event
345  * (that may be registered by time event callbacks just processed).
346  * Without special flags the function sleeps until some file event
347  * fires, or when the next time event occurs (if any).
348  *
349  * If flags is 0, the function does nothing and returns.
350  * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
351  * if flags has AE_FILE_EVENTS set, file events are processed.
352  * if flags has AE_TIME_EVENTS set, time events are processed.
353  * if flags has AE_DONT_WAIT set the function returns ASAP until all
354  * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
355  * the events that's possible to process without to wait are processed.
356  *
357  * The function returns the number of events processed. */
aeProcessEvents(aeEventLoop * eventLoop,int flags)358 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
359 {
360     int processed = 0, numevents;
361 
362     /* Nothing to do? return ASAP */
363     if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
364 
365     /* Note that we want call select() even if there are no
366      * file events to process as long as we want to process time
367      * events, in order to sleep until the next time event is ready
368      * to fire. */
369     if (eventLoop->maxfd != -1 ||
370         ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
371         int j;
372         aeTimeEvent *shortest = NULL;
373         struct timeval tv, *tvp;
374 
375         if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
376             shortest = aeSearchNearestTimer(eventLoop);
377         if (shortest) {
378             long now_sec, now_ms;
379 
380             aeGetTime(&now_sec, &now_ms);
381             tvp = &tv;
382 
383             /* How many milliseconds we need to wait for the next
384              * time event to fire? */
385             long long ms =
386                 (shortest->when_sec - now_sec)*1000 +
387                 shortest->when_ms - now_ms;
388 
389             if (ms > 0) {
390                 tvp->tv_sec = ms/1000;
391                 tvp->tv_usec = (ms % 1000)*1000;
392             } else {
393                 tvp->tv_sec = 0;
394                 tvp->tv_usec = 0;
395             }
396         } else {
397             /* If we have to check for events but need to return
398              * ASAP because of AE_DONT_WAIT we need to set the timeout
399              * to zero */
400             if (flags & AE_DONT_WAIT) {
401                 tv.tv_sec = tv.tv_usec = 0;
402                 tvp = &tv;
403             } else {
404                 /* Otherwise we can block */
405                 tvp = NULL; /* wait forever */
406             }
407         }
408 
409         /* Call the multiplexing API, will return only on timeout or when
410          * some event fires. */
411         numevents = aeApiPoll(eventLoop, tvp);
412 
413         /* After sleep callback. */
414         if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
415             eventLoop->aftersleep(eventLoop);
416 
417         for (j = 0; j < numevents; j++) {
418             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
419             int mask = eventLoop->fired[j].mask;
420             int fd = eventLoop->fired[j].fd;
421             int fired = 0; /* Number of events fired for current fd. */
422 
423             /* Normally we execute the readable event first, and the writable
424              * event laster. This is useful as sometimes we may be able
425              * to serve the reply of a query immediately after processing the
426              * query.
427              *
428              * However if AE_BARRIER is set in the mask, our application is
429              * asking us to do the reverse: never fire the writable event
430              * after the readable. In such a case, we invert the calls.
431              * This is useful when, for instance, we want to do things
432              * in the beforeSleep() hook, like fsynching a file to disk,
433              * before replying to a client. */
434             int invert = fe->mask & AE_BARRIER;
435 
436             /* Note the "fe->mask & mask & ..." code: maybe an already
437              * processed event removed an element that fired and we still
438              * didn't processed, so we check if the event is still valid.
439              *
440              * Fire the readable event if the call sequence is not
441              * inverted. */
442             if (!invert && fe->mask & mask & AE_READABLE) {
443                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
444                 fired++;
445             }
446 
447             /* Fire the writable event. */
448             if (fe->mask & mask & AE_WRITABLE) {
449                 if (!fired || fe->wfileProc != fe->rfileProc) {
450                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);
451                     fired++;
452                 }
453             }
454 
455             /* If we have to invert the call, fire the readable event now
456              * after the writable one. */
457             if (invert && fe->mask & mask & AE_READABLE) {
458                 if (!fired || fe->wfileProc != fe->rfileProc) {
459                     fe->rfileProc(eventLoop,fd,fe->clientData,mask);
460                     fired++;
461                 }
462             }
463 
464             processed++;
465         }
466     }
467     /* Check time events */
468     if (flags & AE_TIME_EVENTS)
469         processed += processTimeEvents(eventLoop);
470 
471     return processed; /* return the number of processed file/time events */
472 }
473 
474 /* Wait for milliseconds until the given file descriptor becomes
475  * writable/readable/exception */
aeWait(int fd,int mask,long long milliseconds)476 int aeWait(int fd, int mask, long long milliseconds) {
477     struct pollfd pfd;
478     int retmask = 0, retval;
479 
480     memset(&pfd, 0, sizeof(pfd));
481     pfd.fd = fd;
482     if (mask & AE_READABLE) pfd.events |= POLLIN;
483     if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
484 
485     if ((retval = poll(&pfd, 1, milliseconds))== 1) {
486         if (pfd.revents & POLLIN) retmask |= AE_READABLE;
487         if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
488         if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
489         if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
490         return retmask;
491     } else {
492         return retval;
493     }
494 }
495 
aeMain(aeEventLoop * eventLoop)496 void aeMain(aeEventLoop *eventLoop) {
497     eventLoop->stop = 0;
498     while (!eventLoop->stop) {
499         if (eventLoop->beforesleep != NULL)
500             eventLoop->beforesleep(eventLoop);
501         aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
502     }
503 }
504 
aeGetApiName(void)505 char *aeGetApiName(void) {
506     return aeApiName();
507 }
508 
aeSetBeforeSleepProc(aeEventLoop * eventLoop,aeBeforeSleepProc * beforesleep)509 void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
510     eventLoop->beforesleep = beforesleep;
511 }
512 
aeSetAfterSleepProc(aeEventLoop * eventLoop,aeBeforeSleepProc * aftersleep)513 void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
514     eventLoop->aftersleep = aftersleep;
515 }
516