1 /* PEAK Library
2 *
3 * Copyright (c) 2003, 2004, 2005
4 * Stephane Thiell <mbuna@bugged.org>. All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
22 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
23 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
24 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
25 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 *
29 * $Id: engine_mod_kqueue.c,v 1.6 2005/01/27 16:31:50 mbuna Exp $
30 */
31 #define RCSID "$Id: engine_mod_kqueue.c,v 1.6 2005/01/27 16:31:50 mbuna Exp $"
32
33 #ifdef HAVE_CONFIG_H
34 #include "config.h"
35 #endif
36
37 #include "engine.h"
38
39 #include <assert.h>
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <sys/types.h>
45 #include <sys/socket.h>
46 #include <sys/time.h>
47 #include <time.h>
48 #include <unistd.h>
49 #include <sys/event.h>
50 #ifdef HAVE_SYS_SIGNAL_H
51 #include <sys/signal.h>
52 #endif
53 #ifdef HAVE_SIGNAL_H
54 #include <signal.h>
55 #endif
56
57 #include "internal.h"
58 #include "socket.h"
59 #include "spinlock.h"
60 #include "task_private.h"
61 #include "utilities.h"
62
63 /* We don't systematically register for both read and write events, so we
64 * bother to save if we did or not, so that we can properly remove the events
65 * later, without error.
66 */
67 #define CS_KEVENT_READ CS_CUSTOM1
68 #define CS_KEVENT_WRITE CS_CUSTOM2
69
70 static void __peak_engine_init(peak_engine e, va_list vp);
71 static void __peak_engine_finalize(peak_engine e);
72 static void __peak_engine_set_or_clear(peak_engine e, peak_engine_client c,
73 uint32_t set, uint32_t clear);
74 static void __peak_engine_add_signal(peak_engine e, peak_engine_client c);
75 static void __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
76 int event, int info);
77
78
79
80 struct __peak_engine
81 {
82 PEAK_STRUCT_RT_HEADER;
83 peak_task _task;
84 int _maxfds;
85 int _nfds;
86 int _kq;
87 int _ne;
88 volatile int _running;
89 };
90
91 PEAK_CLASS_BASE_DECLARE(engine);
92
93 __private_extern__ const char *
_peak_engine_get_name(peak_engine e)94 _peak_engine_get_name(peak_engine e)
95 {
96 return "kqueue";
97 }
98
99 __private_extern__ peak_engine
_peak_engine_create(peak_task task)100 _peak_engine_create(peak_task task)
101 {
102 return PEAK_CLASS_CONSTRUCT1(engine, task);
103 }
104
105 static void
__peak_engine_init(peak_engine e,va_list vp)106 __peak_engine_init(peak_engine e, va_list vp)
107 {
108 e->_task = va_arg(vp, peak_task);
109 e->_maxfds = PEAK_DEFAULT_FLAVOR_MAXFDS;
110 e->_nfds = 0;
111
112 if ((e->_kq = kqueue()) == -1)
113 PEAK_HALT;
114
115 e->_running = 0;
116 }
117
118 static void
__peak_engine_finalize(peak_engine e)119 __peak_engine_finalize(peak_engine e)
120 {
121 }
122
123 __private_extern__ int
_peak_engine_get_maxfds(peak_engine e)124 _peak_engine_get_maxfds(peak_engine e)
125 {
126 return e->_maxfds;
127 }
128
129 __private_extern__ int
_peak_engine_set_maxfds(peak_engine e,int maxfds)130 _peak_engine_set_maxfds(peak_engine e, int maxfds)
131 {
132 if (maxfds <= 0)
133 return -1;
134
135 e->_maxfds = peak_set_fdlimit(maxfds);
136 return (e->_maxfds == maxfds) ? 0 : -1;
137 }
138
139 static void
__peak_engine_set_or_clear(peak_engine e,peak_engine_client c,uint32_t set,uint32_t clear)140 __peak_engine_set_or_clear(peak_engine e, peak_engine_client c,
141 uint32_t set, uint32_t clear)
142 {
143 int i = 0;
144 struct kevent chglist[2];
145
146 if ((clear ^ set) & (CS_ACCEPTING|CS_READING)) /* readable has changed */
147 {
148 EV_SET(&chglist[i], c->_ident, EVFILT_READ, EV_ADD, 0, 0, c);
149
150 if (set & (CS_ACCEPTING|CS_READING)) /* it's set */
151 chglist[i].flags |= EV_ENABLE;
152 else /* clear it */
153 chglist[i].flags |= EV_DISABLE;
154
155 c->_state |= CS_KEVENT_READ;
156 i++;
157 }
158
159 if ((clear ^ set) & (CS_CONNECTING|CS_WRITING)) /* writable has changed */
160 {
161 EV_SET(&chglist[i], c->_ident, EVFILT_WRITE, EV_ADD, 0, 0, c);
162
163 if (set & (CS_CONNECTING|CS_WRITING)) /* it's set */
164 chglist[i].flags |= EV_ENABLE;
165 else /* clear it */
166 chglist[i].flags |= EV_DISABLE;
167
168 c->_state |= CS_KEVENT_WRITE;
169 i++;
170 }
171
172 if (i == 0)
173 PEAK_FATAL("State of engine's client cannot generate event", 0);
174
175 if (kevent(e->_kq, chglist, i, 0, 0, 0) == -1)
176 PEAK_FATAL("kevent failure", errno);
177 }
178
179 static void
__peak_engine_add_signal(peak_engine e,peak_engine_client c)180 __peak_engine_add_signal(peak_engine e, peak_engine_client c)
181 {
182 struct kevent sigevent;
183 struct sigaction act;
184
185 assert(c->_state & CS_SIGNAL);
186
187 EV_SET(&sigevent, c->_ident, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, c);
188
189 if (kevent(e->_kq, &sigevent, 1, 0, 0, 0) == -1)
190 PEAK_FATAL("kevent", errno);
191
192 act.sa_handler = SIG_IGN; /* ignore the signal */
193 act.sa_flags = 0;
194 sigemptyset(&act.sa_mask);
195 sigaction(c->_ident, &act, 0);
196 }
197
198 __private_extern__ void
_peak_engine_add_client(peak_engine e,peak_engine_client c)199 _peak_engine_add_client(peak_engine e, peak_engine_client c)
200 {
201 if (c->_state & CS_SIGNAL)
202 {
203 __peak_engine_add_signal(e, c);
204 return;
205 }
206
207 if (++e->_nfds >= e->_maxfds)
208 PEAK_HALT;
209
210 __peak_engine_set_or_clear(e, c, c->_state, 0);
211
212 c->_engine = e;
213 }
214
215 __private_extern__ void
_peak_engine_remove_client(peak_engine e,peak_engine_client c)216 _peak_engine_remove_client(peak_engine e, peak_engine_client c)
217 {
218 int i = 0;
219 struct kevent dellist[2];
220
221 assert(c != NULL);
222
223 e->_nfds--;
224 c->_engine = NULL;
225
226 if (c->_state & CS_SIGNAL)
227 {
228 struct sigaction act;
229
230 EV_SET(&dellist[i], c->_ident, EVFILT_SIGNAL, EV_DELETE, 0, 0, 0);
231 i++;
232
233 act.sa_handler = SIG_IGN; /* reset default */
234 act.sa_flags = 0;
235 sigemptyset(&act.sa_mask);
236 sigaction(c->_ident, &act, 0);
237 }
238 else
239 {
240 if (c->_state & CS_KEVENT_READ)
241 {
242 EV_SET(&dellist[i], c->_ident, EVFILT_READ, EV_DELETE, 0, 0, 0);
243 i++;
244 }
245 if (c->_state & CS_KEVENT_WRITE)
246 {
247 EV_SET(&dellist[i], c->_ident, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
248 i++;
249 }
250 }
251
252 if (i > 0 && kevent(e->_kq, dellist, i, 0, 0, 0) == -1)
253 PEAK_FATAL("kevent", errno);
254 }
255
256 /* Precondition: Always called under the protection of c->_lock.
257 */
258 __private_extern__ void
_peak_engine_edit_client(peak_engine e,peak_engine_client c)259 _peak_engine_edit_client(peak_engine e, peak_engine_client c)
260 {
261 assert(!(c->_state & CS_HANDLED));
262
263 if (c->_sstate != c->_state)
264 {
265 c->_sstate = c->_state;
266 __peak_engine_set_or_clear(e, c, c->_state, CS_ANY);
267 }
268 }
269
270 __private_extern__ void
_peak_engine_loop(peak_engine e)271 _peak_engine_loop(peak_engine e)
272 {
273 struct timespec ts;
274 peak_engine_client c;
275 int i, nevs, err;
276 int events_count = 24;
277 struct kevent events[24];
278
279 e->_running = 1;
280
281 do {
282 nevs = kevent(e->_kq, 0, 0, events, events_count,
283 _peak_task_timer_tswait(e->_task, &ts));
284 if (nevs < 0)
285 {
286 fprintf(stderr, "kevent failure\n");
287 continue;
288 }
289
290 e->_ne = 0;
291
292 for (i = 0; i < nevs; i++)
293 {
294 if ((c = (peak_engine_client)events[i].udata) == NULL)
295 PEAK_HALT;
296
297 /* Although implementations of kqueue support it, the library's
298 * design doesn't allow us to handle more than one event at a time for
299 * the same client.
300 */
301 if (c->_state & CS_HANDLED)
302 continue;
303
304 switch (events[i].filter)
305 {
306 case EVFILT_SIGNAL:
307 __peak_engine_ioevent_generate(e, c, IOEVENT_SIGNAL,
308 events[i].ident);
309 break;
310 case EVFILT_READ:
311 if ((err = peak_socket_get_error(events[i].ident)) != 0)
312 {
313 __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, err);
314 continue;
315 }
316
317 if (c->_state & CS_ACCEPTING)
318 __peak_engine_ioevent_generate(e, c, IOEVENT_ACCEPT, 0);
319 else
320 {
321 if (c->_state & CS_READING)
322 __peak_engine_ioevent_generate(e, c,
323 events[i].flags & EV_EOF ? IOEVENT_EOF : IOEVENT_READ, 0);
324 }
325 break;
326 case EVFILT_WRITE:
327 if ((err = peak_socket_get_error(events[i].ident)) != 0)
328 {
329 __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, err);
330 continue;
331 }
332
333 if (c->_state & CS_CONNECTING)
334 __peak_engine_ioevent_generate(e, c, IOEVENT_CONNECT, 0);
335 else /* CS_CONNECTED or accepted socket */
336 {
337 if (c->_state & CS_WRITING)
338 __peak_engine_ioevent_generate(e, c, IOEVENT_WRITE, 0);
339 }
340 break;
341
342 default:
343 PEAK_HALT;
344 }
345 }
346
347 /* Prepare to fire any pending timers
348 */
349 e->_ne += _peak_task_timer_schedule_fire(e->_task);
350 _peak_task_process_pending_events(e->_task, e->_ne);
351
352 } while (e->_running);
353 }
354
355 __private_extern__ void
_peak_engine_break(peak_engine e)356 _peak_engine_break(peak_engine e)
357 {
358 e->_running = 0;
359 }
360
361 static void
__peak_engine_ioevent_generate(peak_engine e,peak_engine_client c,int event,int info)362 __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
363 int event, int info)
364 {
365 uint16_t mclear = 0, mset = 0;
366
367 switch (event)
368 {
369 case IOEVENT_CONNECT:
370 mclear = CS_CONNECTING;
371 mset = CS_CONNECTED|CS_READING|CS_WRITING;
372 break;
373 case IOEVENT_ACCEPT:
374 mclear = CS_ACCEPTING;
375 break;
376 case IOEVENT_READ:
377 mclear = CS_READING;
378 break;
379 case IOEVENT_WRITE:
380 mclear = CS_WRITING;
381 break;
382 case IOEVENT_EOF:
383 case IOEVENT_ERROR:
384 mclear = CS_CONNECTED|CS_READING|CS_WRITING;
385 break;
386 case IOEVENT_SIGNAL:
387 break;
388 default:
389 PEAK_HALT;
390 break;
391 }
392
393 #if 0
394 printf("gen: c->_state=%x\n", c->_state);
395 if (c->_state & CS_HANDLED)
396 {
397 printf("gen: handling several events for the same object fd=%d\n",
398 c->_ident);
399 }
400 #endif
401
402 /* Set "event handled" bit */
403 c->_state |= CS_HANDLED;
404
405 /* Cache state */
406 c->_sstate = c->_state;
407
408 /* Prepare */
409 c->_state &= ~mclear;
410 c->_state |= mset;
411
412 /* Schedule for processing */
413 _peak_task_op_ioevent_schedule(e->_task, c, event, info);
414
415 e->_ne++;
416 }
417
418 __private_extern__ void
_peak_engine_event_postprocess(peak_engine_client c)419 _peak_engine_event_postprocess(peak_engine_client c)
420 {
421 peak_engine e = c->_engine;
422
423 /* Commit changes if necessary, restore stuffs.
424 */
425 _peak_engine_client_lock(c);
426
427 if (c->_sstate != c->_state && e != NULL)
428 __peak_engine_set_or_clear(e, c, c->_state, CS_ANY);
429
430 c->_sstate = 0; /* invalidate cache */
431 c->_state &= ~CS_HANDLED; /* we don't handle it anymore */
432
433 _peak_engine_client_unlock(c);
434 }
435