1 /*
2  * io_sched.c
3  * (C)2001-2011 by Marc Huber <Marc.Huber@web.de>
4  * All rights reserved.
5  *
6  * $Id: io_sched.c,v 1.58 2019/03/03 12:05:59 marc Exp $
7  *
8  */
9 
10 #define __IO_SCHED_C__
11 
12 #include "misc/sysconf.h"
13 
14 #include <sys/types.h>
15 #include <sys/poll.h>
16 #include <sys/time.h>
17 #include <unistd.h>
18 #include <sys/time.h>
19 #include <sys/resource.h>
20 #include <unistd.h>
21 #include <sysexits.h>
22 #include <string.h>
23 #include <errno.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <limits.h>
27 
28 #include "misc/io_sched.h"
29 #include "misc/rb.h"
30 #include "mavis/debug.h"
31 #include "mavis/log.h"
32 #include "misc/memops.h"
33 
34 static const char rcsid[] __attribute__ ((used)) = "$Id: io_sched.c,v 1.58 2019/03/03 12:05:59 marc Exp $";
35 
36 #ifdef WITH_KQUEUE
37 #  include <sys/event.h>
38 #endif
39 #ifdef WITH_EPOLL
40 #  include <sys/epoll.h>
41 #endif
42 #ifdef WITH_POLL
43 #  include <sys/poll.h>
44 #endif
45 #ifdef WITH_DEVPOLL
46 #  include <sys/devpoll.h>
47 #endif
48 #ifdef WITH_SELECT
49 #  include <sys/select.h>
50 #endif
51 #ifdef WITH_PORT
52 #  include <port.h>
53 #endif
54 
55 #define IO_MODE_kqueue	(1 << 0)
56 #define IO_MODE_devpoll	(1 << 1)
57 #define IO_MODE_epoll	(1 << 2)
58 #define IO_MODE_poll	(1 << 3)
59 #define IO_MODE_select	(1 << 4)
60 #define IO_MODE_port	(1 << 5)
61 
62 #define ARRAYINC 128
63 #define LISTINC 128
64 
65 struct io_handler {
66     void *i;			/* input handler */
67     void *o;			/* output handler */
68     void *i_app;		/* application input handler */
69     void *o_app;		/* application output handler */
70     void *e;			/* error handler */
71     void *h;			/* hangup handler */
72     u_int want_read:1;		/* interested in reading */
73     u_int want_write:1;		/* interested in writing */
74     u_int want_read_app:1;	/* App interested in reading */
75     u_int want_write_app:1;	/* App interested in writing */
76     u_int want_read_ssl:1;	/* TLS interested in reading */
77     u_int want_write_ssl:1;	/* TLS interested in writing */
78     u_int reneg:1;		/* TLS renegotiation active */
79     void *data;			/* opaque context information */
80 };
81 
82 #ifdef WITH_KQUEUE
83 struct kqueue_io_context {
84     struct kevent *changelist;
85     struct kevent *eventlist;
86     int nchanges;
87     int nchanges_max;
88     int nevents_max;
89     int fd;
90 };
91 #endif
92 
93 #ifdef WITH_EPOLL
94 struct epoll_io_context {
95     int *changelist;
96     int *changemap;
97     int *diskfilemap;
98     int *diskfile;
99     struct epoll_event *eventlist;
100     int nchanges;
101     int ndiskfile;
102     int nevents_max;
103     int fd;
104 };
105 #endif
106 
107 #ifdef WITH_DEVPOLL
108 struct devpoll_io_context {
109     struct pollfd *changelist;
110     struct pollfd *eventlist;
111     int nchanges;
112     int nchanges_max;
113     int nevents_max;
114     int fd;
115 };
116 #endif
117 
118 #ifdef WITH_POLL
119 struct poll_io_context {
120     struct pollfd *ufds;
121     int nfds;
122     int *pax;
123 };
124 #endif
125 
126 #ifdef WITH_SELECT
127 struct select_io_context {
128     fd_set rfds;
129     fd_set wfds;
130     fd_set efds;
131     int nfds;
132 };
133 #endif
134 
135 #ifdef WITH_PORT
136 struct port_io_context {
137     int *changelist;
138     int *changemap;
139     port_event_t *eventlist;
140     int nchanges;
141     int fd;
142     int nfds;
143     int nevents_max;
144 };
145 #endif
146 
147 struct event_cache {
148     int fd;
149     int events;
150 };
151 
152 struct io_context {
153     struct io_handler *handler;
154     rb_tree_t *events_by_data;
155     rb_tree_t *events_by_time;
156     void *io_invalid_i;
157     void *io_invalid_o;
158     void *io_invalid_h;
159     void *io_invalid_e;
160     int *rcache_map;		/* fd -> rcache map index */
161     struct event_cache *rcache;
162     int nfds_limit;
163     int nfds_max;
164     union {
165 #ifdef WITH_SELECT
166 	struct select_io_context select;
167 #define Select mechanism.select
168 #endif
169 #ifdef WITH_POLL
170 	struct poll_io_context poll;
171 #define Poll mechanism.poll
172 #endif
173 #ifdef WITH_EPOLL
174 	struct epoll_io_context epoll;
175 #define Epoll mechanism.epoll
176 #endif
177 #ifdef WITH_DEVPOLL
178 	struct devpoll_io_context devpoll;
179 #define Devpoll mechanism.devpoll
180 #endif
181 #ifdef WITH_KQUEUE
182 	struct kqueue_io_context kqueue;
183 #define Kqueue mechanism.kqueue
184 #endif
185 #ifdef WITH_PORT
186 	struct port_io_context port;
187 #define Port mechanism.port
188 #endif
189     } mechanism;
190 };
191 
192 struct io_event {
193     void *proc;
194     struct timeval time_wait;
195     struct io_event *next;
196 };
197 
198 struct io_sched {
199     void *data;			/* context pointer, e.g. */
200     struct timeval time_when;	/* when next event is triggered */
201     struct timeval time_real;	/* when next event should be triggered */
202     struct io_event *event;	/* event pointer */
203 };
204 
205 static void (*mech_io_set_i) (struct io_context *, int);
206 static void (*mech_io_set_o) (struct io_context *, int);
207 static void (*mech_io_clr_i) (struct io_context *, int);
208 static void (*mech_io_clr_o) (struct io_context *, int);
209 static void (*mech_io_register) (struct io_context *, int);
210 static void (*mech_io_unregister) (struct io_context *, int);
211 static void (*mech_io_close) (struct io_context *, int);
212 static void (*mech_io_destroy) (struct io_context *);
213 static int (*mech_io_poll) (struct io_context *, int, int *);
214 static void (*mech_io_poll_finish) (struct io_context *, int);
215 
216 static void io_resize(struct io_context *, int fd);
217 
MINIMUM(int a,int b)218 static __inline__ int MINIMUM(int a, int b)
219 {
220     return (a < b) ? a : b;
221 }
222 
MAXIMUM(int a,int b)223 static __inline__ int MAXIMUM(int a, int b)
224 {
225     return (a < b) ? b : a;
226 }
227 
228 #define SIOS(A) ((struct io_sched *)(A))
229 
cmp_tv(const void * a,const void * b)230 static int cmp_tv(const void *a, const void *b)
231 {
232     if (SIOS(a)->time_when.tv_sec < SIOS(b)->time_when.tv_sec)
233 	return -1;
234     if (SIOS(a)->time_when.tv_sec > SIOS(b)->time_when.tv_sec)
235 	return +1;
236     if (SIOS(a)->time_when.tv_usec < SIOS(b)->time_when.tv_usec)
237 	return -1;
238     if (SIOS(a)->time_when.tv_usec > SIOS(b)->time_when.tv_usec)
239 	return +1;
240     if (a < b)
241 	return -1;
242     if (a > b)
243 	return +1;
244     return 0;
245 }
246 
cmp_data(const void * a,const void * b)247 static int cmp_data(const void *a, const void *b)
248 {
249     if (SIOS(a)->data < SIOS(b)->data)
250 	return -1;
251     if (SIOS(a)->data > SIOS(b)->data)
252 	return +1;
253     return 0;
254 }
255 
io_invalid_i(void * v,int cur)256 static void io_invalid_i(void *v __attribute__ ((unused)), int cur)
257 {
258     logmsg("io_invalid_i (%d)", cur);
259     abort();
260 }
261 
io_invalid_o(void * v,int cur)262 static void io_invalid_o(void *v __attribute__ ((unused)), int cur)
263 {
264     logmsg("io_invalid_o (%d)", cur);
265     abort();
266 }
267 
io_invalid_e(void * v,int cur)268 static void io_invalid_e(void *v __attribute__ ((unused)), int cur)
269 {
270     logmsg("io_invalid_e (%d)", cur);
271     abort();
272 }
273 
io_invalid_h(void * v,int cur)274 static void io_invalid_h(void *v __attribute__ ((unused)), int cur)
275 {
276     logmsg("io_invalid_h (%d)", cur);
277     abort();
278 }
279 
io_is_invalid_i(struct io_context * io,int cur)280 int io_is_invalid_i(struct io_context *io, int cur)
281 {
282     return (io->handler[cur].i_app == io->io_invalid_i);
283 }
284 
io_is_invalid_o(struct io_context * io,int cur)285 int io_is_invalid_o(struct io_context *io, int cur)
286 {
287     return (io->handler[cur].o_app == io->io_invalid_o);
288 }
289 
io_is_invalid_e(struct io_context * io,int cur)290 int io_is_invalid_e(struct io_context *io, int cur)
291 {
292     return (io->handler[cur].e == io->io_invalid_e);
293 }
294 
io_is_invalid_h(struct io_context * io,int cur)295 int io_is_invalid_h(struct io_context *io, int cur)
296 {
297     return (io->handler[cur].h == io->io_invalid_h);
298 }
299 
io_poll(struct io_context * io,int poll_timeout)300 int io_poll(struct io_context *io, int poll_timeout)
301 {
302     int count, cax = 0;
303     int res = mech_io_poll(io, poll_timeout, &cax);
304 
305     for (count = 0; count < cax; count++) {
306 	int cur = io->rcache[count].fd;
307 	struct io_context *ctx;
308 
309 	if (cur > -1) {
310 	    ctx = io_get_ctx(io, cur);
311 	    Debug((DEBUG_PROC, "fd %d ctx %p\n", cur, ctx));
312 	    if (ctx) {
313 		void (*cb) (void *, int);
314 		if (io->handler[cur].want_read && (io->rcache[count].events & POLLIN))
315 		    cb = (void (*)(void *, int)) (io_get_cb_i(io, cur));
316 		else if (io->handler[cur].want_write && (io->rcache[count].events & POLLOUT)
317 			 && !(io->rcache[count].events & POLLHUP))
318 		    cb = (void (*)(void *, int)) (io_get_cb_o(io, cur));
319 		else if (io->rcache[count].events & POLLERR)
320 		    cb = (void (*)(void *, int)) (io_get_cb_e(io, cur));
321 		else if (io->rcache[count].events & POLLHUP)
322 		    cb = (void (*)(void *, int)) (io_get_cb_h(io, cur));
323 		else
324 		    cb = NULL;
325 
326 		Debug((DEBUG_PROC, "fd %d cb = %p\n", cur, cb));
327 		if (cb)
328 		    cb(ctx, cur);
329 	    }
330 	    io->rcache_map[cur] = -1;
331 	}
332 
333 	io->rcache[count].fd = -1;
334 	io->rcache[count].events = 0;
335     }
336     if (mech_io_poll_finish)
337 	mech_io_poll_finish(io, res);
338 
339     return res;
340 }
341 
io_destroy(struct io_context * io,void (* freeproc)(void *))342 struct io_context *io_destroy(struct io_context *io, void (*freeproc) (void *))
343 {
344     if (io) {
345 	RB_tree_delete(io->events_by_data);
346 	RB_tree_delete(io->events_by_time);
347 
348 	if (freeproc) {
349 	    int i;
350 	    for (i = 0; i < io->nfds_max; i++)
351 		if (io->handler[i].data)
352 		    freeproc(io->handler[i].data);
353 	}
354 
355 	mech_io_destroy(io);
356 
357 	free(io->handler);
358 	free(io->rcache_map);
359 	free(io->rcache);
360 	free(io);
361     }
362     return NULL;
363 }
364 
io_register(struct io_context * io,int fd,void * data)365 void io_register(struct io_context *io, int fd, void *data)
366 {
367     mech_io_register(io, fd);
368 
369     io->handler[fd].data = data;
370     io->handler[fd].i = io->io_invalid_i;
371     io->handler[fd].o = io->io_invalid_o;
372     io->handler[fd].i_app = io->io_invalid_i;
373     io->handler[fd].o_app = io->io_invalid_o;
374     io->handler[fd].e = io->io_invalid_e;
375     io->handler[fd].h = io->io_invalid_h;
376     io->handler[fd].want_read = 0;
377     io->handler[fd].want_write = 0;
378     io->handler[fd].want_read_app = 0;
379     io->handler[fd].want_write_app = 0;
380 #ifdef WITH_SSL
381     io->handler[fd].want_read_ssl = 0;
382     io->handler[fd].want_write_ssl = 0;
383 #endif
384 }
385 
io_unregister(struct io_context * io,int fd)386 void *io_unregister(struct io_context *io, int fd)
387 {
388     void *res = io->handler[fd].data;
389 
390     mech_io_unregister(io, fd);
391 
392     io->handler[fd].data = NULL;
393     io->handler[fd].want_read = 0;
394     io->handler[fd].want_write = 0;
395 
396     if (io->rcache_map[fd] > -1) {
397 	io->rcache[io->rcache_map[fd]].fd = -1;
398 	io->rcache[io->rcache_map[fd]].events = 0;
399 	io->rcache_map[fd] = -1;
400     }
401 
402     return res;
403 }
404 
io_close(struct io_context * io,int fd)405 int io_close(struct io_context *io, int fd)
406 {
407     io_unregister(io, fd);
408     if (mech_io_close)
409 	mech_io_close(io, fd);
410     return close(fd);
411 }
412 
io_set_i(struct io_context * io,int fd)413 void io_set_i(struct io_context *io, int fd)
414 {
415     if (!io->handler[fd].want_read_app) {
416 	io->handler[fd].want_read_app = 1;
417 	mech_io_set_i(io, fd);
418     }
419 }
420 
io_set_o(struct io_context * io,int fd)421 void io_set_o(struct io_context *io, int fd)
422 {
423     if (!io->handler[fd].want_write_app) {
424 	io->handler[fd].want_write_app = 1;
425 	mech_io_set_o(io, fd);
426     }
427 }
428 
io_clr_i(struct io_context * io,int fd)429 void io_clr_i(struct io_context *io, int fd)
430 {
431     if (io->handler[fd].want_read_app) {
432 	io->handler[fd].want_read_app = 0;
433 #ifdef WITH_SSL
434 	if (io->handler[fd].want_read_ssl)
435 	    return;
436 #endif
437 	mech_io_clr_i(io, fd);
438 
439 	if (io->rcache_map[fd] > -1)
440 	    io->rcache[io->rcache_map[fd]].events &= ~POLLIN;
441     }
442 }
443 
io_clr_o(struct io_context * io,int fd)444 void io_clr_o(struct io_context *io, int fd)
445 {
446     if (io->handler[fd].want_write_app) {
447 	io->handler[fd].want_write_app = 0;
448 #ifdef WITH_SSL
449 	if (io->handler[fd].want_write_ssl)
450 	    return;
451 #endif
452 	mech_io_clr_o(io, fd);
453 	if (io->rcache_map[fd] > -1)
454 	    io->rcache[io->rcache_map[fd]].events &= ~POLLOUT;
455     }
456 }
457 
458 #ifdef WITH_SSL
io_SSL_set_i(struct io_context * io,int fd)459 static __inline__ void io_SSL_set_i(struct io_context *io, int fd)
460 {
461     if (!io->handler[fd].want_read_ssl) {
462 	io->handler[fd].want_read_ssl = 1;
463 	mech_io_set_i(io, fd);
464     }
465 }
466 
io_SSL_set_o(struct io_context * io,int fd)467 static __inline__ void io_SSL_set_o(struct io_context *io, int fd)
468 {
469     if (!io->handler[fd].want_write_ssl) {
470 	io->handler[fd].want_write_ssl = 1;
471 	mech_io_set_o(io, fd);
472     }
473 }
474 
io_SSL_clr_i(struct io_context * io,int fd)475 static __inline__ void io_SSL_clr_i(struct io_context *io, int fd)
476 {
477     if (io->handler[fd].want_read_ssl) {
478 	io->handler[fd].want_read_ssl = 0;
479 	if (!io->handler[fd].want_read_app)
480 	    mech_io_clr_i(io, fd);
481     }
482 }
483 
io_SSL_clr_o(struct io_context * io,int fd)484 static __inline__ void io_SSL_clr_o(struct io_context *io, int fd)
485 {
486     if (io->handler[fd].want_write_ssl) {
487 	io->handler[fd].want_write_ssl = 0;
488 	if (!io->handler[fd].want_write_app)
489 	    mech_io_clr_o(io, fd);
490     }
491 }
492 #endif				/* WITH_SSL */
493 
494 #ifdef WITH_SELECT
select_io_set_i(struct io_context * io,int fd)495 static void select_io_set_i(struct io_context *io, int fd)
496 {
497     io->handler[fd].want_read = 1;
498     FD_SET(fd, &io->Select.rfds);
499 }
500 
select_io_set_o(struct io_context * io,int fd)501 static void select_io_set_o(struct io_context *io, int fd)
502 {
503     io->handler[fd].want_write = 1;
504     FD_SET(fd, &io->Select.wfds);
505 }
506 
select_io_clr_i(struct io_context * io,int fd)507 static void select_io_clr_i(struct io_context *io, int fd)
508 {
509     io->handler[fd].want_read = 0;
510     FD_CLR(fd, &io->Select.rfds);
511 }
512 
select_io_clr_o(struct io_context * io,int fd)513 static void select_io_clr_o(struct io_context *io, int fd)
514 {
515     io->handler[fd].want_write = 0;
516     FD_CLR(fd, &io->Select.wfds);
517 }
518 
select_io_init(struct io_context * io)519 static void select_io_init(struct io_context *io)
520 {
521     FD_ZERO(&io->Select.rfds);
522     FD_ZERO(&io->Select.wfds);
523     FD_ZERO(&io->Select.efds);
524     io->Select.nfds = -1;
525 }
526 
select_io_poll(struct io_context * io,int poll_timeout,int * cax)527 static int select_io_poll(struct io_context *io, int poll_timeout, int *cax)
528 {
529     int cur = 0, r, res;
530     struct timeval timeout;
531     fd_set rfds, wfds, efds;
532 
533     Debug((DEBUG_PROC, "io_poll (%p, %dms)\n", io, poll_timeout));
534 
535     *cax = 0;
536 
537     timeout.tv_sec = poll_timeout / 1000;
538     timeout.tv_usec = 1000 * (u_int) (poll_timeout - 1000 * timeout.tv_sec);
539     rfds = io->Select.rfds;
540     wfds = io->Select.wfds;
541     efds = io->Select.efds;
542 
543     r = res = select(io->Select.nfds + 1, &rfds, &wfds, &efds, poll_timeout < 0 ? NULL : &timeout);
544 
545     if (r < 0) {
546 	logerr("Fatal select(2) error (%ld, ..., (%lu, %lu))", (long int) io->Select.nfds + 1, (u_long) timeout.tv_sec, (u_long) timeout.tv_usec);
547 	abort();
548     }
549 
550     gettimeofday(&io_now, NULL);
551 
552     for (; r > 0 && cur < io->nfds_max; cur++) {
553 	if (FD_ISSET(cur, &rfds) || FD_ISSET(cur, &wfds) || FD_ISSET(cur, &efds)) {
554 	    if (io->rcache_map[cur] < 0) {
555 		io->rcache[*cax].events = 0;
556 		io->rcache[*cax].fd = cur;
557 		io->rcache_map[cur] = (*cax)++;
558 	    }
559 
560 	    if (FD_ISSET(cur, &rfds))
561 		r--, io->rcache[io->rcache_map[cur]].events |= POLLIN;
562 	    if (FD_ISSET(cur, &efds))
563 		r--, io->rcache[io->rcache_map[cur]].events |= POLLERR;
564 	    if (FD_ISSET(cur, &wfds))
565 		r--, io->rcache[io->rcache_map[cur]].events |= POLLOUT;
566 
567 	    if (r < 0) {
568 		logmsg("Bug near %s:%d", __FILE__, __LINE__);
569 		abort();
570 	    }
571 	}
572     }
573 
574     return res;
575 }
576 
select_io_destroy(struct io_context * io)577 static void select_io_destroy(struct io_context *io __attribute__ ((unused)))
578 {
579     /* nothing to do */
580 }
581 
select_io_unregister(struct io_context * io,int fd)582 static void select_io_unregister(struct io_context *io, int fd)
583 {
584     Debug((DEBUG_PROC, " io_unregister %d\n", fd));
585 
586     FD_CLR(fd, &io->Select.rfds);
587     FD_CLR(fd, &io->Select.wfds);
588     FD_CLR(fd, &io->Select.efds);
589 
590     if (fd == io->Select.nfds)
591 	do
592 	    io->Select.nfds--;
593 	while (io->Select.nfds > -1 && io->handler[io->Select.nfds].data == NULL);
594 }
595 
select_io_register(struct io_context * io,int fd)596 static void select_io_register(struct io_context *io, int fd)
597 {
598     Debug((DEBUG_PROC, " io_register %d\n", fd));
599 
600     if (fd >= io->nfds_max)
601 	io_resize(io, fd);
602 
603     if (fd > io->Select.nfds)
604 	io->Select.nfds = fd;
605 
606     FD_SET(fd, &io->Select.efds);
607 }
608 #endif
609 
610 #ifdef WITH_POLL
poll_io_set_i(struct io_context * io,int fd)611 static void poll_io_set_i(struct io_context *io, int fd)
612 {
613     if (!io->handler[fd].want_read) {
614 	io->handler[fd].want_read = 1;
615 	io->Poll.ufds[io->Poll.pax[fd]].events |= POLLIN;
616     }
617 }
618 
poll_io_set_o(struct io_context * io,int fd)619 static void poll_io_set_o(struct io_context *io, int fd)
620 {
621     if (!io->handler[fd].want_write) {
622 	io->handler[fd].want_write = 1;
623 	io->Poll.ufds[io->Poll.pax[fd]].events |= POLLOUT;
624     }
625 }
626 
poll_io_clr_i(struct io_context * io,int fd)627 static void poll_io_clr_i(struct io_context *io, int fd)
628 {
629     if (io->handler[fd].want_read) {
630 	io->handler[fd].want_read = 0;
631 	io->Poll.ufds[io->Poll.pax[fd]].events &= ~POLLIN;
632     }
633 }
634 
poll_io_clr_o(struct io_context * io,int fd)635 static void poll_io_clr_o(struct io_context *io, int fd)
636 {
637     if (io->handler[fd].want_write) {
638 	io->handler[fd].want_write = 0;
639 	io->Poll.ufds[io->Poll.pax[fd]].events &= ~POLLOUT;
640     }
641 }
642 
poll_io_init(struct io_context * io)643 static void poll_io_init(struct io_context *io)
644 {
645     int i;
646 
647     io->Poll.ufds = Xcalloc(io->nfds_max, sizeof(struct pollfd));
648     io->Poll.pax = Xcalloc(io->nfds_max, sizeof(int));
649     for (i = 0; i < io->nfds_max; i++)
650 	io->Poll.pax[i] = -1;
651 }
652 
poll_io_poll(struct io_context * io,int poll_timeout,int * cax)653 static int poll_io_poll(struct io_context *io, int poll_timeout, int *cax)
654 {
655     int count, res, r;
656     Debug((DEBUG_PROC, "io_poll (%p) timeout: %d\n", io, poll_timeout));
657 
658     *cax = 0;
659 
660     r = res = poll(io->Poll.ufds, (nfds_t) (io->Poll.nfds), poll_timeout);
661 
662     Debug((DEBUG_PROC, "io_poll (%p) timeout: %d, res: %d\n", io, poll_timeout, res));
663 
664     gettimeofday(&io_now, NULL);
665 
666     for (count = io->Poll.nfds - 1; r > 0 && count > -1; count--) {
667 	int cur = io->Poll.ufds[count].fd;
668 
669 	if (cur > -1 && io->Poll.pax[cur] > -1 && io->Poll.ufds[io->Poll.pax[cur]].revents) {
670 	    r--;
671 
672 	    if (io->rcache_map[cur] < 0) {
673 		io->rcache[*cax].events = 0;
674 		io->rcache[*cax].fd = cur;
675 		io->rcache_map[cur] = (*cax)++;
676 	    }
677 
678 	    io->rcache[io->rcache_map[cur]].events = io->Poll.ufds[io->Poll.pax[cur]].revents;
679 	}
680     }
681 
682     return res;
683 }
684 
poll_io_destroy(struct io_context * io)685 static void poll_io_destroy(struct io_context *io)
686 {
687     free(io->Poll.ufds);
688     free(io->Poll.pax);
689 }
690 
poll_io_unregister(struct io_context * io,int fd)691 static void poll_io_unregister(struct io_context *io, int fd)
692 {
693     int pos = io->Poll.pax[fd];
694     Debug((DEBUG_PROC, " io_unregister %d\n", fd));
695 
696     if (pos < 0 || pos >= io->Poll.nfds) {
697 	logmsg("Ooops ... poll array index for %d out of range (%d)!", fd, pos);
698 	abort();
699     }
700 
701     if (pos != --io->Poll.nfds) {
702 	io->Poll.ufds[pos] = io->Poll.ufds[io->Poll.nfds];
703 	io->Poll.pax[io->Poll.ufds[pos].fd] = pos;
704     }
705 
706     io->Poll.pax[fd] = -1;
707 }
708 
poll_io_register(struct io_context * io,int fd)709 static void poll_io_register(struct io_context *io, int fd)
710 {
711     Debug((DEBUG_PROC, " io_register %d\n", fd));
712 
713     if (fd >= io->nfds_max) {
714 	int i;
715 	int omax = io->nfds_max;
716 	io_resize(io, fd);
717 	io->Poll.ufds = Xrealloc(io->Poll.ufds, io->nfds_max * sizeof(struct pollfd));
718 	io->Poll.pax = Xrealloc(io->Poll.pax, io->nfds_max * sizeof(int));
719 	for (i = omax; i < io->nfds_max; i++)
720 	    io->Poll.pax[i] = -1;
721     }
722 
723     if (io->Poll.pax[fd] != -1) {
724 	logmsg("Ooops ... poll array index for %d already set!", fd);
725 	logmsg("%d %d %d", io->nfds_limit, io->nfds_max, fd);
726 	abort();
727     }
728 
729     memset(&io->Poll.ufds[io->Poll.nfds], 0, sizeof(struct pollfd));
730     io->Poll.ufds[io->Poll.nfds].fd = fd;
731     io->Poll.ufds[io->Poll.nfds].events = 0;
732     io->Poll.pax[fd] = io->Poll.nfds++;
733 }
734 #endif
735 
736 #ifdef WITH_EPOLL
epoll_io_close(struct io_context * io,int fd)737 static void epoll_io_close(struct io_context *io, int fd)
738 {
739     if (io->Epoll.changemap[fd] > -1) {
740 	io->Epoll.changelist[io->Epoll.changemap[fd]] = -1;
741 	io->Epoll.changemap[fd] = -1;
742 	io->Epoll.nchanges--;
743     }
744 }
745 
epoll_addchange(struct io_context * io,int fd)746 static void epoll_addchange(struct io_context *io, int fd)
747 {
748     if (io->Epoll.changemap[fd] < 0 || io->Epoll.changemap[fd] >= io->Epoll.nchanges || io->Epoll.changelist[io->Epoll.changemap[fd]] != fd) {
749 	io->Epoll.changemap[fd] = io->Epoll.nchanges;
750 	io->Epoll.changelist[io->Epoll.nchanges++] = fd;
751     }
752 }
753 
epoll_io_set_i(struct io_context * io,int fd)754 static void epoll_io_set_i(struct io_context *io, int fd)
755 {
756     if (!io->handler[fd].want_read) {
757 	io->handler[fd].want_read = 1;
758 	if (io->Epoll.diskfile[fd] == -2) {
759 	    io->Epoll.diskfilemap[io->Epoll.ndiskfile] = fd;
760 	    io->Epoll.diskfile[fd] = io->Epoll.ndiskfile++;
761 	} else if (io->Epoll.diskfile[fd] == -1)
762 	    epoll_addchange(io, fd);
763     }
764 }
765 
epoll_io_clr_i(struct io_context * io,int fd)766 static void epoll_io_clr_i(struct io_context *io, int fd)
767 {
768     if (io->handler[fd].want_read) {
769 	io->handler[fd].want_read = 0;
770 	if (io->Epoll.diskfile[fd] > -1) {
771 	    if (!io->handler[fd].want_write) {
772 		io->Epoll.ndiskfile--;
773 		io->Epoll.diskfilemap[io->Epoll.diskfile[fd]] = io->Epoll.diskfilemap[io->Epoll.ndiskfile];
774 		io->Epoll.diskfile[io->Epoll.diskfilemap[io->Epoll.ndiskfile]] = io->Epoll.diskfile[fd];
775 		io->Epoll.diskfile[fd] = -2;
776 	    }
777 	} else
778 	    epoll_addchange(io, fd);
779     }
780 }
781 
epoll_io_set_o(struct io_context * io,int fd)782 static void epoll_io_set_o(struct io_context *io, int fd)
783 {
784     if (!io->handler[fd].want_write) {
785 	io->handler[fd].want_write = 1;
786 	if (io->Epoll.diskfile[fd] == -2) {
787 	    io->Epoll.diskfilemap[io->Epoll.ndiskfile] = fd;
788 	    io->Epoll.diskfile[fd] = io->Epoll.ndiskfile++;
789 	} else if (io->Epoll.diskfile[fd] == -1)
790 	    epoll_addchange(io, fd);
791     }
792 }
793 
epoll_io_clr_o(struct io_context * io,int fd)794 static void epoll_io_clr_o(struct io_context *io, int fd)
795 {
796     if (io->handler[fd].want_write) {
797 	io->handler[fd].want_write = 0;
798 	if (io->Epoll.diskfile[fd] > -1) {
799 	    if (!io->handler[fd].want_read) {
800 		io->Epoll.ndiskfile--;
801 		io->Epoll.diskfilemap[io->Epoll.diskfile[fd]] = io->Epoll.diskfilemap[io->Epoll.ndiskfile];
802 		io->Epoll.diskfile[io->Epoll.diskfilemap[io->Epoll.ndiskfile]] = io->Epoll.diskfile[fd];
803 		io->Epoll.diskfile[fd] = -2;
804 	    }
805 	} else
806 	    epoll_addchange(io, fd);
807     }
808 }
809 
epoll_io_init(struct io_context * io)810 static void epoll_io_init(struct io_context *io)
811 {
812     int i, flags;
813 
814     io->Epoll.fd = epoll_create(io->nfds_max);
815 
816     flags = fcntl(io->Epoll.fd, F_GETFD, 0) | FD_CLOEXEC;
817     fcntl(io->Epoll.fd, F_SETFD, flags);
818 
819     io->Epoll.nchanges = io->Epoll.ndiskfile = 0;
820     io->Epoll.nevents_max = io->nfds_max;
821     io->Epoll.eventlist = Xcalloc(io->Epoll.nevents_max, sizeof(struct epoll_event));
822     io->Epoll.changelist = Xcalloc(io->nfds_max, sizeof(int));
823     io->Epoll.changemap = Xcalloc(io->nfds_max, sizeof(int));
824     io->Epoll.diskfile = Xcalloc(io->nfds_max, sizeof(int));
825     io->Epoll.diskfilemap = Xcalloc(io->nfds_max, sizeof(int));
826     for (i = 0; i < io->nfds_max; i++) {
827 	io->Epoll.changelist[i] = -1;
828 	io->Epoll.changemap[i] = -1;
829 	io->Epoll.diskfile[i] = -1;
830 	io->Epoll.diskfilemap[i] = -1;
831     }
832 }
833 
epoll_io_unregister(struct io_context * io,int fd)834 static void epoll_io_unregister(struct io_context *io __attribute__ ((unused)), int fd __attribute__ ((unused)))
835 {
836     Debug((DEBUG_PROC, " io_unregister %d\n", fd));
837     if (io->Epoll.diskfile[fd] > -1) {
838 	io->Epoll.ndiskfile--;
839 	io->Epoll.diskfilemap[io->Epoll.diskfile[fd]] = io->Epoll.diskfilemap[io->Epoll.ndiskfile];
840 	io->Epoll.diskfile[io->Epoll.diskfilemap[io->Epoll.ndiskfile]] = io->Epoll.diskfile[fd];
841 	io->Epoll.diskfile[fd] = -1;
842     }
843 }
844 
epoll_io_register(struct io_context * io,int fd)845 static void epoll_io_register(struct io_context *io, int fd)
846 {
847     struct epoll_event e;
848     Debug((DEBUG_PROC, " io_register %d\n", fd));
849 
850     if (fd >= io->nfds_max) {
851 	int i;
852 	int omax = io->nfds_max;
853 	io_resize(io, fd);
854 	io->Epoll.changelist = Xrealloc(io->Epoll.changelist, io->nfds_max * sizeof(int));
855 	io->Epoll.changemap = Xrealloc(io->Epoll.changemap, io->nfds_max * sizeof(int));
856 	for (i = omax; i < io->nfds_max; i++) {
857 	    io->Epoll.changelist[i] = -1;
858 	    io->Epoll.changemap[i] = -1;
859 	}
860     }
861 
862     e.data.fd = fd;
863     e.events = 0;
864     if (-1 == epoll_ctl(io->Epoll.fd, EPOLL_CTL_ADD, fd, &e)
865 	&& errno == EPERM)
866 	io->Epoll.diskfile[fd] = -2;
867 }
868 
epoll_io_poll(struct io_context * io,int poll_timeout,int * cax)869 static int epoll_io_poll(struct io_context *io, int poll_timeout, int *cax)
870 {
871     int count, res;
872     Debug((DEBUG_PROC, "io_poll (%p)\n", io));
873 
874     *cax = 0;
875 
876     for (count = 0; count < io->Epoll.nchanges; count++) {
877 	int fd = io->Epoll.changelist[count];
878 	if (fd > -1 && io->Epoll.changemap[fd] == count && io->Epoll.diskfilemap[fd] == -1) {
879 	    struct epoll_event e;
880 	    e.data.fd = fd;
881 	    e.events = (io->handler[fd].want_read ? EPOLLIN : 0) | (io->handler[fd].want_write ? EPOLLOUT : 0);
882 	    if (epoll_ctl(io->Epoll.fd, EPOLL_CTL_MOD, fd, &e) < 0) {
883 #ifndef DEBUG
884 		logerr("epoll_ctl (%s:%d)", __FILE__, __LINE__)
885 #endif
886 		    ;
887 	    }
888 	    io->Epoll.changemap[fd] = -1;
889 	}
890     }
891     io->Epoll.nchanges = 0;
892 
893     res = epoll_wait(io->Epoll.fd, io->Epoll.eventlist, io->Epoll.nevents_max, io->Epoll.ndiskfile ? 0 : poll_timeout);
894 
895     gettimeofday(&io_now, NULL);
896 
897     for (count = 0; count < res; count++) {
898 	int cur = io->Epoll.eventlist[count].data.fd;
899 
900 	if (io->rcache_map[cur] < 0) {
901 	    io->rcache[*cax].events = 0;
902 	    io->rcache[*cax].fd = cur;
903 	    io->rcache_map[cur] = (*cax)++;
904 	}
905 
906 	io->rcache[io->rcache_map[cur]].events = io->Epoll.eventlist[count].events;
907     }
908 
909     res += io->Epoll.ndiskfile;
910 
911     for (count = 0; count < io->Epoll.ndiskfile; count++) {
912 	int cur = io->Epoll.diskfilemap[count];
913 
914 	if (io->rcache_map[cur] < 0) {
915 	    io->rcache[*cax].events = 0;
916 	    io->rcache[*cax].fd = cur;
917 	    io->rcache_map[cur] = (*cax)++;
918 	}
919 
920 	if (io->handler[cur].want_write)
921 	    io->rcache[io->rcache_map[cur]].events |= POLLOUT;
922 	if (io->handler[cur].want_read)
923 	    io->rcache[io->rcache_map[cur]].events |= POLLIN;
924     }
925 
926     return res;
927 }
928 
epoll_io_destroy(struct io_context * io)929 static void epoll_io_destroy(struct io_context *io)
930 {
931     free(io->Epoll.eventlist);
932     free(io->Epoll.changelist);
933     free(io->Epoll.changemap);
934     free(io->Epoll.diskfile);
935     free(io->Epoll.diskfilemap);
936     close(io->Epoll.fd);
937 }
938 #endif
939 
940 #ifdef WITH_DEVPOLL
devpoll_io_changelist_resize(struct io_context * io)941 static void devpoll_io_changelist_resize(struct io_context *io)
942 {
943     io->Devpoll.nchanges_max += LISTINC;
944     io->Devpoll.changelist = Xrealloc(io->Devpoll.changelist, io->Devpoll.nchanges_max * sizeof(struct pollfd));
945 }
946 
devpoll_io_close(struct io_context * io,int fd)947 static void devpoll_io_close(struct io_context *io, int fd)
948 {
949     int i, j;
950     for (i = 0, j = 0; i < io->Devpoll.nchanges; i++) {
951 	if ((int) io->Devpoll.changelist[i].fd == fd)
952 	    continue;
953 	if (i != j)
954 	    io->Devpoll.changelist[j] = io->Devpoll.changelist[i];
955 	j++;
956     }
957     io->Devpoll.nchanges = j;
958 }
959 
devpoll_io_set_i(struct io_context * io,int fd)960 static void devpoll_io_set_i(struct io_context *io, int fd)
961 {
962     if (!io->handler[fd].want_read) {
963 	io->handler[fd].want_read = 1;
964 	if (io->Devpoll.nchanges == io->Devpoll.nchanges_max)
965 	    devpoll_io_changelist_resize(io);
966 
967 	io->Devpoll.changelist[io->Devpoll.nchanges].fd = fd;
968 	io->Devpoll.changelist[io->Devpoll.nchanges++].events = POLLIN;
969     }
970 }
971 
devpoll_io_set_o(struct io_context * io,int fd)972 static void devpoll_io_set_o(struct io_context *io, int fd)
973 {
974     if (!io->handler[fd].want_write) {
975 	io->handler[fd].want_write = 1;
976 	if (io->Devpoll.nchanges == io->Devpoll.nchanges_max)
977 	    devpoll_io_changelist_resize(io);
978 
979 	io->Devpoll.changelist[io->Devpoll.nchanges].fd = fd;
980 	io->Devpoll.changelist[io->Devpoll.nchanges++].events = POLLOUT;
981     }
982 }
983 
devpoll_io_clr_i(struct io_context * io,int fd)984 static void devpoll_io_clr_i(struct io_context *io, int fd)
985 {
986     if (io->handler[fd].want_read) {
987 	io->handler[fd].want_read = 0;
988 	if (io->Devpoll.nchanges == io->Devpoll.nchanges_max)
989 	    devpoll_io_changelist_resize(io);
990 
991 	io->Devpoll.changelist[io->Devpoll.nchanges].fd = fd;
992 	io->Devpoll.changelist[io->Devpoll.nchanges++].events = POLLREMOVE;
993 
994 	if (io->handler[fd].want_write) {
995 	    io->handler[fd].want_write = 0;
996 	    devpoll_io_set_o(io, fd);
997 	}
998     }
999 }
1000 
devpoll_io_clr_o(struct io_context * io,int fd)1001 static void devpoll_io_clr_o(struct io_context *io, int fd)
1002 {
1003     if (io->handler[fd].want_write) {
1004 	io->handler[fd].want_write = 0;
1005 	if (io->Devpoll.nchanges == io->Devpoll.nchanges_max)
1006 	    devpoll_io_changelist_resize(io);
1007 
1008 	io->Devpoll.changelist[io->Devpoll.nchanges].fd = fd;
1009 	io->Devpoll.changelist[io->Devpoll.nchanges++].events = POLLREMOVE;
1010 
1011 	if (io->handler[fd].want_read) {
1012 	    io->handler[fd].want_read = 0;
1013 	    devpoll_io_set_i(io, fd);
1014 	}
1015     }
1016 }
1017 
devpoll_io_init(struct io_context * io)1018 static void devpoll_io_init(struct io_context *io)
1019 {
1020     int flags;
1021 
1022     if ((io->Devpoll.fd = open("/dev/poll", O_RDWR)) < 0) {
1023 	logerr("devpoll open (%s:%d)", __FILE__, __LINE__);
1024 	abort();
1025     }
1026     flags = fcntl(io->Devpoll.fd, F_GETFD, 0) | FD_CLOEXEC;
1027     fcntl(io->Devpoll.fd, F_SETFD, flags);
1028 
1029     io->Devpoll.nchanges = 0;
1030     io->Devpoll.nchanges_max = LISTINC;
1031     io->Devpoll.changelist = Xcalloc(io->Devpoll.nchanges_max, sizeof(struct pollfd));
1032     io->Devpoll.nevents_max = LISTINC;
1033     io->Devpoll.eventlist = Xcalloc(io->Devpoll.nevents_max, sizeof(struct pollfd));
1034 }
1035 
devpoll_io_poll(struct io_context * io,int poll_timeout,int * cax)1036 static int devpoll_io_poll(struct io_context *io, int poll_timeout, int *cax)
1037 {
1038     int count, res;
1039     struct dvpoll dvp;
1040 
1041     Debug((DEBUG_PROC, "io_poll (%p)\n", io));
1042 
1043     *cax = 0;
1044 
1045     dvp.dp_fds = io->Devpoll.eventlist;
1046     dvp.dp_nfds = io->Devpoll.nevents_max;
1047     dvp.dp_timeout = poll_timeout > -1 ? poll_timeout : 0;
1048 
1049     if (io->Devpoll.nchanges &&
1050 	(write(io->Devpoll.fd, io->Devpoll.changelist,
1051 	       sizeof(struct pollfd) * io->Devpoll.nchanges) != (ssize_t) sizeof(struct pollfd) * io->Devpoll.nchanges)) {
1052 	logerr("devpoll write (%s:%d)", __FILE__, __LINE__);
1053 	abort();
1054     }
1055 
1056     res = ioctl(io->Devpoll.fd, DP_POLL, &dvp);
1057     Debug((DEBUG_PROC, "devpoll ioctl returns %d\n", res));
1058 
1059     if (0 > res) {
1060 	logerr("devpoll ioctl (%s:%d)", __FILE__, __LINE__);
1061 	abort();
1062     }
1063 
1064     io->Devpoll.nchanges = 0;
1065 
1066     gettimeofday(&io_now, NULL);
1067 
1068     for (count = 0; count < res; count++) {
1069 	int cur = io->Devpoll.eventlist[count].fd;
1070 
1071 	if (io->rcache_map[cur] < 0) {
1072 	    io->rcache[*cax].events = 0;
1073 	    io->rcache[*cax].fd = cur;
1074 	    io->rcache_map[cur] = (*cax)++;
1075 	}
1076 	io->rcache[io->rcache_map[cur]].events = io->Devpoll.eventlist[count].revents;
1077     }
1078 
1079     return res;
1080 }
1081 
devpoll_io_destroy(struct io_context * io)1082 static void devpoll_io_destroy(struct io_context *io)
1083 {
1084     free(io->Devpoll.changelist);
1085     free(io->Devpoll.eventlist);
1086     close(io->Devpoll.fd);
1087 }
1088 
devpoll_io_unregister(struct io_context * io,int fd)1089 static void devpoll_io_unregister(struct io_context *io, int fd)
1090 {
1091     Debug((DEBUG_PROC, " io_unregister %d\n", fd));
1092 
1093     if (io->handler[fd].want_read || io->handler[fd].want_write) {
1094 
1095 	if (io->Devpoll.nchanges == io->Devpoll.nchanges_max)
1096 	    devpoll_io_changelist_resize(io);
1097 
1098 	io->Devpoll.changelist[io->Devpoll.nchanges].fd = fd;
1099 	io->Devpoll.changelist[io->Devpoll.nchanges++].events = POLLREMOVE;
1100     }
1101 }
1102 
devpoll_io_register(struct io_context * io,int fd)1103 static void devpoll_io_register(struct io_context *io, int fd)
1104 {
1105     Debug((DEBUG_PROC, " io_register %d\n", fd));
1106     if (fd >= io->nfds_max)
1107 	io_resize(io, fd);
1108 }
1109 #endif
1110 
1111 #ifdef WITH_KQUEUE
kqueue_io_changelist_resize(struct io_context * io)1112 static void kqueue_io_changelist_resize(struct io_context *io)
1113 {
1114     io->Kqueue.nchanges_max += LISTINC;
1115     io->Kqueue.changelist = Xrealloc(io->Kqueue.changelist, io->Kqueue.nchanges_max * sizeof(struct kevent));
1116 }
1117 
kqueue_io_set_i(struct io_context * io,int fd)1118 static void kqueue_io_set_i(struct io_context *io, int fd)
1119 {
1120     Debug((DEBUG_PROC, "io_set_i(%d)\n", fd));
1121     if (!io->handler[fd].want_read) {
1122 	io->handler[fd].want_read = 1;
1123 	if (io->Kqueue.nchanges == io->Kqueue.nchanges_max)
1124 	    kqueue_io_changelist_resize(io);
1125 
1126 	EV_SET(&io->Kqueue.changelist[io->Kqueue.nchanges], fd, EVFILT_READ, EV_ADD, 0, 0,
1127 #ifdef __NetBSD__
1128 	       (intptr_t)
1129 #endif
1130 	       io_get_ctx(io, fd));
1131 
1132 	io->Kqueue.nchanges++;
1133     }
1134 }
1135 
1136 static struct timespec *timeout_immediately = NULL;
1137 
kqueue_flush(struct io_context * io)1138 static void kqueue_flush(struct io_context *io)
1139 {
1140     kevent(io->Kqueue.fd, io->Kqueue.changelist, io->Kqueue.nchanges, io->Kqueue.eventlist, 0, timeout_immediately);
1141     io->Kqueue.nchanges = 0;
1142 }
1143 
kqueue_io_clr_i(struct io_context * io,int fd)1144 static void kqueue_io_clr_i(struct io_context *io, int fd)
1145 {
1146     Debug((DEBUG_PROC, "io_clr_i(%d)\n", fd));
1147     if (io->handler[fd].want_read) {
1148 	io->handler[fd].want_read = 0;
1149 	if (io->Kqueue.nchanges == io->Kqueue.nchanges_max)
1150 	    kqueue_io_changelist_resize(io);
1151 
1152 	EV_SET(&io->Kqueue.changelist[io->Kqueue.nchanges], fd, EVFILT_READ, EV_DELETE, 0, 0,
1153 #ifdef __NetBSD__
1154 	       (intptr_t)
1155 #endif
1156 	       NULL);
1157 	io->Kqueue.nchanges++;
1158     }
1159 }
1160 
kqueue_io_set_o(struct io_context * io,int fd)1161 static void kqueue_io_set_o(struct io_context *io, int fd)
1162 {
1163     Debug((DEBUG_PROC, "io_set_o(%d)\n", fd));
1164     if (!io->handler[fd].want_write) {
1165 	io->handler[fd].want_write = 1;
1166 	if (io->Kqueue.nchanges == io->Kqueue.nchanges_max)
1167 	    kqueue_io_changelist_resize(io);
1168 
1169 	EV_SET(&io->Kqueue.changelist[io->Kqueue.nchanges], fd, EVFILT_WRITE, EV_ADD, 0, 0,
1170 #ifdef __NetBSD__
1171 	       (intptr_t)
1172 #endif
1173 	       io_get_ctx(io, fd));
1174 	io->Kqueue.nchanges++;
1175     }
1176 }
1177 
kqueue_io_clr_o(struct io_context * io,int fd)1178 static void kqueue_io_clr_o(struct io_context *io, int fd)
1179 {
1180     Debug((DEBUG_PROC, "io_clr_o(%d)\n", fd));
1181     if (io->handler[fd].want_write) {
1182 	io->handler[fd].want_write = 0;
1183 	if (io->Kqueue.nchanges == io->Kqueue.nchanges_max)
1184 	    kqueue_io_changelist_resize(io);
1185 
1186 	EV_SET(&io->Kqueue.changelist[io->Kqueue.nchanges], fd, EVFILT_WRITE, EV_DELETE, 0, 0,
1187 #ifdef __NetBSD__
1188 	       (intptr_t)
1189 #endif
1190 	       NULL);
1191 	io->Kqueue.nchanges++;
1192     }
1193 }
1194 
kqueue_io_init(struct io_context * io)1195 static void kqueue_io_init(struct io_context *io)
1196 {
1197     io->Kqueue.fd = kqueue();
1198     io->Kqueue.nchanges = 0;
1199     io->Kqueue.nchanges_max = LISTINC;
1200     io->Kqueue.changelist = Xcalloc(io->Kqueue.nchanges_max, sizeof(struct kevent));
1201     io->Kqueue.nevents_max = LISTINC;
1202     io->Kqueue.eventlist = Xcalloc(io->Kqueue.nevents_max, sizeof(struct kevent));
1203     if (!timeout_immediately)
1204 	timeout_immediately = calloc(1, sizeof(struct timespec));
1205 }
1206 
kqueue_io_poll(struct io_context * io,int poll_timeout,int * cax)1207 static int kqueue_io_poll(struct io_context *io, int poll_timeout, int *cax)
1208 {
1209     int count, res;
1210     struct timespec timeout;
1211 
1212     Debug((DEBUG_PROC, "io_poll (%p)\n", io));
1213 
1214     *cax = 0;
1215 
1216     timeout.tv_sec = poll_timeout / 1000;
1217     timeout.tv_nsec = 1000000 * (poll_timeout - 1000 * timeout.tv_sec);
1218     Debug((DEBUG_PROC, "nchanges is %d\n", io->Kqueue.nchanges));
1219     res = kevent(io->Kqueue.fd, io->Kqueue.changelist,
1220 		 io->Kqueue.nchanges, io->Kqueue.eventlist, io->Kqueue.nevents_max, poll_timeout > -1 ? &timeout : NULL);
1221     io->Kqueue.nchanges = 0;
1222 
1223     gettimeofday(&io_now, NULL);
1224 
1225     for (count = 0; count < res; count++) {
1226 	struct kevent *k = &io->Kqueue.eventlist[count];
1227 	if (!(k->flags & EV_ERROR && k->data == EBADF)) {
1228 	    int pos, cur = (int) k->ident;
1229 
1230 	    if (io->rcache_map[cur] < 0) {
1231 		io->rcache[*cax].events = 0;
1232 		io->rcache[*cax].fd = cur;
1233 		io->rcache_map[cur] = (*cax)++;
1234 	    }
1235 
1236 	    pos = io->rcache_map[cur];
1237 
1238 	    if (k->filter == EVFILT_READ)
1239 		io->rcache[pos].events |= POLLIN;
1240 	    if (k->flags & EV_EOF)
1241 		io->rcache[pos].events |= POLLHUP;
1242 	    if (k->flags & EV_ERROR)
1243 		io->rcache[pos].events |= POLLERR;
1244 	    if (k->filter == EVFILT_WRITE)
1245 		io->rcache[pos].events |= POLLOUT;
1246 	}
1247     }
1248 
1249     return res;
1250 }
1251 
kqueue_io_unregister(struct io_context * io,int fd)1252 static void kqueue_io_unregister(struct io_context *io, int fd)
1253 {
1254     kqueue_io_clr_i(io, fd);
1255     kqueue_io_clr_o(io, fd);
1256     kqueue_flush(io);
1257 }
1258 
kqueue_io_close(struct io_context * io,int fd)1259 static void kqueue_io_close(struct io_context *io, int fd)
1260 {
1261     int i, j;
1262 
1263     for (i = 0, j = 0; i < io->Kqueue.nchanges; i++) {
1264 	if ((int) io->Kqueue.changelist[i].ident == fd)
1265 	    continue;
1266 	if (i != j)
1267 	    io->Kqueue.changelist[j] = io->Kqueue.changelist[i];
1268 	j++;
1269     }
1270     io->Kqueue.nchanges = j;
1271 }
1272 
1273 
kqueue_io_register(struct io_context * io,int fd)1274 static void kqueue_io_register(struct io_context *io, int fd)
1275 {
1276     Debug((DEBUG_PROC, " io_register %d\n", fd));
1277 
1278     if (fd >= io->nfds_max)
1279 	io_resize(io, fd);
1280 }
1281 
kqueue_io_destroy(struct io_context * io)1282 static void kqueue_io_destroy(struct io_context *io)
1283 {
1284     free(io->Kqueue.changelist);
1285     free(io->Kqueue.eventlist);
1286     close(io->Kqueue.fd);
1287 }
1288 #endif
1289 
1290 #ifdef WITH_PORT
port_io_close(struct io_context * io,int fd)1291 static void port_io_close(struct io_context *io, int fd)
1292 {
1293     if (io->Port.changemap[fd] > -1) {
1294 	io->Port.changelist[io->Port.changemap[fd]] = -1;
1295 	io->Port.changemap[fd] = -1;
1296 	io->Port.nchanges--;
1297     }
1298 }
1299 
port_addchange(struct io_context * io,int fd)1300 static void port_addchange(struct io_context *io, int fd)
1301 {
1302     if (io->Port.changemap[fd] < 0 || io->Port.changemap[fd] >= io->Port.nchanges || io->Port.changelist[io->Port.changemap[fd]] != fd) {
1303 	io->Port.changemap[fd] = io->Port.nchanges;
1304 	io->Port.changelist[io->Port.nchanges++] = fd;
1305     }
1306 }
1307 
port_io_set_i(struct io_context * io,int fd)1308 static void port_io_set_i(struct io_context *io, int fd)
1309 {
1310     if (!io->handler[fd].want_read) {
1311 	io->handler[fd].want_read = 1;
1312 	port_addchange(io, fd);
1313     }
1314 }
1315 
port_io_clr_i(struct io_context * io,int fd)1316 static void port_io_clr_i(struct io_context *io, int fd)
1317 {
1318     if (io->handler[fd].want_read) {
1319 	io->handler[fd].want_read = 0;
1320 	port_addchange(io, fd);
1321     }
1322 }
1323 
port_io_set_o(struct io_context * io,int fd)1324 static void port_io_set_o(struct io_context *io, int fd)
1325 {
1326     if (!io->handler[fd].want_write) {
1327 	io->handler[fd].want_write = 1;
1328 	port_addchange(io, fd);
1329     }
1330 }
1331 
port_io_clr_o(struct io_context * io,int fd)1332 static void port_io_clr_o(struct io_context *io, int fd)
1333 {
1334     if (io->handler[fd].want_write) {
1335 	io->handler[fd].want_write = 0;
1336 	port_addchange(io, fd);
1337     }
1338 }
1339 
port_io_init(struct io_context * io)1340 static void port_io_init(struct io_context *io)
1341 {
1342     int flags, i;
1343 
1344     io->Port.nevents_max = LISTINC;
1345     io->Port.eventlist = Xcalloc(io->Port.nevents_max, sizeof(port_event_t));
1346 
1347     io->Port.fd = port_create();
1348     flags = fcntl(io->Port.fd, F_GETFD, 0) | FD_CLOEXEC;
1349     fcntl(io->Port.fd, F_SETFD, flags);
1350 
1351     io->Port.changelist = Xcalloc(io->nfds_max, sizeof(int));
1352     io->Port.changemap = Xcalloc(io->nfds_max, sizeof(int));
1353     for (i = 0; i < io->Port.nevents_max; i++) {
1354 	io->Port.changelist[i] = -1;
1355 	io->Port.changemap[i] = -1;
1356     }
1357 }
1358 
port_io_poll(struct io_context * io,int poll_timeout,int * cax)1359 static int port_io_poll(struct io_context *io, int poll_timeout, int *cax)
1360 {
1361     uint_t count;
1362     uint_t nevents = 1;
1363     struct timespec timeout;
1364 
1365     *cax = 0;
1366 
1367     Debug((DEBUG_PROC, "io_poll (%p)\n", io));
1368 
1369     for (count = 0; count < (uint_t) io->Port.nchanges; count++) {
1370 	int fd = io->Port.changelist[count];
1371 	if (fd > -1 && io->Port.changemap[fd] == (int) count) {
1372 	    if (0 > port_associate(io->Port.fd, PORT_SOURCE_FD, fd,
1373 				   (io->handler[fd].want_read ? POLLIN : 0) | (io->handler[fd].want_write ? POLLOUT : 0), &io->handler[fd]))
1374 		logerr("port_associate (%s:%d)", __FILE__, __LINE__);
1375 
1376 	    io->Port.changemap[fd] = -1;
1377 	}
1378     }
1379     io->Port.nchanges = 0;
1380 
1381     timeout.tv_sec = poll_timeout / 1000;
1382     timeout.tv_nsec = 1000000 * (poll_timeout - 1000 * timeout.tv_sec);
1383 
1384     if (-1 == port_getn(io->Port.fd, io->Port.eventlist, io->Port.nevents_max, &nevents, poll_timeout < 0 ? NULL : &timeout)) {
1385 	if (errno != ETIME) {
1386 	    logerr("port_getn (errno = %d)", errno);
1387 	    abort();
1388 	}
1389 	nevents = 0;
1390     }
1391 
1392     gettimeofday(&io_now, NULL);
1393 
1394     for (count = 0; count < nevents; count++) {
1395 	int pos, cur = io->Port.eventlist[count].portev_object;
1396 
1397 	if (io->rcache_map[cur] < 0) {
1398 	    io->rcache[*cax].events = 0;
1399 	    io->rcache[*cax].fd = cur;
1400 	    io->rcache_map[cur] = (*cax)++;
1401 	}
1402 
1403 	pos = io->rcache_map[cur];
1404 
1405 	io->rcache[pos].events = io->Port.eventlist[count].portev_events;
1406     }
1407 
1408     return (int) nevents;
1409 }
1410 
port_io_poll_finish(struct io_context * io,int nevents)1411 static void port_io_poll_finish(struct io_context *io, int nevents)
1412 {
1413     int count;
1414     for (count = 0; count < nevents; count++) {
1415 	int cur = io->Port.eventlist[count].portev_object;
1416 	if (io->handler[cur].want_read || io->handler[cur].want_write)
1417 	    port_addchange(io, cur);
1418     }
1419 }
1420 
port_io_unregister(struct io_context * io,int fd)1421 static void port_io_unregister(struct io_context *io, int fd)
1422 {
1423     port_dissociate(io->Port.fd, PORT_SOURCE_FD, fd);
1424     io->Port.changemap[fd] = -1;
1425 }
1426 
port_io_register(struct io_context * io,int fd)1427 static void port_io_register(struct io_context *io, int fd)
1428 {
1429     Debug((DEBUG_PROC, " io_register %d\n", fd));
1430 
1431     if (fd >= io->nfds_max) {
1432 	int i;
1433 	int omax = io->nfds_max;
1434 
1435 	io->Port.changelist = Xrealloc(io->Port.changelist, io->nfds_max * sizeof(int));
1436 	io->Port.changemap = Xrealloc(io->Port.changemap, io->nfds_max * sizeof(int));
1437 
1438 	for (i = omax; i < io->Port.nevents_max; i++) {
1439 	    io->Port.changelist[i] = -1;
1440 	    io->Port.changemap[i] = -1;
1441 	}
1442 	io_resize(io, fd);
1443     }
1444 }
1445 
port_io_destroy(struct io_context * io)1446 static void port_io_destroy(struct io_context *io)
1447 {
1448     free(io->Port.eventlist);
1449     free(io->Port.changelist);
1450     free(io->Port.changemap);
1451     close(io->Port.fd);
1452 }
1453 #endif
1454 
insert_isc(rb_tree_t * t,struct io_sched * isc)1455 static void insert_isc(rb_tree_t * t, struct io_sched *isc)
1456 {
1457     while (!RB_insert(t, isc)) {
1458 	isc->time_when.tv_usec++;
1459 	if (isc->time_when.tv_usec > 1000000)
1460 	    isc->time_when.tv_usec -= 1000000, isc->time_when.tv_sec++;
1461 	isc->time_real.tv_sec = isc->time_when.tv_sec;
1462 	isc->time_real.tv_usec = isc->time_when.tv_usec;
1463     }
1464 }
1465 
io_sched_add(struct io_context * io,void * data,void * proc,time_t tv_sec,suseconds_t tv_usec)1466 void io_sched_add(struct io_context *io, void *data, void *proc, time_t tv_sec, suseconds_t tv_usec)
1467 {
1468     rb_node_t *rbn;
1469     struct io_event *ioe = Xcalloc(1, sizeof(struct io_event));
1470     struct io_sched *isc, is;
1471 
1472     Debug((DEBUG_PROC, "io_sched_add %p %ld.%ld\n", data, (long) tv_sec, (long) tv_usec));
1473 
1474     gettimeofday(&io_now, NULL);
1475 
1476     is.data = data;
1477     rbn = RB_search(io->events_by_data, &is);
1478 
1479     ioe->proc = proc;
1480     ioe->time_wait.tv_sec = tv_sec;
1481     ioe->time_wait.tv_usec = tv_usec;
1482 
1483     if (rbn) {
1484 	isc = RB_payload(rbn, struct io_sched *);
1485 	ioe->next = isc->event;
1486 	RB_search_and_delete(io->events_by_time, isc);
1487     } else {
1488 	isc = Xcalloc(1, sizeof(struct io_sched));
1489 	isc->data = data;
1490 	RB_insert(io->events_by_data, isc);
1491     }
1492     isc->event = ioe;
1493     isc->time_when.tv_sec = io_now.tv_sec + ioe->time_wait.tv_sec;
1494     isc->time_when.tv_usec = io_now.tv_usec + ioe->time_wait.tv_usec;
1495     if (isc->time_when.tv_usec > 1000000)
1496 	isc->time_when.tv_usec -= 1000000, isc->time_when.tv_sec++;
1497     isc->time_real.tv_sec = isc->time_when.tv_sec;
1498     isc->time_real.tv_usec = isc->time_when.tv_usec;
1499     insert_isc(io->events_by_time, isc);
1500 }
1501 
io_sched_app(struct io_context * io,void * data,void * proc,time_t tv_sec,suseconds_t tv_usec)1502 void io_sched_app(struct io_context *io, void *data, void *proc, time_t tv_sec, suseconds_t tv_usec)
1503 {
1504     rb_node_t *rbn;
1505     struct io_event *ioe = Xcalloc(1, sizeof(struct io_event));
1506     struct io_sched is;
1507 
1508     DebugIn(DEBUG_PROC);
1509 
1510     is.data = data;
1511     rbn = RB_search(io->events_by_data, &is);
1512 
1513     ioe->proc = proc;
1514     ioe->time_wait.tv_sec = tv_sec;
1515     ioe->time_wait.tv_usec = tv_usec;
1516 
1517     if (rbn) {
1518 	struct io_event *i = RB_payload(rbn, struct io_sched *)->event;
1519 	while (i->next)
1520 	    i = i->next;
1521 	i->next = ioe;
1522     } else {
1523 	struct io_sched *isc;
1524 	isc = Xcalloc(1, sizeof(struct io_sched));
1525 	isc->data = data;
1526 	isc->event = ioe;
1527 	isc->time_when.tv_sec = io_now.tv_sec + ioe->time_wait.tv_sec;
1528 	isc->time_when.tv_usec = io_now.tv_usec + ioe->time_wait.tv_usec;
1529 	if (isc->time_when.tv_usec > 1000000)
1530 	    isc->time_when.tv_usec -= 1000000, isc->time_when.tv_sec++;
1531 	isc->time_real.tv_sec = isc->time_when.tv_sec;
1532 	isc->time_real.tv_usec = isc->time_when.tv_usec;
1533 	RB_insert(io->events_by_data, isc);
1534 	insert_isc(io->events_by_time, isc);
1535     }
1536 
1537     DebugOut(DEBUG_PROC);
1538 }
1539 
io_sched_pop(struct io_context * io,void * data)1540 void *io_sched_pop(struct io_context *io, void *data)
1541 {
1542     rb_node_t *rbn;
1543     struct io_sched is;
1544     void *result = NULL;
1545 
1546     DebugIn(DEBUG_PROC);
1547 
1548     is.data = data;
1549     rbn = RB_search(io->events_by_data, &is);
1550     if (rbn) {
1551 	struct io_sched *isc = RB_payload(rbn, struct io_sched *);
1552 	struct io_event *i = isc->event;
1553 
1554 	isc->event = i->next;
1555 	free(i);
1556 	RB_search_and_delete(io->events_by_time, isc);
1557 	if (isc->event) {
1558 	    isc->time_when.tv_sec = io_now.tv_sec + isc->event->time_wait.tv_sec;
1559 	    isc->time_when.tv_usec = io_now.tv_usec + isc->event->time_wait.tv_usec;
1560 	    if (isc->time_when.tv_usec > 1000000)
1561 		isc->time_when.tv_usec -= 1000000, isc->time_when.tv_sec++;
1562 	    isc->time_real.tv_sec = isc->time_when.tv_sec;
1563 	    isc->time_real.tv_usec = isc->time_when.tv_usec;
1564 	    insert_isc(io->events_by_time, isc);
1565 	    result = isc->event->proc;
1566 	} else {
1567 	    RB_delete(io->events_by_data, rbn);
1568 	    free(isc);
1569 	}
1570     }
1571     DebugOut(DEBUG_PROC);
1572     return result;
1573 }
1574 
io_sched_del(struct io_context * io,void * data,void * proc)1575 int io_sched_del(struct io_context *io, void *data, void *proc)
1576 {
1577     int result = 0;
1578     struct io_sched is;
1579     rb_node_t *rbn;
1580 
1581     DebugIn(DEBUG_PROC);
1582 
1583     is.data = data;
1584     rbn = RB_search(io->events_by_data, &is);
1585     if (rbn) {
1586 	struct io_sched *isc = RB_payload(rbn, struct io_sched *);
1587 	struct io_event *i = isc->event;
1588 	if (i) {
1589 	    if (i->proc == proc)
1590 		io_sched_pop(io, data), result = -1;
1591 	    else {
1592 		struct io_event *next;
1593 		while (i->next)
1594 		    if (i->next->proc == proc) {
1595 			next = i->next;
1596 			i->next = next->next;
1597 			free(next);
1598 			result = -1;
1599 		    } else
1600 			i = i->next;
1601 	    }
1602 	}
1603     }
1604     DebugOut(DEBUG_PROC);
1605     return result;
1606 }
1607 
io_sched_renew_proc(struct io_context * io,void * data,void * proc)1608 int io_sched_renew_proc(struct io_context *io, void *data, void *proc)
1609 {
1610     struct io_sched is;
1611     rb_node_t *rbn;
1612     Debug((DEBUG_PROC, "io_sched_renew_proc %p\n", data));
1613     is.data = data;
1614     rbn = RB_search(io->events_by_data, &is);
1615     if (rbn) {
1616 	struct io_sched *isc = RB_payload(rbn, struct io_sched *);
1617 	if (isc && isc->event && (!proc || isc->event->proc == proc)) {
1618 	    isc->time_real.tv_sec = io_now.tv_sec + isc->event->time_wait.tv_sec;
1619 	    isc->time_real.tv_usec = io_now.tv_usec + isc->event->time_wait.tv_usec;
1620 	    if (isc->time_real.tv_usec > 1000000)
1621 		isc->time_real.tv_usec -= 1000000, isc->time_real.tv_sec++;
1622 	    Debug((DEBUG_PROC, "to be fired at %.8lx:%.8lx\n", (long) (isc->time_real.tv_sec), (long) (isc->time_real.tv_usec)));
1623 	    return 0;
1624 	}
1625     }
1626     return -1;
1627 }
1628 
io_sched_peek(struct io_context * io,void * data)1629 void *io_sched_peek(struct io_context *io, void *data)
1630 {
1631     struct io_event *ioe;
1632     rb_node_t *rbn;
1633     struct io_sched is;
1634     is.data = data;
1635     rbn = RB_search(io->events_by_data, &is);
1636     if (rbn && (ioe = RB_payload(rbn, struct io_sched *)->event))
1637 	 return (void *) (ioe->proc);
1638     return NULL;
1639 }
1640 
io_sched_peek_time(struct io_context * io,void * data)1641 struct timeval *io_sched_peek_time(struct io_context *io, void *data)
1642 {
1643     struct io_sched *ios;
1644     rb_node_t *rbn;
1645     struct io_sched is;
1646     is.data = data;
1647     rbn = RB_search(io->events_by_data, &is);
1648     if (rbn && (ios = RB_payload(rbn, struct io_sched *))->event)
1649 	 return &ios->time_real;
1650     return NULL;
1651 }
1652 
io_reschedule(struct io_context * io)1653 static void io_reschedule(struct io_context *io)
1654 {
1655     rb_node_t *rbn, *rbnext;
1656     struct io_sched *ios;
1657 
1658     for (rbn = RB_first(io->events_by_time);
1659 	 rbn &&
1660 	 ((ios =
1661 	   RB_payload(rbn,
1662 		      struct io_sched *))->time_when.tv_sec < io_now.tv_sec
1663 	  || (ios->time_when.tv_sec == io_now.tv_sec && ios->time_when.tv_usec <= io_now.tv_usec)); rbn = rbnext) {
1664 	rbnext = RB_next(rbn);
1665 	if (ios->time_when.tv_sec != ios->time_real.tv_sec || ios->time_when.tv_usec != ios->time_real.tv_usec) {
1666 	    RB_delete(io->events_by_time, rbn);
1667 	    ios->time_when.tv_sec = ios->time_real.tv_sec;
1668 	    ios->time_when.tv_usec = ios->time_real.tv_usec;
1669 	    insert_isc(io->events_by_time, ios);
1670 	    Debug((DEBUG_PROC, " rescheduled at %.8lx:%.8lx (%lds)\n",
1671 		   (long) (ios->time_when.tv_sec), (long) (ios->time_when.tv_usec), (long) (ios->time_when.tv_sec) - (long) io_now.tv_sec));
1672 	}
1673     }
1674 }
1675 
io_sched_exec(struct io_context * io)1676 int io_sched_exec(struct io_context *io)
1677 {
1678     rb_node_t *rbn, *rbnext;
1679     int poll_timeout;
1680     struct io_sched *ios;
1681 
1682     Debug((DEBUG_PROC, "io_sched_exec (%p)\n", io));
1683 
1684     io_reschedule(io);
1685 
1686     for (rbn = RB_first(io->events_by_time);
1687 	 rbn &&
1688 	 ((ios =
1689 	   RB_payload(rbn,
1690 		      struct io_sched *))->time_when.tv_sec < io_now.tv_sec
1691 	  || (ios->time_when.tv_sec == io_now.tv_sec && ios->time_when.tv_usec <= io_now.tv_usec)); rbn = rbnext) {
1692 	rbnext = RB_next(rbn);
1693 	Debug((DEBUG_PROC, " executing ...\n"));
1694 	((void (*)(void *, int)) (ios->event->proc)) (ios->data, -1);
1695 	Debug((DEBUG_PROC, "... done.\n"));
1696     }
1697 
1698     io_reschedule(io);
1699 
1700     rbn = RB_first(io->events_by_time);
1701     if (rbn) {
1702 	ios = RB_payload(rbn, struct io_sched *);
1703 	poll_timeout = 1 + (int) ((ios->time_when.tv_sec - io_now.tv_sec) * 1000) + (int) ((ios->time_when.tv_usec - io_now.tv_usec) / 1000);
1704 
1705 	Debug((DEBUG_PROC, "poll_timeout = %dms\n", poll_timeout));
1706     } else
1707 	poll_timeout = -1;
1708 
1709     return poll_timeout;
1710 }
1711 
io_main(struct io_context * io)1712 void io_main(struct io_context *io)
1713 {
1714     Debug((DEBUG_PROC, "io_main (%p)\n", io));
1715     do {
1716 	gettimeofday(&io_now, NULL);
1717 	io_poll(io, io_sched_exec(io));
1718     }
1719     while (1);
1720 }
1721 
io_init()1722 struct io_context *io_init()
1723 {
1724     static int once = 0;
1725     void (*mech_io_init) (struct io_context *);
1726     int mode = 0
1727 #ifdef WITH_POLL
1728 	| IO_MODE_poll
1729 #endif
1730 #ifdef WITH_EPOLL
1731 	| IO_MODE_epoll
1732 #endif
1733 #ifdef WITH_DEVPOLL
1734 	| IO_MODE_devpoll
1735 #endif
1736 #ifdef WITH_KQUEUE
1737 	| IO_MODE_kqueue
1738 #endif
1739 #ifdef WITH_SELECT
1740 	| IO_MODE_select
1741 #endif
1742 #ifdef WITH_PORT
1743 	| IO_MODE_port
1744 #endif
1745 	;
1746     char *mech, *e;
1747     int i;
1748     struct rlimit rlim;
1749     struct io_context *io = Xcalloc(1, sizeof(struct io_context));
1750 
1751     Debug((DEBUG_PROC, "io_init\n"));
1752 
1753     if (getrlimit(RLIMIT_NOFILE, &rlim)) {
1754 	logerr("getrlimit");
1755 	exit(EX_SOFTWARE);
1756     }
1757 
1758     if ((e = getenv("IO_POLL_MECHANISM")))
1759 	mode &= atoi(e);
1760 
1761     mech_io_poll_finish = NULL;
1762     mech_io_close = NULL;
1763 
1764 #define EVENT_MECHANISM_DEFUNCT "%s event mechanism is unavailable"
1765 
1766 #ifdef WITH_KQUEUE
1767     if (mode & IO_MODE_kqueue) {
1768 	int fd = kqueue();
1769 	mech = "kqueue";
1770 	if (fd > -1) {
1771 	    close(fd);
1772 	    mech_io_poll = kqueue_io_poll;
1773 	    mech_io_set_i = kqueue_io_set_i;
1774 	    mech_io_set_o = kqueue_io_set_o;
1775 	    mech_io_clr_i = kqueue_io_clr_i;
1776 	    mech_io_clr_o = kqueue_io_clr_o;
1777 	    mech_io_register = kqueue_io_register;
1778 	    mech_io_unregister = kqueue_io_unregister;
1779 	    mech_io_destroy = kqueue_io_destroy;
1780 	    mech_io_init = kqueue_io_init;
1781 	    mech_io_close = kqueue_io_close;
1782 	    goto gotit;
1783 	}
1784 	logerr(EVENT_MECHANISM_DEFUNCT, mech);
1785     }
1786 #endif
1787 
1788 #ifdef WITH_EPOLL
1789     if (mode & IO_MODE_epoll) {
1790 	int fd = epoll_create(1);
1791 	mech = "epoll";
1792 	if (fd > -1) {
1793 	    close(fd);
1794 	    mech_io_poll = epoll_io_poll;
1795 	    mech_io_set_i = epoll_io_set_i;
1796 	    mech_io_set_o = epoll_io_set_o;
1797 	    mech_io_clr_i = epoll_io_clr_i;
1798 	    mech_io_clr_o = epoll_io_clr_o;
1799 	    mech_io_register = epoll_io_register;
1800 	    mech_io_unregister = epoll_io_unregister;
1801 	    mech_io_destroy = epoll_io_destroy;
1802 	    mech_io_init = epoll_io_init;
1803 	    mech_io_close = epoll_io_close;
1804 	    goto gotit;
1805 	}
1806 	logerr(EVENT_MECHANISM_DEFUNCT, mech);
1807     }
1808 #endif
1809 
1810 #ifdef WITH_POLL
1811     if (mode & IO_MODE_poll) {
1812 	mech = "poll";
1813 	mech_io_poll = poll_io_poll;
1814 	mech_io_set_i = poll_io_set_i;
1815 	mech_io_set_o = poll_io_set_o;
1816 	mech_io_clr_i = poll_io_clr_i;
1817 	mech_io_clr_o = poll_io_clr_o;
1818 	mech_io_register = poll_io_register;
1819 	mech_io_unregister = poll_io_unregister;
1820 	mech_io_destroy = poll_io_destroy;
1821 	mech_io_init = poll_io_init;
1822 	goto gotit;
1823     }
1824 #endif
1825 
1826 #ifdef WITH_DEVPOLL
1827 // Solaris /dev/poll may or may not work correctly. Placed *after* standard
1828 // poll on purpose, so it won't be used unless specified.
1829     if (mode & IO_MODE_devpoll) {
1830 	int fd = open("/dev/poll", O_RDWR);
1831 	mech = "/dev/poll";
1832 	if (fd > -1) {
1833 	    close(fd);
1834 	    mech_io_poll = devpoll_io_poll;
1835 	    mech_io_set_i = devpoll_io_set_i;
1836 	    mech_io_set_o = devpoll_io_set_o;
1837 	    mech_io_clr_i = devpoll_io_clr_i;
1838 	    mech_io_clr_o = devpoll_io_clr_o;
1839 	    mech_io_register = devpoll_io_register;
1840 	    mech_io_unregister = devpoll_io_unregister;
1841 	    mech_io_destroy = devpoll_io_destroy;
1842 	    mech_io_init = devpoll_io_init;
1843 	    mech_io_close = devpoll_io_close;
1844 	    if (rlim.rlim_max > OPEN_MAX)
1845 		rlim.rlim_max = OPEN_MAX;
1846 	    goto gotit;
1847 	}
1848 	logerr(EVENT_MECHANISM_DEFUNCT, mech);
1849     }
1850 #endif
1851 
1852 #ifdef WITH_PORT
1853 // Solaris port(2) may or may not work correctly. Placed *after* standard poll
1854 // on purpose, so it won't be used unless specified.
1855     if (mode & IO_MODE_port) {
1856 	int fd = port_create();
1857 	mech = "port";
1858 	if (fd > -1) {
1859 	    close(fd);
1860 	    mech_io_poll = port_io_poll;
1861 	    mech_io_poll_finish = port_io_poll_finish;
1862 	    mech_io_set_i = port_io_set_i;
1863 	    mech_io_set_o = port_io_set_o;
1864 	    mech_io_clr_i = port_io_clr_i;
1865 	    mech_io_clr_o = port_io_clr_o;
1866 	    mech_io_register = port_io_register;
1867 	    mech_io_unregister = port_io_unregister;
1868 	    mech_io_destroy = port_io_destroy;
1869 	    mech_io_init = port_io_init;
1870 	    mech_io_close = port_io_close;
1871 	    goto gotit;
1872 	}
1873 	logerr(EVENT_MECHANISM_DEFUNCT, mech);
1874     }
1875 #endif
1876 
1877 #ifdef WITH_SELECT
1878 // select(2) comes last and won't be used unless manually chosen.
1879     if (mode & IO_MODE_select) {
1880 	mech = "select";
1881 	mech_io_poll = select_io_poll;
1882 	mech_io_set_i = select_io_set_i;
1883 	mech_io_set_o = select_io_set_o;
1884 	mech_io_clr_i = select_io_clr_i;
1885 	mech_io_clr_o = select_io_clr_o;
1886 	mech_io_register = select_io_register;
1887 	mech_io_unregister = select_io_unregister;
1888 	mech_io_destroy = select_io_destroy;
1889 	mech_io_init = select_io_init;
1890 	if (rlim.rlim_max > FD_SETSIZE)
1891 	    rlim.rlim_max = FD_SETSIZE;
1892 	goto gotit;
1893     }
1894 #endif
1895 
1896     logmsg("no working event notification mechanism found");
1897     abort();
1898 
1899   gotit:
1900     if (!once) {
1901 	logmsg("%s event notification mechanism is being used", mech);
1902 	once++;
1903     }
1904 
1905     rlim.rlim_cur = rlim.rlim_max;
1906     setrlimit(RLIMIT_NOFILE, &rlim);
1907     getrlimit(RLIMIT_NOFILE, &rlim);
1908     io->nfds_limit = (int) rlim.rlim_cur;
1909     io->nfds_max = MINIMUM(io->nfds_limit, ARRAYINC);
1910     io->handler = Xcalloc(io->nfds_max, sizeof(struct io_handler));
1911 
1912     mech_io_init(io);
1913 
1914     io->events_by_time = RB_tree_new(cmp_tv, NULL);
1915     io->events_by_data = RB_tree_new(cmp_data, NULL);
1916     io->io_invalid_i = (void *) io_invalid_i;
1917     io->io_invalid_o = (void *) io_invalid_o;
1918     io->io_invalid_e = (void *) io_invalid_e;
1919     io->io_invalid_h = (void *) io_invalid_h;
1920 
1921     io->rcache_map = Xcalloc(io->nfds_max, sizeof(int));
1922     for (i = 0; i < io->nfds_max; i++)
1923 	io->rcache_map[i] = -1;
1924     io->rcache = Xcalloc(io->nfds_max, sizeof(struct event_cache));
1925 
1926     gettimeofday(&io_now, NULL);
1927 
1928     return io;
1929 }
1930 
io_resize(struct io_context * io,int fd)1931 static void io_resize(struct io_context *io, int fd)
1932 {
1933     int i, omax = io->nfds_max;
1934 
1935     if (io->nfds_limit == io->nfds_max) {
1936 	logmsg("BUG: Can handle at most %d file descriptors", io->nfds_limit);
1937 	abort();
1938     }
1939 
1940     io->nfds_max = MINIMUM(io->nfds_limit, MAXIMUM(fd + 1, io->nfds_max + ARRAYINC));
1941 
1942     if (io->nfds_max <= fd) {
1943 	logmsg("BUG: Can handle at file descriptor %d", fd);
1944 	abort();
1945     }
1946 
1947     io->handler = Xrealloc(io->handler, io->nfds_max * sizeof(struct io_handler));
1948 
1949     memset(&io->handler[omax], 0, (io->nfds_max - omax) * sizeof(struct io_handler));
1950 
1951     io->rcache_map = Xrealloc(io->rcache_map, io->nfds_max * sizeof(int));
1952 
1953     for (i = omax; i < io->nfds_max; i++)
1954 	io->rcache_map[i] = -1;
1955 
1956     io->rcache = Xrealloc(io->rcache, io->nfds_max * sizeof(struct event_cache));
1957 }
1958 
io_get_cb_i(struct io_context * io,int fd)1959 void *io_get_cb_i(struct io_context *io, int fd)
1960 {
1961     return io->handler[fd].i;
1962 }
1963 
io_get_cb_o(struct io_context * io,int fd)1964 void *io_get_cb_o(struct io_context *io, int fd)
1965 {
1966     return io->handler[fd].o;
1967 }
1968 
io_get_cb_h(struct io_context * io,int fd)1969 void *io_get_cb_h(struct io_context *io, int fd)
1970 {
1971     return io->handler[fd].h;
1972 }
1973 
io_get_cb_e(struct io_context * io,int fd)1974 void *io_get_cb_e(struct io_context *io, int fd)
1975 {
1976     return io->handler[fd].e;
1977 }
1978 
io_get_ctx(struct io_context * io,int fd)1979 void *io_get_ctx(struct io_context *io, int fd)
1980 {
1981     return io->handler[fd].data;
1982 }
1983 
io_want_read(struct io_context * io,int fd)1984 int io_want_read(struct io_context *io, int fd)
1985 {
1986     return io->handler[fd].want_read_app;
1987 }
1988 
io_want_write(struct io_context * io,int fd)1989 int io_want_write(struct io_context *io, int fd)
1990 {
1991     return io->handler[fd].want_write_app;
1992 }
1993 
io_set_cb_i(struct io_context * io,int fd,void * f)1994 void io_set_cb_i(struct io_context *io, int fd, void *f)
1995 {
1996     io->handler[fd].i_app = f;
1997     if (!io->handler[fd].reneg)
1998 	io->handler[fd].i = f;
1999 }
2000 
io_set_cb_o(struct io_context * io,int fd,void * f)2001 void io_set_cb_o(struct io_context *io, int fd, void *f)
2002 {
2003     io->handler[fd].o_app = f;
2004     if (!io->handler[fd].reneg)
2005 	io->handler[fd].o = f;
2006 }
2007 
io_set_cb_e(struct io_context * io,int fd,void * f)2008 void io_set_cb_e(struct io_context *io, int fd, void *f)
2009 {
2010     io->handler[fd].e = f;
2011 }
2012 
io_set_cb_h(struct io_context * io,int fd,void * f)2013 void io_set_cb_h(struct io_context *io, int fd, void *f)
2014 {
2015     io->handler[fd].h = f;
2016 }
2017 
io_set_cb_inv_i(struct io_context * io,void * f)2018 void io_set_cb_inv_i(struct io_context *io, void *f)
2019 {
2020     io->io_invalid_i = f;
2021 }
2022 
io_set_cb_inv_o(struct io_context * io,void * f)2023 void io_set_cb_inv_o(struct io_context *io, void *f)
2024 {
2025     io->io_invalid_o = f;
2026 }
2027 
io_set_cb_inv_h(struct io_context * io,void * f)2028 void io_set_cb_inv_h(struct io_context *io, void *f)
2029 {
2030     io->io_invalid_h = f;
2031 }
2032 
io_set_cb_inv_e(struct io_context * io,void * f)2033 void io_set_cb_inv_e(struct io_context *io, void *f)
2034 {
2035     io->io_invalid_e = f;
2036 }
2037 
io_clr_cb_i(struct io_context * io,int fd)2038 void io_clr_cb_i(struct io_context *io, int fd)
2039 {
2040     io->handler[fd].want_read_app = 0;
2041 #ifdef WITH_SSL
2042     if (io->handler[fd].want_read_ssl)
2043 	return;
2044 #endif
2045     io->handler[fd].want_read = 0;
2046     io_set_cb_i(io, fd, io->io_invalid_i);
2047     io_clr_i(io, fd);
2048 }
2049 
io_clr_cb_o(struct io_context * io,int fd)2050 void io_clr_cb_o(struct io_context *io, int fd)
2051 {
2052     io->handler[fd].want_write_app = 0;
2053 #ifdef WITH_SSL
2054     if (io->handler[fd].want_write_ssl)
2055 	return;
2056 #endif
2057     io->handler[fd].want_write = 0;
2058     io_set_cb_o(io, fd, io->io_invalid_o);
2059     io_clr_o(io, fd);
2060 }
2061 
io_clr_cb_e(struct io_context * io,int fd)2062 void io_clr_cb_e(struct io_context *io, int fd)
2063 {
2064     io_set_cb_e(io, fd, io->io_invalid_e);
2065 }
2066 
io_clr_cb_h(struct io_context * io,int fd)2067 void io_clr_cb_h(struct io_context *io, int fd)
2068 {
2069     io_set_cb_h(io, fd, io->io_invalid_h);
2070 }
2071 
2072 #ifdef WITH_SSL
2073 #include <openssl/ssl.h>
2074 
io_SSL_rw(SSL * ssl,struct io_context * io,int fd,void * cb,int res)2075 static ssize_t io_SSL_rw(SSL * ssl, struct io_context *io, int fd, void *cb, int res)
2076 {
2077     DebugIn(DEBUG_PROC | DEBUG_NET);
2078     if (io->handler[fd].reneg && !SSL_want_read(ssl)
2079 	&& !SSL_want_write(ssl)) {
2080 	io->handler[fd].reneg = 0;
2081 	io_SSL_clr_i(io, fd);
2082 	io_SSL_clr_o(io, fd);
2083 	io->handler[fd].i = io->handler[fd].i_app;
2084 	io->handler[fd].o = io->handler[fd].o_app;
2085     } else if (res < 0 && !io->handler[fd].reneg && (SSL_want_read(ssl) || SSL_want_write(ssl))) {
2086 	Debug((DEBUG_PROC | DEBUG_NET, "TLS shutdown or renegotiation initiated " "(res = %d, error = %d)\n", res, SSL_get_error(ssl, res)));
2087 	io->handler[fd].reneg = 1;
2088 	io->handler[fd].i = cb;
2089 	io->handler[fd].o = cb;
2090 
2091 	if (SSL_want_read(ssl)) {
2092 	    io_SSL_clr_o(io, fd);
2093 	    io_SSL_set_i(io, fd);
2094 	} else {
2095 	    io_SSL_clr_i(io, fd);
2096 	    io_SSL_set_o(io, fd);
2097 	}
2098 
2099 	errno = EAGAIN;
2100     }
2101     DebugOut(DEBUG_PROC | DEBUG_NET);
2102     return res;
2103 }
2104 
io_SSL_read(SSL * ssl,void * buf,size_t num,struct io_context * io,int fd,void * cb)2105 ssize_t io_SSL_read(SSL * ssl, void *buf, size_t num, struct io_context * io, int fd, void *cb)
2106 {
2107     return io_SSL_rw(ssl, io, fd, cb, SSL_read(ssl, buf, (int) num));
2108 }
2109 
io_SSL_write(SSL * ssl,void * buf,size_t num,struct io_context * io,int fd,void * cb)2110 ssize_t io_SSL_write(SSL * ssl, void *buf, size_t num, struct io_context *io, int fd, void *cb)
2111 {
2112     return io_SSL_rw(ssl, io, fd, cb, SSL_write(ssl, buf, (int) num));
2113 }
2114 
io_SSL_shutdown(SSL * ssl,struct io_context * io,int fd,void * cb)2115 int io_SSL_shutdown(SSL * ssl, struct io_context *io, int fd, void *cb)
2116 {
2117     Debug((DEBUG_PROC | DEBUG_NET, "%s\n", __func__));
2118     int res = SSL_shutdown(ssl);
2119     Debug((DEBUG_PROC | DEBUG_NET, "SSL_shutdown = %d\n", res));
2120     if (res < 1)
2121 	res = -1;
2122     return (int) io_SSL_rw(ssl, io, fd, cb, res);
2123 }
2124 #endif				/* WITH_SSL */
2125 
io_clone(struct io_context * io,int to,int from)2126 void io_clone(struct io_context *io, int to, int from)
2127 {
2128     Debug((DEBUG_PROC, "io_clone (%d, %d)\n", to, from));
2129 
2130     io->handler[to] = io->handler[from];
2131 
2132     if (io->handler[to].want_read) {
2133 	io->handler[to].want_read = 0;
2134 	mech_io_set_i(io, to);
2135     }
2136     if (io->handler[to].want_write) {
2137 	io->handler[to].want_write = 0;
2138 	mech_io_set_o(io, to);
2139     }
2140 }
2141