1 /* PEAK Library
2  *
3  * Copyright (c) 2003, 2004, 2005
4  *      Stephane Thiell <mbuna@bugged.org>. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  *
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
22  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
23  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
24  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
25  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  *
29  * $Id: engine_mod_kqueue.c,v 1.6 2005/01/27 16:31:50 mbuna Exp $
30  */
31 #define RCSID "$Id: engine_mod_kqueue.c,v 1.6 2005/01/27 16:31:50 mbuna Exp $"
32 
33 #ifdef HAVE_CONFIG_H
34 #include "config.h"
35 #endif
36 
37 #include "engine.h"
38 
39 #include <assert.h>
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <sys/types.h>
45 #include <sys/socket.h>
46 #include <sys/time.h>
47 #include <time.h>
48 #include <unistd.h>
49 #include <sys/event.h>
50 #ifdef HAVE_SYS_SIGNAL_H
51 #include <sys/signal.h>
52 #endif
53 #ifdef HAVE_SIGNAL_H
54 #include <signal.h>
55 #endif
56 
57 #include "internal.h"
58 #include "socket.h"
59 #include "spinlock.h"
60 #include "task_private.h"
61 #include "utilities.h"
62 
63 /* We don't systematically register for both read and write events, so we
64  * bother to save if we did or not, so that we can properly remove the events
65  * later, without error.
66  */
67 #define CS_KEVENT_READ  CS_CUSTOM1
68 #define CS_KEVENT_WRITE CS_CUSTOM2
69 
70 static void __peak_engine_init(peak_engine e, va_list vp);
71 static void __peak_engine_finalize(peak_engine e);
72 static void __peak_engine_set_or_clear(peak_engine e, peak_engine_client c,
73                                        uint32_t set, uint32_t clear);
74 static void __peak_engine_add_signal(peak_engine e, peak_engine_client c);
75 static void __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
76                                            int event, int info);
77 
78 
79 
80 struct __peak_engine
81   {
82   PEAK_STRUCT_RT_HEADER;
83   peak_task _task;
84   int _maxfds;
85   int _nfds;
86   int _kq;
87   int _ne;
88   volatile int _running;
89   };
90 
91 PEAK_CLASS_BASE_DECLARE(engine);
92 
93 __private_extern__ const char *
_peak_engine_get_name(peak_engine e)94 _peak_engine_get_name(peak_engine e)
95   {
96   return "kqueue";
97   }
98 
99 __private_extern__ peak_engine
_peak_engine_create(peak_task task)100 _peak_engine_create(peak_task task)
101   {
102   return PEAK_CLASS_CONSTRUCT1(engine, task);
103   }
104 
105 static void
__peak_engine_init(peak_engine e,va_list vp)106 __peak_engine_init(peak_engine e, va_list vp)
107   {
108   e->_task = va_arg(vp, peak_task);
109   e->_maxfds = PEAK_DEFAULT_FLAVOR_MAXFDS;
110   e->_nfds = 0;
111 
112   if ((e->_kq = kqueue()) == -1)
113     PEAK_HALT;
114 
115   e->_running = 0;
116   }
117 
118 static void
__peak_engine_finalize(peak_engine e)119 __peak_engine_finalize(peak_engine e)
120   {
121   }
122 
123 __private_extern__ int
_peak_engine_get_maxfds(peak_engine e)124 _peak_engine_get_maxfds(peak_engine e)
125   {
126   return e->_maxfds;
127   }
128 
129 __private_extern__ int
_peak_engine_set_maxfds(peak_engine e,int maxfds)130 _peak_engine_set_maxfds(peak_engine e, int maxfds)
131   {
132   if (maxfds <= 0)
133     return -1;
134 
135   e->_maxfds = peak_set_fdlimit(maxfds);
136   return (e->_maxfds == maxfds) ? 0 : -1;
137   }
138 
139 static void
__peak_engine_set_or_clear(peak_engine e,peak_engine_client c,uint32_t set,uint32_t clear)140 __peak_engine_set_or_clear(peak_engine e, peak_engine_client c,
141                            uint32_t set, uint32_t clear)
142   {
143   int i = 0;
144   struct kevent chglist[2];
145 
146   if ((clear ^ set) & (CS_ACCEPTING|CS_READING)) /* readable has changed */
147     {
148     EV_SET(&chglist[i], c->_ident, EVFILT_READ, EV_ADD, 0, 0, c);
149 
150     if (set & (CS_ACCEPTING|CS_READING)) /* it's set */
151       chglist[i].flags |= EV_ENABLE;
152     else /* clear it */
153       chglist[i].flags |= EV_DISABLE;
154 
155     c->_state |= CS_KEVENT_READ;
156     i++;
157     }
158 
159   if ((clear ^ set) & (CS_CONNECTING|CS_WRITING)) /* writable has changed */
160     {
161     EV_SET(&chglist[i], c->_ident, EVFILT_WRITE, EV_ADD, 0, 0, c);
162 
163     if (set & (CS_CONNECTING|CS_WRITING)) /* it's set */
164       chglist[i].flags |= EV_ENABLE;
165     else /* clear it */
166       chglist[i].flags |= EV_DISABLE;
167 
168     c->_state |= CS_KEVENT_WRITE;
169     i++;
170     }
171 
172   if (i == 0)
173     PEAK_FATAL("State of engine's client cannot generate event", 0);
174 
175   if (kevent(e->_kq, chglist, i, 0, 0, 0) == -1)
176     PEAK_FATAL("kevent failure", errno);
177   }
178 
179 static void
__peak_engine_add_signal(peak_engine e,peak_engine_client c)180 __peak_engine_add_signal(peak_engine e, peak_engine_client c)
181   {
182   struct kevent sigevent;
183   struct sigaction act;
184 
185   assert(c->_state & CS_SIGNAL);
186 
187   EV_SET(&sigevent, c->_ident, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, c);
188 
189   if (kevent(e->_kq, &sigevent, 1, 0, 0, 0) == -1)
190     PEAK_FATAL("kevent", errno);
191 
192   act.sa_handler = SIG_IGN; /* ignore the signal */
193   act.sa_flags = 0;
194   sigemptyset(&act.sa_mask);
195   sigaction(c->_ident, &act, 0);
196   }
197 
198 __private_extern__ void
_peak_engine_add_client(peak_engine e,peak_engine_client c)199 _peak_engine_add_client(peak_engine e, peak_engine_client c)
200   {
201   if (c->_state & CS_SIGNAL)
202     {
203     __peak_engine_add_signal(e, c);
204     return;
205     }
206 
207   if (++e->_nfds >= e->_maxfds)
208     PEAK_HALT;
209 
210   __peak_engine_set_or_clear(e, c, c->_state, 0);
211 
212   c->_engine = e;
213   }
214 
215 __private_extern__ void
_peak_engine_remove_client(peak_engine e,peak_engine_client c)216 _peak_engine_remove_client(peak_engine e, peak_engine_client c)
217   {
218   int i = 0;
219   struct kevent dellist[2];
220 
221   assert(c != NULL);
222 
223   e->_nfds--;
224   c->_engine = NULL;
225 
226   if (c->_state & CS_SIGNAL)
227     {
228     struct sigaction act;
229 
230     EV_SET(&dellist[i], c->_ident, EVFILT_SIGNAL, EV_DELETE, 0, 0, 0);
231     i++;
232 
233     act.sa_handler = SIG_IGN; /* reset default */
234     act.sa_flags = 0;
235     sigemptyset(&act.sa_mask);
236     sigaction(c->_ident, &act, 0);
237     }
238   else
239     {
240     if (c->_state & CS_KEVENT_READ)
241       {
242       EV_SET(&dellist[i], c->_ident, EVFILT_READ, EV_DELETE, 0, 0, 0);
243       i++;
244       }
245     if (c->_state & CS_KEVENT_WRITE)
246       {
247       EV_SET(&dellist[i], c->_ident, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
248       i++;
249       }
250     }
251 
252   if (i > 0 && kevent(e->_kq, dellist, i, 0, 0, 0) == -1)
253     PEAK_FATAL("kevent", errno);
254   }
255 
256 /* Precondition: Always called under the protection of c->_lock.
257  */
258 __private_extern__ void
_peak_engine_edit_client(peak_engine e,peak_engine_client c)259 _peak_engine_edit_client(peak_engine e, peak_engine_client c)
260   {
261   assert(!(c->_state & CS_HANDLED));
262 
263   if (c->_sstate != c->_state)
264     {
265     c->_sstate = c->_state;
266     __peak_engine_set_or_clear(e, c, c->_state, CS_ANY);
267     }
268   }
269 
270 __private_extern__ void
_peak_engine_loop(peak_engine e)271 _peak_engine_loop(peak_engine e)
272   {
273   struct timespec ts;
274   peak_engine_client c;
275   int i, nevs, err;
276   int events_count = 24;
277   struct kevent events[24];
278 
279   e->_running = 1;
280 
281   do  {
282     nevs = kevent(e->_kq, 0, 0, events, events_count,
283                   _peak_task_timer_tswait(e->_task, &ts));
284     if (nevs < 0)
285       {
286       fprintf(stderr, "kevent failure\n");
287       continue;
288       }
289 
290     e->_ne = 0;
291 
292     for (i = 0; i < nevs; i++)
293       {
294       if ((c = (peak_engine_client)events[i].udata) == NULL)
295         PEAK_HALT;
296 
297       /* Although implementations of kqueue support it, the library's
298        * design doesn't allow us to handle more than one event at a time for
299        * the same client.
300        */
301       if (c->_state & CS_HANDLED)
302         continue;
303 
304       switch (events[i].filter)
305         {
306         case EVFILT_SIGNAL:
307           __peak_engine_ioevent_generate(e, c, IOEVENT_SIGNAL,
308               events[i].ident);
309           break;
310         case EVFILT_READ:
311           if ((err = peak_socket_get_error(events[i].ident)) != 0)
312             {
313             __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, err);
314             continue;
315             }
316 
317           if (c->_state & CS_ACCEPTING)
318             __peak_engine_ioevent_generate(e, c, IOEVENT_ACCEPT, 0);
319           else
320             {
321             if (c->_state & CS_READING)
322             __peak_engine_ioevent_generate(e, c,
323                 events[i].flags & EV_EOF ? IOEVENT_EOF : IOEVENT_READ, 0);
324             }
325           break;
326         case EVFILT_WRITE:
327           if ((err = peak_socket_get_error(events[i].ident)) != 0)
328             {
329             __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, err);
330             continue;
331             }
332 
333           if (c->_state & CS_CONNECTING)
334             __peak_engine_ioevent_generate(e, c, IOEVENT_CONNECT, 0);
335           else /* CS_CONNECTED or accepted socket */
336             {
337             if (c->_state & CS_WRITING)
338             __peak_engine_ioevent_generate(e, c, IOEVENT_WRITE, 0);
339             }
340           break;
341 
342         default:
343           PEAK_HALT;
344         }
345       }
346 
347     /* Prepare to fire any pending timers
348      */
349     e->_ne += _peak_task_timer_schedule_fire(e->_task);
350     _peak_task_process_pending_events(e->_task, e->_ne);
351 
352     } while (e->_running);
353   }
354 
355 __private_extern__ void
_peak_engine_break(peak_engine e)356 _peak_engine_break(peak_engine e)
357   {
358   e->_running = 0;
359   }
360 
361 static void
__peak_engine_ioevent_generate(peak_engine e,peak_engine_client c,int event,int info)362 __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
363                                int event, int info)
364   {
365   uint16_t mclear = 0, mset = 0;
366 
367   switch (event)
368     {
369     case IOEVENT_CONNECT:
370       mclear = CS_CONNECTING;
371       mset = CS_CONNECTED|CS_READING|CS_WRITING;
372       break;
373     case IOEVENT_ACCEPT:
374       mclear = CS_ACCEPTING;
375       break;
376     case IOEVENT_READ:
377       mclear = CS_READING;
378       break;
379     case IOEVENT_WRITE:
380       mclear = CS_WRITING;
381       break;
382     case IOEVENT_EOF:
383     case IOEVENT_ERROR:
384       mclear = CS_CONNECTED|CS_READING|CS_WRITING;
385       break;
386     case IOEVENT_SIGNAL:
387       break;
388     default:
389       PEAK_HALT;
390       break;
391     }
392 
393 #if 0
394   printf("gen: c->_state=%x\n", c->_state);
395   if (c->_state & CS_HANDLED)
396     {
397     printf("gen: handling several events for the same object fd=%d\n",
398            c->_ident);
399     }
400 #endif
401 
402   /* Set "event handled" bit */
403   c->_state |= CS_HANDLED;
404 
405   /* Cache state */
406   c->_sstate = c->_state;
407 
408   /* Prepare */
409   c->_state &= ~mclear;
410   c->_state |= mset;
411 
412   /* Schedule for processing */
413   _peak_task_op_ioevent_schedule(e->_task, c, event, info);
414 
415   e->_ne++;
416   }
417 
418 __private_extern__ void
_peak_engine_event_postprocess(peak_engine_client c)419 _peak_engine_event_postprocess(peak_engine_client c)
420   {
421   peak_engine e = c->_engine;
422 
423   /* Commit changes if necessary, restore stuffs.
424    */
425   _peak_engine_client_lock(c);
426 
427   if (c->_sstate != c->_state && e != NULL)
428     __peak_engine_set_or_clear(e, c, c->_state, CS_ANY);
429 
430   c->_sstate = 0;              /* invalidate cache */
431   c->_state &= ~CS_HANDLED;    /* we don't handle it anymore */
432 
433   _peak_engine_client_unlock(c);
434   }
435