1 /** 2 * @file main.c Main polling routine 3 * 4 * Copyright (C) 2010 Creytiv.com 5 */ 6 #ifdef HAVE_SYS_TIME_H 7 #include <sys/time.h> 8 #endif 9 #include <sys/types.h> 10 #undef _STRICT_ANSI 11 #include <string.h> 12 #ifdef HAVE_UNISTD_H 13 #include <unistd.h> 14 #endif 15 #ifdef WIN32 16 #include <winsock.h> 17 #endif 18 #ifdef HAVE_SIGNAL 19 #include <signal.h> 20 #endif 21 #ifdef HAVE_SELECT_H 22 #include <sys/select.h> 23 #endif 24 #ifdef HAVE_POLL 25 #include <poll.h> 26 #endif 27 #ifdef HAVE_EPOLL 28 #include <sys/epoll.h> 29 #endif 30 #ifdef HAVE_KQUEUE 31 #include <sys/types.h> 32 #include <sys/event.h> 33 #include <sys/time.h> 34 #undef LIST_INIT 35 #undef LIST_FOREACH 36 #endif 37 #include <re_types.h> 38 #include <re_fmt.h> 39 #include <re_mem.h> 40 #include <re_mbuf.h> 41 #include <re_list.h> 42 #include <re_tmr.h> 43 #include <re_main.h> 44 #include "main.h" 45 #ifdef HAVE_PTHREAD 46 #define __USE_GNU 1 47 #include <stdlib.h> 48 #include <pthread.h> 49 #endif 50 51 52 #define DEBUG_MODULE "main" 53 #define DEBUG_LEVEL 5 54 #include <re_dbg.h> 55 56 /* 57 epoll() has been tested successfully on the following kernels: 58 59 - Linux 2.6.16.29-xen (Debian 4.0 etch) 60 - Linux 2.6.18-4-amd64 (Debian 4.0 etch) 61 62 63 TODO clean this up 64 65 - The polling method is selectable both in compile-time and run-time 66 - The polling method can be changed in run time. this is cool! 67 - Maximum number of fds can be set from application, but only once! 68 - Look at howto optimise main loop 69 */ 70 71 #if !defined (RELEASE) && !defined (MAIN_DEBUG) 72 #define MAIN_DEBUG 1 /**< Enable main loop debugging */ 73 #endif 74 75 76 /** Main loop values */ 77 enum { 78 MAX_BLOCKING = 100, /**< Maximum time spent in handler in [ms] */ 79 #if defined (FD_SETSIZE) 80 DEFAULT_MAXFDS = FD_SETSIZE 81 #else 82 DEFAULT_MAXFDS = 128 83 #endif 84 }; 85 86 87 /** Polling loop data */ 88 struct re { 89 /** File descriptor handler set */ 90 struct { 91 int flags; /**< Polling flags (Read, Write, etc.) */ 92 fd_h *fh; /**< Event handler */ 93 void *arg; /**< Handler argument */ 94 } *fhs; 95 int maxfds; /**< Maximum number of polling fds */ 96 int nfds; /**< Number of active file descriptors */ 97 enum poll_method method; /**< The current polling method */ 98 bool update; /**< File descriptor set need updating */ 99 bool polling; /**< Is polling flag */ 100 int sig; /**< Last caught signal */ 101 struct list tmrl; /**< List of timers */ 102 103 #ifdef HAVE_POLL 104 struct pollfd *fds; /**< Event set for poll() */ 105 #endif 106 107 #ifdef HAVE_EPOLL 108 struct epoll_event *events; /**< Event set for epoll() */ 109 int epfd; /**< epoll control file descriptor */ 110 #endif 111 112 #ifdef HAVE_KQUEUE 113 struct kevent *evlist; 114 int kqfd; 115 #endif 116 117 #ifdef HAVE_PTHREAD 118 pthread_mutex_t mutex; /**< Mutex for thread synchronization */ 119 pthread_mutex_t *mutexp; /**< Pointer to active mutex */ 120 #endif 121 }; 122 123 static struct re global_re = { 124 NULL, 125 0, 126 0, 127 METHOD_NULL, 128 false, 129 false, 130 0, 131 LIST_INIT, 132 #ifdef HAVE_POLL 133 NULL, 134 #endif 135 #ifdef HAVE_EPOLL 136 NULL, 137 -1, 138 #endif 139 #ifdef HAVE_KQUEUE 140 NULL, 141 -1, 142 #endif 143 #ifdef HAVE_PTHREAD 144 #if MAIN_DEBUG && defined (PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP) 145 PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP, 146 #else 147 PTHREAD_MUTEX_INITIALIZER, 148 #endif 149 &global_re.mutex, 150 #endif 151 }; 152 153 154 #ifdef HAVE_PTHREAD 155 156 static void poll_close(struct re *re); 157 158 static pthread_once_t pt_once = PTHREAD_ONCE_INIT; 159 static pthread_key_t pt_key; 160 161 162 static void thread_destructor(void *arg) 163 { 164 poll_close(arg); 165 free(arg); 166 } 167 168 169 static void re_once(void) 170 { 171 pthread_key_create(&pt_key, thread_destructor); 172 } 173 174 175 static struct re *re_get(void) 176 { 177 struct re *re; 178 179 pthread_once(&pt_once, re_once); 180 181 re = pthread_getspecific(pt_key); 182 if (!re) { 183 re = &global_re; 184 } 185 186 return re; 187 } 188 189 190 static inline void re_lock(struct re *re) 191 { 192 int err; 193 194 err = pthread_mutex_lock(re->mutexp); 195 if (err) { 196 DEBUG_WARNING("re_lock: %m\n", err); 197 } 198 } 199 200 201 static inline void re_unlock(struct re *re) 202 { 203 int err; 204 205 err = pthread_mutex_unlock(re->mutexp); 206 if (err) { 207 DEBUG_WARNING("re_unlock: %m\n", err); 208 } 209 } 210 211 212 #else 213 214 static struct re *re_get(void) 215 { 216 return &global_re; 217 } 218 219 #define re_lock(x) /**< Stub */ 220 #define re_unlock(x) /**< Stub */ 221 222 #endif 223 224 225 #if MAIN_DEBUG 226 /** 227 * Call the application event handler 228 * 229 * @param re Poll state 230 * @param fd File descriptor 231 * @param flags Event flags 232 */ 233 static void fd_handler(struct re *re, int fd, int flags) 234 { 235 const uint64_t tick = tmr_jiffies(); 236 uint32_t diff; 237 238 DEBUG_INFO("event on fd=%d (flags=0x%02x)...\n", fd, flags); 239 240 re->fhs[fd].fh(flags, re->fhs[fd].arg); 241 242 diff = (uint32_t)(tmr_jiffies() - tick); 243 244 if (diff > MAX_BLOCKING) { 245 DEBUG_WARNING("long async blocking: %u>%u ms (h=%p arg=%p)\n", 246 diff, MAX_BLOCKING, 247 re->fhs[fd].fh, re->fhs[fd].arg); 248 } 249 } 250 #endif 251 252 253 #ifdef HAVE_POLL 254 static int set_poll_fds(struct re *re, int fd, int flags) 255 { 256 if (!re->fds) 257 return 0; 258 259 if (flags) 260 re->fds[fd].fd = fd; 261 else 262 re->fds[fd].fd = -1; 263 264 re->fds[fd].events = 0; 265 if (flags & FD_READ) 266 re->fds[fd].events |= POLLIN; 267 if (flags & FD_WRITE) 268 re->fds[fd].events |= POLLOUT; 269 if (flags & FD_EXCEPT) 270 re->fds[fd].events |= POLLERR; 271 272 return 0; 273 } 274 #endif 275 276 277 #ifdef HAVE_EPOLL 278 static int set_epoll_fds(struct re *re, int fd, int flags) 279 { 280 struct epoll_event event; 281 int err = 0; 282 283 if (re->epfd < 0) 284 return EBADFD; 285 286 memset(&event, 0, sizeof(event)); 287 288 DEBUG_INFO("set_epoll_fds: fd=%d flags=0x%02x\n", fd, flags); 289 290 if (flags) { 291 event.data.fd = fd; 292 293 if (flags & FD_READ) 294 event.events |= EPOLLIN; 295 if (flags & FD_WRITE) 296 event.events |= EPOLLOUT; 297 if (flags & FD_EXCEPT) 298 event.events |= EPOLLERR; 299 300 /* Try to add it first */ 301 if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_ADD, fd, &event)) { 302 303 /* If already exist then modify it */ 304 if (EEXIST == errno) { 305 306 if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_MOD, 307 fd, &event)) { 308 err = errno; 309 DEBUG_WARNING("epoll_ctl:" 310 " EPOLL_CTL_MOD:" 311 " fd=%d (%m)\n", 312 fd, err); 313 } 314 } 315 else { 316 err = errno; 317 DEBUG_WARNING("epoll_ctl: EPOLL_CTL_ADD:" 318 " fd=%d (%m)\n", 319 fd, err); 320 } 321 } 322 } 323 else { 324 if (-1 == epoll_ctl(re->epfd, EPOLL_CTL_DEL, fd, &event)) { 325 err = errno; 326 DEBUG_INFO("epoll_ctl: EPOLL_CTL_DEL: fd=%d (%m)\n", 327 fd, err); 328 } 329 } 330 331 return err; 332 } 333 #endif 334 335 336 #ifdef HAVE_KQUEUE 337 static int set_kqueue_fds(struct re *re, int fd, int flags) 338 { 339 struct kevent kev[2]; 340 int r, n = 0; 341 342 memset(kev, 0, sizeof(kev)); 343 344 /* always delete the events */ 345 EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, 0); 346 EV_SET(&kev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0); 347 kevent(re->kqfd, kev, 2, NULL, 0, NULL); 348 349 memset(kev, 0, sizeof(kev)); 350 351 if (flags & FD_WRITE) { 352 EV_SET(&kev[n], fd, EVFILT_WRITE, EV_ADD, 0, 0, 0); 353 ++n; 354 } 355 if (flags & FD_READ) { 356 EV_SET(&kev[n], fd, EVFILT_READ, EV_ADD, 0, 0, 0); 357 ++n; 358 } 359 360 if (n) { 361 r = kevent(re->kqfd, kev, n, NULL, 0, NULL); 362 if (r < 0) { 363 int err = errno; 364 365 DEBUG_WARNING("set: [fd=%d, flags=%x] kevent: %m\n", 366 fd, flags, err); 367 return err; 368 } 369 } 370 371 return 0; 372 } 373 #endif 374 375 376 /** 377 * Rebuild the file descriptor mapping table. This must be done whenever 378 * the polling method is changed. 379 */ 380 static int rebuild_fds(struct re *re) 381 { 382 int i, err = 0; 383 384 DEBUG_INFO("rebuilding fds (nfds=%d)\n", re->nfds); 385 386 /* Update fd sets */ 387 for (i=0; i<re->nfds; i++) { 388 if (!re->fhs[i].fh) 389 continue; 390 391 switch (re->method) { 392 393 #ifdef HAVE_POLL 394 case METHOD_POLL: 395 err = set_poll_fds(re, i, re->fhs[i].flags); 396 break; 397 #endif 398 #ifdef HAVE_EPOLL 399 case METHOD_EPOLL: 400 err = set_epoll_fds(re, i, re->fhs[i].flags); 401 break; 402 #endif 403 404 #ifdef HAVE_KQUEUE 405 case METHOD_KQUEUE: 406 err = set_kqueue_fds(re, i, re->fhs[i].flags); 407 break; 408 #endif 409 410 default: 411 break; 412 } 413 414 if (err) 415 break; 416 } 417 418 return err; 419 } 420 421 422 static int poll_init(struct re *re) 423 { 424 DEBUG_INFO("poll init (maxfds=%d)\n", re->maxfds); 425 426 if (!re->maxfds) { 427 DEBUG_WARNING("poll init: maxfds is 0\n"); 428 return EINVAL; 429 } 430 431 switch (re->method) { 432 433 #ifdef HAVE_POLL 434 case METHOD_POLL: 435 if (!re->fds) { 436 re->fds = mem_zalloc(re->maxfds * sizeof(*re->fds), 437 NULL); 438 if (!re->fds) 439 return ENOMEM; 440 } 441 break; 442 #endif 443 #ifdef HAVE_EPOLL 444 case METHOD_EPOLL: 445 if (!re->events) { 446 DEBUG_INFO("allocate %u bytes for epoll set\n", 447 re->maxfds * sizeof(*re->events)); 448 re->events = mem_zalloc(re->maxfds*sizeof(*re->events), 449 NULL); 450 if (!re->events) 451 return ENOMEM; 452 } 453 454 if (re->epfd < 0 455 && -1 == (re->epfd = epoll_create(re->maxfds))) { 456 457 int err = errno; 458 459 DEBUG_WARNING("epoll_create: %m (maxfds=%d)\n", 460 err, re->maxfds); 461 return err; 462 } 463 DEBUG_INFO("init: epoll_create() epfd=%d\n", re->epfd); 464 break; 465 #endif 466 467 #ifdef HAVE_KQUEUE 468 case METHOD_KQUEUE: 469 470 if (!re->evlist) { 471 size_t sz = re->maxfds * sizeof(*re->evlist); 472 re->evlist = mem_zalloc(sz, NULL); 473 if (!re->evlist) 474 return ENOMEM; 475 } 476 477 if (re->kqfd < 0) { 478 re->kqfd = kqueue(); 479 if (re->kqfd < 0) 480 return errno; 481 DEBUG_INFO("kqueue: fd=%d\n", re->kqfd); 482 } 483 484 break; 485 #endif 486 487 default: 488 break; 489 } 490 return 0; 491 } 492 493 494 /** Free all resources */ 495 static void poll_close(struct re *re) 496 { 497 DEBUG_INFO("poll close\n"); 498 499 re->fhs = mem_deref(re->fhs); 500 re->maxfds = 0; 501 502 #ifdef HAVE_POLL 503 re->fds = mem_deref(re->fds); 504 #endif 505 #ifdef HAVE_EPOLL 506 DEBUG_INFO("poll_close: epfd=%d\n", re->epfd); 507 508 if (re->epfd >= 0) { 509 (void)close(re->epfd); 510 re->epfd = -1; 511 } 512 513 re->events = mem_deref(re->events); 514 #endif 515 516 #ifdef HAVE_KQUEUE 517 if (re->kqfd >= 0) { 518 close(re->kqfd); 519 re->kqfd = -1; 520 } 521 522 re->evlist = mem_deref(re->evlist); 523 #endif 524 } 525 526 527 static int poll_setup(struct re *re) 528 { 529 int err; 530 531 err = fd_setsize(DEFAULT_MAXFDS); 532 if (err) 533 goto out; 534 535 if (METHOD_NULL == re->method) { 536 err = poll_method_set(poll_method_best()); 537 if (err) 538 goto out; 539 540 DEBUG_INFO("poll setup: poll method not set - set to `%s'\n", 541 poll_method_name(re->method)); 542 } 543 544 err = poll_init(re); 545 546 out: 547 if (err) 548 poll_close(re); 549 550 return err; 551 } 552 553 554 /** 555 * Listen for events on a file descriptor 556 * 557 * @param fd File descriptor 558 * @param flags Wanted event flags 559 * @param fh Event handler 560 * @param arg Handler argument 561 * 562 * @return 0 if success, otherwise errorcode 563 */ 564 int fd_listen(int fd, int flags, fd_h *fh, void *arg) 565 { 566 struct re *re = re_get(); 567 int err = 0; 568 569 DEBUG_INFO("fd_listen: fd=%d flags=0x%02x\n", fd, flags); 570 571 if (fd < 0) { 572 DEBUG_WARNING("fd_listen: corrupt fd %d\n", fd); 573 return EBADF; 574 } 575 576 if (flags || fh) { 577 err = poll_setup(re); 578 if (err) 579 return err; 580 } 581 582 if (fd >= re->maxfds) { 583 if (flags) { 584 DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x" 585 " - Max %d fds\n", 586 fd, flags, re->maxfds); 587 } 588 return EMFILE; 589 } 590 591 /* Update fh set */ 592 if (re->fhs) { 593 re->fhs[fd].flags = flags; 594 re->fhs[fd].fh = fh; 595 re->fhs[fd].arg = arg; 596 } 597 598 re->nfds = max(re->nfds, fd+1); 599 600 switch (re->method) { 601 602 #ifdef HAVE_POLL 603 case METHOD_POLL: 604 err = set_poll_fds(re, fd, flags); 605 break; 606 #endif 607 608 #ifdef HAVE_EPOLL 609 case METHOD_EPOLL: 610 if (re->epfd < 0) 611 return EBADFD; 612 err = set_epoll_fds(re, fd, flags); 613 break; 614 #endif 615 616 #ifdef HAVE_KQUEUE 617 case METHOD_KQUEUE: 618 err = set_kqueue_fds(re, fd, flags); 619 break; 620 #endif 621 622 default: 623 break; 624 } 625 626 if (err) { 627 if (flags && fh) { 628 fd_close(fd); 629 DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x (%m)\n", 630 fd, flags, err); 631 } 632 } 633 634 return err; 635 } 636 637 638 /** 639 * Stop listening for events on a file descriptor 640 * 641 * @param fd File descriptor 642 */ 643 void fd_close(int fd) 644 { 645 (void)fd_listen(fd, 0, NULL, NULL); 646 } 647 648 649 /** 650 * Polling loop 651 * 652 * @param re Poll state. 653 * 654 * @return 0 if success, otherwise errorcode 655 */ 656 static int fd_poll(struct re *re) 657 { 658 const uint64_t to = tmr_next_timeout(&re->tmrl); 659 int i, n; 660 #ifdef HAVE_SELECT 661 fd_set rfds, wfds, efds; 662 #endif 663 664 DEBUG_INFO("next timer: %llu ms\n", to); 665 666 /* Wait for I/O */ 667 switch (re->method) { 668 669 #ifdef HAVE_POLL 670 case METHOD_POLL: 671 re_unlock(re); 672 n = poll(re->fds, re->nfds, to ? (int)to : -1); 673 re_lock(re); 674 break; 675 #endif 676 #ifdef HAVE_SELECT 677 case METHOD_SELECT: { 678 struct timeval tv; 679 680 /* Clear and update fd sets */ 681 FD_ZERO(&rfds); 682 FD_ZERO(&wfds); 683 FD_ZERO(&efds); 684 685 for (i=0; i<re->nfds; i++) { 686 if (!re->fhs[i].fh) 687 continue; 688 689 if (re->fhs[i].flags & FD_READ) 690 FD_SET(i, &rfds); 691 if (re->fhs[i].flags & FD_WRITE) 692 FD_SET(i, &wfds); 693 if (re->fhs[i].flags & FD_EXCEPT) 694 FD_SET(i, &efds); 695 } 696 697 #ifdef WIN32 698 tv.tv_sec = (long) to / 1000; 699 #else 700 tv.tv_sec = (time_t) to / 1000; 701 #endif 702 tv.tv_usec = (uint32_t) (to % 1000) * 1000; 703 re_unlock(re); 704 n = select(re->nfds, &rfds, &wfds, &efds, to ? &tv : NULL); 705 re_lock(re); 706 } 707 break; 708 #endif 709 #ifdef HAVE_EPOLL 710 case METHOD_EPOLL: 711 re_unlock(re); 712 n = epoll_wait(re->epfd, re->events, re->maxfds, 713 to ? (int)to : -1); 714 re_lock(re); 715 break; 716 #endif 717 718 #ifdef HAVE_KQUEUE 719 case METHOD_KQUEUE: { 720 struct timespec timeout; 721 722 timeout.tv_sec = (time_t) (to / 1000); 723 timeout.tv_nsec = (to % 1000) * 1000000; 724 725 re_unlock(re); 726 n = kevent(re->kqfd, NULL, 0, re->evlist, re->maxfds, 727 to ? &timeout : NULL); 728 re_lock(re); 729 } 730 break; 731 #endif 732 733 default: 734 (void)to; 735 DEBUG_WARNING("no polling method set\n"); 736 return EINVAL; 737 } 738 739 if (n < 0) 740 return errno; 741 742 /* Check for events */ 743 for (i=0; (n > 0) && (i < re->nfds); i++) { 744 int fd, flags = 0; 745 746 switch (re->method) { 747 748 #ifdef HAVE_POLL 749 case METHOD_POLL: 750 fd = i; 751 if (re->fds[fd].revents & POLLIN) 752 flags |= FD_READ; 753 if (re->fds[fd].revents & POLLOUT) 754 flags |= FD_WRITE; 755 if (re->fds[fd].revents & (POLLERR|POLLHUP|POLLNVAL)) 756 flags |= FD_EXCEPT; 757 if (re->fds[fd].revents & POLLNVAL) { 758 DEBUG_WARNING("event: fd=%d POLLNVAL" 759 " (fds.fd=%d," 760 " fds.events=0x%02x)\n", 761 fd, re->fds[fd].fd, 762 re->fds[fd].events); 763 } 764 /* Clear events */ 765 re->fds[fd].revents = 0; 766 break; 767 #endif 768 #ifdef HAVE_SELECT 769 case METHOD_SELECT: 770 fd = i; 771 if (FD_ISSET(fd, &rfds)) 772 flags |= FD_READ; 773 if (FD_ISSET(fd, &wfds)) 774 flags |= FD_WRITE; 775 if (FD_ISSET(fd, &efds)) 776 flags |= FD_EXCEPT; 777 break; 778 #endif 779 #ifdef HAVE_EPOLL 780 case METHOD_EPOLL: 781 fd = re->events[i].data.fd; 782 783 if (re->events[i].events & EPOLLIN) 784 flags |= FD_READ; 785 if (re->events[i].events & EPOLLOUT) 786 flags |= FD_WRITE; 787 if (re->events[i].events & EPOLLERR) 788 flags |= FD_EXCEPT; 789 790 if (!flags) { 791 DEBUG_WARNING("epoll: no flags fd=%d\n", fd); 792 } 793 794 break; 795 #endif 796 797 #ifdef HAVE_KQUEUE 798 case METHOD_KQUEUE: { 799 800 struct kevent *kev = &re->evlist[i]; 801 802 fd = (int)kev->ident; 803 804 if (fd >= re->maxfds) { 805 DEBUG_WARNING("large fd=%d\n", fd); 806 break; 807 } 808 809 if (kev->filter == EVFILT_READ) 810 flags |= FD_READ; 811 else if (kev->filter == EVFILT_WRITE) 812 flags |= FD_WRITE; 813 else { 814 DEBUG_WARNING("kqueue: unhandled " 815 "filter %x\n", 816 kev->filter); 817 } 818 819 if (kev->flags & EV_EOF) { 820 flags |= FD_EXCEPT; 821 } 822 if (kev->flags & EV_ERROR) { 823 DEBUG_WARNING("kqueue: EV_ERROR on fd %d\n", 824 fd); 825 } 826 827 if (!flags) { 828 DEBUG_WARNING("kqueue: no flags fd=%d\n", fd); 829 } 830 } 831 break; 832 #endif 833 834 default: 835 return EINVAL; 836 } 837 838 if (!flags) 839 continue; 840 841 if (re->fhs[fd].fh) { 842 #if MAIN_DEBUG 843 fd_handler(re, fd, flags); 844 #else 845 re->fhs[fd].fh(flags, re->fhs[fd].arg); 846 #endif 847 } 848 849 /* Check if polling method was changed */ 850 if (re->update) { 851 re->update = false; 852 return 0; 853 } 854 855 --n; 856 } 857 858 return 0; 859 } 860 861 862 /** 863 * Set the maximum number of file descriptors 864 * 865 * @param maxfds Max FDs. 0 to free. 866 * 867 * @return 0 if success, otherwise errorcode 868 */ 869 int fd_setsize(int maxfds) 870 { 871 struct re *re = re_get(); 872 873 if (!maxfds) { 874 fd_debug(); 875 poll_close(re); 876 return 0; 877 } 878 879 if (!re->maxfds) 880 re->maxfds = maxfds; 881 882 if (!re->fhs) { 883 DEBUG_INFO("fd_setsize: maxfds=%d, allocating %u bytes\n", 884 re->maxfds, re->maxfds * sizeof(*re->fhs)); 885 886 re->fhs = mem_zalloc(re->maxfds * sizeof(*re->fhs), NULL); 887 if (!re->fhs) 888 return ENOMEM; 889 } 890 891 return 0; 892 } 893 894 895 /** 896 * Print all file descriptors in-use 897 */ 898 void fd_debug(void) 899 { 900 const struct re *re = re_get(); 901 int i; 902 903 if (!re->fhs) 904 return; 905 906 for (i=0; i<re->nfds; i++) { 907 908 if (!re->fhs[i].flags) 909 continue; 910 911 (void)re_fprintf(stderr, 912 "fd %d in use: flags=%x fh=%p arg=%p\n", 913 i, re->fhs[i].flags, re->fhs[i].fh, 914 re->fhs[i].arg); 915 } 916 } 917 918 919 #ifdef HAVE_SIGNAL 920 /* Thread-safe signal handling */ 921 static void signal_handler(int sig) 922 { 923 (void)signal(sig, signal_handler); 924 re_get()->sig = sig; 925 } 926 #endif 927 928 929 /** 930 * Main polling loop for async I/O events. This function will only return when 931 * re_cancel() is called or an error occured. 932 * 933 * @param signalh Optional Signal handler 934 * 935 * @return 0 if success, otherwise errorcode 936 */ 937 int re_main(re_signal_h *signalh) 938 { 939 struct re *re = re_get(); 940 int err; 941 942 #ifdef HAVE_SIGNAL 943 if (signalh) { 944 (void)signal(SIGINT, signal_handler); 945 (void)signal(SIGALRM, signal_handler); 946 (void)signal(SIGTERM, signal_handler); 947 } 948 #endif 949 950 if (re->polling) { 951 DEBUG_WARNING("main loop already polling\n"); 952 return EALREADY; 953 } 954 955 err = poll_setup(re); 956 if (err) 957 goto out; 958 959 DEBUG_INFO("Using async I/O polling method: `%s'\n", 960 poll_method_name(re->method)); 961 962 re->polling = true; 963 964 re_lock(re); 965 for (;;) { 966 967 if (re->sig) { 968 if (signalh) 969 signalh(re->sig); 970 971 re->sig = 0; 972 } 973 974 if (!re->polling) { 975 err = 0; 976 break; 977 } 978 979 err = fd_poll(re); 980 if (err) { 981 if (EINTR == err) 982 continue; 983 984 #ifdef DARWIN 985 /* NOTE: workaround for Darwin */ 986 if (EBADF == err) 987 continue; 988 #endif 989 990 break; 991 } 992 993 tmr_poll(&re->tmrl); 994 } 995 re_unlock(re); 996 997 out: 998 re->polling = false; 999 1000 return err; 1001 } 1002 1003 1004 /** 1005 * Cancel the main polling loop 1006 */ 1007 void re_cancel(void) 1008 { 1009 struct re *re = re_get(); 1010 1011 re->polling = false; 1012 } 1013 1014 1015 /** 1016 * Debug the main polling loop 1017 * 1018 * @param pf Print handler where debug output is printed to 1019 * @param unused Unused parameter 1020 * 1021 * @return 0 if success, otherwise errorcode 1022 */ 1023 int re_debug(struct re_printf *pf, void *unused) 1024 { 1025 struct re *re = re_get(); 1026 int err = 0; 1027 1028 (void)unused; 1029 1030 err |= re_hprintf(pf, "re main loop:\n"); 1031 err |= re_hprintf(pf, " maxfds: %d\n", re->maxfds); 1032 err |= re_hprintf(pf, " nfds: %d\n", re->nfds); 1033 err |= re_hprintf(pf, " method: %d (%s)\n", re->method, 1034 poll_method_name(re->method)); 1035 1036 return err; 1037 } 1038 1039 1040 /** 1041 * Set async I/O polling method. This function can also be called while the 1042 * program is running. 1043 * 1044 * @param method New polling method 1045 * 1046 * @return 0 if success, otherwise errorcode 1047 */ 1048 int poll_method_set(enum poll_method method) 1049 { 1050 struct re *re = re_get(); 1051 int err; 1052 1053 err = fd_setsize(DEFAULT_MAXFDS); 1054 if (err) 1055 return err; 1056 1057 switch (method) { 1058 1059 #ifdef HAVE_POLL 1060 case METHOD_POLL: 1061 break; 1062 #endif 1063 #ifdef HAVE_SELECT 1064 case METHOD_SELECT: 1065 if (re->maxfds > (int)FD_SETSIZE) { 1066 DEBUG_WARNING("SELECT: maxfds > FD_SETSIZE\n"); 1067 return EMFILE; 1068 } 1069 break; 1070 #endif 1071 #ifdef HAVE_EPOLL 1072 case METHOD_EPOLL: 1073 if (!epoll_check()) 1074 return EINVAL; 1075 break; 1076 #endif 1077 #ifdef HAVE_KQUEUE 1078 case METHOD_KQUEUE: 1079 break; 1080 #endif 1081 default: 1082 DEBUG_WARNING("poll method not supported: '%s'\n", 1083 poll_method_name(method)); 1084 return EINVAL; 1085 } 1086 1087 re->method = method; 1088 re->update = true; 1089 1090 DEBUG_INFO("Setting async I/O polling method to `%s'\n", 1091 poll_method_name(re->method)); 1092 1093 err = poll_init(re); 1094 if (err) 1095 return err; 1096 1097 return rebuild_fds(re); 1098 } 1099 1100 1101 /** 1102 * Add a worker thread for this thread 1103 * 1104 * @return 0 if success, otherwise errorcode 1105 */ 1106 int re_thread_init(void) 1107 { 1108 #ifdef HAVE_PTHREAD 1109 struct re *re; 1110 1111 pthread_once(&pt_once, re_once); 1112 1113 re = pthread_getspecific(pt_key); 1114 if (re) { 1115 DEBUG_WARNING("thread_init: already added for thread %d\n", 1116 pthread_self()); 1117 return EALREADY; 1118 } 1119 1120 re = malloc(sizeof(*re)); 1121 if (!re) 1122 return ENOMEM; 1123 1124 memset(re, 0, sizeof(*re)); 1125 pthread_mutex_init(&re->mutex, NULL); 1126 re->mutexp = &re->mutex; 1127 1128 #ifdef HAVE_EPOLL 1129 re->epfd = -1; 1130 #endif 1131 1132 #ifdef HAVE_KQUEUE 1133 re->kqfd = -1; 1134 #endif 1135 1136 pthread_setspecific(pt_key, re); 1137 return 0; 1138 #else 1139 return ENOSYS; 1140 #endif 1141 } 1142 1143 1144 /** 1145 * Remove the worker thread for this thread 1146 */ 1147 void re_thread_close(void) 1148 { 1149 #ifdef HAVE_PTHREAD 1150 struct re *re; 1151 1152 pthread_once(&pt_once, re_once); 1153 1154 re = pthread_getspecific(pt_key); 1155 if (re) { 1156 poll_close(re); 1157 free(re); 1158 pthread_setspecific(pt_key, NULL); 1159 } 1160 #endif 1161 } 1162 1163 1164 /** 1165 * Enter an 're' thread 1166 * 1167 * @note Must only be called from a non-re thread 1168 */ 1169 void re_thread_enter(void) 1170 { 1171 re_lock(re_get()); 1172 } 1173 1174 1175 /** 1176 * Leave an 're' thread 1177 * 1178 * @note Must only be called from a non-re thread 1179 */ 1180 void re_thread_leave(void) 1181 { 1182 re_unlock(re_get()); 1183 } 1184 1185 1186 /** 1187 * Set an external mutex for this thread 1188 * 1189 * @param mutexp Pointer to external mutex, NULL to use internal 1190 */ 1191 void re_set_mutex(void *mutexp) 1192 { 1193 #ifdef HAVE_PTHREAD 1194 struct re *re = re_get(); 1195 1196 re->mutexp = mutexp ? mutexp : &re->mutex; 1197 #else 1198 (void)mutexp; 1199 #endif 1200 } 1201 1202 1203 /** 1204 * Get the timer-list for this thread 1205 * 1206 * @return Timer list 1207 * 1208 * @note only used by tmr module 1209 */ 1210 struct list *tmrl_get(void); 1211 struct list *tmrl_get(void) 1212 { 1213 return &re_get()->tmrl; 1214 } 1215