1 /**
2  * @file main.c  Main polling routine
3  *
4  * Copyright (C) 2010 Creytiv.com
5  */
6 #ifdef HAVE_SYS_TIME_H
7 #include <sys/time.h>
8 #endif
9 #include <sys/types.h>
10 #undef _STRICT_ANSI
11 #include <string.h>
12 #ifdef HAVE_UNISTD_H
13 #include <unistd.h>
14 #endif
15 #ifdef WIN32
16 #include <winsock.h>
17 #endif
18 #ifdef HAVE_SIGNAL
19 #include <signal.h>
20 #endif
21 #ifdef HAVE_SELECT_H
22 #include <sys/select.h>
23 #endif
24 #ifdef HAVE_POLL
25 #include <poll.h>
26 #endif
27 #ifdef HAVE_EPOLL
28 #include <sys/epoll.h>
29 #endif
30 #ifdef HAVE_KQUEUE
31 #include <sys/types.h>
32 #include <sys/event.h>
33 #include <sys/time.h>
34 #undef LIST_INIT
35 #undef LIST_FOREACH
36 #endif
37 #include <re_types.h>
38 #include <re_fmt.h>
39 #include <re_mem.h>
40 #include <re_mbuf.h>
41 #include <re_list.h>
42 #include <re_tmr.h>
43 #include <re_main.h>
44 #include "main.h"
45 #ifdef HAVE_PTHREAD
46 #define __USE_GNU 1
47 #include <stdlib.h>
48 #include <pthread.h>
49 #endif
50 
51 
52 #define DEBUG_MODULE "main"
53 #define DEBUG_LEVEL 5
54 #include <re_dbg.h>
55 
56 /*
57   epoll() has been tested successfully on the following kernels:
58 
59   - Linux 2.6.16.29-xen   (Debian 4.0 etch)
60   - Linux 2.6.18-4-amd64  (Debian 4.0 etch)
61 
62 
63   TODO clean this up
64 
65   - The polling method is selectable both in compile-time and run-time
66   - The polling method can be changed in run time. this is cool!
67   - Maximum number of fds can be set from application, but only once!
68   - Look at howto optimise main loop
69  */
70 
71 #if !defined (RELEASE) && !defined (MAIN_DEBUG)
72 #define MAIN_DEBUG 1  /**< Enable main loop debugging */
73 #endif
74 
75 
76 /** Main loop values */
77 enum {
78 	MAX_BLOCKING = 100,    /**< Maximum time spent in handler in [ms] */
79 #if defined (FD_SETSIZE)
80 	DEFAULT_MAXFDS = FD_SETSIZE
81 #else
82 	DEFAULT_MAXFDS = 128
83 #endif
84 };
85 
86 
87 /** Polling loop data */
88 struct re {
89 	/** File descriptor handler set */
90 	struct {
91 		int flags;           /**< Polling flags (Read, Write, etc.) */
92 		fd_h *fh;            /**< Event handler                     */
93 		void *arg;           /**< Handler argument                  */
94 	} *fhs;
95 	int maxfds;                  /**< Maximum number of polling fds     */
96 	int nfds;                    /**< Number of active file descriptors */
97 	enum poll_method method;     /**< The current polling method        */
98 	bool update;                 /**< File descriptor set need updating */
99 	bool polling;                /**< Is polling flag                   */
100 	int sig;                     /**< Last caught signal                */
101 	struct list tmrl;            /**< List of timers                    */
102 
103 #ifdef HAVE_POLL
104 	struct pollfd *fds;          /**< Event set for poll()              */
105 #endif
106 
107 #ifdef HAVE_EPOLL
108 	struct epoll_event *events;  /**< Event set for epoll()             */
109 	int epfd;                    /**< epoll control file descriptor     */
110 #endif
111 
112 #ifdef HAVE_KQUEUE
113 	struct kevent *evlist;
114 	int kqfd;
115 #endif
116 
117 #ifdef HAVE_PTHREAD
118 	pthread_mutex_t mutex;       /**< Mutex for thread synchronization  */
119 	pthread_mutex_t *mutexp;     /**< Pointer to active mutex           */
120 #endif
121 };
122 
123 static struct re global_re = {
124 	NULL,
125 	0,
126 	0,
127 	METHOD_NULL,
128 	false,
129 	false,
130 	0,
131 	LIST_INIT,
132 #ifdef HAVE_POLL
133 	NULL,
134 #endif
135 #ifdef HAVE_EPOLL
136 	NULL,
137 	-1,
138 #endif
139 #ifdef HAVE_KQUEUE
140 	NULL,
141 	-1,
142 #endif
143 #ifdef HAVE_PTHREAD
144 #if MAIN_DEBUG && defined (PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP)
145 	PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP,
146 #else
147 	PTHREAD_MUTEX_INITIALIZER,
148 #endif
149 	&global_re.mutex,
150 #endif
151 };
152 
153 
154 #ifdef HAVE_PTHREAD
155 
156 static void poll_close(struct re *re);
157 
158 static pthread_once_t pt_once = PTHREAD_ONCE_INIT;
159 static pthread_key_t  pt_key;
160 
161 
162 static void thread_destructor(void *arg)
163 {
164 	poll_close(arg);
165 	free(arg);
166 }
167 
168 
169 static void re_once(void)
170 {
171 	pthread_key_create(&pt_key, thread_destructor);
172 }
173 
174 
175 static struct re *re_get(void)
176 {
177 	struct re *re;
178 
179 	pthread_once(&pt_once, re_once);
180 
181 	re = pthread_getspecific(pt_key);
182 	if (!re) {
183 		re = &global_re;
184 	}
185 
186 	return re;
187 }
188 
189 
190 static inline void re_lock(struct re *re)
191 {
192 	int err;
193 
194 	err = pthread_mutex_lock(re->mutexp);
195 	if (err) {
196 		DEBUG_WARNING("re_lock: %m\n", err);
197 	}
198 }
199 
200 
201 static inline void re_unlock(struct re *re)
202 {
203 	int err;
204 
205 	err = pthread_mutex_unlock(re->mutexp);
206 	if (err) {
207 		DEBUG_WARNING("re_unlock: %m\n", err);
208 	}
209 }
210 
211 
212 #else
213 
214 static struct re *re_get(void)
215 {
216 	return &global_re;
217 }
218 
219 #define re_lock(x)    /**< Stub */
220 #define re_unlock(x)  /**< Stub */
221 
222 #endif
223 
224 
225 #if MAIN_DEBUG
226 /**
227  * Call the application event handler
228  *
229  * @param re     Poll state
230  * @param fd     File descriptor
231  * @param flags  Event flags
232  */
233 static void fd_handler(struct re *re, int fd, int flags)
234 {
235 	const uint64_t tick = tmr_jiffies();
236 	uint32_t diff;
237 
238 	DEBUG_INFO("event on fd=%d (flags=0x%02x)...\n", fd, flags);
239 
240 	re->fhs[fd].fh(flags, re->fhs[fd].arg);
241 
242 	diff = (uint32_t)(tmr_jiffies() - tick);
243 
244 	if (diff > MAX_BLOCKING) {
245 		DEBUG_WARNING("long async blocking: %u>%u ms (h=%p arg=%p)\n",
246 			      diff, MAX_BLOCKING,
247 			      re->fhs[fd].fh, re->fhs[fd].arg);
248 	}
249 }
250 #endif
251 
252 
253 #ifdef HAVE_POLL
254 static int set_poll_fds(struct re *re, int fd, int flags)
255 {
256 	if (!re->fds)
257 		return 0;
258 
259 	if (flags)
260 		re->fds[fd].fd = fd;
261 	else
262 		re->fds[fd].fd = -1;
263 
264 	re->fds[fd].events = 0;
265 	if (flags & FD_READ)
266 		re->fds[fd].events |= POLLIN;
267 	if (flags & FD_WRITE)
268 		re->fds[fd].events |= POLLOUT;
269 	if (flags & FD_EXCEPT)
270 		re->fds[fd].events |= POLLERR;
271 
272 	return 0;
273 }
274 #endif
275 
276 
277 #ifdef HAVE_EPOLL
278 static int set_epoll_fds(struct re *re, int fd, int flags)
279 {
280 	struct epoll_event event;
281 	int err = 0;
282 
283 	if (re->epfd < 0)
284 		return EBADFD;
285 
286 	memset(&event, 0, sizeof(event));
287 
288 	DEBUG_INFO("set_epoll_fds: fd=%d flags=0x%02x\n", fd, flags);
289 
290 	if (flags) {
291 		event.data.fd = fd;
292 
293 		if (flags & FD_READ)
294 			event.events |= EPOLLIN;
295 		if (flags & FD_WRITE)
296 			event.events |= EPOLLOUT;
297 		if (flags & FD_EXCEPT)
298 			event.events |= EPOLLERR;
299 
300 		/* Try to add it first */
301 		if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_ADD, fd, &event)) {
302 
303 			/* If already exist then modify it */
304 			if (EEXIST == errno) {
305 
306 				if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_MOD,
307 						    fd, &event)) {
308 					err = errno;
309 					DEBUG_WARNING("epoll_ctl:"
310 						      " EPOLL_CTL_MOD:"
311 						      " fd=%d (%m)\n",
312 						      fd, err);
313 				}
314 			}
315 			else {
316 				err = errno;
317 				DEBUG_WARNING("epoll_ctl: EPOLL_CTL_ADD:"
318 					      " fd=%d (%m)\n",
319 					      fd, err);
320 			}
321 		}
322 	}
323 	else {
324 		if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_DEL, fd, &event)) {
325 			err = errno;
326 			DEBUG_INFO("epoll_ctl: EPOLL_CTL_DEL: fd=%d (%m)\n",
327 				   fd, err);
328 		}
329 	}
330 
331 	return err;
332 }
333 #endif
334 
335 
336 #ifdef HAVE_KQUEUE
337 static int set_kqueue_fds(struct re *re, int fd, int flags)
338 {
339 	struct kevent kev[2];
340 	int r, n = 0;
341 
342 	memset(kev, 0, sizeof(kev));
343 
344 	/* always delete the events */
345 	EV_SET(&kev[0], fd, EVFILT_READ,  EV_DELETE, 0, 0, 0);
346 	EV_SET(&kev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
347 	kevent(re->kqfd, kev, 2, NULL, 0, NULL);
348 
349 	memset(kev, 0, sizeof(kev));
350 
351 	if (flags & FD_WRITE) {
352 		EV_SET(&kev[n], fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
353 		++n;
354 	}
355 	if (flags & FD_READ) {
356 		EV_SET(&kev[n], fd, EVFILT_READ, EV_ADD, 0, 0, 0);
357 		++n;
358 	}
359 
360 	if (n) {
361 		r = kevent(re->kqfd, kev, n, NULL, 0, NULL);
362 		if (r < 0) {
363 			int err = errno;
364 
365 			DEBUG_WARNING("set: [fd=%d, flags=%x] kevent: %m\n",
366 				      fd, flags, err);
367 			return err;
368 		}
369 	}
370 
371 	return 0;
372 }
373 #endif
374 
375 
376 /**
377  * Rebuild the file descriptor mapping table. This must be done whenever
378  * the polling method is changed.
379  */
380 static int rebuild_fds(struct re *re)
381 {
382 	int i, err = 0;
383 
384 	DEBUG_INFO("rebuilding fds (nfds=%d)\n", re->nfds);
385 
386 	/* Update fd sets */
387 	for (i=0; i<re->nfds; i++) {
388 		if (!re->fhs[i].fh)
389 			continue;
390 
391 		switch (re->method) {
392 
393 #ifdef HAVE_POLL
394 		case METHOD_POLL:
395 			err = set_poll_fds(re, i, re->fhs[i].flags);
396 			break;
397 #endif
398 #ifdef HAVE_EPOLL
399 		case METHOD_EPOLL:
400 			err = set_epoll_fds(re, i, re->fhs[i].flags);
401 			break;
402 #endif
403 
404 #ifdef HAVE_KQUEUE
405 		case METHOD_KQUEUE:
406 			err = set_kqueue_fds(re, i, re->fhs[i].flags);
407 			break;
408 #endif
409 
410 		default:
411 			break;
412 		}
413 
414 		if (err)
415 			break;
416 	}
417 
418 	return err;
419 }
420 
421 
422 static int poll_init(struct re *re)
423 {
424 	DEBUG_INFO("poll init (maxfds=%d)\n", re->maxfds);
425 
426 	if (!re->maxfds) {
427 		DEBUG_WARNING("poll init: maxfds is 0\n");
428 		return EINVAL;
429 	}
430 
431 	switch (re->method) {
432 
433 #ifdef HAVE_POLL
434 	case METHOD_POLL:
435 		if (!re->fds) {
436 			re->fds = mem_zalloc(re->maxfds * sizeof(*re->fds),
437 					    NULL);
438 			if (!re->fds)
439 				return ENOMEM;
440 		}
441 		break;
442 #endif
443 #ifdef HAVE_EPOLL
444 	case METHOD_EPOLL:
445 		if (!re->events) {
446 			DEBUG_INFO("allocate %u bytes for epoll set\n",
447 				   re->maxfds * sizeof(*re->events));
448 			re->events = mem_zalloc(re->maxfds*sizeof(*re->events),
449 					      NULL);
450 			if (!re->events)
451 				return ENOMEM;
452 		}
453 
454 		if (re->epfd < 0
455 		    && -1 == (re->epfd = epoll_create(re->maxfds))) {
456 
457 			int err = errno;
458 
459 			DEBUG_WARNING("epoll_create: %m (maxfds=%d)\n",
460 				      err, re->maxfds);
461 			return err;
462 		}
463 		DEBUG_INFO("init: epoll_create() epfd=%d\n", re->epfd);
464 		break;
465 #endif
466 
467 #ifdef HAVE_KQUEUE
468 	case METHOD_KQUEUE:
469 
470 		if (!re->evlist) {
471 			size_t sz = re->maxfds * sizeof(*re->evlist);
472 			re->evlist = mem_zalloc(sz, NULL);
473 			if (!re->evlist)
474 				return ENOMEM;
475 		}
476 
477 		if (re->kqfd < 0) {
478 			re->kqfd = kqueue();
479 			if (re->kqfd < 0)
480 				return errno;
481 			DEBUG_INFO("kqueue: fd=%d\n", re->kqfd);
482 		}
483 
484 		break;
485 #endif
486 
487 	default:
488 		break;
489 	}
490 	return 0;
491 }
492 
493 
494 /** Free all resources */
495 static void poll_close(struct re *re)
496 {
497 	DEBUG_INFO("poll close\n");
498 
499 	re->fhs = mem_deref(re->fhs);
500 	re->maxfds = 0;
501 
502 #ifdef HAVE_POLL
503 	re->fds = mem_deref(re->fds);
504 #endif
505 #ifdef HAVE_EPOLL
506 	DEBUG_INFO("poll_close: epfd=%d\n", re->epfd);
507 
508 	if (re->epfd >= 0) {
509 		(void)close(re->epfd);
510 		re->epfd = -1;
511 	}
512 
513 	re->events = mem_deref(re->events);
514 #endif
515 
516 #ifdef HAVE_KQUEUE
517 	if (re->kqfd >= 0) {
518 		close(re->kqfd);
519 		re->kqfd = -1;
520 	}
521 
522 	re->evlist = mem_deref(re->evlist);
523 #endif
524 }
525 
526 
527 static int poll_setup(struct re *re)
528 {
529 	int err;
530 
531 	err = fd_setsize(DEFAULT_MAXFDS);
532 	if (err)
533 		goto out;
534 
535 	if (METHOD_NULL == re->method) {
536 		err = poll_method_set(poll_method_best());
537 		if (err)
538 			goto out;
539 
540 		DEBUG_INFO("poll setup: poll method not set - set to `%s'\n",
541 			   poll_method_name(re->method));
542 	}
543 
544 	err = poll_init(re);
545 
546  out:
547 	if (err)
548 		poll_close(re);
549 
550 	return err;
551 }
552 
553 
554 /**
555  * Listen for events on a file descriptor
556  *
557  * @param fd     File descriptor
558  * @param flags  Wanted event flags
559  * @param fh     Event handler
560  * @param arg    Handler argument
561  *
562  * @return 0 if success, otherwise errorcode
563  */
564 int fd_listen(int fd, int flags, fd_h *fh, void *arg)
565 {
566 	struct re *re = re_get();
567 	int err = 0;
568 
569 	DEBUG_INFO("fd_listen: fd=%d flags=0x%02x\n", fd, flags);
570 
571 	if (fd < 0) {
572 		DEBUG_WARNING("fd_listen: corrupt fd %d\n", fd);
573 		return EBADF;
574 	}
575 
576 	if (flags || fh) {
577 		err = poll_setup(re);
578 		if (err)
579 			return err;
580 	}
581 
582 	if (fd >= re->maxfds) {
583 		if (flags) {
584 			DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x"
585 				      " - Max %d fds\n",
586 				      fd, flags, re->maxfds);
587 		}
588 		return EMFILE;
589 	}
590 
591 	/* Update fh set */
592 	if (re->fhs) {
593 		re->fhs[fd].flags = flags;
594 		re->fhs[fd].fh    = fh;
595 		re->fhs[fd].arg   = arg;
596 	}
597 
598 	re->nfds = max(re->nfds, fd+1);
599 
600 	switch (re->method) {
601 
602 #ifdef HAVE_POLL
603 	case METHOD_POLL:
604 		err = set_poll_fds(re, fd, flags);
605 		break;
606 #endif
607 
608 #ifdef HAVE_EPOLL
609 	case METHOD_EPOLL:
610 		if (re->epfd < 0)
611 			return EBADFD;
612 		err = set_epoll_fds(re, fd, flags);
613 		break;
614 #endif
615 
616 #ifdef HAVE_KQUEUE
617 	case METHOD_KQUEUE:
618 		err = set_kqueue_fds(re, fd, flags);
619 		break;
620 #endif
621 
622 	default:
623 		break;
624 	}
625 
626 	if (err) {
627 		if (flags && fh) {
628 			fd_close(fd);
629 			DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x (%m)\n",
630 				      fd, flags, err);
631 		}
632 	}
633 
634 	return err;
635 }
636 
637 
638 /**
639  * Stop listening for events on a file descriptor
640  *
641  * @param fd     File descriptor
642  */
643 void fd_close(int fd)
644 {
645 	(void)fd_listen(fd, 0, NULL, NULL);
646 }
647 
648 
649 /**
650  * Polling loop
651  *
652  * @param re Poll state.
653  *
654  * @return 0 if success, otherwise errorcode
655  */
656 static int fd_poll(struct re *re)
657 {
658 	const uint64_t to = tmr_next_timeout(&re->tmrl);
659 	int i, n;
660 #ifdef HAVE_SELECT
661 	fd_set rfds, wfds, efds;
662 #endif
663 
664 	DEBUG_INFO("next timer: %llu ms\n", to);
665 
666 	/* Wait for I/O */
667 	switch (re->method) {
668 
669 #ifdef HAVE_POLL
670 	case METHOD_POLL:
671 		re_unlock(re);
672 		n = poll(re->fds, re->nfds, to ? (int)to : -1);
673 		re_lock(re);
674 		break;
675 #endif
676 #ifdef HAVE_SELECT
677 	case METHOD_SELECT: {
678 		struct timeval tv;
679 
680 		/* Clear and update fd sets */
681 		FD_ZERO(&rfds);
682 		FD_ZERO(&wfds);
683 		FD_ZERO(&efds);
684 
685 		for (i=0; i<re->nfds; i++) {
686 			if (!re->fhs[i].fh)
687 				continue;
688 
689 			if (re->fhs[i].flags & FD_READ)
690 				FD_SET(i, &rfds);
691 			if (re->fhs[i].flags & FD_WRITE)
692 				FD_SET(i, &wfds);
693 			if (re->fhs[i].flags & FD_EXCEPT)
694 				FD_SET(i, &efds);
695 		}
696 
697 #ifdef WIN32
698 		tv.tv_sec  = (long) to / 1000;
699 #else
700 		tv.tv_sec  = (time_t) to / 1000;
701 #endif
702 		tv.tv_usec = (uint32_t) (to % 1000) * 1000;
703 		re_unlock(re);
704 		n = select(re->nfds, &rfds, &wfds, &efds, to ? &tv : NULL);
705 		re_lock(re);
706 	}
707 		break;
708 #endif
709 #ifdef HAVE_EPOLL
710 	case METHOD_EPOLL:
711 		re_unlock(re);
712 		n = epoll_wait(re->epfd, re->events, re->maxfds,
713 			       to ? (int)to : -1);
714 		re_lock(re);
715 		break;
716 #endif
717 
718 #ifdef HAVE_KQUEUE
719 	case METHOD_KQUEUE: {
720 		struct timespec timeout;
721 
722 		timeout.tv_sec = (time_t) (to / 1000);
723 		timeout.tv_nsec = (to % 1000) * 1000000;
724 
725 		re_unlock(re);
726 		n = kevent(re->kqfd, NULL, 0, re->evlist, re->maxfds,
727 			   to ? &timeout : NULL);
728 		re_lock(re);
729 		}
730 		break;
731 #endif
732 
733 	default:
734 		(void)to;
735 		DEBUG_WARNING("no polling method set\n");
736 		return EINVAL;
737 	}
738 
739 	if (n < 0)
740 		return errno;
741 
742 	/* Check for events */
743 	for (i=0; (n > 0) && (i < re->nfds); i++) {
744 		int fd, flags = 0;
745 
746 		switch (re->method) {
747 
748 #ifdef HAVE_POLL
749 		case METHOD_POLL:
750 			fd = i;
751 			if (re->fds[fd].revents & POLLIN)
752 				flags |= FD_READ;
753 			if (re->fds[fd].revents & POLLOUT)
754 				flags |= FD_WRITE;
755 			if (re->fds[fd].revents & (POLLERR|POLLHUP|POLLNVAL))
756 				flags |= FD_EXCEPT;
757 			if (re->fds[fd].revents & POLLNVAL) {
758 				DEBUG_WARNING("event: fd=%d POLLNVAL"
759 					      " (fds.fd=%d,"
760 					      " fds.events=0x%02x)\n",
761 					      fd, re->fds[fd].fd,
762 					      re->fds[fd].events);
763 			}
764 			/* Clear events */
765 			re->fds[fd].revents = 0;
766 			break;
767 #endif
768 #ifdef HAVE_SELECT
769 		case METHOD_SELECT:
770 			fd = i;
771 			if (FD_ISSET(fd, &rfds))
772 				flags |= FD_READ;
773 			if (FD_ISSET(fd, &wfds))
774 				flags |= FD_WRITE;
775 			if (FD_ISSET(fd, &efds))
776 				flags |= FD_EXCEPT;
777 			break;
778 #endif
779 #ifdef HAVE_EPOLL
780 		case METHOD_EPOLL:
781 			fd = re->events[i].data.fd;
782 
783 			if (re->events[i].events & EPOLLIN)
784 				flags |= FD_READ;
785 			if (re->events[i].events & EPOLLOUT)
786 				flags |= FD_WRITE;
787 			if (re->events[i].events & EPOLLERR)
788 				flags |= FD_EXCEPT;
789 
790 			if (!flags) {
791 				DEBUG_WARNING("epoll: no flags fd=%d\n", fd);
792 			}
793 
794 			break;
795 #endif
796 
797 #ifdef HAVE_KQUEUE
798 		case METHOD_KQUEUE: {
799 
800 			struct kevent *kev = &re->evlist[i];
801 
802 			fd = (int)kev->ident;
803 
804 			if (fd >= re->maxfds) {
805 				DEBUG_WARNING("large fd=%d\n", fd);
806 				break;
807 			}
808 
809 			if (kev->filter == EVFILT_READ)
810 				flags |= FD_READ;
811 			else if (kev->filter == EVFILT_WRITE)
812 				flags |= FD_WRITE;
813 			else {
814 				DEBUG_WARNING("kqueue: unhandled "
815 					      "filter %x\n",
816 					      kev->filter);
817 			}
818 
819 			if (kev->flags & EV_EOF) {
820 				flags |= FD_EXCEPT;
821 			}
822 			if (kev->flags & EV_ERROR) {
823 				DEBUG_WARNING("kqueue: EV_ERROR on fd %d\n",
824 					      fd);
825 			}
826 
827 			if (!flags) {
828 				DEBUG_WARNING("kqueue: no flags fd=%d\n", fd);
829 			}
830 		}
831 			break;
832 #endif
833 
834 		default:
835 			return EINVAL;
836 		}
837 
838 		if (!flags)
839 			continue;
840 
841 		if (re->fhs[fd].fh) {
842 #if MAIN_DEBUG
843 			fd_handler(re, fd, flags);
844 #else
845 			re->fhs[fd].fh(flags, re->fhs[fd].arg);
846 #endif
847 		}
848 
849 		/* Check if polling method was changed */
850 		if (re->update) {
851 			re->update = false;
852 			return 0;
853 		}
854 
855 		--n;
856 	}
857 
858 	return 0;
859 }
860 
861 
862 /**
863  * Set the maximum number of file descriptors
864  *
865  * @param maxfds Max FDs. 0 to free.
866  *
867  * @return 0 if success, otherwise errorcode
868  */
869 int fd_setsize(int maxfds)
870 {
871 	struct re *re = re_get();
872 
873 	if (!maxfds) {
874 		fd_debug();
875 		poll_close(re);
876 		return 0;
877 	}
878 
879 	if (!re->maxfds)
880 		re->maxfds = maxfds;
881 
882 	if (!re->fhs) {
883 		DEBUG_INFO("fd_setsize: maxfds=%d, allocating %u bytes\n",
884 			   re->maxfds, re->maxfds * sizeof(*re->fhs));
885 
886 		re->fhs = mem_zalloc(re->maxfds * sizeof(*re->fhs), NULL);
887 		if (!re->fhs)
888 			return ENOMEM;
889 	}
890 
891 	return 0;
892 }
893 
894 
895 /**
896  * Print all file descriptors in-use
897  */
898 void fd_debug(void)
899 {
900 	const struct re *re = re_get();
901 	int i;
902 
903 	if (!re->fhs)
904 		return;
905 
906 	for (i=0; i<re->nfds; i++) {
907 
908 		if (!re->fhs[i].flags)
909 			continue;
910 
911 		(void)re_fprintf(stderr,
912 				 "fd %d in use: flags=%x fh=%p arg=%p\n",
913 				 i, re->fhs[i].flags, re->fhs[i].fh,
914 				 re->fhs[i].arg);
915 	}
916 }
917 
918 
919 #ifdef HAVE_SIGNAL
920 /* Thread-safe signal handling */
921 static void signal_handler(int sig)
922 {
923 	(void)signal(sig, signal_handler);
924 	re_get()->sig = sig;
925 }
926 #endif
927 
928 
929 /**
930  * Main polling loop for async I/O events. This function will only return when
931  * re_cancel() is called or an error occured.
932  *
933  * @param signalh Optional Signal handler
934  *
935  * @return 0 if success, otherwise errorcode
936  */
937 int re_main(re_signal_h *signalh)
938 {
939 	struct re *re = re_get();
940 	int err;
941 
942 #ifdef HAVE_SIGNAL
943 	if (signalh) {
944 		(void)signal(SIGINT, signal_handler);
945 		(void)signal(SIGALRM, signal_handler);
946 		(void)signal(SIGTERM, signal_handler);
947 	}
948 #endif
949 
950 	if (re->polling) {
951 		DEBUG_WARNING("main loop already polling\n");
952 		return EALREADY;
953 	}
954 
955 	err = poll_setup(re);
956 	if (err)
957 		goto out;
958 
959 	DEBUG_INFO("Using async I/O polling method: `%s'\n",
960 		   poll_method_name(re->method));
961 
962 	re->polling = true;
963 
964 	re_lock(re);
965 	for (;;) {
966 
967 		if (re->sig) {
968 			if (signalh)
969 				signalh(re->sig);
970 
971 			re->sig = 0;
972 		}
973 
974 		if (!re->polling) {
975 			err = 0;
976 			break;
977 		}
978 
979 		err = fd_poll(re);
980 		if (err) {
981 			if (EINTR == err)
982 				continue;
983 
984 #ifdef DARWIN
985 			/* NOTE: workaround for Darwin */
986 			if (EBADF == err)
987 				continue;
988 #endif
989 
990 			break;
991 		}
992 
993 		tmr_poll(&re->tmrl);
994 	}
995 	re_unlock(re);
996 
997  out:
998 	re->polling = false;
999 
1000 	return err;
1001 }
1002 
1003 
1004 /**
1005  * Cancel the main polling loop
1006  */
1007 void re_cancel(void)
1008 {
1009 	struct re *re = re_get();
1010 
1011 	re->polling = false;
1012 }
1013 
1014 
1015 /**
1016  * Debug the main polling loop
1017  *
1018  * @param pf     Print handler where debug output is printed to
1019  * @param unused Unused parameter
1020  *
1021  * @return 0 if success, otherwise errorcode
1022  */
1023 int re_debug(struct re_printf *pf, void *unused)
1024 {
1025 	struct re *re = re_get();
1026 	int err = 0;
1027 
1028 	(void)unused;
1029 
1030 	err |= re_hprintf(pf, "re main loop:\n");
1031 	err |= re_hprintf(pf, "  maxfds:  %d\n", re->maxfds);
1032 	err |= re_hprintf(pf, "  nfds:    %d\n", re->nfds);
1033 	err |= re_hprintf(pf, "  method:  %d (%s)\n", re->method,
1034 			  poll_method_name(re->method));
1035 
1036 	return err;
1037 }
1038 
1039 
1040 /**
1041  * Set async I/O polling method. This function can also be called while the
1042  * program is running.
1043  *
1044  * @param method New polling method
1045  *
1046  * @return 0 if success, otherwise errorcode
1047  */
1048 int poll_method_set(enum poll_method method)
1049 {
1050 	struct re *re = re_get();
1051 	int err;
1052 
1053 	err = fd_setsize(DEFAULT_MAXFDS);
1054 	if (err)
1055 		return err;
1056 
1057 	switch (method) {
1058 
1059 #ifdef HAVE_POLL
1060 	case METHOD_POLL:
1061 		break;
1062 #endif
1063 #ifdef HAVE_SELECT
1064 	case METHOD_SELECT:
1065 		if (re->maxfds > (int)FD_SETSIZE) {
1066 			DEBUG_WARNING("SELECT: maxfds > FD_SETSIZE\n");
1067 			return EMFILE;
1068 		}
1069 		break;
1070 #endif
1071 #ifdef HAVE_EPOLL
1072 	case METHOD_EPOLL:
1073 		if (!epoll_check())
1074 			return EINVAL;
1075 		break;
1076 #endif
1077 #ifdef HAVE_KQUEUE
1078 	case METHOD_KQUEUE:
1079 		break;
1080 #endif
1081 	default:
1082 		DEBUG_WARNING("poll method not supported: '%s'\n",
1083 			      poll_method_name(method));
1084 		return EINVAL;
1085 	}
1086 
1087 	re->method = method;
1088 	re->update = true;
1089 
1090 	DEBUG_INFO("Setting async I/O polling method to `%s'\n",
1091 		   poll_method_name(re->method));
1092 
1093 	err = poll_init(re);
1094 	if (err)
1095 		return err;
1096 
1097 	return rebuild_fds(re);
1098 }
1099 
1100 
1101 /**
1102  * Add a worker thread for this thread
1103  *
1104  * @return 0 if success, otherwise errorcode
1105  */
1106 int re_thread_init(void)
1107 {
1108 #ifdef HAVE_PTHREAD
1109 	struct re *re;
1110 
1111 	pthread_once(&pt_once, re_once);
1112 
1113 	re = pthread_getspecific(pt_key);
1114 	if (re) {
1115 		DEBUG_WARNING("thread_init: already added for thread %d\n",
1116 			      pthread_self());
1117 		return EALREADY;
1118 	}
1119 
1120 	re = malloc(sizeof(*re));
1121 	if (!re)
1122 		return ENOMEM;
1123 
1124 	memset(re, 0, sizeof(*re));
1125 	pthread_mutex_init(&re->mutex, NULL);
1126 	re->mutexp = &re->mutex;
1127 
1128 #ifdef HAVE_EPOLL
1129 	re->epfd = -1;
1130 #endif
1131 
1132 #ifdef HAVE_KQUEUE
1133 	re->kqfd = -1;
1134 #endif
1135 
1136 	pthread_setspecific(pt_key, re);
1137 	return 0;
1138 #else
1139 	return ENOSYS;
1140 #endif
1141 }
1142 
1143 
1144 /**
1145  * Remove the worker thread for this thread
1146  */
1147 void re_thread_close(void)
1148 {
1149 #ifdef HAVE_PTHREAD
1150 	struct re *re;
1151 
1152 	pthread_once(&pt_once, re_once);
1153 
1154 	re = pthread_getspecific(pt_key);
1155 	if (re) {
1156 		poll_close(re);
1157 		free(re);
1158 		pthread_setspecific(pt_key, NULL);
1159 	}
1160 #endif
1161 }
1162 
1163 
1164 /**
1165  * Enter an 're' thread
1166  *
1167  * @note Must only be called from a non-re thread
1168  */
1169 void re_thread_enter(void)
1170 {
1171 	re_lock(re_get());
1172 }
1173 
1174 
1175 /**
1176  * Leave an 're' thread
1177  *
1178  * @note Must only be called from a non-re thread
1179  */
1180 void re_thread_leave(void)
1181 {
1182 	re_unlock(re_get());
1183 }
1184 
1185 
1186 /**
1187  * Set an external mutex for this thread
1188  *
1189  * @param mutexp Pointer to external mutex, NULL to use internal
1190  */
1191 void re_set_mutex(void *mutexp)
1192 {
1193 #ifdef HAVE_PTHREAD
1194 	struct re *re = re_get();
1195 
1196 	re->mutexp = mutexp ? mutexp : &re->mutex;
1197 #else
1198 	(void)mutexp;
1199 #endif
1200 }
1201 
1202 
1203 /**
1204  * Get the timer-list for this thread
1205  *
1206  * @return Timer list
1207  *
1208  * @note only used by tmr module
1209  */
1210 struct list *tmrl_get(void);
1211 struct list *tmrl_get(void)
1212 {
1213 	return &re_get()->tmrl;
1214 }
1215