1 /*
2 * Copyright (C) 2002, 2003, 2004, 2005 James Antill
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 * email: james@and.org
19 */
20 /* IO events, and some help with timed events */
21 #include <vstr.h>
22
23 #include <stdlib.h>
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <errno.h>
29 #include <getopt.h>
30 #include <string.h>
31 #include <netinet/in.h>
32 #include <sys/un.h>
33 #include <netinet/tcp.h>
34 #include <arpa/inet.h>
35 #include <sys/poll.h>
36 #include <netdb.h>
37 #include <sys/time.h>
38 #include <time.h>
39 #include <signal.h>
40 #include <sys/stat.h>
41
42 #include <socket_poll.h>
43 #include <timer_q.h>
44
45 #include <sys/sendfile.h>
46
47 #define CONF_EVNT_NO_EPOLL FALSE
48 #define CONF_EVNT_EPOLL_SZ (10 * 1000) /* size is just a speed hint */
49 #define CONF_EVNT_DUP_EPOLL TRUE /* doesn't work if FALSE and multi proc */
50 #define CONF_GETTIMEOFDAY_TIME TRUE /* does tv_sec contain time(NULL) */
51
52 #ifndef CONF_FULL_STATIC
53 # include <netdb.h>
54 # define EVNT__RESOLVE_NAME(evnt, x) do { \
55 struct hostent *h = gethostbyname(x); \
56 \
57 if (h) \
58 memcpy(&EVNT_SA_IN4(evnt)->sin_addr.s_addr, \
59 h->h_addr_list[0], \
60 sizeof(EVNT_SA_IN4(evnt)->sin_addr.s_addr)); \
61 \
62 } while (FALSE)
63 #else
64 # define EVNT__RESOLVE_NAME(evnt, x) do { \
65 if ((EVNT_SA_IN4(evnt)->sin_addr.s_addr = inet_addr(x)) == INADDR_NONE) \
66 EVNT_SA_IN4(evnt)->sin_addr.s_addr = htonl(INADDR_ANY); \
67 } while (FALSE)
68 #endif
69
70 #if !defined(SO_DETACH_FILTER) || !defined(SO_ATTACH_FILTER)
71 # define CONF_USE_SOCKET_FILTERS FALSE
72 struct sock_fprog { int dummy; };
73 # define SO_DETACH_FILTER 0
74 # define SO_ATTACH_FILTER 0
75 #else
76 # define CONF_USE_SOCKET_FILTERS TRUE
77
78 /* not in glibc... hope it's not in *BSD etc. */
79 struct sock_filter
80 {
81 uint16_t code; /* Actual filter code */
82 uint8_t jt; /* Jump true */
83 uint8_t jf; /* Jump false */
84 uint32_t k; /* Generic multiuse field */
85 };
86
87 struct sock_fprog
88 {
89 unsigned short len; /* Number of filter blocks */
90 struct sock_filter *filter;
91 };
92 #endif
93
94 #include "vlg.h"
95
96 #define EX_UTILS_NO_USE_INIT 1
97 #define EX_UTILS_NO_USE_EXIT 1
98 #define EX_UTILS_NO_USE_LIMIT 1
99 #define EX_UTILS_NO_USE_BLOCK 1
100 #define EX_UTILS_NO_USE_PUT 1
101 #define EX_UTILS_NO_USE_OPEN 1
102 #define EX_UTILS_NO_USE_IO_FD 1
103 #include "ex_utils.h"
104
105 #include "evnt.h"
106
107 #include "mk.h"
108
109 #define CSTREQ(x,y) (!strcmp(x,y))
110
111 volatile sig_atomic_t evnt_child_exited = FALSE;
112
113 int evnt_opt_nagle = EVNT_CONF_NAGLE;
114
115 static struct Evnt *q_send_now = NULL; /* Try a send "now" */
116 static struct Evnt *q_closed = NULL; /* Close when fin. */
117
118 static struct Evnt *q_none = NULL; /* nothing */
119 static struct Evnt *q_accept = NULL; /* connections - recv */
120 static struct Evnt *q_connect = NULL; /* connections - send */
121 static struct Evnt *q_recv = NULL; /* recv */
122 static struct Evnt *q_send_recv = NULL; /* recv + send */
123
124 static Vlg *vlg = NULL;
125
126 static unsigned int evnt__num = 0;
127
128 /* this should be more configurable... */
129 static unsigned int evnt__accept_limit = 4;
130
131 static struct timeval evnt__tv[1];
132
133 static int evnt__is_child = FALSE;
134
135 #define EVNT__UPDATE_TV() gettimeofday(evnt__tv, NULL)
136 #define EVNT__COPY_TV(x) memcpy(x, evnt__tv, sizeof(struct timeval))
137
evnt_logger(Vlg * passed_vlg)138 void evnt_logger(Vlg *passed_vlg)
139 {
140 vlg = passed_vlg;
141 }
142
evnt_fd__set_nonblock(int fd,int val)143 void evnt_fd__set_nonblock(int fd, int val)
144 {
145 int flags = 0;
146
147 ASSERT(val == !!val);
148
149 if ((flags = fcntl(fd, F_GETFL)) == -1)
150 vlg_err(vlg, EXIT_FAILURE, "%s\n", __func__);
151
152 if (!!(flags & O_NONBLOCK) == val)
153 return;
154
155 if (val)
156 flags |= O_NONBLOCK;
157 else
158 flags &= ~O_NONBLOCK;
159
160 if (fcntl(fd, F_SETFL, flags) == -1)
161 vlg_err(vlg, EXIT_FAILURE, "%s\n", __func__);
162 }
163
evnt_add(struct Evnt ** que,struct Evnt * node)164 void evnt_add(struct Evnt **que, struct Evnt *node)
165 {
166 assert(node != *que);
167
168 if ((node->next = *que))
169 node->next->prev = node;
170
171 node->prev = NULL;
172 *que = node;
173 }
174
evnt_del(struct Evnt ** que,struct Evnt * node)175 void evnt_del(struct Evnt **que, struct Evnt *node)
176 {
177 if (node->prev)
178 node->prev->next = node->next;
179 else
180 {
181 assert(*que == node);
182 *que = node->next;
183 }
184
185 if (node->next)
186 node->next->prev = node->prev;
187 }
188
evnt__del_whatever(struct Evnt * evnt)189 static void evnt__del_whatever(struct Evnt *evnt)
190 {
191 if (0) { }
192 else if (evnt->flag_q_accept)
193 evnt_del(&q_accept, evnt);
194 else if (evnt->flag_q_connect)
195 evnt_del(&q_connect, evnt);
196 else if (evnt->flag_q_recv)
197 evnt_del(&q_recv, evnt);
198 else if (evnt->flag_q_send_recv)
199 evnt_del(&q_send_recv, evnt);
200 else if (evnt->flag_q_none)
201 evnt_del(&q_none, evnt);
202 else
203 ASSERT_NOT_REACHED();
204 }
205
EVNT__ATTR_USED()206 static void EVNT__ATTR_USED() evnt__add_whatever(struct Evnt *evnt)
207 {
208 if (0) { }
209 else if (evnt->flag_q_accept)
210 evnt_add(&q_accept, evnt);
211 else if (evnt->flag_q_connect)
212 evnt_add(&q_connect, evnt);
213 else if (evnt->flag_q_recv)
214 evnt_add(&q_recv, evnt);
215 else if (evnt->flag_q_send_recv)
216 evnt_add(&q_send_recv, evnt);
217 else if (evnt->flag_q_none)
218 evnt_add(&q_none, evnt);
219 else
220 ASSERT_NOT_REACHED();
221 }
222
evnt__debug_num_1(struct Evnt * scan)223 static unsigned int evnt__debug_num_1(struct Evnt *scan)
224 {
225 unsigned int num = 0;
226
227 while (scan)
228 {
229 struct Evnt *scan_next = scan->next;
230
231 ++num;
232
233 scan = scan_next;
234 }
235
236 return (num);
237 }
238
239 #ifndef VSTR_AUTOCONF_NDEBUG
evnt__srch(struct Evnt ** que,struct Evnt * evnt)240 static struct Evnt **evnt__srch(struct Evnt **que, struct Evnt *evnt)
241 {
242 struct Evnt **ret = que;
243
244 while (*ret)
245 {
246 if (*ret == evnt)
247 return (ret);
248
249 ret = &(*ret)->next;
250 }
251
252 return (NULL);
253 }
254
evnt__valid(struct Evnt * evnt)255 static int evnt__valid(struct Evnt *evnt)
256 {
257 int ret = 0;
258
259 ASSERT(evnt_num_all());
260
261 ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv +
262 evnt->flag_q_send_recv + evnt->flag_q_none) == 1);
263
264 if (evnt->flag_q_send_now)
265 {
266 struct Evnt **scan = &q_send_now;
267
268 while (*scan && (*scan != evnt))
269 scan = &(*scan)->s_next;
270 ASSERT(*scan);
271 }
272 else
273 {
274 struct Evnt **scan = &q_send_now;
275
276 while (*scan && (*scan != evnt))
277 scan = &(*scan)->s_next;
278 ASSERT(!*scan);
279 }
280
281 if (evnt->flag_q_closed)
282 {
283 struct Evnt **scan = &q_closed;
284
285 while (*scan && (*scan != evnt))
286 scan = &(*scan)->c_next;
287 ASSERT(*scan);
288 }
289 else
290 {
291 struct Evnt **scan = &q_closed;
292
293 while (*scan && (*scan != evnt))
294 scan = &(*scan)->c_next;
295 ASSERT(!*scan);
296 }
297
298 if (0) { }
299 else if (evnt->flag_q_accept)
300 {
301 ret = !!evnt__srch(&q_accept, evnt);
302 assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
303 assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
304 ASSERT(!evnt->io_r && !evnt->io_w);
305 }
306 else if (evnt->flag_q_connect)
307 {
308 ret = !!evnt__srch(&q_connect, evnt);
309 assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN));
310 assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
311 assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN));
312 }
313 else if (evnt->flag_q_send_recv)
314 {
315 ret = !!evnt__srch(&q_send_recv, evnt);
316 assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
317 }
318 else if (evnt->flag_q_recv)
319 {
320 ret = !!evnt__srch(&q_recv, evnt);
321 assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
322 assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
323 }
324 else if (evnt->flag_q_none)
325 {
326 ret = !!evnt__srch(&q_none, evnt);
327 assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN));
328 assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
329 assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN));
330 assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
331 }
332 else
333 ASSERT_NOT_REACHED();
334
335 return (ret);
336 }
337
evnt__debug_num_all(void)338 static unsigned int evnt__debug_num_all(void)
339 {
340 unsigned int num = 0;
341
342 num += evnt__debug_num_1(q_connect);
343 num += evnt__debug_num_1(q_accept);
344 num += evnt__debug_num_1(q_recv);
345 num += evnt__debug_num_1(q_send_recv);
346 num += evnt__debug_num_1(q_none);
347
348 return (num);
349 }
350 #endif
351
evnt_fd(struct Evnt * evnt)352 int evnt_fd(struct Evnt *evnt)
353 {
354 ASSERT(evnt__valid(evnt));
355 return (SOCKET_POLL_INDICATOR(evnt->ind)->fd);
356 }
357
evnt_cb_func_connect(struct Evnt * EVNT__ATTR_UNUSED (evnt))358 int evnt_cb_func_connect(struct Evnt *EVNT__ATTR_UNUSED(evnt))
359 {
360 return (TRUE);
361 }
362
evnt_cb_func_accept(struct Evnt * EVNT__ATTR_UNUSED (evnt),int EVNT__ATTR_UNUSED (fd),struct sockaddr * EVNT__ATTR_UNUSED (sa),socklen_t EVNT__ATTR_UNUSED (len))363 struct Evnt *evnt_cb_func_accept(struct Evnt *EVNT__ATTR_UNUSED(evnt),
364 int EVNT__ATTR_UNUSED(fd),
365 struct sockaddr *EVNT__ATTR_UNUSED(sa),
366 socklen_t EVNT__ATTR_UNUSED(len))
367 {
368 return (NULL);
369 }
370
evnt_cb_func_recv(struct Evnt * evnt)371 int evnt_cb_func_recv(struct Evnt *evnt)
372 {
373 unsigned int ern = 0;
374 int ret = evnt_recv(evnt, &ern);
375
376 if (ret)
377 return (TRUE);
378
379 if ((ern == VSTR_TYPE_SC_READ_FD_ERR_EOF) && evnt->io_w->len)
380 return (evnt_shutdown_r(evnt, TRUE));
381
382 return (FALSE);
383 }
384
evnt_cb_func_send(struct Evnt * evnt)385 int evnt_cb_func_send(struct Evnt *evnt)
386 {
387 int ret = -1;
388
389 evnt_fd_set_cork(evnt, TRUE);
390 ret = evnt_send(evnt);
391 if (!evnt->io_w->len)
392 evnt_fd_set_cork(evnt, FALSE);
393
394 return (ret);
395 }
396
evnt_cb_func_free(struct Evnt * evnt)397 void evnt_cb_func_free(struct Evnt *evnt)
398 {
399 MALLOC_CHECK_SCRUB_PTR(evnt, sizeof(struct Evnt));
400 free(evnt);
401 }
402
evnt_cb_func_F(struct Evnt * evnt)403 void evnt_cb_func_F(struct Evnt *evnt)
404 {
405 F(evnt);
406 }
407
evnt_cb_func_shutdown_r(struct Evnt * evnt)408 int evnt_cb_func_shutdown_r(struct Evnt *evnt)
409 {
410 vlg_dbg2(vlg, "SHUTDOWN CB from[$<sa:%p>]\n", EVNT_SA(evnt));
411
412 if (!evnt_shutdown_r(evnt, FALSE))
413 return (FALSE);
414
415 /* called from outside read, and read'll never get called again ...
416 * so quit if we have nothing to send */
417 return (!!evnt->io_w->len);
418 }
419
evnt_init(struct Evnt * evnt,int fd,Vstr_ref * ref)420 static int evnt_init(struct Evnt *evnt, int fd, Vstr_ref *ref)
421 {
422 ASSERT(ref);
423
424 evnt->flag_q_accept = FALSE;
425 evnt->flag_q_connect = FALSE;
426 evnt->flag_q_recv = FALSE;
427 evnt->flag_q_send_recv = FALSE;
428 evnt->flag_q_none = FALSE;
429
430 evnt->flag_q_send_now = FALSE;
431 evnt->flag_q_closed = FALSE;
432
433 evnt->flag_q_pkt_move = FALSE;
434
435 /* FIXME: need group settings, default no nagle but cork */
436 evnt->flag_io_nagle = evnt_opt_nagle;
437 evnt->flag_io_cork = FALSE;
438
439 evnt->flag_io_filter = FALSE;
440
441 evnt->flag_fully_acpt = FALSE;
442
443 evnt->io_r_shutdown = FALSE;
444 evnt->io_w_shutdown = FALSE;
445
446 evnt->prev_bytes_r = 0;
447
448 evnt->acct.req_put = 0;
449 evnt->acct.req_got = 0;
450 evnt->acct.bytes_r = 0;
451 evnt->acct.bytes_w = 0;
452
453 evnt->cbs->cb_func_accept = evnt_cb_func_accept;
454 evnt->cbs->cb_func_connect = evnt_cb_func_connect;
455 evnt->cbs->cb_func_recv = evnt_cb_func_recv;
456 evnt->cbs->cb_func_send = evnt_cb_func_send;
457 evnt->cbs->cb_func_free = evnt_cb_func_F;
458 evnt->cbs->cb_func_shutdown_r = evnt_cb_func_shutdown_r;
459
460 if (!(evnt->io_r = vstr_make_base(NULL)))
461 goto make_vstr_fail;
462
463 if (!(evnt->io_w = vstr_make_base(NULL)))
464 goto make_vstr_fail;
465
466 evnt->tm_o = NULL;
467
468 if (!(evnt->ind = evnt_poll_add(evnt, fd)))
469 goto poll_add_fail;
470
471 EVNT__UPDATE_TV();
472 EVNT__COPY_TV(&evnt->ctime);
473 EVNT__COPY_TV(&evnt->mtime);
474
475 evnt->msecs_tm_mtime = 0;
476
477 if (fcntl(fd, F_SETFD, TRUE) == -1)
478 goto fcntl_fail;
479
480 evnt->sa_ref = vstr_ref_add(ref);
481
482 evnt_fd__set_nonblock(fd, TRUE);
483
484 return (TRUE);
485
486 fcntl_fail:
487 evnt_poll_del(evnt);
488 poll_add_fail:
489 vstr_free_base(evnt->io_w);
490 make_vstr_fail:
491 vstr_free_base(evnt->io_r);
492
493 errno = ENOMEM;
494 return (FALSE);
495 }
496
evnt__free1(struct Evnt * evnt)497 static void evnt__free1(struct Evnt *evnt)
498 {
499 evnt_send_del(evnt);
500
501 if (evnt->io_r && evnt->io_r->len)
502 vlg_dbg2(vlg, "evnt__free1(%p) io_r len = %zu\n", evnt, evnt->io_r->len);
503 if (evnt->io_w && evnt->io_w->len)
504 vlg_dbg2(vlg, "evnt__free1(%p) io_w len = %zu\n", evnt, evnt->io_w->len);
505
506 vstr_free_base(evnt->io_w); evnt->io_w = NULL;
507 vstr_free_base(evnt->io_r); evnt->io_r = NULL;
508
509 evnt_poll_del(evnt);
510 }
511
evnt__free2(Vstr_ref * sa,Timer_q_node * tm_o)512 static void evnt__free2(Vstr_ref *sa, Timer_q_node *tm_o)
513 { /* post callbacks, evnt no longer exists */
514 vstr_ref_del(sa);
515
516 if (tm_o)
517 {
518 timer_q_cntl_node(tm_o, TIMER_Q_CNTL_NODE_SET_DATA, NULL);
519 timer_q_quick_del_node(tm_o);
520 }
521 }
522
evnt__free(struct Evnt * evnt)523 static void evnt__free(struct Evnt *evnt)
524 {
525 if (evnt)
526 {
527 Vstr_ref *sa = evnt->sa_ref;
528 Timer_q_node *tm_o = evnt->tm_o;
529
530 evnt__free1(evnt);
531
532 ASSERT(evnt__num >= 1); /* in case they come back in via. the cb */
533 --evnt__num;
534 ASSERT(evnt__num == evnt__debug_num_all());
535
536 evnt->cbs->cb_func_free(evnt);
537 evnt__free2(sa, tm_o);
538 }
539 }
540
evnt_free(struct Evnt * evnt)541 void evnt_free(struct Evnt *evnt)
542 {
543 if (evnt)
544 {
545 evnt__del_whatever(evnt);
546 evnt__free(evnt);
547 }
548 }
549
evnt__uninit(struct Evnt * evnt)550 static void evnt__uninit(struct Evnt *evnt)
551 {
552 ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv +
553 evnt->flag_q_send_recv + evnt->flag_q_none) == 0);
554
555 evnt__free1(evnt);
556 evnt__free2(evnt->sa_ref, evnt->tm_o);
557 }
558
evnt_fd__set_nodelay(int fd,int val)559 static int evnt_fd__set_nodelay(int fd, int val)
560 {
561 return (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != -1);
562 }
563
evnt_fd__set_reuse(int fd,int val)564 static int evnt_fd__set_reuse(int fd, int val)
565 {
566 return (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) != -1);
567 }
568
evnt__fd_close_noerrno(int fd)569 static void evnt__fd_close_noerrno(int fd)
570 {
571 int saved_errno = errno;
572 close(fd);
573 errno = saved_errno;
574 }
575
evnt__make_end(struct Evnt ** que,struct Evnt * evnt,int flags)576 static int evnt__make_end(struct Evnt **que, struct Evnt *evnt, int flags)
577 {
578 evnt_add(que, evnt);
579
580 ++evnt__num;
581
582 evnt_wait_cntl_add(evnt, flags);
583
584 ASSERT(evnt__valid(evnt));
585
586 return (TRUE);
587 }
588
evnt_make_con_ipv4(struct Evnt * evnt,const char * ipv4_string,short port)589 int evnt_make_con_ipv4(struct Evnt *evnt, const char *ipv4_string, short port)
590 {
591 int fd = -1;
592 socklen_t alloc_len = sizeof(struct sockaddr_in);
593 Vstr_ref *ref = NULL;
594
595 if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
596 goto sock_fail;
597
598 if (!(ref = vstr_ref_make_malloc(alloc_len)))
599 goto init_fail;
600 if (!evnt_init(evnt, fd, ref))
601 goto init_fail;
602 evnt->flag_q_pkt_move = TRUE;
603
604 if (!evnt->flag_io_nagle)
605 evnt_fd__set_nodelay(fd, TRUE);
606
607 EVNT_SA_IN4(evnt)->sin_family = AF_INET;
608 EVNT_SA_IN4(evnt)->sin_port = htons(port);
609 EVNT_SA_IN4(evnt)->sin_addr.s_addr = inet_addr(ipv4_string);
610
611 ASSERT(port && (EVNT_SA_IN4(evnt)->sin_addr.s_addr != htonl(INADDR_ANY)));
612
613 if (connect(fd, EVNT_SA(evnt), alloc_len) == -1)
614 {
615 if (errno == EINPROGRESS)
616 { /* The connection needs more time....*/
617 vstr_ref_del(ref);
618 evnt->flag_q_connect = TRUE;
619 return (evnt__make_end(&q_connect, evnt, POLLOUT));
620 }
621
622 goto connect_fail;
623 }
624
625 vstr_ref_del(ref);
626 evnt->flag_q_none = TRUE;
627 return (evnt__make_end(&q_none, evnt, 0));
628
629 connect_fail:
630 evnt__uninit(evnt);
631 init_fail:
632 vstr_ref_del(ref);
633 evnt__fd_close_noerrno(fd);
634 sock_fail:
635 return (FALSE);
636 }
637
evnt_make_con_local(struct Evnt * evnt,const char * fname)638 int evnt_make_con_local(struct Evnt *evnt, const char *fname)
639 {
640 int fd = -1;
641 size_t len = strlen(fname) + 1;
642 struct sockaddr_un tmp_sun;
643 socklen_t alloc_len = 0;
644 Vstr_ref *ref = NULL;
645
646 tmp_sun.sun_path[0] = 0;
647 alloc_len = SUN_LEN(&tmp_sun) + len;
648
649 if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1)
650 goto sock_fail;
651
652 if (!(ref = vstr_ref_make_malloc(alloc_len)))
653 goto init_fail;
654 if (!evnt_init(evnt, fd, ref))
655 goto init_fail;
656 evnt->flag_q_pkt_move = TRUE;
657
658 EVNT_SA_UN(evnt)->sun_family = AF_LOCAL;
659 memcpy(EVNT_SA_UN(evnt)->sun_path, fname, len);
660
661 if (connect(fd, EVNT_SA(evnt), alloc_len) == -1)
662 {
663 if (errno == EINPROGRESS)
664 { /* The connection needs more time....*/
665 vstr_ref_del(ref);
666 evnt->flag_q_connect = TRUE;
667 return (evnt__make_end(&q_connect, evnt, POLLOUT));
668 }
669
670 goto connect_fail;
671 }
672
673 vstr_ref_del(ref);
674 evnt->flag_q_none = TRUE;
675 return (evnt__make_end(&q_none, evnt, 0));
676
677 connect_fail:
678 evnt__uninit(evnt);
679 init_fail:
680 vstr_ref_del(ref);
681 evnt__fd_close_noerrno(fd);
682 sock_fail:
683 return (FALSE);
684 }
685
evnt_make_acpt_ref(struct Evnt * evnt,int fd,Vstr_ref * sa)686 int evnt_make_acpt_ref(struct Evnt *evnt, int fd, Vstr_ref *sa)
687 {
688 if (!evnt_init(evnt, fd, sa))
689 return (FALSE);
690
691 if (!evnt->flag_io_nagle)
692 evnt_fd__set_nodelay(fd, TRUE);
693
694 evnt->flag_q_recv = TRUE;
695 return (evnt__make_end(&q_recv, evnt, POLLIN));
696 }
697
evnt_make_acpt_dup(struct Evnt * evnt,int fd,struct sockaddr * sa,socklen_t len)698 int evnt_make_acpt_dup(struct Evnt *evnt, int fd,
699 struct sockaddr *sa, socklen_t len)
700 {
701 Vstr_ref *ref = vstr_ref_make_memdup(sa, len);
702 int ret = FALSE;
703
704 if (!ref)
705 {
706 errno = ENOMEM;
707 return (FALSE);
708 }
709
710 ret = evnt_make_acpt_ref(evnt, fd, ref);
711 vstr_ref_del(ref);
712 return (ret);
713 }
714
evnt__make_bind_end(struct Evnt * evnt)715 static int evnt__make_bind_end(struct Evnt *evnt)
716 {
717 vstr_free_base(evnt->io_r); evnt->io_r = NULL;
718 vstr_free_base(evnt->io_w); evnt->io_w = NULL;
719
720 evnt->flag_q_accept = TRUE;
721 evnt__make_end(&q_accept, evnt, POLLIN);
722 SOCKET_POLL_INDICATOR(evnt->ind)->revents = 0;
723 return (TRUE);
724 }
725
evnt_make_bind_ipv4(struct Evnt * evnt,const char * acpt_addr,short server_port,unsigned int listen_len)726 int evnt_make_bind_ipv4(struct Evnt *evnt,
727 const char *acpt_addr, short server_port,
728 unsigned int listen_len)
729 {
730 int fd = -1;
731 int saved_errno = 0;
732 socklen_t alloc_len = sizeof(struct sockaddr_in);
733 Vstr_ref *ref = NULL;
734
735 if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
736 goto sock_fail;
737
738 if (!(ref = vstr_ref_make_malloc(alloc_len)))
739 goto init_fail;
740 if (!evnt_init(evnt, fd, ref))
741 goto init_fail;
742
743 EVNT_SA_IN4(evnt)->sin_family = AF_INET;
744
745 EVNT_SA_IN4(evnt)->sin_addr.s_addr = htonl(INADDR_ANY);
746 if (acpt_addr && *acpt_addr) /* silent error becomes <any> */
747 EVNT__RESOLVE_NAME(evnt, acpt_addr);
748 if (EVNT_SA_IN4(evnt)->sin_addr.s_addr == htonl(INADDR_ANY))
749 acpt_addr = "any";
750
751 EVNT_SA_IN4(evnt)->sin_port = htons(server_port);
752
753 if (!evnt_fd__set_reuse(fd, TRUE))
754 goto reuse_fail;
755
756 if (bind(fd, EVNT_SA(evnt), alloc_len) == -1)
757 goto bind_fail;
758
759 if (!server_port)
760 if (getsockname(fd, EVNT_SA(evnt), &alloc_len) == -1)
761 vlg_err(vlg, EXIT_FAILURE, "getsockname: %m\n");
762
763 if (listen(fd, listen_len) == -1)
764 goto listen_fail;
765
766 vstr_ref_del(ref);
767 return (evnt__make_bind_end(evnt));
768
769 bind_fail:
770 saved_errno = errno;
771 vlg_warn(vlg, "bind(%s:%hd): %m\n", acpt_addr, server_port);
772 errno = saved_errno;
773 listen_fail:
774 reuse_fail:
775 evnt__uninit(evnt);
776 init_fail:
777 vstr_ref_del(ref);
778 evnt__fd_close_noerrno(fd);
779 sock_fail:
780 return (FALSE);
781 }
782
evnt_make_bind_local(struct Evnt * evnt,const char * fname,unsigned int listen_len)783 int evnt_make_bind_local(struct Evnt *evnt, const char *fname,
784 unsigned int listen_len)
785 {
786 int fd = -1;
787 int saved_errno = 0;
788 size_t len = strlen(fname) + 1;
789 struct sockaddr_un tmp_sun;
790 socklen_t alloc_len = 0;
791 Vstr_ref *ref = NULL;
792
793 tmp_sun.sun_path[0] = 0;
794 alloc_len = SUN_LEN(&tmp_sun) + len;
795
796 if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1)
797 goto sock_fail;
798
799 if (!(ref = vstr_ref_make_malloc(alloc_len)))
800 goto init_fail;
801 if (!evnt_init(evnt, fd, ref))
802 goto init_fail;
803
804 EVNT_SA_UN(evnt)->sun_family = AF_LOCAL;
805 memcpy(EVNT_SA_UN(evnt)->sun_path, fname, len);
806
807 unlink(fname);
808
809 if (bind(fd, EVNT_SA(evnt), alloc_len) == -1)
810 goto bind_fail;
811
812 if (fchmod(fd, 0600) == -1)
813 goto fchmod_fail;
814
815 if (listen(fd, listen_len) == -1)
816 goto listen_fail;
817
818 vstr_ref_del(ref);
819 return (evnt__make_bind_end(evnt));
820
821 bind_fail:
822 saved_errno = errno;
823 vlg_warn(vlg, "bind(%s): %m\n", fname);
824 errno = saved_errno;
825 fchmod_fail:
826 listen_fail:
827 evnt__uninit(evnt);
828 init_fail:
829 vstr_ref_del(ref);
830 evnt__fd_close_noerrno(fd);
831 sock_fail:
832 return (FALSE);
833 }
834
evnt_make_custom(struct Evnt * evnt,int fd,Vstr_ref * sa,int flags)835 int evnt_make_custom(struct Evnt *evnt, int fd, Vstr_ref *sa, int flags)
836 {
837 static Vstr_ref dummy_sa = {vstr_ref_cb_free_nothing, NULL, 1};
838
839 if (!sa)
840 sa = &dummy_sa;
841
842 if (!evnt_init(evnt, fd, sa))
843 {
844 evnt__fd_close_noerrno(fd);
845 return (FALSE);
846 }
847
848 if (flags & POLLIN)
849 {
850 evnt->flag_q_recv = TRUE;
851 return (evnt__make_end(&q_recv, evnt, POLLIN));
852 }
853
854 evnt->flag_q_none = TRUE;
855 return (evnt__make_end(&q_none, evnt, 0));
856 }
857
evnt_close(struct Evnt * evnt)858 void evnt_close(struct Evnt *evnt)
859 {
860 if (!evnt)
861 return;
862
863 ASSERT(evnt__valid(evnt));
864
865 if (evnt->flag_q_closed)
866 return;
867
868 /* can't close at this point or we'll race with:
869 * socket_poll_add()/socket_poll_del() when using _DIRECT mapping */
870
871 /* queue for deletion ... as the bt might still be using the ptr */
872 evnt->flag_q_closed = TRUE;
873 evnt->c_next = q_closed;
874 q_closed = evnt;
875
876 ASSERT(evnt__valid(evnt));
877 }
878
evnt_put_pkt(struct Evnt * evnt)879 void evnt_put_pkt(struct Evnt *evnt)
880 {
881 ASSERT(evnt__valid(evnt));
882
883 if (evnt->flag_q_pkt_move && evnt->flag_q_none)
884 {
885 ASSERT(evnt->acct.req_put >= evnt->acct.req_got);
886 evnt_del(&q_none, evnt); evnt->flag_q_none = FALSE;
887 evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
888 evnt_wait_cntl_add(evnt, POLLIN);
889 }
890
891 ++evnt->acct.req_put;
892
893 ASSERT(evnt__valid(evnt));
894 }
895
evnt_got_pkt(struct Evnt * evnt)896 void evnt_got_pkt(struct Evnt *evnt)
897 {
898 ASSERT(evnt__valid(evnt));
899
900 ++evnt->acct.req_got;
901
902 if (evnt->flag_q_pkt_move &&
903 !evnt->flag_q_send_recv && (evnt->acct.req_put == evnt->acct.req_got))
904 {
905 ASSERT(evnt->acct.req_put >= evnt->acct.req_got);
906 evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE;
907 evnt_add(&q_none, evnt), evnt->flag_q_none = TRUE;
908 evnt_wait_cntl_del(evnt, POLLIN);
909 }
910
911 ASSERT(evnt__valid(evnt));
912 }
913
evnt__call_send(struct Evnt * evnt,unsigned int * ern)914 static int evnt__call_send(struct Evnt *evnt, unsigned int *ern)
915 {
916 size_t tmp = evnt->io_w->len;
917 int fd = evnt_fd(evnt);
918
919 if (!vstr_sc_write_fd(evnt->io_w, 1, tmp, fd, ern) && (errno != EAGAIN))
920 return (FALSE);
921
922 tmp -= evnt->io_w->len;
923 vlg_dbg3(vlg, "write(%p) = %zu\n", evnt, tmp);
924
925 evnt->acct.bytes_w += tmp;
926
927 return (TRUE);
928 }
929
evnt_send_add(struct Evnt * evnt,int force_q,size_t max_sz)930 int evnt_send_add(struct Evnt *evnt, int force_q, size_t max_sz)
931 {
932 ASSERT(evnt__valid(evnt));
933
934 vlg_dbg3(vlg, "q now = %u, q send recv = %u, force = %u\n",
935 evnt->flag_q_send_now, evnt->flag_q_send_recv, force_q);
936
937 if (!evnt->flag_q_send_recv && (evnt->io_w->len > max_sz))
938 {
939 if (!evnt__call_send(evnt, NULL))
940 {
941 ASSERT(evnt__valid(evnt));
942 return (FALSE);
943 }
944 if (!evnt->io_w->len && !force_q)
945 {
946 ASSERT(evnt__valid(evnt));
947 return (TRUE);
948 }
949 }
950
951 /* already on send_q -- or already polling (and not forcing) */
952 if (evnt->flag_q_send_now || (evnt->flag_q_send_recv && !force_q))
953 {
954 ASSERT(evnt__valid(evnt));
955 return (TRUE);
956 }
957
958 evnt->s_next = q_send_now;
959 q_send_now = evnt;
960 evnt->flag_q_send_now = TRUE;
961
962 ASSERT(evnt__valid(evnt));
963
964 return (TRUE);
965 }
966
967 /* if a connection is on the send now q, then remove them ... this is only
968 * done when the client gets killed, so it doesn't matter if it's slow */
evnt_send_del(struct Evnt * evnt)969 void evnt_send_del(struct Evnt *evnt)
970 {
971 struct Evnt **scan = &q_send_now;
972
973 if (!evnt->flag_q_send_now)
974 return;
975
976 while (*scan && (*scan != evnt))
977 scan = &(*scan)->s_next;
978
979 ASSERT(*scan);
980
981 *scan = evnt->s_next;
982
983 evnt->flag_q_send_now = FALSE;
984 }
985
evnt_shutdown_r(struct Evnt * evnt,int got_eof)986 int evnt_shutdown_r(struct Evnt *evnt, int got_eof)
987 {
988 ASSERT(evnt__valid(evnt));
989
990 if (evnt->io_r_shutdown || evnt->io_w_shutdown)
991 return (FALSE);
992
993 evnt_wait_cntl_del(evnt, POLLIN);
994
995 vlg_dbg2(vlg, "shutdown(SHUT_RD, %d) from[$<sa:%p>]\n",
996 got_eof, EVNT_SA(evnt));
997
998 if (!got_eof && (shutdown(evnt_fd(evnt), SHUT_RD) == -1))
999 {
1000 if (errno != ENOTCONN)
1001 vlg_warn(vlg, "shutdown(SHUT_RD): %m\n");
1002 return (FALSE);
1003 }
1004
1005 evnt->io_r_shutdown = TRUE;
1006
1007 ASSERT(evnt__valid(evnt));
1008
1009 return (TRUE);
1010 }
1011
evnt__send_fin(struct Evnt * evnt)1012 static void evnt__send_fin(struct Evnt *evnt)
1013 {
1014 if (0)
1015 { /* nothing */ }
1016 else if ( evnt->flag_q_send_recv && !evnt->io_w->len)
1017 {
1018 evnt_del(&q_send_recv, evnt); evnt->flag_q_send_recv = FALSE;
1019 if (evnt->flag_q_pkt_move && (evnt->acct.req_put == evnt->acct.req_got))
1020 {
1021 evnt_add(&q_none, evnt); evnt->flag_q_none = TRUE;
1022 evnt_wait_cntl_del(evnt, POLLIN | POLLOUT);
1023 }
1024 else
1025 {
1026 evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
1027 evnt_wait_cntl_del(evnt, POLLOUT);
1028 }
1029 }
1030 else if (!evnt->flag_q_send_recv && evnt->io_w->len)
1031 {
1032 int pflags = POLLOUT;
1033 ASSERT(evnt->flag_q_none || evnt->flag_q_recv);
1034 if (evnt->flag_q_none)
1035 evnt_del(&q_none, evnt), evnt->flag_q_none = FALSE;
1036 else
1037 evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE;
1038 evnt_add(&q_send_recv, evnt); evnt->flag_q_send_recv = TRUE;
1039 if (!evnt->io_r_shutdown) pflags |= POLLIN;
1040 evnt_wait_cntl_add(evnt, pflags);
1041 }
1042 }
1043
evnt_shutdown_w(struct Evnt * evnt)1044 int evnt_shutdown_w(struct Evnt *evnt)
1045 {
1046 Vstr_base *out = evnt->io_w;
1047
1048 ASSERT(evnt__valid(evnt));
1049
1050 vlg_dbg2(vlg, "shutdown(SHUT_WR) from[$<sa:%p>]\n", EVNT_SA(evnt));
1051
1052 /* evnt_fd_set_cork(evnt, FALSE); eats data in 2.4.22-1.2199.4.legacy.npt */
1053
1054 if (evnt->io_r_shutdown || evnt->io_w_shutdown)
1055 return (FALSE);
1056
1057 if (shutdown(evnt_fd(evnt), SHUT_WR) == -1)
1058 {
1059 if (errno != ENOTCONN)
1060 vlg_warn(vlg, "shutdown(SHUT_WR): %m\n");
1061 return (FALSE);
1062 }
1063 evnt->io_w_shutdown = TRUE;
1064
1065 vstr_del(out, 1, out->len);
1066
1067 evnt__send_fin(evnt);
1068
1069 ASSERT(evnt__valid(evnt));
1070
1071 return (TRUE);
1072 }
1073
evnt_recv(struct Evnt * evnt,unsigned int * ern)1074 int evnt_recv(struct Evnt *evnt, unsigned int *ern)
1075 {
1076 Vstr_base *data = evnt->io_r;
1077 size_t tmp = evnt->io_r->len;
1078 unsigned int num_min = 2;
1079 unsigned int num_max = 6; /* ave. browser reqs are 500ish, ab is much less */
1080
1081 ASSERT(evnt__valid(evnt) && ern);
1082
1083 /* FIXME: this is set for HTTPD's default buf sizes of 120 (128 - 8) */
1084 if (evnt->prev_bytes_r > (120 * 3))
1085 num_max = 8;
1086 if (evnt->prev_bytes_r > (120 * 6))
1087 num_max = 32;
1088
1089 vstr_sc_read_iov_fd(data, data->len, evnt_fd(evnt), num_min, num_max, ern);
1090 evnt->prev_bytes_r = (evnt->io_r->len - tmp);
1091
1092 vlg_dbg3(vlg, "read(%p) = %ju\n", evnt, evnt->prev_bytes_r);
1093 evnt->acct.bytes_r += evnt->prev_bytes_r;
1094
1095 switch (*ern)
1096 {
1097 case VSTR_TYPE_SC_READ_FD_ERR_NONE:
1098 if (evnt->io_w_shutdown) /* doesn't count if we can't respond */
1099 vstr_del(data, 1, data->len);
1100 else
1101 EVNT__COPY_TV(&evnt->mtime);
1102 return (TRUE);
1103
1104 case VSTR_TYPE_SC_READ_FD_ERR_MEM:
1105 vlg_warn(vlg, "%s\n", __func__);
1106 break;
1107
1108 case VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO:
1109 if (errno == EAGAIN)
1110 return (TRUE);
1111 break;
1112
1113 case VSTR_TYPE_SC_READ_FD_ERR_EOF:
1114 break;
1115
1116 default: /* unknown */
1117 vlg_warn(vlg, "read_iov() = %d: %m\n", *ern);
1118 }
1119
1120 return (FALSE);
1121 }
1122
evnt_send(struct Evnt * evnt)1123 int evnt_send(struct Evnt *evnt)
1124 {
1125 unsigned int ern = 0;
1126
1127 ASSERT(evnt__valid(evnt));
1128
1129 if (!evnt__call_send(evnt, &ern))
1130 return (FALSE);
1131
1132 EVNT__COPY_TV(&evnt->mtime);
1133
1134 evnt__send_fin(evnt);
1135
1136 ASSERT(evnt__valid(evnt));
1137
1138 return (TRUE);
1139 }
1140
1141 #ifdef VSTR_AUTOCONF_lseek64 /* lseek64 doesn't exist */
1142 # define sendfile64 sendfile
1143 #endif
1144
evnt_sendfile(struct Evnt * evnt,int ffd,VSTR_AUTOCONF_uintmax_t * f_off,VSTR_AUTOCONF_uintmax_t * f_len,unsigned int * ern)1145 int evnt_sendfile(struct Evnt *evnt, int ffd, VSTR_AUTOCONF_uintmax_t *f_off,
1146 VSTR_AUTOCONF_uintmax_t *f_len, unsigned int *ern)
1147 {
1148 ssize_t ret = 0;
1149 off64_t tmp_off = *f_off;
1150 size_t tmp_len = *f_len;
1151
1152 *ern = 0;
1153
1154 ASSERT(evnt__valid(evnt));
1155
1156 ASSERT(!evnt->io_w->len);
1157
1158 if (*f_len > SSIZE_MAX)
1159 tmp_len = SSIZE_MAX;
1160
1161 if ((ret = sendfile64(evnt_fd(evnt), ffd, &tmp_off, tmp_len)) == -1)
1162 {
1163 if (errno == EAGAIN)
1164 return (TRUE);
1165
1166 *ern = VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO;
1167 return (FALSE);
1168 }
1169
1170 if (!ret)
1171 {
1172 *ern = VSTR_TYPE_SC_READ_FD_ERR_EOF;
1173 return (FALSE);
1174 }
1175
1176 *f_off = tmp_off;
1177
1178 evnt->acct.bytes_w += ret;
1179 EVNT__COPY_TV(&evnt->mtime);
1180
1181 *f_len -= ret;
1182
1183 return (TRUE);
1184 }
1185
evnt_sc_read_send(struct Evnt * evnt,int fd,VSTR_AUTOCONF_uintmax_t * len)1186 int evnt_sc_read_send(struct Evnt *evnt, int fd, VSTR_AUTOCONF_uintmax_t *len)
1187 {
1188 Vstr_base *out = evnt->io_w;
1189 size_t orig_len = out->len;
1190 size_t tmp = 0;
1191 int ret = IO_OK;
1192
1193 ASSERT(len && *len);
1194
1195 if ((ret = io_get(out, fd)) == IO_FAIL)
1196 return (EVNT_IO_READ_ERR);
1197
1198 if (ret == IO_EOF)
1199 return (EVNT_IO_READ_EOF);
1200
1201 tmp = out->len - orig_len;
1202
1203 if (tmp >= *len)
1204 { /* we might not be transfering to EOF, so reduce if needed */
1205 vstr_sc_reduce(out, 1, out->len, tmp - *len);
1206 ASSERT((out->len - orig_len) == *len);
1207 *len = 0;
1208 return (EVNT_IO_READ_FIN);
1209 }
1210
1211 *len -= tmp;
1212
1213 if (!evnt_send(evnt))
1214 return (EVNT_IO_SEND_ERR);
1215
1216 return (EVNT_IO_OK);
1217 }
1218
evnt__get_timeout(void)1219 static int evnt__get_timeout(void)
1220 {
1221 const struct timeval *tv = NULL;
1222 int msecs = -1;
1223
1224 if (q_send_now)
1225 msecs = 0;
1226 else if ((tv = timer_q_first_timeval()))
1227 {
1228 long diff = 0;
1229 struct timeval now_timeval;
1230
1231 EVNT__COPY_TV(&now_timeval);
1232
1233 diff = timer_q_timeval_diff_msecs(tv, &now_timeval);
1234
1235 if (diff > 0)
1236 {
1237 if (diff >= INT_MAX)
1238 msecs = INT_MAX - 1;
1239 else
1240 msecs = diff;
1241 }
1242 else
1243 msecs = 0;
1244 }
1245
1246 vlg_dbg2(vlg, "get_timeout = %d\n", msecs);
1247
1248 return (msecs);
1249 }
1250
evnt_scan_q_close(void)1251 void evnt_scan_q_close(void)
1252 {
1253 struct Evnt *scan = NULL;
1254
1255 while ((scan = q_closed))
1256 {
1257 scan->flag_q_closed = FALSE;
1258 q_closed = scan->c_next;
1259
1260 evnt_free(scan);
1261 }
1262
1263 ASSERT(!q_closed);
1264 }
1265
evnt__close_now(struct Evnt * evnt)1266 static void evnt__close_now(struct Evnt *evnt)
1267 {
1268 if (evnt->flag_q_closed)
1269 return;
1270
1271 evnt_free(evnt);
1272 }
1273
1274 /* if something goes wrong drop all accept'ing events */
evnt_acpt_close_all(void)1275 void evnt_acpt_close_all(void)
1276 {
1277 struct Evnt *evnt = q_accept; /* struct Evnt *evnt = evnt_queue("accept"); */
1278
1279 while (evnt)
1280 {
1281 evnt_close(evnt);
1282
1283 evnt = evnt->next;
1284 }
1285 }
1286
evnt_scan_fds(unsigned int ready,size_t max_sz)1287 void evnt_scan_fds(unsigned int ready, size_t max_sz)
1288 {
1289 const int bad_poll_flags = (POLLERR | POLLHUP | POLLNVAL);
1290 struct Evnt *scan = NULL;
1291
1292 EVNT__UPDATE_TV();
1293
1294 scan = q_connect;
1295 while (scan && ready)
1296 {
1297 struct Evnt *scan_next = scan->next;
1298 int done = FALSE;
1299
1300 ASSERT(evnt__valid(scan));
1301
1302 if (scan->flag_q_closed)
1303 goto next_connect;
1304
1305 assert(!(SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN));
1306 /* done as one so we get error code */
1307 if (SOCKET_POLL_INDICATOR(scan->ind)->revents & (POLLOUT|bad_poll_flags))
1308 {
1309 int ern = 0;
1310 socklen_t len = sizeof(int);
1311 int ret = 0;
1312
1313 done = TRUE;
1314
1315 ret = getsockopt(evnt_fd(scan), SOL_SOCKET, SO_ERROR, &ern, &len);
1316 if (ret == -1)
1317 vlg_err(vlg, EXIT_FAILURE, "getsockopt(SO_ERROR): %m\n");
1318 else if (ern)
1319 {
1320 errno = ern;
1321 vlg_warn(vlg, "connect(): %m\n");
1322
1323 evnt__close_now(scan);
1324 }
1325 else
1326 {
1327 evnt_del(&q_connect, scan); scan->flag_q_connect = FALSE;
1328 evnt_add(&q_none, scan); scan->flag_q_none = TRUE;
1329 evnt_wait_cntl_del(scan, POLLOUT);
1330
1331 if (!scan->cbs->cb_func_connect(scan))
1332 evnt__close_now(scan);
1333 }
1334 goto next_connect;
1335 }
1336 ASSERT(!done);
1337 if (evnt_poll_direct_enabled()) break;
1338
1339 next_connect:
1340 SOCKET_POLL_INDICATOR(scan->ind)->revents = 0;
1341 if (done)
1342 --ready;
1343
1344 scan = scan_next;
1345 }
1346
1347 scan = q_accept;
1348 while (scan && ready)
1349 { /* Papers have suggested that preferring read over accept is better
1350 * -- edge triggering needs to requeue on non failure */
1351 struct Evnt *scan_next = scan->next;
1352 int done = FALSE;
1353
1354 ASSERT(evnt__valid(scan));
1355
1356 if (scan->flag_q_closed)
1357 goto next_accept;
1358
1359 assert(!(SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLOUT));
1360 if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags))
1361 { /* done first as it's an error with the accept fd, whereas accept
1362 * generates new fds */
1363 done = TRUE;
1364 evnt__close_now(scan);
1365 goto next_accept;
1366 }
1367
1368 if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN)
1369 {
1370 struct sockaddr_in sa;
1371 socklen_t len = sizeof(struct sockaddr_in);
1372 int fd = -1;
1373 struct Evnt *tmp = NULL;
1374 unsigned int acpt_num = 0;
1375
1376 done = TRUE;
1377
1378 /* ignore all accept() errors -- bad_poll_flags fixes here */
1379 /* FIXME: apache seems to assume certain errors are really bad and we
1380 * should just kill the listen socket and wait to die. But for instance.
1381 * we can't just kill the socket on EMFILE, as we might have hit our
1382 * resource limit */
1383 while ((SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN) &&
1384 (fd = accept(evnt_fd(scan), (struct sockaddr *) &sa, &len)) != -1)
1385 {
1386 if (!(tmp = scan->cbs->cb_func_accept(scan, fd,
1387 (struct sockaddr *) &sa, len)))
1388 {
1389 close(fd);
1390 goto next_accept;
1391 }
1392
1393 if (!tmp->flag_q_closed)
1394 {
1395 ++ready; /* give a read event to this new event */
1396 tmp->flag_fully_acpt = TRUE;
1397 }
1398 assert(SOCKET_POLL_INDICATOR(tmp->ind)->events == POLLIN);
1399 assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN);
1400 assert(tmp == q_recv);
1401
1402 if (++acpt_num >= evnt__accept_limit)
1403 break;
1404 }
1405
1406 goto next_accept;
1407 }
1408 ASSERT(!done);
1409 if (evnt_poll_direct_enabled()) break;
1410
1411 next_accept:
1412 SOCKET_POLL_INDICATOR(scan->ind)->revents = 0;
1413 if (done)
1414 --ready;
1415
1416 scan = scan_next;
1417 }
1418
1419 scan = q_recv;
1420 while (scan && ready)
1421 {
1422 struct Evnt *scan_next = scan->next;
1423 int done = FALSE;
1424
1425 ASSERT(evnt__valid(scan));
1426
1427 if (scan->flag_q_closed)
1428 goto next_recv;
1429
1430 if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN)
1431 {
1432 done = TRUE;
1433 if (!scan->cbs->cb_func_recv(scan))
1434 evnt__close_now(scan);
1435 }
1436
1437 if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags))
1438 {
1439 done = TRUE;
1440 if (scan->io_r_shutdown || scan->io_w_shutdown ||
1441 !scan->cbs->cb_func_shutdown_r(scan))
1442 evnt__close_now(scan);
1443 }
1444
1445 if (!done && evnt_poll_direct_enabled()) break;
1446
1447 next_recv:
1448 if (done)
1449 --ready;
1450
1451 scan = scan_next;
1452 }
1453
1454 scan = q_send_recv;
1455 while (scan && ready)
1456 {
1457 struct Evnt *scan_next = scan->next;
1458 int done = FALSE;
1459
1460 ASSERT(evnt__valid(scan));
1461
1462 if (scan->flag_q_closed)
1463 goto next_send;
1464
1465 if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN)
1466 {
1467 done = TRUE;
1468 if (!scan->cbs->cb_func_recv(scan))
1469 {
1470 evnt__close_now(scan);
1471 goto next_send;
1472 }
1473 }
1474
1475 if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLOUT)
1476 {
1477 done = TRUE; /* need groups so we can do direct send here */
1478 if (!evnt_send_add(scan, TRUE, max_sz))
1479 evnt__close_now(scan);
1480 }
1481
1482 if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags))
1483 {
1484 done = TRUE;
1485 if (scan->io_r_shutdown || !scan->cbs->cb_func_shutdown_r(scan))
1486 evnt__close_now(scan);
1487 }
1488
1489 if (!done && evnt_poll_direct_enabled()) break;
1490
1491 next_send:
1492 if (done)
1493 --ready;
1494
1495 scan = scan_next;
1496 }
1497
1498 scan = q_none;
1499 while (scan && ready)
1500 {
1501 struct Evnt *scan_next = scan->next;
1502 int done = FALSE;
1503
1504 ASSERT(evnt__valid(scan));
1505
1506 if (scan->flag_q_closed)
1507 goto next_none;
1508
1509 if (SOCKET_POLL_INDICATOR(scan->ind)->revents)
1510 { /* POLLIN == EOF ? */
1511 /* FIXME: failure cb */
1512 done = TRUE;
1513
1514 evnt__close_now(scan);
1515 goto next_none;
1516 }
1517 else
1518 assert(!SOCKET_POLL_INDICATOR(scan->ind)->revents);
1519
1520 ASSERT(!done);
1521 if (evnt_poll_direct_enabled()) break;
1522
1523 next_none:
1524 if (done)
1525 --ready;
1526
1527 scan = scan_next;
1528 }
1529
1530 if (q_closed)
1531 evnt_scan_q_close();
1532 else if (ready)
1533 vlg_abort(vlg, "ready = %d\n", ready);
1534 }
1535
evnt_scan_send_fds(void)1536 void evnt_scan_send_fds(void)
1537 {
1538 struct Evnt **scan = NULL;
1539
1540 evnt_scan_q_close();
1541
1542 scan = &q_send_now;
1543 while (*scan)
1544 {
1545 struct Evnt *tmp = *scan;
1546
1547 tmp->flag_q_send_now = FALSE;
1548 *scan = tmp->s_next;
1549 if (!tmp->cbs->cb_func_send(tmp))
1550 {
1551 evnt__close_now(tmp);
1552 continue;
1553 }
1554 if (tmp == *scan) /* added back to q */
1555 {
1556 ASSERT(tmp->flag_q_send_now == TRUE);
1557 scan = &tmp->s_next;
1558 }
1559 }
1560
1561 evnt_scan_q_close();
1562 }
1563
evnt__close_1(struct Evnt ** root)1564 static void evnt__close_1(struct Evnt **root)
1565 {
1566 struct Evnt *scan = *root;
1567
1568 *root = NULL;
1569
1570 while (scan)
1571 {
1572 struct Evnt *scan_next = scan->next;
1573 Vstr_ref *sa = scan->sa_ref;
1574 Timer_q_node *tm_o = scan->tm_o;
1575
1576 vstr_free_base(scan->io_w); scan->io_w = NULL;
1577 vstr_free_base(scan->io_r); scan->io_r = NULL;
1578
1579 evnt_poll_del(scan);
1580
1581 --evnt__num;
1582 evnt__free2(sa, tm_o);
1583 scan->cbs->cb_func_free(scan);
1584
1585 scan = scan_next;
1586 }
1587 }
1588
evnt_close_all(void)1589 void evnt_close_all(void)
1590 {
1591 q_send_now = NULL;
1592 q_closed = NULL;
1593
1594 evnt__close_1(&q_connect);
1595 evnt__close_1(&q_accept);
1596 evnt__close_1(&q_recv);
1597 evnt__close_1(&q_send_recv);
1598 evnt__close_1(&q_none);
1599 ASSERT(evnt__num == evnt__debug_num_all());
1600 }
1601
evnt_out_dbg3(const char * prefix)1602 void evnt_out_dbg3(const char *prefix)
1603 {
1604 if (vlg->out_dbg < 3)
1605 return;
1606
1607 vlg_dbg3(vlg, "%s T=%u c=%u a=%u r=%u s=%u n=%u [SN=%u]\n",
1608 prefix, evnt_num_all(),
1609 evnt__debug_num_1(q_connect),
1610 evnt__debug_num_1(q_accept),
1611 evnt__debug_num_1(q_recv),
1612 evnt__debug_num_1(q_send_recv),
1613 evnt__debug_num_1(q_none),
1614 evnt__debug_num_1(q_send_now));
1615 }
1616
evnt_stats_add(struct Evnt * dst,const struct Evnt * src)1617 void evnt_stats_add(struct Evnt *dst, const struct Evnt *src)
1618 {
1619 dst->acct.req_put += src->acct.req_put;
1620 dst->acct.req_put += src->acct.req_got;
1621
1622 dst->acct.bytes_r += src->acct.bytes_r;
1623 dst->acct.bytes_w += src->acct.bytes_w;
1624 }
1625
evnt_num_all(void)1626 unsigned int evnt_num_all(void)
1627 {
1628 ASSERT(evnt__num == evnt__debug_num_all());
1629 return (evnt__num);
1630 }
1631
evnt_waiting(void)1632 int evnt_waiting(void)
1633 {
1634 return (q_connect || q_accept || q_recv || q_send_recv || q_send_now);
1635 }
1636
evnt_find_least_used(void)1637 struct Evnt *evnt_find_least_used(void)
1638 {
1639 struct Evnt *con = NULL;
1640 struct Evnt *con_min = NULL;
1641
1642 /* Find a usable connection, tries to find the least used connection
1643 * preferring ones not blocking on send IO */
1644 if (!(con = q_none) &&
1645 !(con = q_recv) &&
1646 !(con = q_send_recv))
1647 return (NULL);
1648
1649 /* FIXME: not optimal, only want to change after a certain level */
1650 con_min = con;
1651 while (con)
1652 {
1653 if (con_min->io_w->len > con->io_w->len)
1654 con_min = con;
1655 con = con->next;
1656 }
1657
1658 return (con_min);
1659 }
1660
1661 #define MATCH_Q_NAME(x) \
1662 if (CSTREQ(qname, #x )) \
1663 return ( q_ ## x ) \
1664
evnt_queue(const char * qname)1665 struct Evnt *evnt_queue(const char *qname)
1666 {
1667 MATCH_Q_NAME(connect);
1668 MATCH_Q_NAME(accept);
1669 MATCH_Q_NAME(send_recv);
1670 MATCH_Q_NAME(recv);
1671 MATCH_Q_NAME(none);
1672 MATCH_Q_NAME(send_now);
1673
1674 return (NULL);
1675 }
1676
1677 #ifdef VSTR_AUTOCONF_HAVE_TCP_CORK
1678 # define USE_TCP_CORK 1
1679 #else
1680 # define USE_TCP_CORK 0
1681 # define TCP_CORK 0
1682 #endif
1683
evnt_fd_set_cork(struct Evnt * evnt,int val)1684 void evnt_fd_set_cork(struct Evnt *evnt, int val)
1685 { /* assume it can't work for set and fail for unset */
1686 ASSERT(evnt__valid(evnt));
1687
1688 if (!USE_TCP_CORK)
1689 return;
1690
1691 if (!evnt->flag_io_cork == !val)
1692 return;
1693
1694 if (!evnt->flag_io_nagle) /* flags can't be combined ... stupid */
1695 {
1696 evnt_fd__set_nodelay(evnt_fd(evnt), FALSE);
1697 evnt->flag_io_nagle = TRUE;
1698 }
1699
1700 if (setsockopt(evnt_fd(evnt), IPPROTO_TCP, TCP_CORK, &val, sizeof(val)) == -1)
1701 return;
1702
1703 evnt->flag_io_cork = !!val;
1704 }
1705
evnt__free_base_noerrno(Vstr_base * s1)1706 static void evnt__free_base_noerrno(Vstr_base *s1)
1707 {
1708 int saved_errno = errno;
1709 vstr_free_base(s1);
1710 errno = saved_errno;
1711 }
1712
evnt_fd_set_filter(struct Evnt * evnt,const char * fname)1713 int evnt_fd_set_filter(struct Evnt *evnt, const char *fname)
1714 {
1715 int fd = evnt_fd(evnt);
1716 Vstr_base *s1 = NULL;
1717 unsigned int ern = 0;
1718
1719 if (!CONF_USE_SOCKET_FILTERS)
1720 return (TRUE);
1721
1722 if (!(s1 = vstr_make_base(NULL)))
1723 VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach0: %m\n"));
1724
1725 vstr_sc_read_len_file(s1, 0, fname, 0, 0, &ern);
1726
1727 if (ern &&
1728 ((ern != VSTR_TYPE_SC_READ_FILE_ERR_OPEN_ERRNO) || (errno != ENOENT)))
1729 {
1730 evnt__free_base_noerrno(s1);
1731 VLG_WARN_RET(FALSE, (vlg, "filter_attach1(%s): %m\n", fname));
1732 }
1733 else if ((s1->len / sizeof(struct sock_filter)) > USHRT_MAX)
1734 {
1735 vstr_free_base(s1);
1736 errno = E2BIG;
1737 VLG_WARN_RET(FALSE, (vlg, "filter_attach2(%s): %m\n", fname));
1738 }
1739 else if (!s1->len)
1740 {
1741 vstr_free_base(s1);
1742 if (!evnt->flag_io_filter)
1743 vlg_warn(vlg, "filter_attach3(%s): Empty file\n", fname);
1744 else if (setsockopt(fd, SOL_SOCKET, SO_DETACH_FILTER, NULL, 0) == -1)
1745 {
1746 evnt__free_base_noerrno(s1);
1747 VLG_WARN_RET(FALSE, (vlg, "setsockopt(SOCKET, DETACH_FILTER, %s): %m\n",
1748 fname));
1749 }
1750
1751 evnt->flag_io_filter = FALSE;
1752 return (TRUE);
1753 }
1754 else
1755 {
1756 struct sock_fprog filter[1];
1757 socklen_t len = sizeof(filter);
1758
1759 filter->len = s1->len / sizeof(struct sock_filter);
1760 filter->filter = (void *)vstr_export_cstr_ptr(s1, 1, s1->len);
1761
1762 if (!filter->filter)
1763 VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach4: %m\n"));
1764
1765 if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_FILTER, filter, len) == -1)
1766 {
1767 evnt__free_base_noerrno(s1);
1768 VLG_WARN_RET(FALSE, (vlg, "setsockopt(SOCKET, ATTACH_FILTER, %s): %m\n",
1769 fname));
1770 }
1771 }
1772
1773 evnt->flag_io_filter = TRUE;
1774
1775 vstr_free_base(s1);
1776
1777 return (TRUE);
1778 }
1779
1780 static Timer_q_base *evnt__timeout_1 = NULL;
1781 static Timer_q_base *evnt__timeout_10 = NULL;
1782 static Timer_q_base *evnt__timeout_100 = NULL;
1783
evnt__timeout_mtime_make(struct Evnt * evnt,struct timeval * tv,unsigned long msecs)1784 static Timer_q_node *evnt__timeout_mtime_make(struct Evnt *evnt,
1785 struct timeval *tv,
1786 unsigned long msecs)
1787 {
1788 Timer_q_node *tm_o = NULL;
1789
1790 vlg_dbg2(vlg, "mtime_make(%p, %lu)\n", evnt, msecs);
1791
1792 if (0) { }
1793 else if (msecs >= ( 99 * 1000))
1794 {
1795 TIMER_Q_TIMEVAL_ADD_SECS(tv, 100, 0);
1796 tm_o = timer_q_add_node(evnt__timeout_100, evnt, tv,
1797 TIMER_Q_FLAG_NODE_DEFAULT);
1798 }
1799 else if (msecs >= ( 9 * 1000))
1800 {
1801 TIMER_Q_TIMEVAL_ADD_SECS(tv, 10, 0);
1802 tm_o = timer_q_add_node(evnt__timeout_10, evnt, tv,
1803 TIMER_Q_FLAG_NODE_DEFAULT);
1804 }
1805 else
1806 {
1807 TIMER_Q_TIMEVAL_ADD_SECS(tv, 1, 0);
1808 tm_o = timer_q_add_node(evnt__timeout_1, evnt, tv,
1809 TIMER_Q_FLAG_NODE_DEFAULT);
1810 }
1811
1812 return (tm_o);
1813 }
1814
evnt__timer_cb_mtime(int type,void * data)1815 static void evnt__timer_cb_mtime(int type, void *data)
1816 {
1817 struct Evnt *evnt = data;
1818 struct timeval tv[1];
1819 unsigned long diff = 0;
1820
1821 if (!evnt) /* deleted */
1822 return;
1823
1824 ASSERT(evnt__valid(evnt));
1825
1826 if (type == TIMER_Q_TYPE_CALL_RUN_ALL)
1827 return;
1828
1829 evnt->tm_o = NULL;
1830
1831 if (type == TIMER_Q_TYPE_CALL_DEL)
1832 return;
1833
1834 EVNT__COPY_TV(tv);
1835
1836 /* find out time elapsed */
1837 diff = timer_q_timeval_udiff_msecs(tv, &evnt->mtime);
1838 if (diff < evnt->msecs_tm_mtime)
1839 diff = evnt->msecs_tm_mtime - diff; /* seconds left until timeout */
1840 else
1841 {
1842 vlg_dbg2(vlg, "timeout = %p[$<sa:%p>] (%lu, %lu)\n",
1843 evnt, EVNT_SA(evnt), diff, evnt->msecs_tm_mtime);
1844 if (!evnt_shutdown_w(evnt))
1845 {
1846 evnt_close(evnt);
1847 return;
1848 }
1849
1850 /* FIXME: linger close time configurable? */
1851 EVNT__COPY_TV(&evnt->mtime);
1852 diff = evnt->msecs_tm_mtime;
1853 }
1854
1855 if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, diff)))
1856 {
1857 errno = ENOMEM;
1858 vlg_warn(vlg, "%s: %m\n", "timer reinit");
1859 evnt_close(evnt);
1860 }
1861 }
1862
evnt_timeout_init(void)1863 void evnt_timeout_init(void)
1864 {
1865 ASSERT(!evnt__timeout_1);
1866
1867 EVNT__UPDATE_TV();
1868
1869 /* move when empty is buggy, *sigh* */
1870 evnt__timeout_1 = timer_q_add_base(evnt__timer_cb_mtime,
1871 TIMER_Q_FLAG_BASE_INSERT_FROM_END);
1872 evnt__timeout_10 = timer_q_add_base(evnt__timer_cb_mtime,
1873 TIMER_Q_FLAG_BASE_INSERT_FROM_END);
1874 evnt__timeout_100 = timer_q_add_base(evnt__timer_cb_mtime,
1875 TIMER_Q_FLAG_BASE_INSERT_FROM_END);
1876
1877 if (!evnt__timeout_1 || !evnt__timeout_10 || !evnt__timeout_100)
1878 VLG_ERRNOMEM((vlg, EXIT_FAILURE, "timer init"));
1879
1880 /* FIXME: massive hack 1.0.5 is broken */
1881 timer_q_cntl_base(evnt__timeout_1,
1882 TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE);
1883 timer_q_cntl_base(evnt__timeout_10,
1884 TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE);
1885 timer_q_cntl_base(evnt__timeout_100,
1886 TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE);
1887 }
1888
evnt_timeout_exit(void)1889 void evnt_timeout_exit(void)
1890 {
1891 ASSERT(evnt__timeout_1);
1892
1893 timer_q_del_base(evnt__timeout_1); evnt__timeout_1 = NULL;
1894 timer_q_del_base(evnt__timeout_10); evnt__timeout_10 = NULL;
1895 timer_q_del_base(evnt__timeout_100); evnt__timeout_100 = NULL;
1896 }
1897
evnt_sc_timeout_via_mtime(struct Evnt * evnt,unsigned long msecs)1898 int evnt_sc_timeout_via_mtime(struct Evnt *evnt, unsigned long msecs)
1899 {
1900 struct timeval tv[1];
1901
1902 evnt->msecs_tm_mtime = msecs;
1903
1904 EVNT__COPY_TV(tv);
1905
1906 if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, msecs)))
1907 return (FALSE);
1908
1909 return (TRUE);
1910 }
1911
evnt_sc_main_loop(size_t max_sz)1912 void evnt_sc_main_loop(size_t max_sz)
1913 {
1914 int ready = 0;
1915 struct timeval tv[1];
1916
1917 ready = evnt_poll();
1918 if ((ready == -1) && (errno != EINTR))
1919 vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", "poll");
1920 if (ready == -1)
1921 return;
1922
1923 evnt_out_dbg3("1");
1924 evnt_scan_fds(ready, max_sz);
1925 evnt_out_dbg3("2");
1926 evnt_scan_send_fds();
1927 evnt_out_dbg3("3");
1928
1929 EVNT__COPY_TV(tv);
1930 timer_q_run_norm(tv);
1931
1932 evnt_out_dbg3("4");
1933 evnt_scan_send_fds();
1934 evnt_out_dbg3("5");
1935 }
1936
evnt_sc_time(void)1937 time_t evnt_sc_time(void)
1938 {
1939 time_t ret = -1;
1940
1941 if (!CONF_GETTIMEOFDAY_TIME)
1942 ret = time(NULL);
1943 else
1944 {
1945 struct timeval tv[1];
1946
1947 EVNT__COPY_TV(tv);
1948 ret = tv->tv_sec;
1949 }
1950
1951 return (ret);
1952 }
1953
evnt_sc_serv_cb_func_acpt_free(struct Evnt * evnt)1954 void evnt_sc_serv_cb_func_acpt_free(struct Evnt *evnt)
1955 {
1956 struct Acpt_listener *acpt_listener = (struct Acpt_listener *)evnt;
1957 struct Acpt_data *acpt_data = acpt_listener->ref->ptr;
1958
1959 evnt_vlg_stats_info(acpt_listener->evnt, "ACCEPT FREE");
1960
1961 acpt_data->evnt = NULL;
1962
1963 vstr_ref_del(acpt_listener->ref);
1964
1965 F(acpt_listener);
1966 }
1967
evnt__sc_serv_make_acpt_data_cb(Vstr_ref * ref)1968 static void evnt__sc_serv_make_acpt_data_cb(Vstr_ref *ref)
1969 {
1970 struct Acpt_data *ptr = NULL;
1971
1972 if (!ref)
1973 return;
1974
1975 ptr = ref->ptr;
1976 vstr_ref_del(ptr->sa);
1977 F(ptr);
1978 free(ref);
1979 }
1980
evnt_sc_serv_make_bind(const char * acpt_addr,unsigned short acpt_port,unsigned int q_listen_len,unsigned int max_connections,unsigned int defer_accept,const char * acpt_filter_file)1981 struct Evnt *evnt_sc_serv_make_bind(const char *acpt_addr,
1982 unsigned short acpt_port,
1983 unsigned int q_listen_len,
1984 unsigned int max_connections,
1985 unsigned int defer_accept,
1986 const char *acpt_filter_file)
1987 {
1988 struct sockaddr_in *sinv4 = NULL;
1989 Acpt_listener *acpt_listener = NULL;
1990 Acpt_data *acpt_data = NULL;
1991 Vstr_ref *ref = NULL;
1992
1993 if (!(acpt_listener = MK(sizeof(Acpt_listener))))
1994 VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
1995 acpt_addr, acpt_port));
1996 acpt_listener->max_connections = max_connections;
1997
1998 if (!(acpt_data = MK(sizeof(Acpt_data))))
1999 VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
2000 acpt_addr, acpt_port));
2001 acpt_data->evnt = NULL;
2002 acpt_data->sa = NULL;
2003
2004 if (!(ref = vstr_ref_make_ptr(acpt_data, evnt__sc_serv_make_acpt_data_cb)))
2005 VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
2006 acpt_addr, acpt_port));
2007 acpt_listener->ref = ref;
2008
2009 if (!evnt_make_bind_ipv4(acpt_listener->evnt, acpt_addr, acpt_port,
2010 q_listen_len))
2011 vlg_err(vlg, 2, "%s: %m\n", __func__);
2012 acpt_data->evnt = acpt_listener->evnt;
2013 acpt_data->sa = vstr_ref_add(acpt_data->evnt->sa_ref);
2014
2015 sinv4 = EVNT_SA_IN4(acpt_listener->evnt);
2016 ASSERT(!acpt_port || (acpt_port == ntohs(sinv4->sin_port)));
2017
2018 if (defer_accept)
2019 evnt_fd_set_defer_accept(acpt_listener->evnt, defer_accept);
2020
2021 if (acpt_filter_file &&
2022 !evnt_fd_set_filter(acpt_listener->evnt, acpt_filter_file))
2023 vlg_err(vlg, 3, "set_filter(%s): %m\n", acpt_filter_file);
2024
2025 acpt_listener->evnt->cbs->cb_func_free = evnt_sc_serv_cb_func_acpt_free;
2026
2027 return (acpt_listener->evnt);
2028 }
2029
evnt_vlg_stats_info(struct Evnt * evnt,const char * prefix)2030 void evnt_vlg_stats_info(struct Evnt *evnt, const char *prefix)
2031 {
2032 vlg_info(vlg, "%s from[$<sa:%p>] req_got[%'u:%u] req_put[%'u:%u]"
2033 " recv[${BKMG.ju:%ju}:%ju] send[${BKMG.ju:%ju}:%ju]\n",
2034 prefix, EVNT_SA(evnt),
2035 evnt->acct.req_got, evnt->acct.req_got,
2036 evnt->acct.req_put, evnt->acct.req_put,
2037 evnt->acct.bytes_r, evnt->acct.bytes_r,
2038 evnt->acct.bytes_w, evnt->acct.bytes_w);
2039 }
2040
2041 #ifdef TCP_DEFER_ACCEPT
2042 # define USE_TCP_DEFER_ACCEPT 1
2043 #else
2044 # define USE_TCP_DEFER_ACCEPT 0
2045 # define TCP_DEFER_ACCEPT 0
2046 #endif
2047
evnt_fd_set_defer_accept(struct Evnt * evnt,int val)2048 void evnt_fd_set_defer_accept(struct Evnt *evnt, int val)
2049 {
2050 socklen_t len = sizeof(int);
2051
2052 ASSERT(evnt__valid(evnt));
2053
2054 if (!USE_TCP_DEFER_ACCEPT)
2055 return;
2056
2057 /* ignore return val */
2058 setsockopt(evnt_fd(evnt), IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len);
2059 }
2060
evnt_make_child(void)2061 pid_t evnt_make_child(void)
2062 {
2063 pid_t ret = fork();
2064
2065 if (!ret)
2066 {
2067 evnt__is_child = TRUE;
2068 evnt_poll_child_init();
2069 }
2070
2071 return (ret);
2072 }
2073
evnt_is_child(void)2074 int evnt_is_child(void)
2075 {
2076 return (evnt__is_child);
2077 }
2078
evnt_child_block_beg(void)2079 int evnt_child_block_beg(void)
2080 {
2081 if (!evnt_is_child())
2082 return (TRUE);
2083
2084 if (PROC_CNTL_PDEATHSIG(SIGTERM) == -1)
2085 vlg_err(vlg, EXIT_FAILURE, "prctl(): %m\n");
2086
2087 if (evnt_child_exited)
2088 return (FALSE);
2089
2090 return (TRUE);
2091 }
2092
evnt_child_block_end(void)2093 int evnt_child_block_end(void)
2094 {
2095 if (!evnt_is_child())
2096 return (TRUE);
2097
2098 if (PROC_CNTL_PDEATHSIG(SIGCHLD) == -1)
2099 vlg_err(vlg, EXIT_FAILURE, "prctl(): %m\n");
2100
2101 return (TRUE);
2102 }
2103
2104 /* if we are blocking forever, and the only thing we are waiting for is
2105 * a single accept() fd, just call accept() */
evnt__poll_tst_accept(int msecs)2106 static int evnt__poll_tst_accept(int msecs)
2107 {
2108 return ((msecs == -1) && (evnt_num_all() == 1) && q_accept);
2109 }
2110
evnt__poll_accept(void)2111 static int evnt__poll_accept(void)
2112 {
2113 int fd = -1;
2114 struct Evnt *scan = q_accept;
2115 struct sockaddr_in sa;
2116 socklen_t len = sizeof(struct sockaddr_in);
2117 struct Evnt *tmp = NULL;
2118
2119 /* need to make sure we die if the parent does */
2120 evnt_fd__set_nonblock(evnt_fd(scan), FALSE);
2121 if (!evnt_child_block_beg())
2122 goto block_beg_fail;
2123
2124 fd = accept(evnt_fd(scan), (struct sockaddr *) &sa, &len);
2125 evnt_child_block_end();
2126 evnt_fd__set_nonblock(evnt_fd(scan), TRUE);
2127
2128 if (fd == -1)
2129 goto accept_fail;
2130
2131 if (!(tmp = scan->cbs->cb_func_accept(scan,
2132 fd, (struct sockaddr *) &sa, len)))
2133 goto cb_accept_fail;
2134
2135 if (!tmp->flag_q_closed)
2136 tmp->flag_fully_acpt = TRUE;
2137 assert(SOCKET_POLL_INDICATOR(tmp->ind)->events == POLLIN);
2138 assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN);
2139 assert(tmp == q_recv);
2140
2141 return (1);
2142
2143 block_beg_fail:
2144 evnt_fd__set_nonblock(evnt_fd(scan), TRUE);
2145 goto accept_fail;
2146
2147 cb_accept_fail:
2148 close(fd);
2149 accept_fail:
2150 errno = EINTR;
2151 return (-1);
2152 }
2153
2154
2155 #ifndef EVNT_USE_EPOLL
2156 # ifdef VSTR_AUTOCONF_HAVE_SYS_EPOLL_H
2157 # define EVNT_USE_EPOLL 1
2158 # else
2159 # define EVNT_USE_EPOLL 0
2160 # endif
2161 #endif
2162
2163 #if !EVNT_USE_EPOLL
evnt_poll_init(void)2164 int evnt_poll_init(void)
2165 {
2166 return (TRUE);
2167 }
2168
evnt_poll_direct_enabled(void)2169 int evnt_poll_direct_enabled(void)
2170 {
2171 return (FALSE);
2172 }
2173
evnt_poll_child_init(void)2174 int evnt_poll_child_init(void)
2175 {
2176 return (TRUE);
2177 }
2178
evnt_wait_cntl_add(struct Evnt * evnt,int flags)2179 void evnt_wait_cntl_add(struct Evnt *evnt, int flags)
2180 {
2181 SOCKET_POLL_INDICATOR(evnt->ind)->events |= flags;
2182 SOCKET_POLL_INDICATOR(evnt->ind)->revents |= flags;
2183 }
2184
evnt_wait_cntl_del(struct Evnt * evnt,int flags)2185 void evnt_wait_cntl_del(struct Evnt *evnt, int flags)
2186 {
2187 SOCKET_POLL_INDICATOR(evnt->ind)->events &= ~flags;
2188 SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags;
2189 }
2190
evnt_poll_add(struct Evnt * EVNT__ATTR_UNUSED (evnt),int fd)2191 unsigned int evnt_poll_add(struct Evnt *EVNT__ATTR_UNUSED(evnt), int fd)
2192 {
2193 return (socket_poll_add(fd));
2194 }
2195
evnt_poll_del(struct Evnt * evnt)2196 void evnt_poll_del(struct Evnt *evnt)
2197 {
2198 if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1)
2199 close(SOCKET_POLL_INDICATOR(evnt->ind)->fd);
2200 socket_poll_del(evnt->ind);
2201 }
2202
2203 /* NOTE: that because of socket_poll direct mapping etc. we can't be "clever" */
evnt_poll_swap_accept_read(struct Evnt * evnt,int fd)2204 int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd)
2205 {
2206 unsigned int old_ind = evnt->ind;
2207
2208 assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd);
2209
2210 ASSERT(evnt__valid(evnt));
2211
2212 if (!(evnt->ind = socket_poll_add(fd)))
2213 goto poll_add_fail;
2214
2215 if (!(evnt->io_r = vstr_make_base(NULL)) ||
2216 !(evnt->io_w = vstr_make_base(NULL)))
2217 goto malloc_base_fail;
2218
2219 SOCKET_POLL_INDICATOR(evnt->ind)->events |= POLLIN;
2220 SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN;
2221
2222 socket_poll_del(old_ind);
2223
2224 evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE;
2225 evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
2226
2227 evnt_fd__set_nonblock(fd, TRUE);
2228
2229 ASSERT(evnt__valid(evnt));
2230
2231 return (TRUE);
2232
2233 malloc_base_fail:
2234 socket_poll_del(evnt->ind);
2235 evnt->ind = old_ind;
2236 vstr_free_base(evnt->io_r); evnt->io_r = NULL;
2237 ASSERT(!evnt->io_r && !evnt->io_w);
2238 poll_add_fail:
2239 return (FALSE);
2240 }
2241
evnt_poll(void)2242 int evnt_poll(void)
2243 {
2244 int msecs = evnt__get_timeout();
2245
2246 if (evnt_child_exited)
2247 return (errno = EINTR, -1);
2248
2249 if (evnt__poll_tst_accept(msecs))
2250 return (evnt__poll_accept());
2251
2252 return (socket_poll_update_all(msecs));
2253 }
2254 #else
2255
2256 #include <sys/epoll.h>
2257
2258 static int evnt__epoll_fd = -1;
2259
evnt_poll_init(void)2260 int evnt_poll_init(void)
2261 {
2262 assert(POLLIN == EPOLLIN);
2263 assert(POLLOUT == EPOLLOUT);
2264 assert(POLLHUP == EPOLLHUP);
2265 assert(POLLERR == EPOLLERR);
2266
2267 if (!CONF_EVNT_NO_EPOLL)
2268 {
2269 evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ);
2270
2271 vlg_dbg2(vlg, "epoll_create(%d): %m\n", evnt__epoll_fd);
2272 }
2273
2274 return (evnt__epoll_fd != -1);
2275 }
2276
evnt_poll_direct_enabled(void)2277 int evnt_poll_direct_enabled(void)
2278 {
2279 return (evnt__epoll_fd != -1);
2280 }
2281
evnt__epoll_readd(struct Evnt * evnt)2282 static int evnt__epoll_readd(struct Evnt *evnt)
2283 {
2284 struct epoll_event epevent[1];
2285
2286 while (evnt)
2287 {
2288 int flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
2289 vlg_dbg2(vlg, "epoll_mod_add(%p,%d)\n", evnt, flags);
2290 epevent->events = flags;
2291 epevent->data.ptr = evnt;
2292 if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, evnt_fd(evnt), epevent) == -1)
2293 vlg_err(vlg, EXIT_FAILURE, "epoll_readd: %m\n");
2294
2295 evnt = evnt->next;
2296 }
2297
2298 return (TRUE);
2299 }
2300
evnt_poll_child_init(void)2301 int evnt_poll_child_init(void)
2302 { /* Can't share epoll() fd's between tasks ... */
2303 if (CONF_EVNT_DUP_EPOLL && evnt_poll_direct_enabled())
2304 {
2305 close(evnt__epoll_fd);
2306 evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ); /* size does nothing */
2307 if (evnt__epoll_fd == -1)
2308 VLG_WARN_RET(FALSE, (vlg, "epoll_recreate(): %m\n"));
2309
2310 evnt__epoll_readd(q_connect);
2311 evnt__epoll_readd(q_accept);
2312 evnt__epoll_readd(q_recv);
2313 evnt__epoll_readd(q_send_recv);
2314 evnt__epoll_readd(q_none);
2315 }
2316
2317 return (TRUE);
2318 }
2319
evnt_wait_cntl_add(struct Evnt * evnt,int flags)2320 void evnt_wait_cntl_add(struct Evnt *evnt, int flags)
2321 {
2322 if ((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags)
2323 return;
2324
2325 SOCKET_POLL_INDICATOR(evnt->ind)->events |= flags;
2326 SOCKET_POLL_INDICATOR(evnt->ind)->revents |= (flags & POLLIN);
2327
2328 if (evnt_poll_direct_enabled())
2329 {
2330 struct epoll_event epevent[1];
2331
2332 flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
2333 vlg_dbg2(vlg, "epoll_mod_add(%p,%d)\n", evnt, flags);
2334 epevent->events = flags;
2335 epevent->data.ptr = evnt;
2336 if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1)
2337 vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n");
2338 }
2339 }
2340
evnt_wait_cntl_del(struct Evnt * evnt,int flags)2341 void evnt_wait_cntl_del(struct Evnt *evnt, int flags)
2342 {
2343 if (!(SOCKET_POLL_INDICATOR(evnt->ind)->events & flags))
2344 return;
2345
2346 SOCKET_POLL_INDICATOR(evnt->ind)->events &= ~flags;
2347 SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags;
2348
2349 if (flags && evnt_poll_direct_enabled())
2350 {
2351 struct epoll_event epevent[1];
2352
2353 flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
2354 vlg_dbg2(vlg, "epoll_mod_del(%p,%d)\n", evnt, flags);
2355 epevent->events = flags;
2356 epevent->data.ptr = evnt;
2357 if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1)
2358 vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n");
2359 }
2360 }
2361
evnt_poll_add(struct Evnt * evnt,int fd)2362 unsigned int evnt_poll_add(struct Evnt *evnt, int fd)
2363 {
2364 unsigned int ind = socket_poll_add(fd);
2365
2366 if (ind && evnt_poll_direct_enabled())
2367 {
2368 struct epoll_event epevent[1];
2369 int flags = 0;
2370
2371 vlg_dbg2(vlg, "epoll_add(%p,%d)\n", evnt, flags);
2372 epevent->events = flags;
2373 epevent->data.ptr = evnt;
2374
2375 if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1)
2376 {
2377 vlg_warn(vlg, "epoll: %m\n");
2378 socket_poll_del(fd);
2379 }
2380 }
2381
2382 return (ind);
2383 }
2384
evnt_poll_del(struct Evnt * evnt)2385 void evnt_poll_del(struct Evnt *evnt)
2386 {
2387 if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1)
2388 close(SOCKET_POLL_INDICATOR(evnt->ind)->fd);
2389 socket_poll_del(evnt->ind);
2390
2391 /* done via. the close() */
2392 if (FALSE && evnt_poll_direct_enabled())
2393 {
2394 int fd = SOCKET_POLL_INDICATOR(evnt->ind)->fd;
2395 struct epoll_event epevent[1];
2396
2397 vlg_dbg2(vlg, "epoll_del(%p,%d)\n", evnt, 0);
2398 epevent->events = 0;
2399 epevent->data.ptr = evnt;
2400
2401 if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, fd, epevent) == -1)
2402 vlg_abort(vlg, "epoll: %m\n");
2403 }
2404 }
2405
evnt_poll_swap_accept_read(struct Evnt * evnt,int fd)2406 int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd)
2407 {
2408 unsigned int old_ind = evnt->ind;
2409 int old_fd = SOCKET_POLL_INDICATOR(old_ind)->fd;
2410
2411 assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd);
2412
2413 ASSERT(evnt__valid(evnt));
2414
2415 if (!(evnt->ind = socket_poll_add(fd)))
2416 goto poll_add_fail;
2417
2418 if (!(evnt->io_r = vstr_make_base(NULL)) ||
2419 !(evnt->io_w = vstr_make_base(NULL)))
2420 goto malloc_base_fail;
2421
2422 SOCKET_POLL_INDICATOR(evnt->ind)->events |= POLLIN;
2423 SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN;
2424
2425 socket_poll_del(old_ind);
2426
2427 evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE;
2428 evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
2429
2430 evnt_fd__set_nonblock(fd, TRUE);
2431
2432 if (evnt_poll_direct_enabled())
2433 {
2434 struct epoll_event epevent[1];
2435
2436 vlg_dbg2(vlg, "epoll_swap(%p,%d,%d)\n", evnt, old_fd, fd);
2437
2438 epevent->events = POLLIN;
2439 epevent->data.ptr = evnt;
2440
2441 if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, old_fd, epevent) == -1)
2442 vlg_abort(vlg, "epoll: %m\n");
2443
2444 epevent->events = SOCKET_POLL_INDICATOR(evnt->ind)->events;
2445 epevent->data.ptr = evnt;
2446
2447 if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1)
2448 vlg_abort(vlg, "epoll: %m\n");
2449 }
2450
2451 ASSERT(evnt__valid(evnt));
2452
2453 return (TRUE);
2454
2455 malloc_base_fail:
2456 socket_poll_del(evnt->ind);
2457 evnt->ind = old_ind;
2458 vstr_free_base(evnt->io_r); evnt->io_r = NULL;
2459 ASSERT(!evnt->io_r && !evnt->io_w);
2460 poll_add_fail:
2461 return (FALSE);
2462 }
2463
2464 #define EVNT__EPOLL_EVENTS 128
evnt_poll(void)2465 int evnt_poll(void)
2466 {
2467 struct epoll_event events[EVNT__EPOLL_EVENTS];
2468 int msecs = evnt__get_timeout();
2469 int ret = -1;
2470 unsigned int scan = 0;
2471
2472 if (evnt_child_exited)
2473 return (errno = EINTR, -1);
2474
2475 if (evnt__poll_tst_accept(msecs))
2476 return (evnt__poll_accept());
2477
2478 if (!evnt_poll_direct_enabled())
2479 return (socket_poll_update_all(msecs));
2480
2481 ret = epoll_wait(evnt__epoll_fd, events, EVNT__EPOLL_EVENTS, msecs);
2482 if (ret == -1)
2483 return (ret);
2484
2485 scan = ret;
2486 ASSERT(scan <= EVNT__EPOLL_EVENTS);
2487 while (scan-- > 0)
2488 {
2489 struct Evnt *evnt = NULL;
2490 unsigned int flags = 0;
2491
2492 flags = events[scan].events;
2493 evnt = events[scan].data.ptr;
2494
2495 ASSERT(evnt__valid(evnt));
2496
2497 vlg_dbg2(vlg, "epoll_wait(%p,%u)\n", evnt, flags);
2498 vlg_dbg2(vlg, "epoll_wait[flags]=a=%u|r=%u\n",
2499 evnt->flag_q_accept, evnt->flag_q_recv);
2500
2501 assert(((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags) ||
2502 ((POLLHUP|POLLERR) & flags));
2503
2504 SOCKET_POLL_INDICATOR(evnt->ind)->revents = flags;
2505
2506 evnt__del_whatever(evnt); /* move to front of queue */
2507 evnt__add_whatever(evnt);
2508 }
2509
2510 return (ret);
2511 }
2512 #endif
2513