1 /* PEAK Library
2 *
3 * Copyright (c) 2004
4 * Stephane Thiell <mbuna@bugged.org>. All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
22 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
23 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
24 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
25 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 *
29 * $Id: engine_mod_devpoll.c,v 1.1 2007/05/24 13:11:53 mbuna Exp $
30 */
31 #define RCSID "$Id: engine_mod_devpoll.c,v 1.1 2007/05/24 13:11:53 mbuna Exp $"
32
33 #ifdef HAVE_CONFIG_H
34 #include "config.h"
35 #endif
36
37 #include "engine.h"
38
39 #include <assert.h>
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <sys/types.h>
44 #include <sys/socket.h>
45 #include <sys/stat.h>
46 #include <sys/devpoll.h>
47 #include <fcntl.h>
48 #include <time.h>
49 #include <unistd.h>
50 #ifdef HAVE_SIGNAL_H
51 #include <signal.h>
52 #endif
53
54 #include "internal.h"
55 #include "socket.h"
56 #include "spinlock.h"
57 #include "task_private.h"
58 #include "utilities.h"
59
60 /* Figure out what bits to set for read */
61 #if defined(POLLMSG) && defined(POLLIN) && defined(POLLRDNORM)
62 # define POLLREADFLAGS (POLLMSG|POLLIN|POLLRDNORM)
63 #elif defined(POLLIN) && defined(POLLRDNORM)
64 # define POLLREADFLAGS (POLLIN|POLLRDNORM)
65 #elif defined(POLLIN)
66 # define POLLREADFLAGS POLLIN
67 #elif defined(POLLRDNORM)
68 # define POLLREADFLAGS POLLRDNORM
69 #endif
70
71 /* Figure out what bits to set for write */
72 #if defined(POLLOUT) && defined(POLLWRNORM)
73 # define POLLWRITEFLAGS (POLLOUT|POLLWRNORM)
74 #elif defined(POLLOUT)
75 # define POLLWRITEFLAGS POLLOUT
76 #elif defined(POLLWRNORM)
77 # define POLLWRITEFLAGS POLLWRNORM
78 #endif
79
80 static peak_spinlock_t pollfdLock = PEAK_SPINLOCK_INITIALIZER;
81
82 static void __peak_engine_init(peak_engine e, va_list vp);
83 static void __peak_engine_finalize(peak_engine e);
84 static void __peak_engine_allocate_clients(peak_engine e);
85 static void __peak_engine_set_or_clear(peak_engine e, int fd, uint32_t set,
86 uint32_t clear);
87 static void __peak_engine_add_signal(peak_engine e, peak_engine_client c);
88 static void __peak_engine_signal_trap(int signum);
89 static void __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
90 int event, int info);
91
92 #ifndef MAX_SIGNUM
93 #ifdef NSIG
94 #define MAX_SIGNUM (NSIG-1)
95 #else
96 #define MAX_SIGNUM 31
97 #endif
98 #endif
99
100 struct __peak_engine
101 {
102 PEAK_STRUCT_RT_HEADER;
103 peak_task _task;
104 peak_engine_client* _clients;
105 int _alloctotal;
106 int _dpfd;
107 int _maxfds;
108 peak_engine_client _signals[MAX_SIGNUM + 1];
109 int _ne;
110 volatile int _running;
111 };
112
113 static int interrupt_read_fd, interrupt_write_fd;
114
115 PEAK_CLASS_BASE_DECLARE(engine);
116
117 __private_extern__ const char *
peak_engine_get_name(peak_engine e)118 peak_engine_get_name(peak_engine e)
119 {
120 return "/dev/poll";
121 }
122
123 __private_extern__ peak_engine
peak_engine_create(peak_task task)124 peak_engine_create(peak_task task)
125 {
126 return PEAK_CLASS_CONSTRUCT1(engine, task);
127 }
128
129 static void
__peak_engine_init(peak_engine e,va_list vp)130 __peak_engine_init(peak_engine e, va_list vp)
131 {
132 int i, p[2];
133
134 e->_task = va_arg(vp, peak_task);
135 e->_maxfds = peak_set_fdlimit(PEAK_DEFAULT_FLAVOR_MAXFDS);
136 e->_clients = NULL;
137 e->_alloctotal = 0;
138
139 /* Open /dev/poll device. */
140 if ((e->_dpfd = open("/dev/poll", O_RDWR)) < 0)
141 PEAK_HALT;
142
143 for (i = 0; i <= MAX_SIGNUM; i++)
144 e->_signals[i] = NULL;
145
146 if (interrupt_read_fd == 0)
147 {
148 if (pipe(p) == -1)
149 PEAK_HALT;
150 interrupt_read_fd = p[0];
151 interrupt_write_fd = p[1];
152 }
153
154 e->_running = 0;
155 }
156
157 static void
__peak_engine_finalize(peak_engine e)158 __peak_engine_finalize(peak_engine e)
159 {
160 if (e->_clients)
161 peak_deallocate(e->_clients);
162 close(e->_dpfd);
163 }
164
165 __private_extern__ int
peak_engine_get_maxfds(peak_engine e)166 peak_engine_get_maxfds(peak_engine e)
167 {
168 return e->_maxfds;
169 }
170
171 __private_extern__ int
peak_engine_set_maxfds(peak_engine e,int maxfds)172 peak_engine_set_maxfds(peak_engine e, int maxfds)
173 {
174 if (maxfds <= 0) /* doh */
175 return -1;
176
177 maxfds += 2; /* for signal fds */
178
179 if (e->_alloctotal > 0 && maxfds > e->_alloctotal)
180 {
181 peak_engine_client * n_clients;
182 int i;
183
184 n_clients = (peak_engine_client *) peak_allocate(sizeof(peak_engine_client)
185 * maxfds);
186
187 for (i = 0; i < e->_maxfds; i++)
188 n_clients[i] = e->_clients[i];
189 for (i = e->_maxfds; i < maxfds; i++)
190 n_clients[i] = NULL;
191
192 _peak_spinlock_lock(&pollfdLock);
193 e->_alloctotal = maxfds;
194 e->_maxfds = maxfds;
195 peak_deallocate(e->_clients);
196 e->_clients = n_clients;
197 _peak_spinlock_unlock(&pollfdLock);
198 }
199 else
200 {
201 _peak_spinlock_lock(&pollfdLock);
202 e->_maxfds = peak_set_fdlimit(maxfds);
203 _peak_spinlock_unlock(&pollfdLock);
204 }
205 return maxfds == e->_maxfds ? 0 : -1;
206 }
207
208 /* Should be called under the protection of pollfdLock for eventual
209 * concurrent allocation (eg. 2 timers are fired at the same time to create
210 * the first engine's clients on different threads).
211 */
212 static void
__peak_engine_allocate_clients(peak_engine e)213 __peak_engine_allocate_clients(peak_engine e)
214 {
215 int i;
216
217 assert(e->_clients == NULL);
218
219 e->_clients = (peak_engine_client *) peak_allocate(sizeof(peak_engine_client)
220 * e->_maxfds);
221 for (i = 0; i < e->_maxfds; i++)
222 e->_clients[i] = NULL;
223 e->_alloctotal = e->_maxfds;
224 }
225
226
227 static void
__peak_engine_set_or_clear(peak_engine e,int fd,uint32_t set,uint32_t clear)228 __peak_engine_set_or_clear(peak_engine e, int fd, uint32_t set, uint32_t clear)
229 {
230 struct pollfd pfd;
231 // printf("fd=%d set=%x clear=%x\n", fd, set, clear);
232 pfd.fd = fd;
233 if (clear != 0)
234 {
235 pfd.events = POLLREMOVE;
236 if (write(e->_dpfd, &pfd, sizeof(pfd)) != sizeof(pfd))
237 PEAK_HALT;
238 }
239
240 if (!(set & CS_ANY))
241 return;
242
243 pfd.events = 0;
244 if (set & (CS_ACCEPTING|CS_READING))
245 pfd.events |= POLLREADFLAGS;
246 if (set & (CS_CONNECTING|CS_WRITING))
247 pfd.events |= POLLWRITEFLAGS;
248
249 if (write(e->_dpfd, &pfd, sizeof(pfd)) != sizeof(pfd))
250 PEAK_HALT;
251 }
252
253 static void
__peak_engine_add_signal(peak_engine e,peak_engine_client c)254 __peak_engine_add_signal(peak_engine e, peak_engine_client c)
255 {
256 struct sigaction action;
257 sigset_t stop_signal;
258
259 sigemptyset(&stop_signal);
260 sigaddset(&stop_signal, c->_ident);
261
262 action.sa_handler = __peak_engine_signal_trap;
263 action.sa_mask = stop_signal;
264 action.sa_flags = 0;
265
266 if (c->_ident > MAX_SIGNUM)
267 PEAK_HALT;
268
269 if (sigaction(c->_ident, &action, NULL) == 0)
270 {
271 e->_signals[c->_ident] = c;
272
273 /* Register one real client for all signals, the first signal client is
274 * effectively used for convenience.
275 */
276 _peak_spinlock_lock(&pollfdLock);
277 if (!e->_alloctotal)
278 __peak_engine_allocate_clients(e);
279
280 if (e->_clients[interrupt_read_fd] == NULL)
281 {
282 if (interrupt_read_fd >= e->_maxfds)
283 PEAK_HALT; /* TODO */
284
285 e->_clients[interrupt_read_fd] = c;
286
287 c->_state |= CS_READING;
288 __peak_engine_set_or_clear(e, interrupt_read_fd, c->_state, 0);
289 c->_engine = e;
290 }
291 _peak_spinlock_unlock(&pollfdLock);
292 }
293 }
294
295 static void
__peak_engine_signal_trap(int signum)296 __peak_engine_signal_trap(int signum)
297 {
298 assert (interrupt_write_fd >= 0);
299 write(interrupt_write_fd, &signum, sizeof(interrupt_write_fd));
300 }
301
302 __private_extern__ void
peak_engine_add_client(peak_engine e,peak_engine_client c)303 peak_engine_add_client(peak_engine e, peak_engine_client c)
304 {
305 if (c->_state & CS_SIGNAL)
306 {
307 __peak_engine_add_signal(e, c);
308 return;
309 }
310
311 _peak_spinlock_lock(&pollfdLock);
312
313 if (!e->_alloctotal)
314 __peak_engine_allocate_clients(e);
315
316 if (c->_ident >= e->_maxfds)
317 PEAK_HALT; /* Sorry, that's too much. */
318
319 e->_clients[c->_ident] = c;
320
321 /* Set start flags */
322 __peak_engine_set_or_clear(e, c->_ident, c->_state, 0);
323
324 c->_engine = e;
325
326 _peak_spinlock_unlock(&pollfdLock);
327 }
328
329 __private_extern__ void
peak_engine_remove_client(peak_engine e,peak_engine_client c)330 peak_engine_remove_client(peak_engine e, peak_engine_client c)
331 {
332 _peak_spinlock_lock(&pollfdLock);
333 assert(c != NULL);
334
335 if (c->_state & CS_SIGNAL)
336 {
337 struct sigaction action;
338
339 /* Remove a signal: restore default action. */
340 action.sa_handler = SIG_DFL;
341 sigemptyset(&action.sa_mask);
342 action.sa_flags = 0;
343
344 sigaction(c->_ident, &action, NULL);
345 e->_signals[c->_ident] = NULL;
346 }
347 else
348 {
349 /* Remove it. */
350 __peak_engine_set_or_clear(e, c->_ident, 0, CS_ANY);
351
352 /* Then we can clear the slot. */
353 e->_clients[c->_ident] = NULL;
354 }
355 c->_engine = NULL;
356 _peak_spinlock_unlock(&pollfdLock);
357 }
358
359 /* Precondition: Always called under the protection of c->_lock.
360 */
361 __private_extern__ void
peak_engine_edit_client(peak_engine e,peak_engine_client c)362 peak_engine_edit_client(peak_engine e, peak_engine_client c)
363 {
364 assert(!(c->_state & CS_HANDLED));
365
366 if (c->_sstate != c->_state)
367 {
368 c->_sstate = c->_state;
369 _peak_spinlock_lock(&pollfdLock);
370 __peak_engine_set_or_clear(e, c->_ident, c->_state, CS_ANY);
371 _peak_spinlock_unlock(&pollfdLock);
372 }
373 }
374
375 #define PEAK_POLLS_PER_LOOP 24
376 __private_extern__ void
peak_engine_loop(peak_engine e)377 peak_engine_loop(peak_engine e)
378 {
379 peak_engine_client c;
380 struct dvpoll dopoll;
381 struct pollfd polls[PEAK_POLLS_PER_LOOP];
382 int maxpolls = PEAK_POLLS_PER_LOOP;
383 int i, nfds;
384 int err;
385
386 e->_running = 1;
387
388 do {
389 dopoll.dp_fds = polls;
390 dopoll.dp_nfds = maxpolls;
391 dopoll.dp_timeout = _peak_task_timer_mswait(e->_task);
392 nfds = ioctl(e->_dpfd, DP_POLL, &dopoll);
393
394 if (nfds < 0)
395 {
396 fprintf(stderr, "/dev/poll: ioctl error\n");
397 continue;
398 }
399
400 e->_ne = 0;
401
402 for (i = 0; i < nfds; i++)
403 {
404 if ((c = e->_clients[polls[i].fd]) == NULL)
405 continue;
406
407 if (c->_state & CS_SIGNAL)
408 {
409 if (polls[i].revents & POLLREADFLAGS)
410 {
411 int signum;
412
413 if (read(interrupt_read_fd, &signum, sizeof(signum))
414 == sizeof(signum))
415 {
416 if (signum > 0 && signum <= MAX_SIGNUM)
417 __peak_engine_ioevent_generate(e, e->_signals[signum],
418 IOEVENT_SIGNAL, signum);
419 }
420 }
421 continue;
422 }
423
424 assert(!(c->_state & CS_SIGNAL));
425 assert(polls[i].fd == c->_ident);
426
427 if ((err = peak_socket_get_error(polls[i].fd)) != 0)
428 {
429 __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, err);
430 continue;
431 }
432
433 #ifdef POLLHUP
434 if (polls[i].revents & POLLHUP)
435 __peak_engine_ioevent_generate(e, c, IOEVENT_EOF, 0);
436 else
437 #endif
438 if (polls[i].revents & POLLREADFLAGS)
439 {
440 if (c->_state & CS_ACCEPTING) /* ready for accept */
441 __peak_engine_ioevent_generate(e, c, IOEVENT_ACCEPT, 0);
442 else
443 {
444 assert(c->_state & CS_READING);
445
446 /* PEEK TEST */
447 if (c->_state & CS_PEEKABLE)
448 {
449 switch (peak_socket_peek(polls[i].fd))
450 {
451 case -1:
452 if (errno == EAGAIN)
453 {
454 PEAK_WARN("peak_socket_peek triggered EAGAIN");
455 continue; /* Resource temporarily unavailable */
456 }
457 __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, errno);
458 PEAK_FATAL("peak_socket_peek failed", errno);
459 break;
460 case 0: /* EOF */
461 __peak_engine_ioevent_generate(e, c, IOEVENT_EOF, 0);
462 break;
463 default:
464 __peak_engine_ioevent_generate(e, c, IOEVENT_READ, 0);
465 break;
466 }
467 }
468 else
469 __peak_engine_ioevent_generate(e, c, IOEVENT_READ, 0);
470 }
471 }
472 else if (polls[i].revents & POLLWRITEFLAGS)
473 {
474 if (c->_state & CS_CONNECTING)
475 __peak_engine_ioevent_generate(e, c, IOEVENT_CONNECT, 0);
476 else
477 {
478 assert (c->_state & CS_WRITING);
479
480 __peak_engine_ioevent_generate(e, c, IOEVENT_WRITE, 0);
481 }
482 }
483 }
484
485 /* Prepare to fire any pending timers
486 */
487 e->_ne += _peak_task_timer_schedule_fire(e->_task);
488
489 /* Process events...
490 */
491 _peak_task_process_pending_events(e->_task, e->_ne);
492
493 } while (e->_running);
494 }
495
496 __private_extern__ void
peak_engine_break(peak_engine e)497 peak_engine_break(peak_engine e)
498 {
499 e->_running = 0;
500 }
501
502 static void
__peak_engine_ioevent_generate(peak_engine e,peak_engine_client c,int ioevent,int info)503 __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
504 int ioevent, int info)
505 {
506 uint16_t mclear = 0, mset = 0;
507
508 switch (ioevent)
509 {
510 case IOEVENT_CONNECT:
511 mclear = CS_CONNECTING;
512 mset = CS_CONNECTED|CS_READING|CS_WRITING;
513 break;
514 case IOEVENT_ACCEPT:
515 mclear = CS_ACCEPTING;
516 break;
517 case IOEVENT_READ:
518 mclear = CS_READING;
519 break;
520 case IOEVENT_WRITE:
521 mclear = CS_WRITING;
522 break;
523 case IOEVENT_EOF:
524 case IOEVENT_ERROR:
525 mclear = CS_CONNECTED|CS_READING|CS_WRITING;
526 break;
527 case IOEVENT_SIGNAL:
528 break;
529 default:
530 PEAK_HALT;
531 break;
532 }
533
534 /* Set "event handled" bit */
535 c->_state |= CS_HANDLED;
536
537 /* Cache state */
538 c->_sstate = c->_state;
539
540 /* Prepare */
541 c->_state &= ~mclear;
542 c->_state |= mset;
543
544 /* Schedule for processing */
545 _peak_task_op_ioevent_schedule(e->_task, c, ioevent, info);
546
547 e->_ne++;
548 }
549
550 __private_extern__ void
peak_engine_event_postprocess(peak_engine_client c)551 peak_engine_event_postprocess(peak_engine_client c)
552 {
553 peak_engine e = c->_engine;
554
555 /* Commit changes if necessary, restore stuffs.
556 */
557 _peak_engine_client_lock(c);
558
559 if (c->_sstate != c->_state && e != NULL)
560 __peak_engine_set_or_clear(e, c->_ident, c->_state, CS_ANY);
561
562 c->_sstate = 0; /* invalidate cache */
563 c->_state &= ~CS_HANDLED; /* we don't handle it anymore */
564
565 _peak_engine_client_unlock(c);
566 }
567