1 /* PEAK Library
2  *
3  * Copyright (c) 2004
4  *      Stephane Thiell <mbuna@bugged.org>. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  *
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
22  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
23  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
24  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
25  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  *
29  * $Id: engine_mod_devpoll.c,v 1.1 2007/05/24 13:11:53 mbuna Exp $
30  */
31 #define RCSID "$Id: engine_mod_devpoll.c,v 1.1 2007/05/24 13:11:53 mbuna Exp $"
32 
33 #ifdef HAVE_CONFIG_H
34 #include "config.h"
35 #endif
36 
37 #include "engine.h"
38 
39 #include <assert.h>
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <sys/types.h>
44 #include <sys/socket.h>
45 #include <sys/stat.h>
46 #include <sys/devpoll.h>
47 #include <fcntl.h>
48 #include <time.h>
49 #include <unistd.h>
50 #ifdef HAVE_SIGNAL_H
51 #include <signal.h>
52 #endif
53 
54 #include "internal.h"
55 #include "socket.h"
56 #include "spinlock.h"
57 #include "task_private.h"
58 #include "utilities.h"
59 
60 /* Figure out what bits to set for read */
61 #if defined(POLLMSG) && defined(POLLIN) && defined(POLLRDNORM)
62 #  define POLLREADFLAGS (POLLMSG|POLLIN|POLLRDNORM)
63 #elif defined(POLLIN) && defined(POLLRDNORM)
64 #  define POLLREADFLAGS (POLLIN|POLLRDNORM)
65 #elif defined(POLLIN)
66 #  define POLLREADFLAGS POLLIN
67 #elif defined(POLLRDNORM)
68 #  define POLLREADFLAGS POLLRDNORM
69 #endif
70 
71 /* Figure out what bits to set for write */
72 #if defined(POLLOUT) && defined(POLLWRNORM)
73 #  define POLLWRITEFLAGS (POLLOUT|POLLWRNORM)
74 #elif defined(POLLOUT)
75 #  define POLLWRITEFLAGS POLLOUT
76 #elif defined(POLLWRNORM)
77 #  define POLLWRITEFLAGS POLLWRNORM
78 #endif
79 
80 static peak_spinlock_t pollfdLock = PEAK_SPINLOCK_INITIALIZER;
81 
82 static void __peak_engine_init(peak_engine e, va_list vp);
83 static void __peak_engine_finalize(peak_engine e);
84 static void __peak_engine_allocate_clients(peak_engine e);
85 static void __peak_engine_set_or_clear(peak_engine e, int fd, uint32_t set,
86                                        uint32_t clear);
87 static void __peak_engine_add_signal(peak_engine e, peak_engine_client c);
88 static void __peak_engine_signal_trap(int signum);
89 static void __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
90                                            int event, int info);
91 
92 #ifndef MAX_SIGNUM
93 #ifdef NSIG
94 #define MAX_SIGNUM (NSIG-1)
95 #else
96 #define MAX_SIGNUM 31
97 #endif
98 #endif
99 
100 struct __peak_engine
101   {
102   PEAK_STRUCT_RT_HEADER;
103   peak_task _task;
104   peak_engine_client* _clients;
105   int _alloctotal;
106   int _dpfd;
107   int _maxfds;
108   peak_engine_client _signals[MAX_SIGNUM + 1];
109   int _ne;
110   volatile int _running;
111   };
112 
113 static int interrupt_read_fd, interrupt_write_fd;
114 
115 PEAK_CLASS_BASE_DECLARE(engine);
116 
117 __private_extern__ const char *
peak_engine_get_name(peak_engine e)118 peak_engine_get_name(peak_engine e)
119   {
120   return "/dev/poll";
121   }
122 
123 __private_extern__ peak_engine
peak_engine_create(peak_task task)124 peak_engine_create(peak_task task)
125   {
126   return PEAK_CLASS_CONSTRUCT1(engine, task);
127   }
128 
129 static void
__peak_engine_init(peak_engine e,va_list vp)130 __peak_engine_init(peak_engine e, va_list vp)
131   {
132   int i, p[2];
133 
134   e->_task = va_arg(vp, peak_task);
135   e->_maxfds = peak_set_fdlimit(PEAK_DEFAULT_FLAVOR_MAXFDS);
136   e->_clients = NULL;
137   e->_alloctotal = 0;
138 
139   /* Open /dev/poll device. */
140   if ((e->_dpfd = open("/dev/poll", O_RDWR)) < 0)
141     PEAK_HALT;
142 
143   for (i = 0; i <= MAX_SIGNUM; i++)
144     e->_signals[i] = NULL;
145 
146   if (interrupt_read_fd == 0)
147     {
148     if (pipe(p) == -1)
149       PEAK_HALT;
150     interrupt_read_fd = p[0];
151     interrupt_write_fd = p[1];
152     }
153 
154   e->_running = 0;
155   }
156 
157 static void
__peak_engine_finalize(peak_engine e)158 __peak_engine_finalize(peak_engine e)
159   {
160   if (e->_clients)
161     peak_deallocate(e->_clients);
162   close(e->_dpfd);
163   }
164 
165 __private_extern__ int
peak_engine_get_maxfds(peak_engine e)166 peak_engine_get_maxfds(peak_engine e)
167   {
168   return e->_maxfds;
169   }
170 
171 __private_extern__ int
peak_engine_set_maxfds(peak_engine e,int maxfds)172 peak_engine_set_maxfds(peak_engine e, int maxfds)
173   {
174   if (maxfds <= 0) /* doh */
175     return -1;
176 
177   maxfds += 2; /* for signal fds */
178 
179   if (e->_alloctotal > 0 && maxfds > e->_alloctotal)
180     {
181     peak_engine_client * n_clients;
182     int i;
183 
184     n_clients = (peak_engine_client *) peak_allocate(sizeof(peak_engine_client)
185                                                      * maxfds);
186 
187     for (i = 0; i < e->_maxfds; i++)
188       n_clients[i] = e->_clients[i];
189     for (i = e->_maxfds; i < maxfds; i++)
190       n_clients[i] = NULL;
191 
192     _peak_spinlock_lock(&pollfdLock);
193     e->_alloctotal = maxfds;
194     e->_maxfds = maxfds;
195     peak_deallocate(e->_clients);
196     e->_clients = n_clients;
197     _peak_spinlock_unlock(&pollfdLock);
198     }
199   else
200     {
201     _peak_spinlock_lock(&pollfdLock);
202     e->_maxfds = peak_set_fdlimit(maxfds);
203     _peak_spinlock_unlock(&pollfdLock);
204     }
205   return maxfds == e->_maxfds ? 0 : -1;
206   }
207 
208 /* Should be called under the protection of pollfdLock for eventual
209  * concurrent allocation (eg. 2 timers are fired at the same time to create
210  * the first engine's clients on different threads).
211  */
212 static void
__peak_engine_allocate_clients(peak_engine e)213 __peak_engine_allocate_clients(peak_engine e)
214   {
215   int i;
216 
217   assert(e->_clients == NULL);
218 
219   e->_clients = (peak_engine_client *) peak_allocate(sizeof(peak_engine_client)
220                                                      * e->_maxfds);
221   for (i = 0; i < e->_maxfds; i++)
222     e->_clients[i] = NULL;
223   e->_alloctotal = e->_maxfds;
224   }
225 
226 
227 static void
__peak_engine_set_or_clear(peak_engine e,int fd,uint32_t set,uint32_t clear)228 __peak_engine_set_or_clear(peak_engine e, int fd, uint32_t set, uint32_t clear)
229   {
230   struct pollfd pfd;
231 //  printf("fd=%d set=%x clear=%x\n", fd, set, clear);
232   pfd.fd = fd;
233   if (clear != 0)
234     {
235     pfd.events = POLLREMOVE;
236     if (write(e->_dpfd, &pfd, sizeof(pfd)) != sizeof(pfd))
237       PEAK_HALT;
238     }
239 
240   if (!(set & CS_ANY))
241     return;
242 
243   pfd.events = 0;
244   if (set & (CS_ACCEPTING|CS_READING))
245     pfd.events |= POLLREADFLAGS;
246   if (set & (CS_CONNECTING|CS_WRITING))
247     pfd.events |= POLLWRITEFLAGS;
248 
249   if (write(e->_dpfd, &pfd, sizeof(pfd)) != sizeof(pfd))
250     PEAK_HALT;
251   }
252 
253 static void
__peak_engine_add_signal(peak_engine e,peak_engine_client c)254 __peak_engine_add_signal(peak_engine e, peak_engine_client c)
255   {
256   struct sigaction action;
257   sigset_t stop_signal;
258 
259   sigemptyset(&stop_signal);
260   sigaddset(&stop_signal, c->_ident);
261 
262   action.sa_handler = __peak_engine_signal_trap;
263   action.sa_mask = stop_signal;
264   action.sa_flags = 0;
265 
266   if (c->_ident > MAX_SIGNUM)
267     PEAK_HALT;
268 
269   if (sigaction(c->_ident, &action, NULL) == 0)
270     {
271     e->_signals[c->_ident] = c;
272 
273     /* Register one real client for all signals, the first signal client is
274      * effectively used for convenience.
275      */
276     _peak_spinlock_lock(&pollfdLock);
277     if (!e->_alloctotal)
278       __peak_engine_allocate_clients(e);
279 
280     if (e->_clients[interrupt_read_fd] == NULL)
281       {
282       if (interrupt_read_fd >= e->_maxfds)
283         PEAK_HALT; /* TODO */
284 
285       e->_clients[interrupt_read_fd] = c;
286 
287       c->_state |= CS_READING;
288       __peak_engine_set_or_clear(e, interrupt_read_fd, c->_state, 0);
289       c->_engine = e;
290       }
291     _peak_spinlock_unlock(&pollfdLock);
292     }
293   }
294 
295 static void
__peak_engine_signal_trap(int signum)296 __peak_engine_signal_trap(int signum)
297   {
298   assert (interrupt_write_fd >= 0);
299   write(interrupt_write_fd, &signum, sizeof(interrupt_write_fd));
300   }
301 
302 __private_extern__ void
peak_engine_add_client(peak_engine e,peak_engine_client c)303 peak_engine_add_client(peak_engine e, peak_engine_client c)
304   {
305   if (c->_state & CS_SIGNAL)
306     {
307     __peak_engine_add_signal(e, c);
308     return;
309     }
310 
311   _peak_spinlock_lock(&pollfdLock);
312 
313   if (!e->_alloctotal)
314     __peak_engine_allocate_clients(e);
315 
316   if (c->_ident >= e->_maxfds)
317     PEAK_HALT; /* Sorry, that's too much. */
318 
319   e->_clients[c->_ident] = c;
320 
321   /* Set start flags */
322   __peak_engine_set_or_clear(e, c->_ident, c->_state, 0);
323 
324   c->_engine = e;
325 
326   _peak_spinlock_unlock(&pollfdLock);
327   }
328 
329 __private_extern__ void
peak_engine_remove_client(peak_engine e,peak_engine_client c)330 peak_engine_remove_client(peak_engine e, peak_engine_client c)
331   {
332   _peak_spinlock_lock(&pollfdLock);
333   assert(c != NULL);
334 
335   if (c->_state & CS_SIGNAL)
336     {
337     struct sigaction action;
338 
339     /* Remove a signal: restore default action. */
340     action.sa_handler = SIG_DFL;
341     sigemptyset(&action.sa_mask);
342     action.sa_flags = 0;
343 
344     sigaction(c->_ident, &action, NULL);
345     e->_signals[c->_ident] = NULL;
346     }
347   else
348     {
349     /* Remove it. */
350     __peak_engine_set_or_clear(e, c->_ident, 0, CS_ANY);
351 
352     /* Then we can clear the slot. */
353     e->_clients[c->_ident] = NULL;
354     }
355   c->_engine = NULL;
356   _peak_spinlock_unlock(&pollfdLock);
357   }
358 
359 /* Precondition: Always called under the protection of c->_lock.
360  */
361 __private_extern__ void
peak_engine_edit_client(peak_engine e,peak_engine_client c)362 peak_engine_edit_client(peak_engine e, peak_engine_client c)
363   {
364   assert(!(c->_state & CS_HANDLED));
365 
366   if (c->_sstate != c->_state)
367     {
368     c->_sstate = c->_state;
369     _peak_spinlock_lock(&pollfdLock);
370     __peak_engine_set_or_clear(e, c->_ident, c->_state, CS_ANY);
371     _peak_spinlock_unlock(&pollfdLock);
372     }
373   }
374 
375 #define PEAK_POLLS_PER_LOOP 24
376 __private_extern__ void
peak_engine_loop(peak_engine e)377 peak_engine_loop(peak_engine e)
378   {
379   peak_engine_client c;
380   struct dvpoll dopoll;
381   struct pollfd polls[PEAK_POLLS_PER_LOOP];
382   int maxpolls = PEAK_POLLS_PER_LOOP;
383   int i, nfds;
384   int err;
385 
386   e->_running = 1;
387 
388   do  {
389     dopoll.dp_fds = polls;
390     dopoll.dp_nfds = maxpolls;
391     dopoll.dp_timeout = _peak_task_timer_mswait(e->_task);
392     nfds = ioctl(e->_dpfd, DP_POLL, &dopoll);
393 
394     if (nfds < 0)
395       {
396       fprintf(stderr, "/dev/poll: ioctl error\n");
397       continue;
398       }
399 
400     e->_ne = 0;
401 
402     for (i = 0; i < nfds; i++)
403       {
404       if ((c = e->_clients[polls[i].fd]) == NULL)
405         continue;
406 
407       if (c->_state & CS_SIGNAL)
408         {
409         if (polls[i].revents & POLLREADFLAGS)
410           {
411           int signum;
412 
413           if (read(interrupt_read_fd, &signum, sizeof(signum))
414               == sizeof(signum))
415             {
416             if (signum > 0 && signum <= MAX_SIGNUM)
417               __peak_engine_ioevent_generate(e, e->_signals[signum],
418                                              IOEVENT_SIGNAL, signum);
419             }
420           }
421         continue;
422         }
423 
424       assert(!(c->_state & CS_SIGNAL));
425       assert(polls[i].fd == c->_ident);
426 
427       if ((err = peak_socket_get_error(polls[i].fd)) != 0)
428         {
429         __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, err);
430         continue;
431         }
432 
433 #ifdef POLLHUP
434       if (polls[i].revents & POLLHUP)
435         __peak_engine_ioevent_generate(e, c, IOEVENT_EOF, 0);
436       else
437 #endif
438       if (polls[i].revents & POLLREADFLAGS)
439         {
440         if (c->_state & CS_ACCEPTING)  /* ready for accept */
441           __peak_engine_ioevent_generate(e, c, IOEVENT_ACCEPT, 0);
442         else
443           {
444           assert(c->_state & CS_READING);
445 
446           /* PEEK TEST */
447           if (c->_state & CS_PEEKABLE)
448             {
449             switch (peak_socket_peek(polls[i].fd))
450               {
451               case -1:
452                 if (errno == EAGAIN)
453                   {
454                   PEAK_WARN("peak_socket_peek triggered EAGAIN");
455                   continue; /* Resource temporarily unavailable */
456                   }
457                 __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, errno);
458                 PEAK_FATAL("peak_socket_peek failed", errno);
459                 break;
460               case 0: /* EOF */
461                 __peak_engine_ioevent_generate(e, c, IOEVENT_EOF, 0);
462                 break;
463               default:
464                 __peak_engine_ioevent_generate(e, c, IOEVENT_READ, 0);
465                 break;
466               }
467             }
468           else
469             __peak_engine_ioevent_generate(e, c, IOEVENT_READ, 0);
470           }
471         }
472       else if (polls[i].revents & POLLWRITEFLAGS)
473         {
474         if (c->_state & CS_CONNECTING)
475           __peak_engine_ioevent_generate(e, c, IOEVENT_CONNECT, 0);
476         else
477           {
478           assert (c->_state & CS_WRITING);
479 
480           __peak_engine_ioevent_generate(e, c, IOEVENT_WRITE, 0);
481           }
482         }
483       }
484 
485       /* Prepare to fire any pending timers
486      */
487     e->_ne += _peak_task_timer_schedule_fire(e->_task);
488 
489     /* Process events...
490      */
491     _peak_task_process_pending_events(e->_task, e->_ne);
492 
493     } while (e->_running);
494   }
495 
496 __private_extern__ void
peak_engine_break(peak_engine e)497 peak_engine_break(peak_engine e)
498   {
499   e->_running = 0;
500   }
501 
502 static void
__peak_engine_ioevent_generate(peak_engine e,peak_engine_client c,int ioevent,int info)503 __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
504                                int ioevent, int info)
505   {
506   uint16_t mclear = 0, mset = 0;
507 
508   switch (ioevent)
509     {
510     case IOEVENT_CONNECT:
511       mclear = CS_CONNECTING;
512       mset = CS_CONNECTED|CS_READING|CS_WRITING;
513       break;
514     case IOEVENT_ACCEPT:
515       mclear = CS_ACCEPTING;
516       break;
517     case IOEVENT_READ:
518       mclear = CS_READING;
519       break;
520     case IOEVENT_WRITE:
521       mclear = CS_WRITING;
522       break;
523     case IOEVENT_EOF:
524     case IOEVENT_ERROR:
525       mclear = CS_CONNECTED|CS_READING|CS_WRITING;
526       break;
527     case IOEVENT_SIGNAL:
528       break;
529     default:
530       PEAK_HALT;
531       break;
532     }
533 
534   /* Set "event handled" bit */
535   c->_state |= CS_HANDLED;
536 
537   /* Cache state */
538   c->_sstate = c->_state;
539 
540   /* Prepare */
541   c->_state &= ~mclear;
542   c->_state |= mset;
543 
544   /* Schedule for processing */
545   _peak_task_op_ioevent_schedule(e->_task, c, ioevent, info);
546 
547   e->_ne++;
548   }
549 
550 __private_extern__ void
peak_engine_event_postprocess(peak_engine_client c)551 peak_engine_event_postprocess(peak_engine_client c)
552   {
553   peak_engine e = c->_engine;
554 
555   /* Commit changes if necessary, restore stuffs.
556    */
557   _peak_engine_client_lock(c);
558 
559   if (c->_sstate != c->_state && e != NULL)
560     __peak_engine_set_or_clear(e, c->_ident, c->_state, CS_ANY);
561 
562   c->_sstate = 0;              /* invalidate cache */
563   c->_state &= ~CS_HANDLED;    /* we don't handle it anymore */
564 
565   _peak_engine_client_unlock(c);
566   }
567