1 /*
2 * io_sched.c
3 * (C)2001-2011 by Marc Huber <Marc.Huber@web.de>
4 * All rights reserved.
5 *
6 * $Id: io_sched.c,v 1.58 2019/03/03 12:05:59 marc Exp $
7 *
8 */
9
10 #define __IO_SCHED_C__
11
12 #include "misc/sysconf.h"
13
14 #include <sys/types.h>
15 #include <sys/poll.h>
16 #include <sys/time.h>
17 #include <unistd.h>
18 #include <sys/time.h>
19 #include <sys/resource.h>
20 #include <unistd.h>
21 #include <sysexits.h>
22 #include <string.h>
23 #include <errno.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <limits.h>
27
28 #include "misc/io_sched.h"
29 #include "misc/rb.h"
30 #include "mavis/debug.h"
31 #include "mavis/log.h"
32 #include "misc/memops.h"
33
34 static const char rcsid[] __attribute__ ((used)) = "$Id: io_sched.c,v 1.58 2019/03/03 12:05:59 marc Exp $";
35
36 #ifdef WITH_KQUEUE
37 # include <sys/event.h>
38 #endif
39 #ifdef WITH_EPOLL
40 # include <sys/epoll.h>
41 #endif
42 #ifdef WITH_POLL
43 # include <sys/poll.h>
44 #endif
45 #ifdef WITH_DEVPOLL
46 # include <sys/devpoll.h>
47 #endif
48 #ifdef WITH_SELECT
49 # include <sys/select.h>
50 #endif
51 #ifdef WITH_PORT
52 # include <port.h>
53 #endif
54
55 #define IO_MODE_kqueue (1 << 0)
56 #define IO_MODE_devpoll (1 << 1)
57 #define IO_MODE_epoll (1 << 2)
58 #define IO_MODE_poll (1 << 3)
59 #define IO_MODE_select (1 << 4)
60 #define IO_MODE_port (1 << 5)
61
62 #define ARRAYINC 128
63 #define LISTINC 128
64
65 struct io_handler {
66 void *i; /* input handler */
67 void *o; /* output handler */
68 void *i_app; /* application input handler */
69 void *o_app; /* application output handler */
70 void *e; /* error handler */
71 void *h; /* hangup handler */
72 u_int want_read:1; /* interested in reading */
73 u_int want_write:1; /* interested in writing */
74 u_int want_read_app:1; /* App interested in reading */
75 u_int want_write_app:1; /* App interested in writing */
76 u_int want_read_ssl:1; /* TLS interested in reading */
77 u_int want_write_ssl:1; /* TLS interested in writing */
78 u_int reneg:1; /* TLS renegotiation active */
79 void *data; /* opaque context information */
80 };
81
82 #ifdef WITH_KQUEUE
83 struct kqueue_io_context {
84 struct kevent *changelist;
85 struct kevent *eventlist;
86 int nchanges;
87 int nchanges_max;
88 int nevents_max;
89 int fd;
90 };
91 #endif
92
93 #ifdef WITH_EPOLL
94 struct epoll_io_context {
95 int *changelist;
96 int *changemap;
97 int *diskfilemap;
98 int *diskfile;
99 struct epoll_event *eventlist;
100 int nchanges;
101 int ndiskfile;
102 int nevents_max;
103 int fd;
104 };
105 #endif
106
107 #ifdef WITH_DEVPOLL
108 struct devpoll_io_context {
109 struct pollfd *changelist;
110 struct pollfd *eventlist;
111 int nchanges;
112 int nchanges_max;
113 int nevents_max;
114 int fd;
115 };
116 #endif
117
118 #ifdef WITH_POLL
119 struct poll_io_context {
120 struct pollfd *ufds;
121 int nfds;
122 int *pax;
123 };
124 #endif
125
126 #ifdef WITH_SELECT
127 struct select_io_context {
128 fd_set rfds;
129 fd_set wfds;
130 fd_set efds;
131 int nfds;
132 };
133 #endif
134
135 #ifdef WITH_PORT
136 struct port_io_context {
137 int *changelist;
138 int *changemap;
139 port_event_t *eventlist;
140 int nchanges;
141 int fd;
142 int nfds;
143 int nevents_max;
144 };
145 #endif
146
147 struct event_cache {
148 int fd;
149 int events;
150 };
151
152 struct io_context {
153 struct io_handler *handler;
154 rb_tree_t *events_by_data;
155 rb_tree_t *events_by_time;
156 void *io_invalid_i;
157 void *io_invalid_o;
158 void *io_invalid_h;
159 void *io_invalid_e;
160 int *rcache_map; /* fd -> rcache map index */
161 struct event_cache *rcache;
162 int nfds_limit;
163 int nfds_max;
164 union {
165 #ifdef WITH_SELECT
166 struct select_io_context select;
167 #define Select mechanism.select
168 #endif
169 #ifdef WITH_POLL
170 struct poll_io_context poll;
171 #define Poll mechanism.poll
172 #endif
173 #ifdef WITH_EPOLL
174 struct epoll_io_context epoll;
175 #define Epoll mechanism.epoll
176 #endif
177 #ifdef WITH_DEVPOLL
178 struct devpoll_io_context devpoll;
179 #define Devpoll mechanism.devpoll
180 #endif
181 #ifdef WITH_KQUEUE
182 struct kqueue_io_context kqueue;
183 #define Kqueue mechanism.kqueue
184 #endif
185 #ifdef WITH_PORT
186 struct port_io_context port;
187 #define Port mechanism.port
188 #endif
189 } mechanism;
190 };
191
192 struct io_event {
193 void *proc;
194 struct timeval time_wait;
195 struct io_event *next;
196 };
197
198 struct io_sched {
199 void *data; /* context pointer, e.g. */
200 struct timeval time_when; /* when next event is triggered */
201 struct timeval time_real; /* when next event should be triggered */
202 struct io_event *event; /* event pointer */
203 };
204
205 static void (*mech_io_set_i) (struct io_context *, int);
206 static void (*mech_io_set_o) (struct io_context *, int);
207 static void (*mech_io_clr_i) (struct io_context *, int);
208 static void (*mech_io_clr_o) (struct io_context *, int);
209 static void (*mech_io_register) (struct io_context *, int);
210 static void (*mech_io_unregister) (struct io_context *, int);
211 static void (*mech_io_close) (struct io_context *, int);
212 static void (*mech_io_destroy) (struct io_context *);
213 static int (*mech_io_poll) (struct io_context *, int, int *);
214 static void (*mech_io_poll_finish) (struct io_context *, int);
215
216 static void io_resize(struct io_context *, int fd);
217
MINIMUM(int a,int b)218 static __inline__ int MINIMUM(int a, int b)
219 {
220 return (a < b) ? a : b;
221 }
222
MAXIMUM(int a,int b)223 static __inline__ int MAXIMUM(int a, int b)
224 {
225 return (a < b) ? b : a;
226 }
227
228 #define SIOS(A) ((struct io_sched *)(A))
229
cmp_tv(const void * a,const void * b)230 static int cmp_tv(const void *a, const void *b)
231 {
232 if (SIOS(a)->time_when.tv_sec < SIOS(b)->time_when.tv_sec)
233 return -1;
234 if (SIOS(a)->time_when.tv_sec > SIOS(b)->time_when.tv_sec)
235 return +1;
236 if (SIOS(a)->time_when.tv_usec < SIOS(b)->time_when.tv_usec)
237 return -1;
238 if (SIOS(a)->time_when.tv_usec > SIOS(b)->time_when.tv_usec)
239 return +1;
240 if (a < b)
241 return -1;
242 if (a > b)
243 return +1;
244 return 0;
245 }
246
cmp_data(const void * a,const void * b)247 static int cmp_data(const void *a, const void *b)
248 {
249 if (SIOS(a)->data < SIOS(b)->data)
250 return -1;
251 if (SIOS(a)->data > SIOS(b)->data)
252 return +1;
253 return 0;
254 }
255
io_invalid_i(void * v,int cur)256 static void io_invalid_i(void *v __attribute__ ((unused)), int cur)
257 {
258 logmsg("io_invalid_i (%d)", cur);
259 abort();
260 }
261
io_invalid_o(void * v,int cur)262 static void io_invalid_o(void *v __attribute__ ((unused)), int cur)
263 {
264 logmsg("io_invalid_o (%d)", cur);
265 abort();
266 }
267
io_invalid_e(void * v,int cur)268 static void io_invalid_e(void *v __attribute__ ((unused)), int cur)
269 {
270 logmsg("io_invalid_e (%d)", cur);
271 abort();
272 }
273
io_invalid_h(void * v,int cur)274 static void io_invalid_h(void *v __attribute__ ((unused)), int cur)
275 {
276 logmsg("io_invalid_h (%d)", cur);
277 abort();
278 }
279
io_is_invalid_i(struct io_context * io,int cur)280 int io_is_invalid_i(struct io_context *io, int cur)
281 {
282 return (io->handler[cur].i_app == io->io_invalid_i);
283 }
284
io_is_invalid_o(struct io_context * io,int cur)285 int io_is_invalid_o(struct io_context *io, int cur)
286 {
287 return (io->handler[cur].o_app == io->io_invalid_o);
288 }
289
io_is_invalid_e(struct io_context * io,int cur)290 int io_is_invalid_e(struct io_context *io, int cur)
291 {
292 return (io->handler[cur].e == io->io_invalid_e);
293 }
294
io_is_invalid_h(struct io_context * io,int cur)295 int io_is_invalid_h(struct io_context *io, int cur)
296 {
297 return (io->handler[cur].h == io->io_invalid_h);
298 }
299
io_poll(struct io_context * io,int poll_timeout)300 int io_poll(struct io_context *io, int poll_timeout)
301 {
302 int count, cax = 0;
303 int res = mech_io_poll(io, poll_timeout, &cax);
304
305 for (count = 0; count < cax; count++) {
306 int cur = io->rcache[count].fd;
307 struct io_context *ctx;
308
309 if (cur > -1) {
310 ctx = io_get_ctx(io, cur);
311 Debug((DEBUG_PROC, "fd %d ctx %p\n", cur, ctx));
312 if (ctx) {
313 void (*cb) (void *, int);
314 if (io->handler[cur].want_read && (io->rcache[count].events & POLLIN))
315 cb = (void (*)(void *, int)) (io_get_cb_i(io, cur));
316 else if (io->handler[cur].want_write && (io->rcache[count].events & POLLOUT)
317 && !(io->rcache[count].events & POLLHUP))
318 cb = (void (*)(void *, int)) (io_get_cb_o(io, cur));
319 else if (io->rcache[count].events & POLLERR)
320 cb = (void (*)(void *, int)) (io_get_cb_e(io, cur));
321 else if (io->rcache[count].events & POLLHUP)
322 cb = (void (*)(void *, int)) (io_get_cb_h(io, cur));
323 else
324 cb = NULL;
325
326 Debug((DEBUG_PROC, "fd %d cb = %p\n", cur, cb));
327 if (cb)
328 cb(ctx, cur);
329 }
330 io->rcache_map[cur] = -1;
331 }
332
333 io->rcache[count].fd = -1;
334 io->rcache[count].events = 0;
335 }
336 if (mech_io_poll_finish)
337 mech_io_poll_finish(io, res);
338
339 return res;
340 }
341
io_destroy(struct io_context * io,void (* freeproc)(void *))342 struct io_context *io_destroy(struct io_context *io, void (*freeproc) (void *))
343 {
344 if (io) {
345 RB_tree_delete(io->events_by_data);
346 RB_tree_delete(io->events_by_time);
347
348 if (freeproc) {
349 int i;
350 for (i = 0; i < io->nfds_max; i++)
351 if (io->handler[i].data)
352 freeproc(io->handler[i].data);
353 }
354
355 mech_io_destroy(io);
356
357 free(io->handler);
358 free(io->rcache_map);
359 free(io->rcache);
360 free(io);
361 }
362 return NULL;
363 }
364
io_register(struct io_context * io,int fd,void * data)365 void io_register(struct io_context *io, int fd, void *data)
366 {
367 mech_io_register(io, fd);
368
369 io->handler[fd].data = data;
370 io->handler[fd].i = io->io_invalid_i;
371 io->handler[fd].o = io->io_invalid_o;
372 io->handler[fd].i_app = io->io_invalid_i;
373 io->handler[fd].o_app = io->io_invalid_o;
374 io->handler[fd].e = io->io_invalid_e;
375 io->handler[fd].h = io->io_invalid_h;
376 io->handler[fd].want_read = 0;
377 io->handler[fd].want_write = 0;
378 io->handler[fd].want_read_app = 0;
379 io->handler[fd].want_write_app = 0;
380 #ifdef WITH_SSL
381 io->handler[fd].want_read_ssl = 0;
382 io->handler[fd].want_write_ssl = 0;
383 #endif
384 }
385
io_unregister(struct io_context * io,int fd)386 void *io_unregister(struct io_context *io, int fd)
387 {
388 void *res = io->handler[fd].data;
389
390 mech_io_unregister(io, fd);
391
392 io->handler[fd].data = NULL;
393 io->handler[fd].want_read = 0;
394 io->handler[fd].want_write = 0;
395
396 if (io->rcache_map[fd] > -1) {
397 io->rcache[io->rcache_map[fd]].fd = -1;
398 io->rcache[io->rcache_map[fd]].events = 0;
399 io->rcache_map[fd] = -1;
400 }
401
402 return res;
403 }
404
io_close(struct io_context * io,int fd)405 int io_close(struct io_context *io, int fd)
406 {
407 io_unregister(io, fd);
408 if (mech_io_close)
409 mech_io_close(io, fd);
410 return close(fd);
411 }
412
io_set_i(struct io_context * io,int fd)413 void io_set_i(struct io_context *io, int fd)
414 {
415 if (!io->handler[fd].want_read_app) {
416 io->handler[fd].want_read_app = 1;
417 mech_io_set_i(io, fd);
418 }
419 }
420
io_set_o(struct io_context * io,int fd)421 void io_set_o(struct io_context *io, int fd)
422 {
423 if (!io->handler[fd].want_write_app) {
424 io->handler[fd].want_write_app = 1;
425 mech_io_set_o(io, fd);
426 }
427 }
428
io_clr_i(struct io_context * io,int fd)429 void io_clr_i(struct io_context *io, int fd)
430 {
431 if (io->handler[fd].want_read_app) {
432 io->handler[fd].want_read_app = 0;
433 #ifdef WITH_SSL
434 if (io->handler[fd].want_read_ssl)
435 return;
436 #endif
437 mech_io_clr_i(io, fd);
438
439 if (io->rcache_map[fd] > -1)
440 io->rcache[io->rcache_map[fd]].events &= ~POLLIN;
441 }
442 }
443
io_clr_o(struct io_context * io,int fd)444 void io_clr_o(struct io_context *io, int fd)
445 {
446 if (io->handler[fd].want_write_app) {
447 io->handler[fd].want_write_app = 0;
448 #ifdef WITH_SSL
449 if (io->handler[fd].want_write_ssl)
450 return;
451 #endif
452 mech_io_clr_o(io, fd);
453 if (io->rcache_map[fd] > -1)
454 io->rcache[io->rcache_map[fd]].events &= ~POLLOUT;
455 }
456 }
457
458 #ifdef WITH_SSL
io_SSL_set_i(struct io_context * io,int fd)459 static __inline__ void io_SSL_set_i(struct io_context *io, int fd)
460 {
461 if (!io->handler[fd].want_read_ssl) {
462 io->handler[fd].want_read_ssl = 1;
463 mech_io_set_i(io, fd);
464 }
465 }
466
io_SSL_set_o(struct io_context * io,int fd)467 static __inline__ void io_SSL_set_o(struct io_context *io, int fd)
468 {
469 if (!io->handler[fd].want_write_ssl) {
470 io->handler[fd].want_write_ssl = 1;
471 mech_io_set_o(io, fd);
472 }
473 }
474
io_SSL_clr_i(struct io_context * io,int fd)475 static __inline__ void io_SSL_clr_i(struct io_context *io, int fd)
476 {
477 if (io->handler[fd].want_read_ssl) {
478 io->handler[fd].want_read_ssl = 0;
479 if (!io->handler[fd].want_read_app)
480 mech_io_clr_i(io, fd);
481 }
482 }
483
io_SSL_clr_o(struct io_context * io,int fd)484 static __inline__ void io_SSL_clr_o(struct io_context *io, int fd)
485 {
486 if (io->handler[fd].want_write_ssl) {
487 io->handler[fd].want_write_ssl = 0;
488 if (!io->handler[fd].want_write_app)
489 mech_io_clr_o(io, fd);
490 }
491 }
492 #endif /* WITH_SSL */
493
494 #ifdef WITH_SELECT
select_io_set_i(struct io_context * io,int fd)495 static void select_io_set_i(struct io_context *io, int fd)
496 {
497 io->handler[fd].want_read = 1;
498 FD_SET(fd, &io->Select.rfds);
499 }
500
select_io_set_o(struct io_context * io,int fd)501 static void select_io_set_o(struct io_context *io, int fd)
502 {
503 io->handler[fd].want_write = 1;
504 FD_SET(fd, &io->Select.wfds);
505 }
506
select_io_clr_i(struct io_context * io,int fd)507 static void select_io_clr_i(struct io_context *io, int fd)
508 {
509 io->handler[fd].want_read = 0;
510 FD_CLR(fd, &io->Select.rfds);
511 }
512
select_io_clr_o(struct io_context * io,int fd)513 static void select_io_clr_o(struct io_context *io, int fd)
514 {
515 io->handler[fd].want_write = 0;
516 FD_CLR(fd, &io->Select.wfds);
517 }
518
select_io_init(struct io_context * io)519 static void select_io_init(struct io_context *io)
520 {
521 FD_ZERO(&io->Select.rfds);
522 FD_ZERO(&io->Select.wfds);
523 FD_ZERO(&io->Select.efds);
524 io->Select.nfds = -1;
525 }
526
select_io_poll(struct io_context * io,int poll_timeout,int * cax)527 static int select_io_poll(struct io_context *io, int poll_timeout, int *cax)
528 {
529 int cur = 0, r, res;
530 struct timeval timeout;
531 fd_set rfds, wfds, efds;
532
533 Debug((DEBUG_PROC, "io_poll (%p, %dms)\n", io, poll_timeout));
534
535 *cax = 0;
536
537 timeout.tv_sec = poll_timeout / 1000;
538 timeout.tv_usec = 1000 * (u_int) (poll_timeout - 1000 * timeout.tv_sec);
539 rfds = io->Select.rfds;
540 wfds = io->Select.wfds;
541 efds = io->Select.efds;
542
543 r = res = select(io->Select.nfds + 1, &rfds, &wfds, &efds, poll_timeout < 0 ? NULL : &timeout);
544
545 if (r < 0) {
546 logerr("Fatal select(2) error (%ld, ..., (%lu, %lu))", (long int) io->Select.nfds + 1, (u_long) timeout.tv_sec, (u_long) timeout.tv_usec);
547 abort();
548 }
549
550 gettimeofday(&io_now, NULL);
551
552 for (; r > 0 && cur < io->nfds_max; cur++) {
553 if (FD_ISSET(cur, &rfds) || FD_ISSET(cur, &wfds) || FD_ISSET(cur, &efds)) {
554 if (io->rcache_map[cur] < 0) {
555 io->rcache[*cax].events = 0;
556 io->rcache[*cax].fd = cur;
557 io->rcache_map[cur] = (*cax)++;
558 }
559
560 if (FD_ISSET(cur, &rfds))
561 r--, io->rcache[io->rcache_map[cur]].events |= POLLIN;
562 if (FD_ISSET(cur, &efds))
563 r--, io->rcache[io->rcache_map[cur]].events |= POLLERR;
564 if (FD_ISSET(cur, &wfds))
565 r--, io->rcache[io->rcache_map[cur]].events |= POLLOUT;
566
567 if (r < 0) {
568 logmsg("Bug near %s:%d", __FILE__, __LINE__);
569 abort();
570 }
571 }
572 }
573
574 return res;
575 }
576
select_io_destroy(struct io_context * io)577 static void select_io_destroy(struct io_context *io __attribute__ ((unused)))
578 {
579 /* nothing to do */
580 }
581
select_io_unregister(struct io_context * io,int fd)582 static void select_io_unregister(struct io_context *io, int fd)
583 {
584 Debug((DEBUG_PROC, " io_unregister %d\n", fd));
585
586 FD_CLR(fd, &io->Select.rfds);
587 FD_CLR(fd, &io->Select.wfds);
588 FD_CLR(fd, &io->Select.efds);
589
590 if (fd == io->Select.nfds)
591 do
592 io->Select.nfds--;
593 while (io->Select.nfds > -1 && io->handler[io->Select.nfds].data == NULL);
594 }
595
select_io_register(struct io_context * io,int fd)596 static void select_io_register(struct io_context *io, int fd)
597 {
598 Debug((DEBUG_PROC, " io_register %d\n", fd));
599
600 if (fd >= io->nfds_max)
601 io_resize(io, fd);
602
603 if (fd > io->Select.nfds)
604 io->Select.nfds = fd;
605
606 FD_SET(fd, &io->Select.efds);
607 }
608 #endif
609
610 #ifdef WITH_POLL
poll_io_set_i(struct io_context * io,int fd)611 static void poll_io_set_i(struct io_context *io, int fd)
612 {
613 if (!io->handler[fd].want_read) {
614 io->handler[fd].want_read = 1;
615 io->Poll.ufds[io->Poll.pax[fd]].events |= POLLIN;
616 }
617 }
618
poll_io_set_o(struct io_context * io,int fd)619 static void poll_io_set_o(struct io_context *io, int fd)
620 {
621 if (!io->handler[fd].want_write) {
622 io->handler[fd].want_write = 1;
623 io->Poll.ufds[io->Poll.pax[fd]].events |= POLLOUT;
624 }
625 }
626
poll_io_clr_i(struct io_context * io,int fd)627 static void poll_io_clr_i(struct io_context *io, int fd)
628 {
629 if (io->handler[fd].want_read) {
630 io->handler[fd].want_read = 0;
631 io->Poll.ufds[io->Poll.pax[fd]].events &= ~POLLIN;
632 }
633 }
634
poll_io_clr_o(struct io_context * io,int fd)635 static void poll_io_clr_o(struct io_context *io, int fd)
636 {
637 if (io->handler[fd].want_write) {
638 io->handler[fd].want_write = 0;
639 io->Poll.ufds[io->Poll.pax[fd]].events &= ~POLLOUT;
640 }
641 }
642
poll_io_init(struct io_context * io)643 static void poll_io_init(struct io_context *io)
644 {
645 int i;
646
647 io->Poll.ufds = Xcalloc(io->nfds_max, sizeof(struct pollfd));
648 io->Poll.pax = Xcalloc(io->nfds_max, sizeof(int));
649 for (i = 0; i < io->nfds_max; i++)
650 io->Poll.pax[i] = -1;
651 }
652
poll_io_poll(struct io_context * io,int poll_timeout,int * cax)653 static int poll_io_poll(struct io_context *io, int poll_timeout, int *cax)
654 {
655 int count, res, r;
656 Debug((DEBUG_PROC, "io_poll (%p) timeout: %d\n", io, poll_timeout));
657
658 *cax = 0;
659
660 r = res = poll(io->Poll.ufds, (nfds_t) (io->Poll.nfds), poll_timeout);
661
662 Debug((DEBUG_PROC, "io_poll (%p) timeout: %d, res: %d\n", io, poll_timeout, res));
663
664 gettimeofday(&io_now, NULL);
665
666 for (count = io->Poll.nfds - 1; r > 0 && count > -1; count--) {
667 int cur = io->Poll.ufds[count].fd;
668
669 if (cur > -1 && io->Poll.pax[cur] > -1 && io->Poll.ufds[io->Poll.pax[cur]].revents) {
670 r--;
671
672 if (io->rcache_map[cur] < 0) {
673 io->rcache[*cax].events = 0;
674 io->rcache[*cax].fd = cur;
675 io->rcache_map[cur] = (*cax)++;
676 }
677
678 io->rcache[io->rcache_map[cur]].events = io->Poll.ufds[io->Poll.pax[cur]].revents;
679 }
680 }
681
682 return res;
683 }
684
poll_io_destroy(struct io_context * io)685 static void poll_io_destroy(struct io_context *io)
686 {
687 free(io->Poll.ufds);
688 free(io->Poll.pax);
689 }
690
poll_io_unregister(struct io_context * io,int fd)691 static void poll_io_unregister(struct io_context *io, int fd)
692 {
693 int pos = io->Poll.pax[fd];
694 Debug((DEBUG_PROC, " io_unregister %d\n", fd));
695
696 if (pos < 0 || pos >= io->Poll.nfds) {
697 logmsg("Ooops ... poll array index for %d out of range (%d)!", fd, pos);
698 abort();
699 }
700
701 if (pos != --io->Poll.nfds) {
702 io->Poll.ufds[pos] = io->Poll.ufds[io->Poll.nfds];
703 io->Poll.pax[io->Poll.ufds[pos].fd] = pos;
704 }
705
706 io->Poll.pax[fd] = -1;
707 }
708
poll_io_register(struct io_context * io,int fd)709 static void poll_io_register(struct io_context *io, int fd)
710 {
711 Debug((DEBUG_PROC, " io_register %d\n", fd));
712
713 if (fd >= io->nfds_max) {
714 int i;
715 int omax = io->nfds_max;
716 io_resize(io, fd);
717 io->Poll.ufds = Xrealloc(io->Poll.ufds, io->nfds_max * sizeof(struct pollfd));
718 io->Poll.pax = Xrealloc(io->Poll.pax, io->nfds_max * sizeof(int));
719 for (i = omax; i < io->nfds_max; i++)
720 io->Poll.pax[i] = -1;
721 }
722
723 if (io->Poll.pax[fd] != -1) {
724 logmsg("Ooops ... poll array index for %d already set!", fd);
725 logmsg("%d %d %d", io->nfds_limit, io->nfds_max, fd);
726 abort();
727 }
728
729 memset(&io->Poll.ufds[io->Poll.nfds], 0, sizeof(struct pollfd));
730 io->Poll.ufds[io->Poll.nfds].fd = fd;
731 io->Poll.ufds[io->Poll.nfds].events = 0;
732 io->Poll.pax[fd] = io->Poll.nfds++;
733 }
734 #endif
735
736 #ifdef WITH_EPOLL
epoll_io_close(struct io_context * io,int fd)737 static void epoll_io_close(struct io_context *io, int fd)
738 {
739 if (io->Epoll.changemap[fd] > -1) {
740 io->Epoll.changelist[io->Epoll.changemap[fd]] = -1;
741 io->Epoll.changemap[fd] = -1;
742 io->Epoll.nchanges--;
743 }
744 }
745
epoll_addchange(struct io_context * io,int fd)746 static void epoll_addchange(struct io_context *io, int fd)
747 {
748 if (io->Epoll.changemap[fd] < 0 || io->Epoll.changemap[fd] >= io->Epoll.nchanges || io->Epoll.changelist[io->Epoll.changemap[fd]] != fd) {
749 io->Epoll.changemap[fd] = io->Epoll.nchanges;
750 io->Epoll.changelist[io->Epoll.nchanges++] = fd;
751 }
752 }
753
epoll_io_set_i(struct io_context * io,int fd)754 static void epoll_io_set_i(struct io_context *io, int fd)
755 {
756 if (!io->handler[fd].want_read) {
757 io->handler[fd].want_read = 1;
758 if (io->Epoll.diskfile[fd] == -2) {
759 io->Epoll.diskfilemap[io->Epoll.ndiskfile] = fd;
760 io->Epoll.diskfile[fd] = io->Epoll.ndiskfile++;
761 } else if (io->Epoll.diskfile[fd] == -1)
762 epoll_addchange(io, fd);
763 }
764 }
765
epoll_io_clr_i(struct io_context * io,int fd)766 static void epoll_io_clr_i(struct io_context *io, int fd)
767 {
768 if (io->handler[fd].want_read) {
769 io->handler[fd].want_read = 0;
770 if (io->Epoll.diskfile[fd] > -1) {
771 if (!io->handler[fd].want_write) {
772 io->Epoll.ndiskfile--;
773 io->Epoll.diskfilemap[io->Epoll.diskfile[fd]] = io->Epoll.diskfilemap[io->Epoll.ndiskfile];
774 io->Epoll.diskfile[io->Epoll.diskfilemap[io->Epoll.ndiskfile]] = io->Epoll.diskfile[fd];
775 io->Epoll.diskfile[fd] = -2;
776 }
777 } else
778 epoll_addchange(io, fd);
779 }
780 }
781
epoll_io_set_o(struct io_context * io,int fd)782 static void epoll_io_set_o(struct io_context *io, int fd)
783 {
784 if (!io->handler[fd].want_write) {
785 io->handler[fd].want_write = 1;
786 if (io->Epoll.diskfile[fd] == -2) {
787 io->Epoll.diskfilemap[io->Epoll.ndiskfile] = fd;
788 io->Epoll.diskfile[fd] = io->Epoll.ndiskfile++;
789 } else if (io->Epoll.diskfile[fd] == -1)
790 epoll_addchange(io, fd);
791 }
792 }
793
epoll_io_clr_o(struct io_context * io,int fd)794 static void epoll_io_clr_o(struct io_context *io, int fd)
795 {
796 if (io->handler[fd].want_write) {
797 io->handler[fd].want_write = 0;
798 if (io->Epoll.diskfile[fd] > -1) {
799 if (!io->handler[fd].want_read) {
800 io->Epoll.ndiskfile--;
801 io->Epoll.diskfilemap[io->Epoll.diskfile[fd]] = io->Epoll.diskfilemap[io->Epoll.ndiskfile];
802 io->Epoll.diskfile[io->Epoll.diskfilemap[io->Epoll.ndiskfile]] = io->Epoll.diskfile[fd];
803 io->Epoll.diskfile[fd] = -2;
804 }
805 } else
806 epoll_addchange(io, fd);
807 }
808 }
809
epoll_io_init(struct io_context * io)810 static void epoll_io_init(struct io_context *io)
811 {
812 int i, flags;
813
814 io->Epoll.fd = epoll_create(io->nfds_max);
815
816 flags = fcntl(io->Epoll.fd, F_GETFD, 0) | FD_CLOEXEC;
817 fcntl(io->Epoll.fd, F_SETFD, flags);
818
819 io->Epoll.nchanges = io->Epoll.ndiskfile = 0;
820 io->Epoll.nevents_max = io->nfds_max;
821 io->Epoll.eventlist = Xcalloc(io->Epoll.nevents_max, sizeof(struct epoll_event));
822 io->Epoll.changelist = Xcalloc(io->nfds_max, sizeof(int));
823 io->Epoll.changemap = Xcalloc(io->nfds_max, sizeof(int));
824 io->Epoll.diskfile = Xcalloc(io->nfds_max, sizeof(int));
825 io->Epoll.diskfilemap = Xcalloc(io->nfds_max, sizeof(int));
826 for (i = 0; i < io->nfds_max; i++) {
827 io->Epoll.changelist[i] = -1;
828 io->Epoll.changemap[i] = -1;
829 io->Epoll.diskfile[i] = -1;
830 io->Epoll.diskfilemap[i] = -1;
831 }
832 }
833
epoll_io_unregister(struct io_context * io,int fd)834 static void epoll_io_unregister(struct io_context *io __attribute__ ((unused)), int fd __attribute__ ((unused)))
835 {
836 Debug((DEBUG_PROC, " io_unregister %d\n", fd));
837 if (io->Epoll.diskfile[fd] > -1) {
838 io->Epoll.ndiskfile--;
839 io->Epoll.diskfilemap[io->Epoll.diskfile[fd]] = io->Epoll.diskfilemap[io->Epoll.ndiskfile];
840 io->Epoll.diskfile[io->Epoll.diskfilemap[io->Epoll.ndiskfile]] = io->Epoll.diskfile[fd];
841 io->Epoll.diskfile[fd] = -1;
842 }
843 }
844
epoll_io_register(struct io_context * io,int fd)845 static void epoll_io_register(struct io_context *io, int fd)
846 {
847 struct epoll_event e;
848 Debug((DEBUG_PROC, " io_register %d\n", fd));
849
850 if (fd >= io->nfds_max) {
851 int i;
852 int omax = io->nfds_max;
853 io_resize(io, fd);
854 io->Epoll.changelist = Xrealloc(io->Epoll.changelist, io->nfds_max * sizeof(int));
855 io->Epoll.changemap = Xrealloc(io->Epoll.changemap, io->nfds_max * sizeof(int));
856 for (i = omax; i < io->nfds_max; i++) {
857 io->Epoll.changelist[i] = -1;
858 io->Epoll.changemap[i] = -1;
859 }
860 }
861
862 e.data.fd = fd;
863 e.events = 0;
864 if (-1 == epoll_ctl(io->Epoll.fd, EPOLL_CTL_ADD, fd, &e)
865 && errno == EPERM)
866 io->Epoll.diskfile[fd] = -2;
867 }
868
epoll_io_poll(struct io_context * io,int poll_timeout,int * cax)869 static int epoll_io_poll(struct io_context *io, int poll_timeout, int *cax)
870 {
871 int count, res;
872 Debug((DEBUG_PROC, "io_poll (%p)\n", io));
873
874 *cax = 0;
875
876 for (count = 0; count < io->Epoll.nchanges; count++) {
877 int fd = io->Epoll.changelist[count];
878 if (fd > -1 && io->Epoll.changemap[fd] == count && io->Epoll.diskfilemap[fd] == -1) {
879 struct epoll_event e;
880 e.data.fd = fd;
881 e.events = (io->handler[fd].want_read ? EPOLLIN : 0) | (io->handler[fd].want_write ? EPOLLOUT : 0);
882 if (epoll_ctl(io->Epoll.fd, EPOLL_CTL_MOD, fd, &e) < 0) {
883 #ifndef DEBUG
884 logerr("epoll_ctl (%s:%d)", __FILE__, __LINE__)
885 #endif
886 ;
887 }
888 io->Epoll.changemap[fd] = -1;
889 }
890 }
891 io->Epoll.nchanges = 0;
892
893 res = epoll_wait(io->Epoll.fd, io->Epoll.eventlist, io->Epoll.nevents_max, io->Epoll.ndiskfile ? 0 : poll_timeout);
894
895 gettimeofday(&io_now, NULL);
896
897 for (count = 0; count < res; count++) {
898 int cur = io->Epoll.eventlist[count].data.fd;
899
900 if (io->rcache_map[cur] < 0) {
901 io->rcache[*cax].events = 0;
902 io->rcache[*cax].fd = cur;
903 io->rcache_map[cur] = (*cax)++;
904 }
905
906 io->rcache[io->rcache_map[cur]].events = io->Epoll.eventlist[count].events;
907 }
908
909 res += io->Epoll.ndiskfile;
910
911 for (count = 0; count < io->Epoll.ndiskfile; count++) {
912 int cur = io->Epoll.diskfilemap[count];
913
914 if (io->rcache_map[cur] < 0) {
915 io->rcache[*cax].events = 0;
916 io->rcache[*cax].fd = cur;
917 io->rcache_map[cur] = (*cax)++;
918 }
919
920 if (io->handler[cur].want_write)
921 io->rcache[io->rcache_map[cur]].events |= POLLOUT;
922 if (io->handler[cur].want_read)
923 io->rcache[io->rcache_map[cur]].events |= POLLIN;
924 }
925
926 return res;
927 }
928
epoll_io_destroy(struct io_context * io)929 static void epoll_io_destroy(struct io_context *io)
930 {
931 free(io->Epoll.eventlist);
932 free(io->Epoll.changelist);
933 free(io->Epoll.changemap);
934 free(io->Epoll.diskfile);
935 free(io->Epoll.diskfilemap);
936 close(io->Epoll.fd);
937 }
938 #endif
939
940 #ifdef WITH_DEVPOLL
devpoll_io_changelist_resize(struct io_context * io)941 static void devpoll_io_changelist_resize(struct io_context *io)
942 {
943 io->Devpoll.nchanges_max += LISTINC;
944 io->Devpoll.changelist = Xrealloc(io->Devpoll.changelist, io->Devpoll.nchanges_max * sizeof(struct pollfd));
945 }
946
devpoll_io_close(struct io_context * io,int fd)947 static void devpoll_io_close(struct io_context *io, int fd)
948 {
949 int i, j;
950 for (i = 0, j = 0; i < io->Devpoll.nchanges; i++) {
951 if ((int) io->Devpoll.changelist[i].fd == fd)
952 continue;
953 if (i != j)
954 io->Devpoll.changelist[j] = io->Devpoll.changelist[i];
955 j++;
956 }
957 io->Devpoll.nchanges = j;
958 }
959
devpoll_io_set_i(struct io_context * io,int fd)960 static void devpoll_io_set_i(struct io_context *io, int fd)
961 {
962 if (!io->handler[fd].want_read) {
963 io->handler[fd].want_read = 1;
964 if (io->Devpoll.nchanges == io->Devpoll.nchanges_max)
965 devpoll_io_changelist_resize(io);
966
967 io->Devpoll.changelist[io->Devpoll.nchanges].fd = fd;
968 io->Devpoll.changelist[io->Devpoll.nchanges++].events = POLLIN;
969 }
970 }
971
devpoll_io_set_o(struct io_context * io,int fd)972 static void devpoll_io_set_o(struct io_context *io, int fd)
973 {
974 if (!io->handler[fd].want_write) {
975 io->handler[fd].want_write = 1;
976 if (io->Devpoll.nchanges == io->Devpoll.nchanges_max)
977 devpoll_io_changelist_resize(io);
978
979 io->Devpoll.changelist[io->Devpoll.nchanges].fd = fd;
980 io->Devpoll.changelist[io->Devpoll.nchanges++].events = POLLOUT;
981 }
982 }
983
devpoll_io_clr_i(struct io_context * io,int fd)984 static void devpoll_io_clr_i(struct io_context *io, int fd)
985 {
986 if (io->handler[fd].want_read) {
987 io->handler[fd].want_read = 0;
988 if (io->Devpoll.nchanges == io->Devpoll.nchanges_max)
989 devpoll_io_changelist_resize(io);
990
991 io->Devpoll.changelist[io->Devpoll.nchanges].fd = fd;
992 io->Devpoll.changelist[io->Devpoll.nchanges++].events = POLLREMOVE;
993
994 if (io->handler[fd].want_write) {
995 io->handler[fd].want_write = 0;
996 devpoll_io_set_o(io, fd);
997 }
998 }
999 }
1000
devpoll_io_clr_o(struct io_context * io,int fd)1001 static void devpoll_io_clr_o(struct io_context *io, int fd)
1002 {
1003 if (io->handler[fd].want_write) {
1004 io->handler[fd].want_write = 0;
1005 if (io->Devpoll.nchanges == io->Devpoll.nchanges_max)
1006 devpoll_io_changelist_resize(io);
1007
1008 io->Devpoll.changelist[io->Devpoll.nchanges].fd = fd;
1009 io->Devpoll.changelist[io->Devpoll.nchanges++].events = POLLREMOVE;
1010
1011 if (io->handler[fd].want_read) {
1012 io->handler[fd].want_read = 0;
1013 devpoll_io_set_i(io, fd);
1014 }
1015 }
1016 }
1017
devpoll_io_init(struct io_context * io)1018 static void devpoll_io_init(struct io_context *io)
1019 {
1020 int flags;
1021
1022 if ((io->Devpoll.fd = open("/dev/poll", O_RDWR)) < 0) {
1023 logerr("devpoll open (%s:%d)", __FILE__, __LINE__);
1024 abort();
1025 }
1026 flags = fcntl(io->Devpoll.fd, F_GETFD, 0) | FD_CLOEXEC;
1027 fcntl(io->Devpoll.fd, F_SETFD, flags);
1028
1029 io->Devpoll.nchanges = 0;
1030 io->Devpoll.nchanges_max = LISTINC;
1031 io->Devpoll.changelist = Xcalloc(io->Devpoll.nchanges_max, sizeof(struct pollfd));
1032 io->Devpoll.nevents_max = LISTINC;
1033 io->Devpoll.eventlist = Xcalloc(io->Devpoll.nevents_max, sizeof(struct pollfd));
1034 }
1035
devpoll_io_poll(struct io_context * io,int poll_timeout,int * cax)1036 static int devpoll_io_poll(struct io_context *io, int poll_timeout, int *cax)
1037 {
1038 int count, res;
1039 struct dvpoll dvp;
1040
1041 Debug((DEBUG_PROC, "io_poll (%p)\n", io));
1042
1043 *cax = 0;
1044
1045 dvp.dp_fds = io->Devpoll.eventlist;
1046 dvp.dp_nfds = io->Devpoll.nevents_max;
1047 dvp.dp_timeout = poll_timeout > -1 ? poll_timeout : 0;
1048
1049 if (io->Devpoll.nchanges &&
1050 (write(io->Devpoll.fd, io->Devpoll.changelist,
1051 sizeof(struct pollfd) * io->Devpoll.nchanges) != (ssize_t) sizeof(struct pollfd) * io->Devpoll.nchanges)) {
1052 logerr("devpoll write (%s:%d)", __FILE__, __LINE__);
1053 abort();
1054 }
1055
1056 res = ioctl(io->Devpoll.fd, DP_POLL, &dvp);
1057 Debug((DEBUG_PROC, "devpoll ioctl returns %d\n", res));
1058
1059 if (0 > res) {
1060 logerr("devpoll ioctl (%s:%d)", __FILE__, __LINE__);
1061 abort();
1062 }
1063
1064 io->Devpoll.nchanges = 0;
1065
1066 gettimeofday(&io_now, NULL);
1067
1068 for (count = 0; count < res; count++) {
1069 int cur = io->Devpoll.eventlist[count].fd;
1070
1071 if (io->rcache_map[cur] < 0) {
1072 io->rcache[*cax].events = 0;
1073 io->rcache[*cax].fd = cur;
1074 io->rcache_map[cur] = (*cax)++;
1075 }
1076 io->rcache[io->rcache_map[cur]].events = io->Devpoll.eventlist[count].revents;
1077 }
1078
1079 return res;
1080 }
1081
devpoll_io_destroy(struct io_context * io)1082 static void devpoll_io_destroy(struct io_context *io)
1083 {
1084 free(io->Devpoll.changelist);
1085 free(io->Devpoll.eventlist);
1086 close(io->Devpoll.fd);
1087 }
1088
devpoll_io_unregister(struct io_context * io,int fd)1089 static void devpoll_io_unregister(struct io_context *io, int fd)
1090 {
1091 Debug((DEBUG_PROC, " io_unregister %d\n", fd));
1092
1093 if (io->handler[fd].want_read || io->handler[fd].want_write) {
1094
1095 if (io->Devpoll.nchanges == io->Devpoll.nchanges_max)
1096 devpoll_io_changelist_resize(io);
1097
1098 io->Devpoll.changelist[io->Devpoll.nchanges].fd = fd;
1099 io->Devpoll.changelist[io->Devpoll.nchanges++].events = POLLREMOVE;
1100 }
1101 }
1102
devpoll_io_register(struct io_context * io,int fd)1103 static void devpoll_io_register(struct io_context *io, int fd)
1104 {
1105 Debug((DEBUG_PROC, " io_register %d\n", fd));
1106 if (fd >= io->nfds_max)
1107 io_resize(io, fd);
1108 }
1109 #endif
1110
1111 #ifdef WITH_KQUEUE
kqueue_io_changelist_resize(struct io_context * io)1112 static void kqueue_io_changelist_resize(struct io_context *io)
1113 {
1114 io->Kqueue.nchanges_max += LISTINC;
1115 io->Kqueue.changelist = Xrealloc(io->Kqueue.changelist, io->Kqueue.nchanges_max * sizeof(struct kevent));
1116 }
1117
kqueue_io_set_i(struct io_context * io,int fd)1118 static void kqueue_io_set_i(struct io_context *io, int fd)
1119 {
1120 Debug((DEBUG_PROC, "io_set_i(%d)\n", fd));
1121 if (!io->handler[fd].want_read) {
1122 io->handler[fd].want_read = 1;
1123 if (io->Kqueue.nchanges == io->Kqueue.nchanges_max)
1124 kqueue_io_changelist_resize(io);
1125
1126 EV_SET(&io->Kqueue.changelist[io->Kqueue.nchanges], fd, EVFILT_READ, EV_ADD, 0, 0,
1127 #ifdef __NetBSD__
1128 (intptr_t)
1129 #endif
1130 io_get_ctx(io, fd));
1131
1132 io->Kqueue.nchanges++;
1133 }
1134 }
1135
1136 static struct timespec *timeout_immediately = NULL;
1137
kqueue_flush(struct io_context * io)1138 static void kqueue_flush(struct io_context *io)
1139 {
1140 kevent(io->Kqueue.fd, io->Kqueue.changelist, io->Kqueue.nchanges, io->Kqueue.eventlist, 0, timeout_immediately);
1141 io->Kqueue.nchanges = 0;
1142 }
1143
kqueue_io_clr_i(struct io_context * io,int fd)1144 static void kqueue_io_clr_i(struct io_context *io, int fd)
1145 {
1146 Debug((DEBUG_PROC, "io_clr_i(%d)\n", fd));
1147 if (io->handler[fd].want_read) {
1148 io->handler[fd].want_read = 0;
1149 if (io->Kqueue.nchanges == io->Kqueue.nchanges_max)
1150 kqueue_io_changelist_resize(io);
1151
1152 EV_SET(&io->Kqueue.changelist[io->Kqueue.nchanges], fd, EVFILT_READ, EV_DELETE, 0, 0,
1153 #ifdef __NetBSD__
1154 (intptr_t)
1155 #endif
1156 NULL);
1157 io->Kqueue.nchanges++;
1158 }
1159 }
1160
kqueue_io_set_o(struct io_context * io,int fd)1161 static void kqueue_io_set_o(struct io_context *io, int fd)
1162 {
1163 Debug((DEBUG_PROC, "io_set_o(%d)\n", fd));
1164 if (!io->handler[fd].want_write) {
1165 io->handler[fd].want_write = 1;
1166 if (io->Kqueue.nchanges == io->Kqueue.nchanges_max)
1167 kqueue_io_changelist_resize(io);
1168
1169 EV_SET(&io->Kqueue.changelist[io->Kqueue.nchanges], fd, EVFILT_WRITE, EV_ADD, 0, 0,
1170 #ifdef __NetBSD__
1171 (intptr_t)
1172 #endif
1173 io_get_ctx(io, fd));
1174 io->Kqueue.nchanges++;
1175 }
1176 }
1177
kqueue_io_clr_o(struct io_context * io,int fd)1178 static void kqueue_io_clr_o(struct io_context *io, int fd)
1179 {
1180 Debug((DEBUG_PROC, "io_clr_o(%d)\n", fd));
1181 if (io->handler[fd].want_write) {
1182 io->handler[fd].want_write = 0;
1183 if (io->Kqueue.nchanges == io->Kqueue.nchanges_max)
1184 kqueue_io_changelist_resize(io);
1185
1186 EV_SET(&io->Kqueue.changelist[io->Kqueue.nchanges], fd, EVFILT_WRITE, EV_DELETE, 0, 0,
1187 #ifdef __NetBSD__
1188 (intptr_t)
1189 #endif
1190 NULL);
1191 io->Kqueue.nchanges++;
1192 }
1193 }
1194
kqueue_io_init(struct io_context * io)1195 static void kqueue_io_init(struct io_context *io)
1196 {
1197 io->Kqueue.fd = kqueue();
1198 io->Kqueue.nchanges = 0;
1199 io->Kqueue.nchanges_max = LISTINC;
1200 io->Kqueue.changelist = Xcalloc(io->Kqueue.nchanges_max, sizeof(struct kevent));
1201 io->Kqueue.nevents_max = LISTINC;
1202 io->Kqueue.eventlist = Xcalloc(io->Kqueue.nevents_max, sizeof(struct kevent));
1203 if (!timeout_immediately)
1204 timeout_immediately = calloc(1, sizeof(struct timespec));
1205 }
1206
kqueue_io_poll(struct io_context * io,int poll_timeout,int * cax)1207 static int kqueue_io_poll(struct io_context *io, int poll_timeout, int *cax)
1208 {
1209 int count, res;
1210 struct timespec timeout;
1211
1212 Debug((DEBUG_PROC, "io_poll (%p)\n", io));
1213
1214 *cax = 0;
1215
1216 timeout.tv_sec = poll_timeout / 1000;
1217 timeout.tv_nsec = 1000000 * (poll_timeout - 1000 * timeout.tv_sec);
1218 Debug((DEBUG_PROC, "nchanges is %d\n", io->Kqueue.nchanges));
1219 res = kevent(io->Kqueue.fd, io->Kqueue.changelist,
1220 io->Kqueue.nchanges, io->Kqueue.eventlist, io->Kqueue.nevents_max, poll_timeout > -1 ? &timeout : NULL);
1221 io->Kqueue.nchanges = 0;
1222
1223 gettimeofday(&io_now, NULL);
1224
1225 for (count = 0; count < res; count++) {
1226 struct kevent *k = &io->Kqueue.eventlist[count];
1227 if (!(k->flags & EV_ERROR && k->data == EBADF)) {
1228 int pos, cur = (int) k->ident;
1229
1230 if (io->rcache_map[cur] < 0) {
1231 io->rcache[*cax].events = 0;
1232 io->rcache[*cax].fd = cur;
1233 io->rcache_map[cur] = (*cax)++;
1234 }
1235
1236 pos = io->rcache_map[cur];
1237
1238 if (k->filter == EVFILT_READ)
1239 io->rcache[pos].events |= POLLIN;
1240 if (k->flags & EV_EOF)
1241 io->rcache[pos].events |= POLLHUP;
1242 if (k->flags & EV_ERROR)
1243 io->rcache[pos].events |= POLLERR;
1244 if (k->filter == EVFILT_WRITE)
1245 io->rcache[pos].events |= POLLOUT;
1246 }
1247 }
1248
1249 return res;
1250 }
1251
kqueue_io_unregister(struct io_context * io,int fd)1252 static void kqueue_io_unregister(struct io_context *io, int fd)
1253 {
1254 kqueue_io_clr_i(io, fd);
1255 kqueue_io_clr_o(io, fd);
1256 kqueue_flush(io);
1257 }
1258
kqueue_io_close(struct io_context * io,int fd)1259 static void kqueue_io_close(struct io_context *io, int fd)
1260 {
1261 int i, j;
1262
1263 for (i = 0, j = 0; i < io->Kqueue.nchanges; i++) {
1264 if ((int) io->Kqueue.changelist[i].ident == fd)
1265 continue;
1266 if (i != j)
1267 io->Kqueue.changelist[j] = io->Kqueue.changelist[i];
1268 j++;
1269 }
1270 io->Kqueue.nchanges = j;
1271 }
1272
1273
kqueue_io_register(struct io_context * io,int fd)1274 static void kqueue_io_register(struct io_context *io, int fd)
1275 {
1276 Debug((DEBUG_PROC, " io_register %d\n", fd));
1277
1278 if (fd >= io->nfds_max)
1279 io_resize(io, fd);
1280 }
1281
kqueue_io_destroy(struct io_context * io)1282 static void kqueue_io_destroy(struct io_context *io)
1283 {
1284 free(io->Kqueue.changelist);
1285 free(io->Kqueue.eventlist);
1286 close(io->Kqueue.fd);
1287 }
1288 #endif
1289
1290 #ifdef WITH_PORT
port_io_close(struct io_context * io,int fd)1291 static void port_io_close(struct io_context *io, int fd)
1292 {
1293 if (io->Port.changemap[fd] > -1) {
1294 io->Port.changelist[io->Port.changemap[fd]] = -1;
1295 io->Port.changemap[fd] = -1;
1296 io->Port.nchanges--;
1297 }
1298 }
1299
port_addchange(struct io_context * io,int fd)1300 static void port_addchange(struct io_context *io, int fd)
1301 {
1302 if (io->Port.changemap[fd] < 0 || io->Port.changemap[fd] >= io->Port.nchanges || io->Port.changelist[io->Port.changemap[fd]] != fd) {
1303 io->Port.changemap[fd] = io->Port.nchanges;
1304 io->Port.changelist[io->Port.nchanges++] = fd;
1305 }
1306 }
1307
port_io_set_i(struct io_context * io,int fd)1308 static void port_io_set_i(struct io_context *io, int fd)
1309 {
1310 if (!io->handler[fd].want_read) {
1311 io->handler[fd].want_read = 1;
1312 port_addchange(io, fd);
1313 }
1314 }
1315
port_io_clr_i(struct io_context * io,int fd)1316 static void port_io_clr_i(struct io_context *io, int fd)
1317 {
1318 if (io->handler[fd].want_read) {
1319 io->handler[fd].want_read = 0;
1320 port_addchange(io, fd);
1321 }
1322 }
1323
port_io_set_o(struct io_context * io,int fd)1324 static void port_io_set_o(struct io_context *io, int fd)
1325 {
1326 if (!io->handler[fd].want_write) {
1327 io->handler[fd].want_write = 1;
1328 port_addchange(io, fd);
1329 }
1330 }
1331
port_io_clr_o(struct io_context * io,int fd)1332 static void port_io_clr_o(struct io_context *io, int fd)
1333 {
1334 if (io->handler[fd].want_write) {
1335 io->handler[fd].want_write = 0;
1336 port_addchange(io, fd);
1337 }
1338 }
1339
port_io_init(struct io_context * io)1340 static void port_io_init(struct io_context *io)
1341 {
1342 int flags, i;
1343
1344 io->Port.nevents_max = LISTINC;
1345 io->Port.eventlist = Xcalloc(io->Port.nevents_max, sizeof(port_event_t));
1346
1347 io->Port.fd = port_create();
1348 flags = fcntl(io->Port.fd, F_GETFD, 0) | FD_CLOEXEC;
1349 fcntl(io->Port.fd, F_SETFD, flags);
1350
1351 io->Port.changelist = Xcalloc(io->nfds_max, sizeof(int));
1352 io->Port.changemap = Xcalloc(io->nfds_max, sizeof(int));
1353 for (i = 0; i < io->Port.nevents_max; i++) {
1354 io->Port.changelist[i] = -1;
1355 io->Port.changemap[i] = -1;
1356 }
1357 }
1358
port_io_poll(struct io_context * io,int poll_timeout,int * cax)1359 static int port_io_poll(struct io_context *io, int poll_timeout, int *cax)
1360 {
1361 uint_t count;
1362 uint_t nevents = 1;
1363 struct timespec timeout;
1364
1365 *cax = 0;
1366
1367 Debug((DEBUG_PROC, "io_poll (%p)\n", io));
1368
1369 for (count = 0; count < (uint_t) io->Port.nchanges; count++) {
1370 int fd = io->Port.changelist[count];
1371 if (fd > -1 && io->Port.changemap[fd] == (int) count) {
1372 if (0 > port_associate(io->Port.fd, PORT_SOURCE_FD, fd,
1373 (io->handler[fd].want_read ? POLLIN : 0) | (io->handler[fd].want_write ? POLLOUT : 0), &io->handler[fd]))
1374 logerr("port_associate (%s:%d)", __FILE__, __LINE__);
1375
1376 io->Port.changemap[fd] = -1;
1377 }
1378 }
1379 io->Port.nchanges = 0;
1380
1381 timeout.tv_sec = poll_timeout / 1000;
1382 timeout.tv_nsec = 1000000 * (poll_timeout - 1000 * timeout.tv_sec);
1383
1384 if (-1 == port_getn(io->Port.fd, io->Port.eventlist, io->Port.nevents_max, &nevents, poll_timeout < 0 ? NULL : &timeout)) {
1385 if (errno != ETIME) {
1386 logerr("port_getn (errno = %d)", errno);
1387 abort();
1388 }
1389 nevents = 0;
1390 }
1391
1392 gettimeofday(&io_now, NULL);
1393
1394 for (count = 0; count < nevents; count++) {
1395 int pos, cur = io->Port.eventlist[count].portev_object;
1396
1397 if (io->rcache_map[cur] < 0) {
1398 io->rcache[*cax].events = 0;
1399 io->rcache[*cax].fd = cur;
1400 io->rcache_map[cur] = (*cax)++;
1401 }
1402
1403 pos = io->rcache_map[cur];
1404
1405 io->rcache[pos].events = io->Port.eventlist[count].portev_events;
1406 }
1407
1408 return (int) nevents;
1409 }
1410
port_io_poll_finish(struct io_context * io,int nevents)1411 static void port_io_poll_finish(struct io_context *io, int nevents)
1412 {
1413 int count;
1414 for (count = 0; count < nevents; count++) {
1415 int cur = io->Port.eventlist[count].portev_object;
1416 if (io->handler[cur].want_read || io->handler[cur].want_write)
1417 port_addchange(io, cur);
1418 }
1419 }
1420
port_io_unregister(struct io_context * io,int fd)1421 static void port_io_unregister(struct io_context *io, int fd)
1422 {
1423 port_dissociate(io->Port.fd, PORT_SOURCE_FD, fd);
1424 io->Port.changemap[fd] = -1;
1425 }
1426
port_io_register(struct io_context * io,int fd)1427 static void port_io_register(struct io_context *io, int fd)
1428 {
1429 Debug((DEBUG_PROC, " io_register %d\n", fd));
1430
1431 if (fd >= io->nfds_max) {
1432 int i;
1433 int omax = io->nfds_max;
1434
1435 io->Port.changelist = Xrealloc(io->Port.changelist, io->nfds_max * sizeof(int));
1436 io->Port.changemap = Xrealloc(io->Port.changemap, io->nfds_max * sizeof(int));
1437
1438 for (i = omax; i < io->Port.nevents_max; i++) {
1439 io->Port.changelist[i] = -1;
1440 io->Port.changemap[i] = -1;
1441 }
1442 io_resize(io, fd);
1443 }
1444 }
1445
port_io_destroy(struct io_context * io)1446 static void port_io_destroy(struct io_context *io)
1447 {
1448 free(io->Port.eventlist);
1449 free(io->Port.changelist);
1450 free(io->Port.changemap);
1451 close(io->Port.fd);
1452 }
1453 #endif
1454
insert_isc(rb_tree_t * t,struct io_sched * isc)1455 static void insert_isc(rb_tree_t * t, struct io_sched *isc)
1456 {
1457 while (!RB_insert(t, isc)) {
1458 isc->time_when.tv_usec++;
1459 if (isc->time_when.tv_usec > 1000000)
1460 isc->time_when.tv_usec -= 1000000, isc->time_when.tv_sec++;
1461 isc->time_real.tv_sec = isc->time_when.tv_sec;
1462 isc->time_real.tv_usec = isc->time_when.tv_usec;
1463 }
1464 }
1465
io_sched_add(struct io_context * io,void * data,void * proc,time_t tv_sec,suseconds_t tv_usec)1466 void io_sched_add(struct io_context *io, void *data, void *proc, time_t tv_sec, suseconds_t tv_usec)
1467 {
1468 rb_node_t *rbn;
1469 struct io_event *ioe = Xcalloc(1, sizeof(struct io_event));
1470 struct io_sched *isc, is;
1471
1472 Debug((DEBUG_PROC, "io_sched_add %p %ld.%ld\n", data, (long) tv_sec, (long) tv_usec));
1473
1474 gettimeofday(&io_now, NULL);
1475
1476 is.data = data;
1477 rbn = RB_search(io->events_by_data, &is);
1478
1479 ioe->proc = proc;
1480 ioe->time_wait.tv_sec = tv_sec;
1481 ioe->time_wait.tv_usec = tv_usec;
1482
1483 if (rbn) {
1484 isc = RB_payload(rbn, struct io_sched *);
1485 ioe->next = isc->event;
1486 RB_search_and_delete(io->events_by_time, isc);
1487 } else {
1488 isc = Xcalloc(1, sizeof(struct io_sched));
1489 isc->data = data;
1490 RB_insert(io->events_by_data, isc);
1491 }
1492 isc->event = ioe;
1493 isc->time_when.tv_sec = io_now.tv_sec + ioe->time_wait.tv_sec;
1494 isc->time_when.tv_usec = io_now.tv_usec + ioe->time_wait.tv_usec;
1495 if (isc->time_when.tv_usec > 1000000)
1496 isc->time_when.tv_usec -= 1000000, isc->time_when.tv_sec++;
1497 isc->time_real.tv_sec = isc->time_when.tv_sec;
1498 isc->time_real.tv_usec = isc->time_when.tv_usec;
1499 insert_isc(io->events_by_time, isc);
1500 }
1501
io_sched_app(struct io_context * io,void * data,void * proc,time_t tv_sec,suseconds_t tv_usec)1502 void io_sched_app(struct io_context *io, void *data, void *proc, time_t tv_sec, suseconds_t tv_usec)
1503 {
1504 rb_node_t *rbn;
1505 struct io_event *ioe = Xcalloc(1, sizeof(struct io_event));
1506 struct io_sched is;
1507
1508 DebugIn(DEBUG_PROC);
1509
1510 is.data = data;
1511 rbn = RB_search(io->events_by_data, &is);
1512
1513 ioe->proc = proc;
1514 ioe->time_wait.tv_sec = tv_sec;
1515 ioe->time_wait.tv_usec = tv_usec;
1516
1517 if (rbn) {
1518 struct io_event *i = RB_payload(rbn, struct io_sched *)->event;
1519 while (i->next)
1520 i = i->next;
1521 i->next = ioe;
1522 } else {
1523 struct io_sched *isc;
1524 isc = Xcalloc(1, sizeof(struct io_sched));
1525 isc->data = data;
1526 isc->event = ioe;
1527 isc->time_when.tv_sec = io_now.tv_sec + ioe->time_wait.tv_sec;
1528 isc->time_when.tv_usec = io_now.tv_usec + ioe->time_wait.tv_usec;
1529 if (isc->time_when.tv_usec > 1000000)
1530 isc->time_when.tv_usec -= 1000000, isc->time_when.tv_sec++;
1531 isc->time_real.tv_sec = isc->time_when.tv_sec;
1532 isc->time_real.tv_usec = isc->time_when.tv_usec;
1533 RB_insert(io->events_by_data, isc);
1534 insert_isc(io->events_by_time, isc);
1535 }
1536
1537 DebugOut(DEBUG_PROC);
1538 }
1539
io_sched_pop(struct io_context * io,void * data)1540 void *io_sched_pop(struct io_context *io, void *data)
1541 {
1542 rb_node_t *rbn;
1543 struct io_sched is;
1544 void *result = NULL;
1545
1546 DebugIn(DEBUG_PROC);
1547
1548 is.data = data;
1549 rbn = RB_search(io->events_by_data, &is);
1550 if (rbn) {
1551 struct io_sched *isc = RB_payload(rbn, struct io_sched *);
1552 struct io_event *i = isc->event;
1553
1554 isc->event = i->next;
1555 free(i);
1556 RB_search_and_delete(io->events_by_time, isc);
1557 if (isc->event) {
1558 isc->time_when.tv_sec = io_now.tv_sec + isc->event->time_wait.tv_sec;
1559 isc->time_when.tv_usec = io_now.tv_usec + isc->event->time_wait.tv_usec;
1560 if (isc->time_when.tv_usec > 1000000)
1561 isc->time_when.tv_usec -= 1000000, isc->time_when.tv_sec++;
1562 isc->time_real.tv_sec = isc->time_when.tv_sec;
1563 isc->time_real.tv_usec = isc->time_when.tv_usec;
1564 insert_isc(io->events_by_time, isc);
1565 result = isc->event->proc;
1566 } else {
1567 RB_delete(io->events_by_data, rbn);
1568 free(isc);
1569 }
1570 }
1571 DebugOut(DEBUG_PROC);
1572 return result;
1573 }
1574
io_sched_del(struct io_context * io,void * data,void * proc)1575 int io_sched_del(struct io_context *io, void *data, void *proc)
1576 {
1577 int result = 0;
1578 struct io_sched is;
1579 rb_node_t *rbn;
1580
1581 DebugIn(DEBUG_PROC);
1582
1583 is.data = data;
1584 rbn = RB_search(io->events_by_data, &is);
1585 if (rbn) {
1586 struct io_sched *isc = RB_payload(rbn, struct io_sched *);
1587 struct io_event *i = isc->event;
1588 if (i) {
1589 if (i->proc == proc)
1590 io_sched_pop(io, data), result = -1;
1591 else {
1592 struct io_event *next;
1593 while (i->next)
1594 if (i->next->proc == proc) {
1595 next = i->next;
1596 i->next = next->next;
1597 free(next);
1598 result = -1;
1599 } else
1600 i = i->next;
1601 }
1602 }
1603 }
1604 DebugOut(DEBUG_PROC);
1605 return result;
1606 }
1607
io_sched_renew_proc(struct io_context * io,void * data,void * proc)1608 int io_sched_renew_proc(struct io_context *io, void *data, void *proc)
1609 {
1610 struct io_sched is;
1611 rb_node_t *rbn;
1612 Debug((DEBUG_PROC, "io_sched_renew_proc %p\n", data));
1613 is.data = data;
1614 rbn = RB_search(io->events_by_data, &is);
1615 if (rbn) {
1616 struct io_sched *isc = RB_payload(rbn, struct io_sched *);
1617 if (isc && isc->event && (!proc || isc->event->proc == proc)) {
1618 isc->time_real.tv_sec = io_now.tv_sec + isc->event->time_wait.tv_sec;
1619 isc->time_real.tv_usec = io_now.tv_usec + isc->event->time_wait.tv_usec;
1620 if (isc->time_real.tv_usec > 1000000)
1621 isc->time_real.tv_usec -= 1000000, isc->time_real.tv_sec++;
1622 Debug((DEBUG_PROC, "to be fired at %.8lx:%.8lx\n", (long) (isc->time_real.tv_sec), (long) (isc->time_real.tv_usec)));
1623 return 0;
1624 }
1625 }
1626 return -1;
1627 }
1628
io_sched_peek(struct io_context * io,void * data)1629 void *io_sched_peek(struct io_context *io, void *data)
1630 {
1631 struct io_event *ioe;
1632 rb_node_t *rbn;
1633 struct io_sched is;
1634 is.data = data;
1635 rbn = RB_search(io->events_by_data, &is);
1636 if (rbn && (ioe = RB_payload(rbn, struct io_sched *)->event))
1637 return (void *) (ioe->proc);
1638 return NULL;
1639 }
1640
io_sched_peek_time(struct io_context * io,void * data)1641 struct timeval *io_sched_peek_time(struct io_context *io, void *data)
1642 {
1643 struct io_sched *ios;
1644 rb_node_t *rbn;
1645 struct io_sched is;
1646 is.data = data;
1647 rbn = RB_search(io->events_by_data, &is);
1648 if (rbn && (ios = RB_payload(rbn, struct io_sched *))->event)
1649 return &ios->time_real;
1650 return NULL;
1651 }
1652
io_reschedule(struct io_context * io)1653 static void io_reschedule(struct io_context *io)
1654 {
1655 rb_node_t *rbn, *rbnext;
1656 struct io_sched *ios;
1657
1658 for (rbn = RB_first(io->events_by_time);
1659 rbn &&
1660 ((ios =
1661 RB_payload(rbn,
1662 struct io_sched *))->time_when.tv_sec < io_now.tv_sec
1663 || (ios->time_when.tv_sec == io_now.tv_sec && ios->time_when.tv_usec <= io_now.tv_usec)); rbn = rbnext) {
1664 rbnext = RB_next(rbn);
1665 if (ios->time_when.tv_sec != ios->time_real.tv_sec || ios->time_when.tv_usec != ios->time_real.tv_usec) {
1666 RB_delete(io->events_by_time, rbn);
1667 ios->time_when.tv_sec = ios->time_real.tv_sec;
1668 ios->time_when.tv_usec = ios->time_real.tv_usec;
1669 insert_isc(io->events_by_time, ios);
1670 Debug((DEBUG_PROC, " rescheduled at %.8lx:%.8lx (%lds)\n",
1671 (long) (ios->time_when.tv_sec), (long) (ios->time_when.tv_usec), (long) (ios->time_when.tv_sec) - (long) io_now.tv_sec));
1672 }
1673 }
1674 }
1675
io_sched_exec(struct io_context * io)1676 int io_sched_exec(struct io_context *io)
1677 {
1678 rb_node_t *rbn, *rbnext;
1679 int poll_timeout;
1680 struct io_sched *ios;
1681
1682 Debug((DEBUG_PROC, "io_sched_exec (%p)\n", io));
1683
1684 io_reschedule(io);
1685
1686 for (rbn = RB_first(io->events_by_time);
1687 rbn &&
1688 ((ios =
1689 RB_payload(rbn,
1690 struct io_sched *))->time_when.tv_sec < io_now.tv_sec
1691 || (ios->time_when.tv_sec == io_now.tv_sec && ios->time_when.tv_usec <= io_now.tv_usec)); rbn = rbnext) {
1692 rbnext = RB_next(rbn);
1693 Debug((DEBUG_PROC, " executing ...\n"));
1694 ((void (*)(void *, int)) (ios->event->proc)) (ios->data, -1);
1695 Debug((DEBUG_PROC, "... done.\n"));
1696 }
1697
1698 io_reschedule(io);
1699
1700 rbn = RB_first(io->events_by_time);
1701 if (rbn) {
1702 ios = RB_payload(rbn, struct io_sched *);
1703 poll_timeout = 1 + (int) ((ios->time_when.tv_sec - io_now.tv_sec) * 1000) + (int) ((ios->time_when.tv_usec - io_now.tv_usec) / 1000);
1704
1705 Debug((DEBUG_PROC, "poll_timeout = %dms\n", poll_timeout));
1706 } else
1707 poll_timeout = -1;
1708
1709 return poll_timeout;
1710 }
1711
io_main(struct io_context * io)1712 void io_main(struct io_context *io)
1713 {
1714 Debug((DEBUG_PROC, "io_main (%p)\n", io));
1715 do {
1716 gettimeofday(&io_now, NULL);
1717 io_poll(io, io_sched_exec(io));
1718 }
1719 while (1);
1720 }
1721
io_init()1722 struct io_context *io_init()
1723 {
1724 static int once = 0;
1725 void (*mech_io_init) (struct io_context *);
1726 int mode = 0
1727 #ifdef WITH_POLL
1728 | IO_MODE_poll
1729 #endif
1730 #ifdef WITH_EPOLL
1731 | IO_MODE_epoll
1732 #endif
1733 #ifdef WITH_DEVPOLL
1734 | IO_MODE_devpoll
1735 #endif
1736 #ifdef WITH_KQUEUE
1737 | IO_MODE_kqueue
1738 #endif
1739 #ifdef WITH_SELECT
1740 | IO_MODE_select
1741 #endif
1742 #ifdef WITH_PORT
1743 | IO_MODE_port
1744 #endif
1745 ;
1746 char *mech, *e;
1747 int i;
1748 struct rlimit rlim;
1749 struct io_context *io = Xcalloc(1, sizeof(struct io_context));
1750
1751 Debug((DEBUG_PROC, "io_init\n"));
1752
1753 if (getrlimit(RLIMIT_NOFILE, &rlim)) {
1754 logerr("getrlimit");
1755 exit(EX_SOFTWARE);
1756 }
1757
1758 if ((e = getenv("IO_POLL_MECHANISM")))
1759 mode &= atoi(e);
1760
1761 mech_io_poll_finish = NULL;
1762 mech_io_close = NULL;
1763
1764 #define EVENT_MECHANISM_DEFUNCT "%s event mechanism is unavailable"
1765
1766 #ifdef WITH_KQUEUE
1767 if (mode & IO_MODE_kqueue) {
1768 int fd = kqueue();
1769 mech = "kqueue";
1770 if (fd > -1) {
1771 close(fd);
1772 mech_io_poll = kqueue_io_poll;
1773 mech_io_set_i = kqueue_io_set_i;
1774 mech_io_set_o = kqueue_io_set_o;
1775 mech_io_clr_i = kqueue_io_clr_i;
1776 mech_io_clr_o = kqueue_io_clr_o;
1777 mech_io_register = kqueue_io_register;
1778 mech_io_unregister = kqueue_io_unregister;
1779 mech_io_destroy = kqueue_io_destroy;
1780 mech_io_init = kqueue_io_init;
1781 mech_io_close = kqueue_io_close;
1782 goto gotit;
1783 }
1784 logerr(EVENT_MECHANISM_DEFUNCT, mech);
1785 }
1786 #endif
1787
1788 #ifdef WITH_EPOLL
1789 if (mode & IO_MODE_epoll) {
1790 int fd = epoll_create(1);
1791 mech = "epoll";
1792 if (fd > -1) {
1793 close(fd);
1794 mech_io_poll = epoll_io_poll;
1795 mech_io_set_i = epoll_io_set_i;
1796 mech_io_set_o = epoll_io_set_o;
1797 mech_io_clr_i = epoll_io_clr_i;
1798 mech_io_clr_o = epoll_io_clr_o;
1799 mech_io_register = epoll_io_register;
1800 mech_io_unregister = epoll_io_unregister;
1801 mech_io_destroy = epoll_io_destroy;
1802 mech_io_init = epoll_io_init;
1803 mech_io_close = epoll_io_close;
1804 goto gotit;
1805 }
1806 logerr(EVENT_MECHANISM_DEFUNCT, mech);
1807 }
1808 #endif
1809
1810 #ifdef WITH_POLL
1811 if (mode & IO_MODE_poll) {
1812 mech = "poll";
1813 mech_io_poll = poll_io_poll;
1814 mech_io_set_i = poll_io_set_i;
1815 mech_io_set_o = poll_io_set_o;
1816 mech_io_clr_i = poll_io_clr_i;
1817 mech_io_clr_o = poll_io_clr_o;
1818 mech_io_register = poll_io_register;
1819 mech_io_unregister = poll_io_unregister;
1820 mech_io_destroy = poll_io_destroy;
1821 mech_io_init = poll_io_init;
1822 goto gotit;
1823 }
1824 #endif
1825
1826 #ifdef WITH_DEVPOLL
1827 // Solaris /dev/poll may or may not work correctly. Placed *after* standard
1828 // poll on purpose, so it won't be used unless specified.
1829 if (mode & IO_MODE_devpoll) {
1830 int fd = open("/dev/poll", O_RDWR);
1831 mech = "/dev/poll";
1832 if (fd > -1) {
1833 close(fd);
1834 mech_io_poll = devpoll_io_poll;
1835 mech_io_set_i = devpoll_io_set_i;
1836 mech_io_set_o = devpoll_io_set_o;
1837 mech_io_clr_i = devpoll_io_clr_i;
1838 mech_io_clr_o = devpoll_io_clr_o;
1839 mech_io_register = devpoll_io_register;
1840 mech_io_unregister = devpoll_io_unregister;
1841 mech_io_destroy = devpoll_io_destroy;
1842 mech_io_init = devpoll_io_init;
1843 mech_io_close = devpoll_io_close;
1844 if (rlim.rlim_max > OPEN_MAX)
1845 rlim.rlim_max = OPEN_MAX;
1846 goto gotit;
1847 }
1848 logerr(EVENT_MECHANISM_DEFUNCT, mech);
1849 }
1850 #endif
1851
1852 #ifdef WITH_PORT
1853 // Solaris port(2) may or may not work correctly. Placed *after* standard poll
1854 // on purpose, so it won't be used unless specified.
1855 if (mode & IO_MODE_port) {
1856 int fd = port_create();
1857 mech = "port";
1858 if (fd > -1) {
1859 close(fd);
1860 mech_io_poll = port_io_poll;
1861 mech_io_poll_finish = port_io_poll_finish;
1862 mech_io_set_i = port_io_set_i;
1863 mech_io_set_o = port_io_set_o;
1864 mech_io_clr_i = port_io_clr_i;
1865 mech_io_clr_o = port_io_clr_o;
1866 mech_io_register = port_io_register;
1867 mech_io_unregister = port_io_unregister;
1868 mech_io_destroy = port_io_destroy;
1869 mech_io_init = port_io_init;
1870 mech_io_close = port_io_close;
1871 goto gotit;
1872 }
1873 logerr(EVENT_MECHANISM_DEFUNCT, mech);
1874 }
1875 #endif
1876
1877 #ifdef WITH_SELECT
1878 // select(2) comes last and won't be used unless manually chosen.
1879 if (mode & IO_MODE_select) {
1880 mech = "select";
1881 mech_io_poll = select_io_poll;
1882 mech_io_set_i = select_io_set_i;
1883 mech_io_set_o = select_io_set_o;
1884 mech_io_clr_i = select_io_clr_i;
1885 mech_io_clr_o = select_io_clr_o;
1886 mech_io_register = select_io_register;
1887 mech_io_unregister = select_io_unregister;
1888 mech_io_destroy = select_io_destroy;
1889 mech_io_init = select_io_init;
1890 if (rlim.rlim_max > FD_SETSIZE)
1891 rlim.rlim_max = FD_SETSIZE;
1892 goto gotit;
1893 }
1894 #endif
1895
1896 logmsg("no working event notification mechanism found");
1897 abort();
1898
1899 gotit:
1900 if (!once) {
1901 logmsg("%s event notification mechanism is being used", mech);
1902 once++;
1903 }
1904
1905 rlim.rlim_cur = rlim.rlim_max;
1906 setrlimit(RLIMIT_NOFILE, &rlim);
1907 getrlimit(RLIMIT_NOFILE, &rlim);
1908 io->nfds_limit = (int) rlim.rlim_cur;
1909 io->nfds_max = MINIMUM(io->nfds_limit, ARRAYINC);
1910 io->handler = Xcalloc(io->nfds_max, sizeof(struct io_handler));
1911
1912 mech_io_init(io);
1913
1914 io->events_by_time = RB_tree_new(cmp_tv, NULL);
1915 io->events_by_data = RB_tree_new(cmp_data, NULL);
1916 io->io_invalid_i = (void *) io_invalid_i;
1917 io->io_invalid_o = (void *) io_invalid_o;
1918 io->io_invalid_e = (void *) io_invalid_e;
1919 io->io_invalid_h = (void *) io_invalid_h;
1920
1921 io->rcache_map = Xcalloc(io->nfds_max, sizeof(int));
1922 for (i = 0; i < io->nfds_max; i++)
1923 io->rcache_map[i] = -1;
1924 io->rcache = Xcalloc(io->nfds_max, sizeof(struct event_cache));
1925
1926 gettimeofday(&io_now, NULL);
1927
1928 return io;
1929 }
1930
io_resize(struct io_context * io,int fd)1931 static void io_resize(struct io_context *io, int fd)
1932 {
1933 int i, omax = io->nfds_max;
1934
1935 if (io->nfds_limit == io->nfds_max) {
1936 logmsg("BUG: Can handle at most %d file descriptors", io->nfds_limit);
1937 abort();
1938 }
1939
1940 io->nfds_max = MINIMUM(io->nfds_limit, MAXIMUM(fd + 1, io->nfds_max + ARRAYINC));
1941
1942 if (io->nfds_max <= fd) {
1943 logmsg("BUG: Can handle at file descriptor %d", fd);
1944 abort();
1945 }
1946
1947 io->handler = Xrealloc(io->handler, io->nfds_max * sizeof(struct io_handler));
1948
1949 memset(&io->handler[omax], 0, (io->nfds_max - omax) * sizeof(struct io_handler));
1950
1951 io->rcache_map = Xrealloc(io->rcache_map, io->nfds_max * sizeof(int));
1952
1953 for (i = omax; i < io->nfds_max; i++)
1954 io->rcache_map[i] = -1;
1955
1956 io->rcache = Xrealloc(io->rcache, io->nfds_max * sizeof(struct event_cache));
1957 }
1958
io_get_cb_i(struct io_context * io,int fd)1959 void *io_get_cb_i(struct io_context *io, int fd)
1960 {
1961 return io->handler[fd].i;
1962 }
1963
io_get_cb_o(struct io_context * io,int fd)1964 void *io_get_cb_o(struct io_context *io, int fd)
1965 {
1966 return io->handler[fd].o;
1967 }
1968
io_get_cb_h(struct io_context * io,int fd)1969 void *io_get_cb_h(struct io_context *io, int fd)
1970 {
1971 return io->handler[fd].h;
1972 }
1973
io_get_cb_e(struct io_context * io,int fd)1974 void *io_get_cb_e(struct io_context *io, int fd)
1975 {
1976 return io->handler[fd].e;
1977 }
1978
io_get_ctx(struct io_context * io,int fd)1979 void *io_get_ctx(struct io_context *io, int fd)
1980 {
1981 return io->handler[fd].data;
1982 }
1983
io_want_read(struct io_context * io,int fd)1984 int io_want_read(struct io_context *io, int fd)
1985 {
1986 return io->handler[fd].want_read_app;
1987 }
1988
io_want_write(struct io_context * io,int fd)1989 int io_want_write(struct io_context *io, int fd)
1990 {
1991 return io->handler[fd].want_write_app;
1992 }
1993
io_set_cb_i(struct io_context * io,int fd,void * f)1994 void io_set_cb_i(struct io_context *io, int fd, void *f)
1995 {
1996 io->handler[fd].i_app = f;
1997 if (!io->handler[fd].reneg)
1998 io->handler[fd].i = f;
1999 }
2000
io_set_cb_o(struct io_context * io,int fd,void * f)2001 void io_set_cb_o(struct io_context *io, int fd, void *f)
2002 {
2003 io->handler[fd].o_app = f;
2004 if (!io->handler[fd].reneg)
2005 io->handler[fd].o = f;
2006 }
2007
io_set_cb_e(struct io_context * io,int fd,void * f)2008 void io_set_cb_e(struct io_context *io, int fd, void *f)
2009 {
2010 io->handler[fd].e = f;
2011 }
2012
io_set_cb_h(struct io_context * io,int fd,void * f)2013 void io_set_cb_h(struct io_context *io, int fd, void *f)
2014 {
2015 io->handler[fd].h = f;
2016 }
2017
io_set_cb_inv_i(struct io_context * io,void * f)2018 void io_set_cb_inv_i(struct io_context *io, void *f)
2019 {
2020 io->io_invalid_i = f;
2021 }
2022
io_set_cb_inv_o(struct io_context * io,void * f)2023 void io_set_cb_inv_o(struct io_context *io, void *f)
2024 {
2025 io->io_invalid_o = f;
2026 }
2027
io_set_cb_inv_h(struct io_context * io,void * f)2028 void io_set_cb_inv_h(struct io_context *io, void *f)
2029 {
2030 io->io_invalid_h = f;
2031 }
2032
io_set_cb_inv_e(struct io_context * io,void * f)2033 void io_set_cb_inv_e(struct io_context *io, void *f)
2034 {
2035 io->io_invalid_e = f;
2036 }
2037
io_clr_cb_i(struct io_context * io,int fd)2038 void io_clr_cb_i(struct io_context *io, int fd)
2039 {
2040 io->handler[fd].want_read_app = 0;
2041 #ifdef WITH_SSL
2042 if (io->handler[fd].want_read_ssl)
2043 return;
2044 #endif
2045 io->handler[fd].want_read = 0;
2046 io_set_cb_i(io, fd, io->io_invalid_i);
2047 io_clr_i(io, fd);
2048 }
2049
io_clr_cb_o(struct io_context * io,int fd)2050 void io_clr_cb_o(struct io_context *io, int fd)
2051 {
2052 io->handler[fd].want_write_app = 0;
2053 #ifdef WITH_SSL
2054 if (io->handler[fd].want_write_ssl)
2055 return;
2056 #endif
2057 io->handler[fd].want_write = 0;
2058 io_set_cb_o(io, fd, io->io_invalid_o);
2059 io_clr_o(io, fd);
2060 }
2061
io_clr_cb_e(struct io_context * io,int fd)2062 void io_clr_cb_e(struct io_context *io, int fd)
2063 {
2064 io_set_cb_e(io, fd, io->io_invalid_e);
2065 }
2066
io_clr_cb_h(struct io_context * io,int fd)2067 void io_clr_cb_h(struct io_context *io, int fd)
2068 {
2069 io_set_cb_h(io, fd, io->io_invalid_h);
2070 }
2071
2072 #ifdef WITH_SSL
2073 #include <openssl/ssl.h>
2074
io_SSL_rw(SSL * ssl,struct io_context * io,int fd,void * cb,int res)2075 static ssize_t io_SSL_rw(SSL * ssl, struct io_context *io, int fd, void *cb, int res)
2076 {
2077 DebugIn(DEBUG_PROC | DEBUG_NET);
2078 if (io->handler[fd].reneg && !SSL_want_read(ssl)
2079 && !SSL_want_write(ssl)) {
2080 io->handler[fd].reneg = 0;
2081 io_SSL_clr_i(io, fd);
2082 io_SSL_clr_o(io, fd);
2083 io->handler[fd].i = io->handler[fd].i_app;
2084 io->handler[fd].o = io->handler[fd].o_app;
2085 } else if (res < 0 && !io->handler[fd].reneg && (SSL_want_read(ssl) || SSL_want_write(ssl))) {
2086 Debug((DEBUG_PROC | DEBUG_NET, "TLS shutdown or renegotiation initiated " "(res = %d, error = %d)\n", res, SSL_get_error(ssl, res)));
2087 io->handler[fd].reneg = 1;
2088 io->handler[fd].i = cb;
2089 io->handler[fd].o = cb;
2090
2091 if (SSL_want_read(ssl)) {
2092 io_SSL_clr_o(io, fd);
2093 io_SSL_set_i(io, fd);
2094 } else {
2095 io_SSL_clr_i(io, fd);
2096 io_SSL_set_o(io, fd);
2097 }
2098
2099 errno = EAGAIN;
2100 }
2101 DebugOut(DEBUG_PROC | DEBUG_NET);
2102 return res;
2103 }
2104
io_SSL_read(SSL * ssl,void * buf,size_t num,struct io_context * io,int fd,void * cb)2105 ssize_t io_SSL_read(SSL * ssl, void *buf, size_t num, struct io_context * io, int fd, void *cb)
2106 {
2107 return io_SSL_rw(ssl, io, fd, cb, SSL_read(ssl, buf, (int) num));
2108 }
2109
io_SSL_write(SSL * ssl,void * buf,size_t num,struct io_context * io,int fd,void * cb)2110 ssize_t io_SSL_write(SSL * ssl, void *buf, size_t num, struct io_context *io, int fd, void *cb)
2111 {
2112 return io_SSL_rw(ssl, io, fd, cb, SSL_write(ssl, buf, (int) num));
2113 }
2114
io_SSL_shutdown(SSL * ssl,struct io_context * io,int fd,void * cb)2115 int io_SSL_shutdown(SSL * ssl, struct io_context *io, int fd, void *cb)
2116 {
2117 Debug((DEBUG_PROC | DEBUG_NET, "%s\n", __func__));
2118 int res = SSL_shutdown(ssl);
2119 Debug((DEBUG_PROC | DEBUG_NET, "SSL_shutdown = %d\n", res));
2120 if (res < 1)
2121 res = -1;
2122 return (int) io_SSL_rw(ssl, io, fd, cb, res);
2123 }
2124 #endif /* WITH_SSL */
2125
io_clone(struct io_context * io,int to,int from)2126 void io_clone(struct io_context *io, int to, int from)
2127 {
2128 Debug((DEBUG_PROC, "io_clone (%d, %d)\n", to, from));
2129
2130 io->handler[to] = io->handler[from];
2131
2132 if (io->handler[to].want_read) {
2133 io->handler[to].want_read = 0;
2134 mech_io_set_i(io, to);
2135 }
2136 if (io->handler[to].want_write) {
2137 io->handler[to].want_write = 0;
2138 mech_io_set_o(io, to);
2139 }
2140 }
2141