1 /*
2  *  Copyright (C) 2002, 2003, 2004, 2005  James Antill
3  *
4  *  This library is free software; you can redistribute it and/or
5  *  modify it under the terms of the GNU Lesser General Public
6  *  License as published by the Free Software Foundation; either
7  *  version 2 of the License, or (at your option) any later version.
8  *
9  *  This library is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  *  Lesser General Public License for more details.
13  *
14  *  You should have received a copy of the GNU Lesser General Public
15  *  License along with this library; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  *
18  *  email: james@and.org
19  */
20 /* IO events, and some help with timed events */
21 #include <vstr.h>
22 
23 #include <stdlib.h>
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <errno.h>
29 #include <getopt.h>
30 #include <string.h>
31 #include <netinet/in.h>
32 #include <sys/un.h>
33 #include <netinet/tcp.h>
34 #include <arpa/inet.h>
35 #include <sys/poll.h>
36 #include <netdb.h>
37 #include <sys/time.h>
38 #include <time.h>
39 #include <signal.h>
40 #include <sys/stat.h>
41 
42 #include <socket_poll.h>
43 #include <timer_q.h>
44 
45 #include <sys/sendfile.h>
46 
47 #define CONF_EVNT_NO_EPOLL FALSE
48 #define CONF_EVNT_EPOLL_SZ (10 * 1000) /* size is just a speed hint */
49 #define CONF_EVNT_DUP_EPOLL TRUE /* doesn't work if FALSE and multi proc */
50 #define CONF_GETTIMEOFDAY_TIME TRUE /* does tv_sec contain time(NULL) */
51 
52 #ifndef CONF_FULL_STATIC
53 # include <netdb.h>
54 # define EVNT__RESOLVE_NAME(evnt, x) do {                       \
55       struct hostent *h = gethostbyname(x);                     \
56                                                                 \
57       if (h)                                                    \
58         memcpy(&EVNT_SA_IN4(evnt)->sin_addr.s_addr,              \
59                h->h_addr_list[0],                               \
60                sizeof(EVNT_SA_IN4(evnt)->sin_addr.s_addr));      \
61                                                                 \
62     } while (FALSE)
63 #else
64 # define EVNT__RESOLVE_NAME(evnt, x) do {               \
65       if ((EVNT_SA_IN4(evnt)->sin_addr.s_addr = inet_addr(x)) == INADDR_NONE) \
66         EVNT_SA_IN4(evnt)->sin_addr.s_addr = htonl(INADDR_ANY);          \
67     } while (FALSE)
68 #endif
69 
70 #if !defined(SO_DETACH_FILTER) || !defined(SO_ATTACH_FILTER)
71 # define CONF_USE_SOCKET_FILTERS FALSE
72 struct sock_fprog { int dummy; };
73 # define SO_DETACH_FILTER 0
74 # define SO_ATTACH_FILTER 0
75 #else
76 # define CONF_USE_SOCKET_FILTERS TRUE
77 
78 /* not in glibc... hope it's not in *BSD etc. */
79 struct sock_filter
80 {
81  uint16_t   code;   /* Actual filter code */
82  uint8_t    jt;     /* Jump true */
83  uint8_t    jf;     /* Jump false */
84  uint32_t   k;      /* Generic multiuse field */
85 };
86 
87 struct sock_fprog
88 {
89  unsigned short len;    /* Number of filter blocks */
90  struct sock_filter *filter;
91 };
92 #endif
93 
94 #include "vlg.h"
95 
96 #define EX_UTILS_NO_USE_INIT  1
97 #define EX_UTILS_NO_USE_EXIT  1
98 #define EX_UTILS_NO_USE_LIMIT 1
99 #define EX_UTILS_NO_USE_BLOCK 1
100 #define EX_UTILS_NO_USE_PUT   1
101 #define EX_UTILS_NO_USE_OPEN  1
102 #define EX_UTILS_NO_USE_IO_FD 1
103 #include "ex_utils.h"
104 
105 #include "evnt.h"
106 
107 #include "mk.h"
108 
109 #define CSTREQ(x,y) (!strcmp(x,y))
110 
111 volatile sig_atomic_t evnt_child_exited = FALSE;
112 
113 int evnt_opt_nagle = EVNT_CONF_NAGLE;
114 
115 static struct Evnt *q_send_now = NULL;  /* Try a send "now" */
116 static struct Evnt *q_closed   = NULL;  /* Close when fin. */
117 
118 static struct Evnt *q_none      = NULL; /* nothing */
119 static struct Evnt *q_accept    = NULL; /* connections - recv */
120 static struct Evnt *q_connect   = NULL; /* connections - send */
121 static struct Evnt *q_recv      = NULL; /* recv */
122 static struct Evnt *q_send_recv = NULL; /* recv + send */
123 
124 static Vlg *vlg = NULL;
125 
126 static unsigned int evnt__num = 0;
127 
128 /* this should be more configurable... */
129 static unsigned int evnt__accept_limit = 4;
130 
131 static struct timeval evnt__tv[1];
132 
133 static int evnt__is_child = FALSE;
134 
135 #define EVNT__UPDATE_TV() gettimeofday(evnt__tv, NULL)
136 #define EVNT__COPY_TV(x)  memcpy(x, evnt__tv, sizeof(struct timeval))
137 
evnt_logger(Vlg * passed_vlg)138 void evnt_logger(Vlg *passed_vlg)
139 {
140   vlg = passed_vlg;
141 }
142 
evnt_fd__set_nonblock(int fd,int val)143 void evnt_fd__set_nonblock(int fd, int val)
144 {
145   int flags = 0;
146 
147   ASSERT(val == !!val);
148 
149   if ((flags = fcntl(fd, F_GETFL)) == -1)
150     vlg_err(vlg, EXIT_FAILURE, "%s\n", __func__);
151 
152   if (!!(flags & O_NONBLOCK) == val)
153     return;
154 
155   if (val)
156     flags |=  O_NONBLOCK;
157   else
158     flags &= ~O_NONBLOCK;
159 
160   if (fcntl(fd, F_SETFL, flags) == -1)
161     vlg_err(vlg, EXIT_FAILURE, "%s\n", __func__);
162 }
163 
evnt_add(struct Evnt ** que,struct Evnt * node)164 void evnt_add(struct Evnt **que, struct Evnt *node)
165 {
166   assert(node != *que);
167 
168   if ((node->next = *que))
169     node->next->prev = node;
170 
171   node->prev = NULL;
172   *que = node;
173 }
174 
evnt_del(struct Evnt ** que,struct Evnt * node)175 void evnt_del(struct Evnt **que, struct Evnt *node)
176 {
177   if (node->prev)
178     node->prev->next = node->next;
179   else
180   {
181     assert(*que == node);
182     *que = node->next;
183   }
184 
185   if (node->next)
186     node->next->prev = node->prev;
187 }
188 
evnt__del_whatever(struct Evnt * evnt)189 static void evnt__del_whatever(struct Evnt *evnt)
190 {
191   if (0) { }
192   else if (evnt->flag_q_accept)
193     evnt_del(&q_accept, evnt);
194   else if (evnt->flag_q_connect)
195     evnt_del(&q_connect, evnt);
196   else if (evnt->flag_q_recv)
197     evnt_del(&q_recv, evnt);
198   else if (evnt->flag_q_send_recv)
199     evnt_del(&q_send_recv, evnt);
200   else if (evnt->flag_q_none)
201     evnt_del(&q_none, evnt);
202   else
203     ASSERT_NOT_REACHED();
204 }
205 
EVNT__ATTR_USED()206 static void EVNT__ATTR_USED() evnt__add_whatever(struct Evnt *evnt)
207 {
208   if (0) { }
209   else if (evnt->flag_q_accept)
210     evnt_add(&q_accept, evnt);
211   else if (evnt->flag_q_connect)
212     evnt_add(&q_connect, evnt);
213   else if (evnt->flag_q_recv)
214     evnt_add(&q_recv, evnt);
215   else if (evnt->flag_q_send_recv)
216     evnt_add(&q_send_recv, evnt);
217   else if (evnt->flag_q_none)
218     evnt_add(&q_none, evnt);
219   else
220     ASSERT_NOT_REACHED();
221 }
222 
evnt__debug_num_1(struct Evnt * scan)223 static unsigned int evnt__debug_num_1(struct Evnt *scan)
224 {
225   unsigned int num = 0;
226 
227   while (scan)
228   {
229     struct Evnt *scan_next = scan->next;
230 
231     ++num;
232 
233     scan = scan_next;
234   }
235 
236   return (num);
237 }
238 
239 #ifndef VSTR_AUTOCONF_NDEBUG
evnt__srch(struct Evnt ** que,struct Evnt * evnt)240 static struct Evnt **evnt__srch(struct Evnt **que, struct Evnt *evnt)
241 {
242   struct Evnt **ret = que;
243 
244   while (*ret)
245   {
246     if (*ret == evnt)
247       return (ret);
248 
249     ret = &(*ret)->next;
250   }
251 
252   return (NULL);
253 }
254 
evnt__valid(struct Evnt * evnt)255 static int evnt__valid(struct Evnt *evnt)
256 {
257   int ret = 0;
258 
259   ASSERT(evnt_num_all());
260 
261   ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv +
262           evnt->flag_q_send_recv + evnt->flag_q_none) == 1);
263 
264   if (evnt->flag_q_send_now)
265   {
266     struct Evnt **scan = &q_send_now;
267 
268     while (*scan && (*scan != evnt))
269       scan = &(*scan)->s_next;
270     ASSERT(*scan);
271   }
272   else
273   {
274     struct Evnt **scan = &q_send_now;
275 
276     while (*scan && (*scan != evnt))
277       scan = &(*scan)->s_next;
278     ASSERT(!*scan);
279   }
280 
281   if (evnt->flag_q_closed)
282   {
283     struct Evnt **scan = &q_closed;
284 
285     while (*scan && (*scan != evnt))
286       scan = &(*scan)->c_next;
287     ASSERT(*scan);
288   }
289   else
290   {
291     struct Evnt **scan = &q_closed;
292 
293     while (*scan && (*scan != evnt))
294       scan = &(*scan)->c_next;
295     ASSERT(!*scan);
296   }
297 
298   if (0) { }
299   else   if (evnt->flag_q_accept)
300   {
301     ret = !!evnt__srch(&q_accept, evnt);
302     assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events  & POLLOUT));
303     assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
304     ASSERT(!evnt->io_r && !evnt->io_w);
305   }
306   else   if (evnt->flag_q_connect)
307   {
308     ret = !!evnt__srch(&q_connect, evnt);
309     assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events  & POLLIN));
310     assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events  & POLLOUT));
311     assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN));
312   }
313   else   if (evnt->flag_q_send_recv)
314   {
315     ret = !!evnt__srch(&q_send_recv, evnt);
316     assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events  & POLLOUT));
317   }
318   else   if (evnt->flag_q_recv)
319   {
320     ret = !!evnt__srch(&q_recv, evnt);
321     assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events  & POLLOUT));
322     assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
323   }
324   else   if (evnt->flag_q_none)
325   {
326     ret = !!evnt__srch(&q_none, evnt);
327     assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events  & POLLIN));
328     assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events  & POLLOUT));
329     assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN));
330     assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
331   }
332   else
333     ASSERT_NOT_REACHED();
334 
335   return (ret);
336 }
337 
evnt__debug_num_all(void)338 static unsigned int evnt__debug_num_all(void)
339 {
340   unsigned int num = 0;
341 
342   num += evnt__debug_num_1(q_connect);
343   num += evnt__debug_num_1(q_accept);
344   num += evnt__debug_num_1(q_recv);
345   num += evnt__debug_num_1(q_send_recv);
346   num += evnt__debug_num_1(q_none);
347 
348   return (num);
349 }
350 #endif
351 
evnt_fd(struct Evnt * evnt)352 int evnt_fd(struct Evnt *evnt)
353 {
354   ASSERT(evnt__valid(evnt));
355   return (SOCKET_POLL_INDICATOR(evnt->ind)->fd);
356 }
357 
evnt_cb_func_connect(struct Evnt * EVNT__ATTR_UNUSED (evnt))358 int evnt_cb_func_connect(struct Evnt *EVNT__ATTR_UNUSED(evnt))
359 {
360   return (TRUE);
361 }
362 
evnt_cb_func_accept(struct Evnt * EVNT__ATTR_UNUSED (evnt),int EVNT__ATTR_UNUSED (fd),struct sockaddr * EVNT__ATTR_UNUSED (sa),socklen_t EVNT__ATTR_UNUSED (len))363 struct Evnt *evnt_cb_func_accept(struct Evnt *EVNT__ATTR_UNUSED(evnt),
364                                  int EVNT__ATTR_UNUSED(fd),
365                                  struct sockaddr *EVNT__ATTR_UNUSED(sa),
366                                  socklen_t EVNT__ATTR_UNUSED(len))
367 {
368   return (NULL);
369 }
370 
evnt_cb_func_recv(struct Evnt * evnt)371 int evnt_cb_func_recv(struct Evnt *evnt)
372 {
373   unsigned int ern = 0;
374   int ret = evnt_recv(evnt, &ern);
375 
376   if (ret)
377     return (TRUE);
378 
379   if ((ern == VSTR_TYPE_SC_READ_FD_ERR_EOF) && evnt->io_w->len)
380     return (evnt_shutdown_r(evnt, TRUE));
381 
382   return (FALSE);
383 }
384 
evnt_cb_func_send(struct Evnt * evnt)385 int evnt_cb_func_send(struct Evnt *evnt)
386 {
387   int ret = -1;
388 
389   evnt_fd_set_cork(evnt, TRUE);
390   ret = evnt_send(evnt);
391   if (!evnt->io_w->len)
392     evnt_fd_set_cork(evnt, FALSE);
393 
394   return (ret);
395 }
396 
evnt_cb_func_free(struct Evnt * evnt)397 void evnt_cb_func_free(struct Evnt *evnt)
398 {
399   MALLOC_CHECK_SCRUB_PTR(evnt, sizeof(struct Evnt));
400   free(evnt);
401 }
402 
evnt_cb_func_F(struct Evnt * evnt)403 void evnt_cb_func_F(struct Evnt *evnt)
404 {
405   F(evnt);
406 }
407 
evnt_cb_func_shutdown_r(struct Evnt * evnt)408 int evnt_cb_func_shutdown_r(struct Evnt *evnt)
409 {
410   vlg_dbg2(vlg, "SHUTDOWN CB from[$<sa:%p>]\n", EVNT_SA(evnt));
411 
412   if (!evnt_shutdown_r(evnt, FALSE))
413     return (FALSE);
414 
415   /* called from outside read, and read'll never get called again ...
416    * so quit if we have nothing to send */
417   return (!!evnt->io_w->len);
418 }
419 
evnt_init(struct Evnt * evnt,int fd,Vstr_ref * ref)420 static int evnt_init(struct Evnt *evnt, int fd, Vstr_ref *ref)
421 {
422   ASSERT(ref);
423 
424   evnt->flag_q_accept    = FALSE;
425   evnt->flag_q_connect   = FALSE;
426   evnt->flag_q_recv      = FALSE;
427   evnt->flag_q_send_recv = FALSE;
428   evnt->flag_q_none      = FALSE;
429 
430   evnt->flag_q_send_now  = FALSE;
431   evnt->flag_q_closed    = FALSE;
432 
433   evnt->flag_q_pkt_move  = FALSE;
434 
435   /* FIXME: need group settings, default no nagle but cork */
436   evnt->flag_io_nagle    = evnt_opt_nagle;
437   evnt->flag_io_cork     = FALSE;
438 
439   evnt->flag_io_filter   = FALSE;
440 
441   evnt->flag_fully_acpt  = FALSE;
442 
443   evnt->io_r_shutdown    = FALSE;
444   evnt->io_w_shutdown    = FALSE;
445 
446   evnt->prev_bytes_r = 0;
447 
448   evnt->acct.req_put = 0;
449   evnt->acct.req_got = 0;
450   evnt->acct.bytes_r = 0;
451   evnt->acct.bytes_w = 0;
452 
453   evnt->cbs->cb_func_accept     = evnt_cb_func_accept;
454   evnt->cbs->cb_func_connect    = evnt_cb_func_connect;
455   evnt->cbs->cb_func_recv       = evnt_cb_func_recv;
456   evnt->cbs->cb_func_send       = evnt_cb_func_send;
457   evnt->cbs->cb_func_free       = evnt_cb_func_F;
458   evnt->cbs->cb_func_shutdown_r = evnt_cb_func_shutdown_r;
459 
460   if (!(evnt->io_r = vstr_make_base(NULL)))
461     goto make_vstr_fail;
462 
463   if (!(evnt->io_w = vstr_make_base(NULL)))
464     goto make_vstr_fail;
465 
466   evnt->tm_o = NULL;
467 
468   if (!(evnt->ind = evnt_poll_add(evnt, fd)))
469     goto poll_add_fail;
470 
471   EVNT__UPDATE_TV();
472   EVNT__COPY_TV(&evnt->ctime);
473   EVNT__COPY_TV(&evnt->mtime);
474 
475   evnt->msecs_tm_mtime = 0;
476 
477   if (fcntl(fd, F_SETFD, TRUE) == -1)
478     goto fcntl_fail;
479 
480   evnt->sa_ref = vstr_ref_add(ref);
481 
482   evnt_fd__set_nonblock(fd, TRUE);
483 
484   return (TRUE);
485 
486  fcntl_fail:
487   evnt_poll_del(evnt);
488  poll_add_fail:
489   vstr_free_base(evnt->io_w);
490  make_vstr_fail:
491   vstr_free_base(evnt->io_r);
492 
493   errno = ENOMEM;
494   return (FALSE);
495 }
496 
evnt__free1(struct Evnt * evnt)497 static void evnt__free1(struct Evnt *evnt)
498 {
499   evnt_send_del(evnt);
500 
501   if (evnt->io_r && evnt->io_r->len)
502     vlg_dbg2(vlg, "evnt__free1(%p) io_r len = %zu\n", evnt, evnt->io_r->len);
503   if (evnt->io_w && evnt->io_w->len)
504     vlg_dbg2(vlg, "evnt__free1(%p) io_w len = %zu\n", evnt, evnt->io_w->len);
505 
506   vstr_free_base(evnt->io_w); evnt->io_w = NULL;
507   vstr_free_base(evnt->io_r); evnt->io_r = NULL;
508 
509   evnt_poll_del(evnt);
510 }
511 
evnt__free2(Vstr_ref * sa,Timer_q_node * tm_o)512 static void evnt__free2(Vstr_ref *sa, Timer_q_node *tm_o)
513 { /* post callbacks, evnt no longer exists */
514   vstr_ref_del(sa);
515 
516   if (tm_o)
517   {
518     timer_q_cntl_node(tm_o, TIMER_Q_CNTL_NODE_SET_DATA, NULL);
519     timer_q_quick_del_node(tm_o);
520   }
521 }
522 
evnt__free(struct Evnt * evnt)523 static void evnt__free(struct Evnt *evnt)
524 {
525   if (evnt)
526   {
527     Vstr_ref *sa = evnt->sa_ref;
528     Timer_q_node *tm_o  = evnt->tm_o;
529 
530     evnt__free1(evnt);
531 
532     ASSERT(evnt__num >= 1); /* in case they come back in via. the cb */
533     --evnt__num;
534     ASSERT(evnt__num == evnt__debug_num_all());
535 
536     evnt->cbs->cb_func_free(evnt);
537     evnt__free2(sa, tm_o);
538   }
539 }
540 
evnt_free(struct Evnt * evnt)541 void evnt_free(struct Evnt *evnt)
542 {
543   if (evnt)
544   {
545     evnt__del_whatever(evnt);
546     evnt__free(evnt);
547   }
548 }
549 
evnt__uninit(struct Evnt * evnt)550 static void evnt__uninit(struct Evnt *evnt)
551 {
552   ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv +
553           evnt->flag_q_send_recv + evnt->flag_q_none) == 0);
554 
555   evnt__free1(evnt);
556   evnt__free2(evnt->sa_ref, evnt->tm_o);
557 }
558 
evnt_fd__set_nodelay(int fd,int val)559 static int evnt_fd__set_nodelay(int fd, int val)
560 {
561   return (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != -1);
562 }
563 
evnt_fd__set_reuse(int fd,int val)564 static int evnt_fd__set_reuse(int fd, int val)
565 {
566   return (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) != -1);
567 }
568 
evnt__fd_close_noerrno(int fd)569 static void evnt__fd_close_noerrno(int fd)
570 {
571   int saved_errno = errno;
572   close(fd);
573   errno = saved_errno;
574 }
575 
evnt__make_end(struct Evnt ** que,struct Evnt * evnt,int flags)576 static int evnt__make_end(struct Evnt **que, struct Evnt *evnt, int flags)
577 {
578   evnt_add(que, evnt);
579 
580   ++evnt__num;
581 
582   evnt_wait_cntl_add(evnt, flags);
583 
584   ASSERT(evnt__valid(evnt));
585 
586   return (TRUE);
587 }
588 
evnt_make_con_ipv4(struct Evnt * evnt,const char * ipv4_string,short port)589 int evnt_make_con_ipv4(struct Evnt *evnt, const char *ipv4_string, short port)
590 {
591   int fd = -1;
592   socklen_t alloc_len = sizeof(struct sockaddr_in);
593   Vstr_ref *ref = NULL;
594 
595   if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
596     goto sock_fail;
597 
598   if (!(ref = vstr_ref_make_malloc(alloc_len)))
599     goto init_fail;
600   if (!evnt_init(evnt, fd, ref))
601     goto init_fail;
602   evnt->flag_q_pkt_move = TRUE;
603 
604   if (!evnt->flag_io_nagle)
605     evnt_fd__set_nodelay(fd, TRUE);
606 
607   EVNT_SA_IN4(evnt)->sin_family = AF_INET;
608   EVNT_SA_IN4(evnt)->sin_port = htons(port);
609   EVNT_SA_IN4(evnt)->sin_addr.s_addr = inet_addr(ipv4_string);
610 
611   ASSERT(port && (EVNT_SA_IN4(evnt)->sin_addr.s_addr != htonl(INADDR_ANY)));
612 
613   if (connect(fd, EVNT_SA(evnt), alloc_len) == -1)
614   {
615     if (errno == EINPROGRESS)
616     { /* The connection needs more time....*/
617       vstr_ref_del(ref);
618       evnt->flag_q_connect = TRUE;
619       return (evnt__make_end(&q_connect, evnt, POLLOUT));
620     }
621 
622     goto connect_fail;
623   }
624 
625   vstr_ref_del(ref);
626   evnt->flag_q_none = TRUE;
627   return (evnt__make_end(&q_none, evnt, 0));
628 
629  connect_fail:
630   evnt__uninit(evnt);
631  init_fail:
632   vstr_ref_del(ref);
633   evnt__fd_close_noerrno(fd);
634  sock_fail:
635   return (FALSE);
636 }
637 
evnt_make_con_local(struct Evnt * evnt,const char * fname)638 int evnt_make_con_local(struct Evnt *evnt, const char *fname)
639 {
640   int fd = -1;
641   size_t len = strlen(fname) + 1;
642   struct sockaddr_un tmp_sun;
643   socklen_t alloc_len = 0;
644   Vstr_ref *ref = NULL;
645 
646   tmp_sun.sun_path[0] = 0;
647   alloc_len = SUN_LEN(&tmp_sun) + len;
648 
649   if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1)
650     goto sock_fail;
651 
652   if (!(ref = vstr_ref_make_malloc(alloc_len)))
653     goto init_fail;
654   if (!evnt_init(evnt, fd, ref))
655     goto init_fail;
656   evnt->flag_q_pkt_move = TRUE;
657 
658   EVNT_SA_UN(evnt)->sun_family = AF_LOCAL;
659   memcpy(EVNT_SA_UN(evnt)->sun_path, fname, len);
660 
661   if (connect(fd, EVNT_SA(evnt), alloc_len) == -1)
662   {
663     if (errno == EINPROGRESS)
664     { /* The connection needs more time....*/
665       vstr_ref_del(ref);
666       evnt->flag_q_connect = TRUE;
667       return (evnt__make_end(&q_connect, evnt, POLLOUT));
668     }
669 
670     goto connect_fail;
671   }
672 
673   vstr_ref_del(ref);
674   evnt->flag_q_none = TRUE;
675   return (evnt__make_end(&q_none, evnt, 0));
676 
677  connect_fail:
678   evnt__uninit(evnt);
679  init_fail:
680   vstr_ref_del(ref);
681   evnt__fd_close_noerrno(fd);
682  sock_fail:
683   return (FALSE);
684 }
685 
evnt_make_acpt_ref(struct Evnt * evnt,int fd,Vstr_ref * sa)686 int evnt_make_acpt_ref(struct Evnt *evnt, int fd, Vstr_ref *sa)
687 {
688   if (!evnt_init(evnt, fd, sa))
689     return (FALSE);
690 
691   if (!evnt->flag_io_nagle)
692     evnt_fd__set_nodelay(fd, TRUE);
693 
694   evnt->flag_q_recv = TRUE;
695   return (evnt__make_end(&q_recv, evnt, POLLIN));
696 }
697 
evnt_make_acpt_dup(struct Evnt * evnt,int fd,struct sockaddr * sa,socklen_t len)698 int evnt_make_acpt_dup(struct Evnt *evnt, int fd,
699                        struct sockaddr *sa, socklen_t len)
700 {
701   Vstr_ref *ref = vstr_ref_make_memdup(sa, len);
702   int ret = FALSE;
703 
704   if (!ref)
705   {
706     errno = ENOMEM;
707     return (FALSE);
708   }
709 
710   ret = evnt_make_acpt_ref(evnt, fd, ref);
711   vstr_ref_del(ref);
712   return (ret);
713 }
714 
evnt__make_bind_end(struct Evnt * evnt)715 static int evnt__make_bind_end(struct Evnt *evnt)
716 {
717   vstr_free_base(evnt->io_r); evnt->io_r = NULL;
718   vstr_free_base(evnt->io_w); evnt->io_w = NULL;
719 
720   evnt->flag_q_accept = TRUE;
721   evnt__make_end(&q_accept, evnt, POLLIN);
722   SOCKET_POLL_INDICATOR(evnt->ind)->revents = 0;
723   return (TRUE);
724 }
725 
evnt_make_bind_ipv4(struct Evnt * evnt,const char * acpt_addr,short server_port,unsigned int listen_len)726 int evnt_make_bind_ipv4(struct Evnt *evnt,
727                         const char *acpt_addr, short server_port,
728                         unsigned int listen_len)
729 {
730   int fd = -1;
731   int saved_errno = 0;
732   socklen_t alloc_len = sizeof(struct sockaddr_in);
733   Vstr_ref *ref = NULL;
734 
735   if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
736     goto sock_fail;
737 
738   if (!(ref = vstr_ref_make_malloc(alloc_len)))
739     goto init_fail;
740   if (!evnt_init(evnt, fd, ref))
741     goto init_fail;
742 
743   EVNT_SA_IN4(evnt)->sin_family = AF_INET;
744 
745   EVNT_SA_IN4(evnt)->sin_addr.s_addr = htonl(INADDR_ANY);
746   if (acpt_addr && *acpt_addr) /* silent error becomes <any> */
747     EVNT__RESOLVE_NAME(evnt, acpt_addr);
748   if (EVNT_SA_IN4(evnt)->sin_addr.s_addr == htonl(INADDR_ANY))
749     acpt_addr = "any";
750 
751   EVNT_SA_IN4(evnt)->sin_port = htons(server_port);
752 
753   if (!evnt_fd__set_reuse(fd, TRUE))
754     goto reuse_fail;
755 
756   if (bind(fd, EVNT_SA(evnt), alloc_len) == -1)
757     goto bind_fail;
758 
759   if (!server_port)
760     if (getsockname(fd, EVNT_SA(evnt), &alloc_len) == -1)
761       vlg_err(vlg, EXIT_FAILURE, "getsockname: %m\n");
762 
763   if (listen(fd, listen_len) == -1)
764     goto listen_fail;
765 
766   vstr_ref_del(ref);
767   return (evnt__make_bind_end(evnt));
768 
769  bind_fail:
770   saved_errno = errno;
771   vlg_warn(vlg, "bind(%s:%hd): %m\n", acpt_addr, server_port);
772   errno = saved_errno;
773  listen_fail:
774  reuse_fail:
775   evnt__uninit(evnt);
776  init_fail:
777   vstr_ref_del(ref);
778   evnt__fd_close_noerrno(fd);
779  sock_fail:
780   return (FALSE);
781 }
782 
evnt_make_bind_local(struct Evnt * evnt,const char * fname,unsigned int listen_len)783 int evnt_make_bind_local(struct Evnt *evnt, const char *fname,
784                          unsigned int listen_len)
785 {
786   int fd = -1;
787   int saved_errno = 0;
788   size_t len = strlen(fname) + 1;
789   struct sockaddr_un tmp_sun;
790   socklen_t alloc_len = 0;
791   Vstr_ref *ref = NULL;
792 
793   tmp_sun.sun_path[0] = 0;
794   alloc_len = SUN_LEN(&tmp_sun) + len;
795 
796   if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1)
797     goto sock_fail;
798 
799   if (!(ref = vstr_ref_make_malloc(alloc_len)))
800     goto init_fail;
801   if (!evnt_init(evnt, fd, ref))
802     goto init_fail;
803 
804   EVNT_SA_UN(evnt)->sun_family = AF_LOCAL;
805   memcpy(EVNT_SA_UN(evnt)->sun_path, fname, len);
806 
807   unlink(fname);
808 
809   if (bind(fd, EVNT_SA(evnt), alloc_len) == -1)
810     goto bind_fail;
811 
812   if (fchmod(fd, 0600) == -1)
813     goto fchmod_fail;
814 
815   if (listen(fd, listen_len) == -1)
816     goto listen_fail;
817 
818   vstr_ref_del(ref);
819   return (evnt__make_bind_end(evnt));
820 
821  bind_fail:
822   saved_errno = errno;
823   vlg_warn(vlg, "bind(%s): %m\n", fname);
824   errno = saved_errno;
825  fchmod_fail:
826  listen_fail:
827   evnt__uninit(evnt);
828  init_fail:
829   vstr_ref_del(ref);
830   evnt__fd_close_noerrno(fd);
831  sock_fail:
832   return (FALSE);
833 }
834 
evnt_make_custom(struct Evnt * evnt,int fd,Vstr_ref * sa,int flags)835 int evnt_make_custom(struct Evnt *evnt, int fd, Vstr_ref *sa, int flags)
836 {
837   static Vstr_ref dummy_sa = {vstr_ref_cb_free_nothing, NULL, 1};
838 
839   if (!sa)
840     sa = &dummy_sa;
841 
842   if (!evnt_init(evnt, fd, sa))
843   {
844     evnt__fd_close_noerrno(fd);
845     return (FALSE);
846   }
847 
848   if (flags & POLLIN)
849   {
850     evnt->flag_q_recv = TRUE;
851     return (evnt__make_end(&q_recv, evnt, POLLIN));
852   }
853 
854   evnt->flag_q_none = TRUE;
855   return (evnt__make_end(&q_none, evnt, 0));
856 }
857 
evnt_close(struct Evnt * evnt)858 void evnt_close(struct Evnt *evnt)
859 {
860   if (!evnt)
861     return;
862 
863   ASSERT(evnt__valid(evnt));
864 
865   if (evnt->flag_q_closed)
866     return;
867 
868   /* can't close at this point or we'll race with:
869    * socket_poll_add()/socket_poll_del() when using _DIRECT mapping */
870 
871   /* queue for deletion ... as the bt might still be using the ptr */
872   evnt->flag_q_closed = TRUE;
873   evnt->c_next = q_closed;
874   q_closed = evnt;
875 
876   ASSERT(evnt__valid(evnt));
877 }
878 
evnt_put_pkt(struct Evnt * evnt)879 void evnt_put_pkt(struct Evnt *evnt)
880 {
881   ASSERT(evnt__valid(evnt));
882 
883   if (evnt->flag_q_pkt_move && evnt->flag_q_none)
884   {
885     ASSERT(evnt->acct.req_put >= evnt->acct.req_got);
886     evnt_del(&q_none, evnt); evnt->flag_q_none = FALSE;
887     evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
888     evnt_wait_cntl_add(evnt, POLLIN);
889   }
890 
891   ++evnt->acct.req_put;
892 
893   ASSERT(evnt__valid(evnt));
894 }
895 
evnt_got_pkt(struct Evnt * evnt)896 void evnt_got_pkt(struct Evnt *evnt)
897 {
898   ASSERT(evnt__valid(evnt));
899 
900   ++evnt->acct.req_got;
901 
902   if (evnt->flag_q_pkt_move &&
903       !evnt->flag_q_send_recv && (evnt->acct.req_put == evnt->acct.req_got))
904   {
905     ASSERT(evnt->acct.req_put >= evnt->acct.req_got);
906     evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE;
907     evnt_add(&q_none, evnt), evnt->flag_q_none = TRUE;
908     evnt_wait_cntl_del(evnt, POLLIN);
909   }
910 
911   ASSERT(evnt__valid(evnt));
912 }
913 
evnt__call_send(struct Evnt * evnt,unsigned int * ern)914 static int evnt__call_send(struct Evnt *evnt, unsigned int *ern)
915 {
916   size_t tmp = evnt->io_w->len;
917   int fd = evnt_fd(evnt);
918 
919   if (!vstr_sc_write_fd(evnt->io_w, 1, tmp, fd, ern) && (errno != EAGAIN))
920     return (FALSE);
921 
922   tmp -= evnt->io_w->len;
923   vlg_dbg3(vlg, "write(%p) = %zu\n", evnt, tmp);
924 
925   evnt->acct.bytes_w += tmp;
926 
927   return (TRUE);
928 }
929 
evnt_send_add(struct Evnt * evnt,int force_q,size_t max_sz)930 int evnt_send_add(struct Evnt *evnt, int force_q, size_t max_sz)
931 {
932   ASSERT(evnt__valid(evnt));
933 
934   vlg_dbg3(vlg, "q now = %u, q send recv = %u, force = %u\n",
935            evnt->flag_q_send_now, evnt->flag_q_send_recv, force_q);
936 
937   if (!evnt->flag_q_send_recv && (evnt->io_w->len > max_sz))
938   {
939     if (!evnt__call_send(evnt, NULL))
940     {
941       ASSERT(evnt__valid(evnt));
942       return (FALSE);
943     }
944     if (!evnt->io_w->len && !force_q)
945     {
946       ASSERT(evnt__valid(evnt));
947       return (TRUE);
948     }
949   }
950 
951   /* already on send_q -- or already polling (and not forcing) */
952   if (evnt->flag_q_send_now || (evnt->flag_q_send_recv && !force_q))
953   {
954     ASSERT(evnt__valid(evnt));
955     return (TRUE);
956   }
957 
958   evnt->s_next = q_send_now;
959   q_send_now = evnt;
960   evnt->flag_q_send_now = TRUE;
961 
962   ASSERT(evnt__valid(evnt));
963 
964   return (TRUE);
965 }
966 
967 /* if a connection is on the send now q, then remove them ... this is only
968  * done when the client gets killed, so it doesn't matter if it's slow */
evnt_send_del(struct Evnt * evnt)969 void evnt_send_del(struct Evnt *evnt)
970 {
971   struct Evnt **scan = &q_send_now;
972 
973   if (!evnt->flag_q_send_now)
974     return;
975 
976   while (*scan && (*scan != evnt))
977     scan = &(*scan)->s_next;
978 
979   ASSERT(*scan);
980 
981   *scan = evnt->s_next;
982 
983   evnt->flag_q_send_now = FALSE;
984 }
985 
evnt_shutdown_r(struct Evnt * evnt,int got_eof)986 int evnt_shutdown_r(struct Evnt *evnt, int got_eof)
987 {
988   ASSERT(evnt__valid(evnt));
989 
990   if (evnt->io_r_shutdown || evnt->io_w_shutdown)
991     return (FALSE);
992 
993   evnt_wait_cntl_del(evnt, POLLIN);
994 
995   vlg_dbg2(vlg, "shutdown(SHUT_RD, %d) from[$<sa:%p>]\n",
996            got_eof, EVNT_SA(evnt));
997 
998   if (!got_eof && (shutdown(evnt_fd(evnt), SHUT_RD) == -1))
999   {
1000     if (errno != ENOTCONN)
1001       vlg_warn(vlg, "shutdown(SHUT_RD): %m\n");
1002     return (FALSE);
1003   }
1004 
1005   evnt->io_r_shutdown = TRUE;
1006 
1007   ASSERT(evnt__valid(evnt));
1008 
1009   return (TRUE);
1010 }
1011 
evnt__send_fin(struct Evnt * evnt)1012 static void evnt__send_fin(struct Evnt *evnt)
1013 {
1014   if (0)
1015   { /* nothing */ }
1016   else if ( evnt->flag_q_send_recv && !evnt->io_w->len)
1017   {
1018     evnt_del(&q_send_recv, evnt); evnt->flag_q_send_recv = FALSE;
1019     if (evnt->flag_q_pkt_move && (evnt->acct.req_put == evnt->acct.req_got))
1020     {
1021       evnt_add(&q_none, evnt); evnt->flag_q_none = TRUE;
1022       evnt_wait_cntl_del(evnt, POLLIN | POLLOUT);
1023     }
1024     else
1025     {
1026       evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
1027       evnt_wait_cntl_del(evnt, POLLOUT);
1028     }
1029   }
1030   else if (!evnt->flag_q_send_recv &&  evnt->io_w->len)
1031   {
1032     int pflags = POLLOUT;
1033     ASSERT(evnt->flag_q_none || evnt->flag_q_recv);
1034     if (evnt->flag_q_none)
1035       evnt_del(&q_none, evnt), evnt->flag_q_none = FALSE;
1036     else
1037       evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE;
1038     evnt_add(&q_send_recv, evnt); evnt->flag_q_send_recv = TRUE;
1039     if (!evnt->io_r_shutdown) pflags |= POLLIN;
1040     evnt_wait_cntl_add(evnt, pflags);
1041   }
1042 }
1043 
evnt_shutdown_w(struct Evnt * evnt)1044 int evnt_shutdown_w(struct Evnt *evnt)
1045 {
1046   Vstr_base *out = evnt->io_w;
1047 
1048   ASSERT(evnt__valid(evnt));
1049 
1050   vlg_dbg2(vlg, "shutdown(SHUT_WR) from[$<sa:%p>]\n", EVNT_SA(evnt));
1051 
1052   /* evnt_fd_set_cork(evnt, FALSE); eats data in 2.4.22-1.2199.4.legacy.npt */
1053 
1054   if (evnt->io_r_shutdown || evnt->io_w_shutdown)
1055     return (FALSE);
1056 
1057   if (shutdown(evnt_fd(evnt), SHUT_WR) == -1)
1058   {
1059     if (errno != ENOTCONN)
1060       vlg_warn(vlg, "shutdown(SHUT_WR): %m\n");
1061     return (FALSE);
1062   }
1063   evnt->io_w_shutdown = TRUE;
1064 
1065   vstr_del(out, 1, out->len);
1066 
1067   evnt__send_fin(evnt);
1068 
1069   ASSERT(evnt__valid(evnt));
1070 
1071   return (TRUE);
1072 }
1073 
evnt_recv(struct Evnt * evnt,unsigned int * ern)1074 int evnt_recv(struct Evnt *evnt, unsigned int *ern)
1075 {
1076   Vstr_base *data = evnt->io_r;
1077   size_t tmp = evnt->io_r->len;
1078   unsigned int num_min = 2;
1079   unsigned int num_max = 6; /* ave. browser reqs are 500ish, ab is much less */
1080 
1081   ASSERT(evnt__valid(evnt) && ern);
1082 
1083   /* FIXME: this is set for HTTPD's default buf sizes of 120 (128 - 8) */
1084   if (evnt->prev_bytes_r > (120 * 3))
1085     num_max =  8;
1086   if (evnt->prev_bytes_r > (120 * 6))
1087     num_max = 32;
1088 
1089   vstr_sc_read_iov_fd(data, data->len, evnt_fd(evnt), num_min, num_max, ern);
1090   evnt->prev_bytes_r = (evnt->io_r->len - tmp);
1091 
1092   vlg_dbg3(vlg, "read(%p) = %ju\n", evnt, evnt->prev_bytes_r);
1093   evnt->acct.bytes_r += evnt->prev_bytes_r;
1094 
1095   switch (*ern)
1096   {
1097     case VSTR_TYPE_SC_READ_FD_ERR_NONE:
1098       if (evnt->io_w_shutdown) /* doesn't count if we can't respond */
1099         vstr_del(data, 1, data->len);
1100       else
1101         EVNT__COPY_TV(&evnt->mtime);
1102       return (TRUE);
1103 
1104     case VSTR_TYPE_SC_READ_FD_ERR_MEM:
1105       vlg_warn(vlg, "%s\n", __func__);
1106       break;
1107 
1108     case VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO:
1109       if (errno == EAGAIN)
1110         return (TRUE);
1111       break;
1112 
1113     case VSTR_TYPE_SC_READ_FD_ERR_EOF:
1114       break;
1115 
1116     default: /* unknown */
1117       vlg_warn(vlg, "read_iov() = %d: %m\n", *ern);
1118   }
1119 
1120   return (FALSE);
1121 }
1122 
evnt_send(struct Evnt * evnt)1123 int evnt_send(struct Evnt *evnt)
1124 {
1125   unsigned int ern = 0;
1126 
1127   ASSERT(evnt__valid(evnt));
1128 
1129   if (!evnt__call_send(evnt, &ern))
1130     return (FALSE);
1131 
1132   EVNT__COPY_TV(&evnt->mtime);
1133 
1134   evnt__send_fin(evnt);
1135 
1136   ASSERT(evnt__valid(evnt));
1137 
1138   return (TRUE);
1139 }
1140 
1141 #ifdef VSTR_AUTOCONF_lseek64 /* lseek64 doesn't exist */
1142 # define sendfile64 sendfile
1143 #endif
1144 
evnt_sendfile(struct Evnt * evnt,int ffd,VSTR_AUTOCONF_uintmax_t * f_off,VSTR_AUTOCONF_uintmax_t * f_len,unsigned int * ern)1145 int evnt_sendfile(struct Evnt *evnt, int ffd, VSTR_AUTOCONF_uintmax_t *f_off,
1146                   VSTR_AUTOCONF_uintmax_t *f_len, unsigned int *ern)
1147 {
1148   ssize_t ret = 0;
1149   off64_t tmp_off = *f_off;
1150   size_t  tmp_len = *f_len;
1151 
1152   *ern = 0;
1153 
1154   ASSERT(evnt__valid(evnt));
1155 
1156   ASSERT(!evnt->io_w->len);
1157 
1158   if (*f_len > SSIZE_MAX)
1159     tmp_len =  SSIZE_MAX;
1160 
1161   if ((ret = sendfile64(evnt_fd(evnt), ffd, &tmp_off, tmp_len)) == -1)
1162   {
1163     if (errno == EAGAIN)
1164       return (TRUE);
1165 
1166     *ern = VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO;
1167     return (FALSE);
1168   }
1169 
1170   if (!ret)
1171   {
1172     *ern = VSTR_TYPE_SC_READ_FD_ERR_EOF;
1173     return (FALSE);
1174   }
1175 
1176   *f_off = tmp_off;
1177 
1178   evnt->acct.bytes_w += ret;
1179   EVNT__COPY_TV(&evnt->mtime);
1180 
1181   *f_len -= ret;
1182 
1183   return (TRUE);
1184 }
1185 
evnt_sc_read_send(struct Evnt * evnt,int fd,VSTR_AUTOCONF_uintmax_t * len)1186 int evnt_sc_read_send(struct Evnt *evnt, int fd, VSTR_AUTOCONF_uintmax_t *len)
1187 {
1188   Vstr_base *out = evnt->io_w;
1189   size_t orig_len = out->len;
1190   size_t tmp = 0;
1191   int ret = IO_OK;
1192 
1193   ASSERT(len && *len);
1194 
1195   if ((ret = io_get(out, fd)) == IO_FAIL)
1196     return (EVNT_IO_READ_ERR);
1197 
1198   if (ret == IO_EOF)
1199     return (EVNT_IO_READ_EOF);
1200 
1201   tmp = out->len - orig_len;
1202 
1203   if (tmp >= *len)
1204   { /* we might not be transfering to EOF, so reduce if needed */
1205     vstr_sc_reduce(out, 1, out->len, tmp - *len);
1206     ASSERT((out->len - orig_len) == *len);
1207     *len = 0;
1208     return (EVNT_IO_READ_FIN);
1209   }
1210 
1211   *len -= tmp;
1212 
1213   if (!evnt_send(evnt))
1214     return (EVNT_IO_SEND_ERR);
1215 
1216   return (EVNT_IO_OK);
1217 }
1218 
evnt__get_timeout(void)1219 static int evnt__get_timeout(void)
1220 {
1221   const struct timeval *tv = NULL;
1222   int msecs = -1;
1223 
1224   if (q_send_now)
1225     msecs = 0;
1226   else if ((tv = timer_q_first_timeval()))
1227   {
1228     long diff = 0;
1229     struct timeval now_timeval;
1230 
1231     EVNT__COPY_TV(&now_timeval);
1232 
1233     diff = timer_q_timeval_diff_msecs(tv, &now_timeval);
1234 
1235     if (diff > 0)
1236     {
1237       if (diff >= INT_MAX)
1238         msecs = INT_MAX - 1;
1239       else
1240         msecs = diff;
1241     }
1242     else
1243       msecs = 0;
1244   }
1245 
1246   vlg_dbg2(vlg, "get_timeout = %d\n", msecs);
1247 
1248   return (msecs);
1249 }
1250 
evnt_scan_q_close(void)1251 void evnt_scan_q_close(void)
1252 {
1253   struct Evnt *scan = NULL;
1254 
1255   while ((scan = q_closed))
1256   {
1257     scan->flag_q_closed = FALSE;
1258     q_closed = scan->c_next;
1259 
1260     evnt_free(scan);
1261   }
1262 
1263   ASSERT(!q_closed);
1264 }
1265 
evnt__close_now(struct Evnt * evnt)1266 static void evnt__close_now(struct Evnt *evnt)
1267 {
1268   if (evnt->flag_q_closed)
1269     return;
1270 
1271   evnt_free(evnt);
1272 }
1273 
1274 /* if something goes wrong drop all accept'ing events */
evnt_acpt_close_all(void)1275 void evnt_acpt_close_all(void)
1276 {
1277   struct Evnt *evnt = q_accept; /* struct Evnt *evnt = evnt_queue("accept"); */
1278 
1279   while (evnt)
1280   {
1281     evnt_close(evnt);
1282 
1283     evnt = evnt->next;
1284   }
1285 }
1286 
evnt_scan_fds(unsigned int ready,size_t max_sz)1287 void evnt_scan_fds(unsigned int ready, size_t max_sz)
1288 {
1289   const int bad_poll_flags = (POLLERR | POLLHUP | POLLNVAL);
1290   struct Evnt *scan = NULL;
1291 
1292   EVNT__UPDATE_TV();
1293 
1294   scan = q_connect;
1295   while (scan && ready)
1296   {
1297     struct Evnt *scan_next = scan->next;
1298     int done = FALSE;
1299 
1300     ASSERT(evnt__valid(scan));
1301 
1302     if (scan->flag_q_closed)
1303       goto next_connect;
1304 
1305     assert(!(SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN));
1306     /* done as one so we get error code */
1307     if (SOCKET_POLL_INDICATOR(scan->ind)->revents & (POLLOUT|bad_poll_flags))
1308     {
1309       int ern = 0;
1310       socklen_t len = sizeof(int);
1311       int ret = 0;
1312 
1313       done = TRUE;
1314 
1315       ret = getsockopt(evnt_fd(scan), SOL_SOCKET, SO_ERROR, &ern, &len);
1316       if (ret == -1)
1317         vlg_err(vlg, EXIT_FAILURE, "getsockopt(SO_ERROR): %m\n");
1318       else if (ern)
1319       {
1320         errno = ern;
1321         vlg_warn(vlg, "connect(): %m\n");
1322 
1323         evnt__close_now(scan);
1324       }
1325       else
1326       {
1327         evnt_del(&q_connect, scan); scan->flag_q_connect = FALSE;
1328         evnt_add(&q_none,    scan); scan->flag_q_none    = TRUE;
1329         evnt_wait_cntl_del(scan, POLLOUT);
1330 
1331         if (!scan->cbs->cb_func_connect(scan))
1332           evnt__close_now(scan);
1333       }
1334       goto next_connect;
1335     }
1336     ASSERT(!done);
1337     if (evnt_poll_direct_enabled()) break;
1338 
1339    next_connect:
1340     SOCKET_POLL_INDICATOR(scan->ind)->revents = 0;
1341     if (done)
1342       --ready;
1343 
1344     scan = scan_next;
1345   }
1346 
1347   scan = q_accept;
1348   while (scan && ready)
1349   { /* Papers have suggested that preferring read over accept is better
1350      * -- edge triggering needs to requeue on non failure */
1351     struct Evnt *scan_next = scan->next;
1352     int done = FALSE;
1353 
1354     ASSERT(evnt__valid(scan));
1355 
1356     if (scan->flag_q_closed)
1357       goto next_accept;
1358 
1359     assert(!(SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLOUT));
1360     if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags))
1361     { /* done first as it's an error with the accept fd, whereas accept
1362        * generates new fds */
1363       done = TRUE;
1364       evnt__close_now(scan);
1365       goto next_accept;
1366     }
1367 
1368     if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN)
1369     {
1370       struct sockaddr_in sa;
1371       socklen_t len = sizeof(struct sockaddr_in);
1372       int fd = -1;
1373       struct Evnt *tmp = NULL;
1374       unsigned int acpt_num = 0;
1375 
1376       done = TRUE;
1377 
1378       /* ignore all accept() errors -- bad_poll_flags fixes here */
1379       /* FIXME: apache seems to assume certain errors are really bad and we
1380        * should just kill the listen socket and wait to die. But for instance.
1381        * we can't just kill the socket on EMFILE, as we might have hit our
1382        * resource limit */
1383       while ((SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN) &&
1384              (fd = accept(evnt_fd(scan), (struct sockaddr *) &sa, &len)) != -1)
1385       {
1386         if (!(tmp = scan->cbs->cb_func_accept(scan, fd,
1387                                               (struct sockaddr *) &sa, len)))
1388         {
1389           close(fd);
1390           goto next_accept;
1391         }
1392 
1393         if (!tmp->flag_q_closed)
1394         {
1395           ++ready; /* give a read event to this new event */
1396           tmp->flag_fully_acpt = TRUE;
1397         }
1398         assert(SOCKET_POLL_INDICATOR(tmp->ind)->events  == POLLIN);
1399         assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN);
1400         assert(tmp == q_recv);
1401 
1402         if (++acpt_num >= evnt__accept_limit)
1403           break;
1404       }
1405 
1406       goto next_accept;
1407     }
1408     ASSERT(!done);
1409     if (evnt_poll_direct_enabled()) break;
1410 
1411    next_accept:
1412     SOCKET_POLL_INDICATOR(scan->ind)->revents = 0;
1413     if (done)
1414       --ready;
1415 
1416     scan = scan_next;
1417   }
1418 
1419   scan = q_recv;
1420   while (scan && ready)
1421   {
1422     struct Evnt *scan_next = scan->next;
1423     int done = FALSE;
1424 
1425     ASSERT(evnt__valid(scan));
1426 
1427     if (scan->flag_q_closed)
1428       goto next_recv;
1429 
1430     if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN)
1431     {
1432       done = TRUE;
1433       if (!scan->cbs->cb_func_recv(scan))
1434         evnt__close_now(scan);
1435     }
1436 
1437     if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags))
1438     {
1439       done = TRUE;
1440       if (scan->io_r_shutdown || scan->io_w_shutdown ||
1441           !scan->cbs->cb_func_shutdown_r(scan))
1442         evnt__close_now(scan);
1443     }
1444 
1445     if (!done && evnt_poll_direct_enabled()) break;
1446 
1447    next_recv:
1448     if (done)
1449       --ready;
1450 
1451     scan = scan_next;
1452   }
1453 
1454   scan = q_send_recv;
1455   while (scan && ready)
1456   {
1457     struct Evnt *scan_next = scan->next;
1458     int done = FALSE;
1459 
1460     ASSERT(evnt__valid(scan));
1461 
1462     if (scan->flag_q_closed)
1463       goto next_send;
1464 
1465     if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN)
1466     {
1467       done = TRUE;
1468       if (!scan->cbs->cb_func_recv(scan))
1469       {
1470         evnt__close_now(scan);
1471         goto next_send;
1472       }
1473     }
1474 
1475     if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLOUT)
1476     {
1477       done = TRUE; /* need groups so we can do direct send here */
1478       if (!evnt_send_add(scan, TRUE, max_sz))
1479         evnt__close_now(scan);
1480     }
1481 
1482     if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags))
1483     {
1484       done = TRUE;
1485       if (scan->io_r_shutdown || !scan->cbs->cb_func_shutdown_r(scan))
1486         evnt__close_now(scan);
1487     }
1488 
1489     if (!done && evnt_poll_direct_enabled()) break;
1490 
1491    next_send:
1492     if (done)
1493       --ready;
1494 
1495     scan = scan_next;
1496   }
1497 
1498   scan = q_none;
1499   while (scan && ready)
1500   {
1501     struct Evnt *scan_next = scan->next;
1502     int done = FALSE;
1503 
1504     ASSERT(evnt__valid(scan));
1505 
1506     if (scan->flag_q_closed)
1507       goto next_none;
1508 
1509     if (SOCKET_POLL_INDICATOR(scan->ind)->revents)
1510     { /* POLLIN == EOF ? */
1511       /* FIXME: failure cb */
1512       done = TRUE;
1513 
1514       evnt__close_now(scan);
1515       goto next_none;
1516     }
1517     else
1518       assert(!SOCKET_POLL_INDICATOR(scan->ind)->revents);
1519 
1520     ASSERT(!done);
1521     if (evnt_poll_direct_enabled()) break;
1522 
1523    next_none:
1524     if (done)
1525       --ready;
1526 
1527     scan = scan_next;
1528   }
1529 
1530   if (q_closed)
1531     evnt_scan_q_close();
1532   else if (ready)
1533     vlg_abort(vlg, "ready = %d\n", ready);
1534 }
1535 
evnt_scan_send_fds(void)1536 void evnt_scan_send_fds(void)
1537 {
1538   struct Evnt **scan = NULL;
1539 
1540   evnt_scan_q_close();
1541 
1542   scan = &q_send_now;
1543   while (*scan)
1544   {
1545     struct Evnt *tmp = *scan;
1546 
1547     tmp->flag_q_send_now = FALSE;
1548     *scan = tmp->s_next;
1549     if (!tmp->cbs->cb_func_send(tmp))
1550     {
1551       evnt__close_now(tmp);
1552       continue;
1553     }
1554     if (tmp == *scan) /* added back to q */
1555     {
1556       ASSERT(tmp->flag_q_send_now == TRUE);
1557       scan = &tmp->s_next;
1558     }
1559   }
1560 
1561   evnt_scan_q_close();
1562 }
1563 
evnt__close_1(struct Evnt ** root)1564 static void evnt__close_1(struct Evnt **root)
1565 {
1566   struct Evnt *scan = *root;
1567 
1568   *root = NULL;
1569 
1570   while (scan)
1571   {
1572     struct Evnt *scan_next = scan->next;
1573     Vstr_ref *sa = scan->sa_ref;
1574     Timer_q_node *tm_o  = scan->tm_o;
1575 
1576     vstr_free_base(scan->io_w); scan->io_w = NULL;
1577     vstr_free_base(scan->io_r); scan->io_r = NULL;
1578 
1579     evnt_poll_del(scan);
1580 
1581     --evnt__num;
1582     evnt__free2(sa, tm_o);
1583     scan->cbs->cb_func_free(scan);
1584 
1585     scan = scan_next;
1586   }
1587 }
1588 
evnt_close_all(void)1589 void evnt_close_all(void)
1590 {
1591   q_send_now = NULL;
1592   q_closed   = NULL;
1593 
1594   evnt__close_1(&q_connect);
1595   evnt__close_1(&q_accept);
1596   evnt__close_1(&q_recv);
1597   evnt__close_1(&q_send_recv);
1598   evnt__close_1(&q_none);
1599   ASSERT(evnt__num == evnt__debug_num_all());
1600 }
1601 
evnt_out_dbg3(const char * prefix)1602 void evnt_out_dbg3(const char *prefix)
1603 {
1604   if (vlg->out_dbg < 3)
1605     return;
1606 
1607   vlg_dbg3(vlg, "%s T=%u c=%u a=%u r=%u s=%u n=%u [SN=%u]\n",
1608            prefix, evnt_num_all(),
1609            evnt__debug_num_1(q_connect),
1610            evnt__debug_num_1(q_accept),
1611            evnt__debug_num_1(q_recv),
1612            evnt__debug_num_1(q_send_recv),
1613            evnt__debug_num_1(q_none),
1614            evnt__debug_num_1(q_send_now));
1615 }
1616 
evnt_stats_add(struct Evnt * dst,const struct Evnt * src)1617 void evnt_stats_add(struct Evnt *dst, const struct Evnt *src)
1618 {
1619   dst->acct.req_put += src->acct.req_put;
1620   dst->acct.req_put += src->acct.req_got;
1621 
1622   dst->acct.bytes_r += src->acct.bytes_r;
1623   dst->acct.bytes_w += src->acct.bytes_w;
1624 }
1625 
evnt_num_all(void)1626 unsigned int evnt_num_all(void)
1627 {
1628   ASSERT(evnt__num == evnt__debug_num_all());
1629   return (evnt__num);
1630 }
1631 
evnt_waiting(void)1632 int evnt_waiting(void)
1633 {
1634   return (q_connect || q_accept || q_recv || q_send_recv || q_send_now);
1635 }
1636 
evnt_find_least_used(void)1637 struct Evnt *evnt_find_least_used(void)
1638 {
1639   struct Evnt *con     = NULL;
1640   struct Evnt *con_min = NULL;
1641 
1642   /*  Find a usable connection, tries to find the least used connection
1643    * preferring ones not blocking on send IO */
1644   if (!(con = q_none) &&
1645       !(con = q_recv) &&
1646       !(con = q_send_recv))
1647     return (NULL);
1648 
1649   /* FIXME: not optimal, only want to change after a certain level */
1650   con_min = con;
1651   while (con)
1652   {
1653     if (con_min->io_w->len > con->io_w->len)
1654       con_min = con;
1655     con = con->next;
1656   }
1657 
1658   return (con_min);
1659 }
1660 
1661 #define MATCH_Q_NAME(x)                         \
1662     if (CSTREQ(qname, #x ))                     \
1663       return ( q_ ## x )                        \
1664 
evnt_queue(const char * qname)1665 struct Evnt *evnt_queue(const char *qname)
1666 {
1667   MATCH_Q_NAME(connect);
1668   MATCH_Q_NAME(accept);
1669   MATCH_Q_NAME(send_recv);
1670   MATCH_Q_NAME(recv);
1671   MATCH_Q_NAME(none);
1672   MATCH_Q_NAME(send_now);
1673 
1674   return (NULL);
1675 }
1676 
1677 #ifdef VSTR_AUTOCONF_HAVE_TCP_CORK
1678 # define USE_TCP_CORK 1
1679 #else
1680 # define USE_TCP_CORK 0
1681 # define TCP_CORK 0
1682 #endif
1683 
evnt_fd_set_cork(struct Evnt * evnt,int val)1684 void evnt_fd_set_cork(struct Evnt *evnt, int val)
1685 { /* assume it can't work for set and fail for unset */
1686   ASSERT(evnt__valid(evnt));
1687 
1688   if (!USE_TCP_CORK)
1689     return;
1690 
1691   if (!evnt->flag_io_cork == !val)
1692     return;
1693 
1694   if (!evnt->flag_io_nagle) /* flags can't be combined ... stupid */
1695   {
1696     evnt_fd__set_nodelay(evnt_fd(evnt), FALSE);
1697     evnt->flag_io_nagle = TRUE;
1698   }
1699 
1700   if (setsockopt(evnt_fd(evnt), IPPROTO_TCP, TCP_CORK, &val, sizeof(val)) == -1)
1701     return;
1702 
1703   evnt->flag_io_cork = !!val;
1704 }
1705 
evnt__free_base_noerrno(Vstr_base * s1)1706 static void evnt__free_base_noerrno(Vstr_base *s1)
1707 {
1708   int saved_errno = errno;
1709   vstr_free_base(s1);
1710   errno = saved_errno;
1711 }
1712 
evnt_fd_set_filter(struct Evnt * evnt,const char * fname)1713 int evnt_fd_set_filter(struct Evnt *evnt, const char *fname)
1714 {
1715   int fd = evnt_fd(evnt);
1716   Vstr_base *s1 = NULL;
1717   unsigned int ern = 0;
1718 
1719   if (!CONF_USE_SOCKET_FILTERS)
1720     return (TRUE);
1721 
1722   if (!(s1 = vstr_make_base(NULL)))
1723     VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach0: %m\n"));
1724 
1725   vstr_sc_read_len_file(s1, 0, fname, 0, 0, &ern);
1726 
1727   if (ern &&
1728       ((ern != VSTR_TYPE_SC_READ_FILE_ERR_OPEN_ERRNO) || (errno != ENOENT)))
1729   {
1730     evnt__free_base_noerrno(s1);
1731     VLG_WARN_RET(FALSE, (vlg, "filter_attach1(%s): %m\n", fname));
1732   }
1733   else if ((s1->len / sizeof(struct sock_filter)) > USHRT_MAX)
1734   {
1735     vstr_free_base(s1);
1736     errno = E2BIG;
1737     VLG_WARN_RET(FALSE, (vlg, "filter_attach2(%s): %m\n", fname));
1738   }
1739   else if (!s1->len)
1740   {
1741     vstr_free_base(s1);
1742     if (!evnt->flag_io_filter)
1743       vlg_warn(vlg, "filter_attach3(%s): Empty file\n", fname);
1744     else if (setsockopt(fd, SOL_SOCKET, SO_DETACH_FILTER, NULL, 0) == -1)
1745     {
1746       evnt__free_base_noerrno(s1);
1747       VLG_WARN_RET(FALSE, (vlg, "setsockopt(SOCKET, DETACH_FILTER, %s): %m\n",
1748                            fname));
1749     }
1750 
1751     evnt->flag_io_filter = FALSE;
1752     return (TRUE);
1753   }
1754   else
1755   {
1756     struct sock_fprog filter[1];
1757     socklen_t len = sizeof(filter);
1758 
1759     filter->len    = s1->len / sizeof(struct sock_filter);
1760     filter->filter = (void *)vstr_export_cstr_ptr(s1, 1, s1->len);
1761 
1762     if (!filter->filter)
1763       VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach4: %m\n"));
1764 
1765     if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_FILTER, filter, len) == -1)
1766     {
1767       evnt__free_base_noerrno(s1);
1768       VLG_WARN_RET(FALSE, (vlg,  "setsockopt(SOCKET, ATTACH_FILTER, %s): %m\n",
1769                            fname));
1770     }
1771   }
1772 
1773   evnt->flag_io_filter = TRUE;
1774 
1775   vstr_free_base(s1);
1776 
1777   return (TRUE);
1778 }
1779 
1780 static Timer_q_base *evnt__timeout_1   = NULL;
1781 static Timer_q_base *evnt__timeout_10  = NULL;
1782 static Timer_q_base *evnt__timeout_100 = NULL;
1783 
evnt__timeout_mtime_make(struct Evnt * evnt,struct timeval * tv,unsigned long msecs)1784 static Timer_q_node *evnt__timeout_mtime_make(struct Evnt *evnt,
1785                                               struct timeval *tv,
1786                                               unsigned long msecs)
1787 {
1788   Timer_q_node *tm_o = NULL;
1789 
1790   vlg_dbg2(vlg, "mtime_make(%p, %lu)\n", evnt, msecs);
1791 
1792   if (0) { }
1793   else if (msecs >= ( 99 * 1000))
1794   {
1795     TIMER_Q_TIMEVAL_ADD_SECS(tv, 100, 0);
1796     tm_o = timer_q_add_node(evnt__timeout_100, evnt, tv,
1797                             TIMER_Q_FLAG_NODE_DEFAULT);
1798   }
1799   else if (msecs >= (  9 * 1000))
1800   {
1801     TIMER_Q_TIMEVAL_ADD_SECS(tv,  10, 0);
1802     tm_o = timer_q_add_node(evnt__timeout_10,  evnt, tv,
1803                             TIMER_Q_FLAG_NODE_DEFAULT);
1804   }
1805   else
1806   {
1807     TIMER_Q_TIMEVAL_ADD_SECS(tv,   1, 0);
1808     tm_o = timer_q_add_node(evnt__timeout_1,   evnt, tv,
1809                             TIMER_Q_FLAG_NODE_DEFAULT);
1810   }
1811 
1812   return (tm_o);
1813 }
1814 
evnt__timer_cb_mtime(int type,void * data)1815 static void evnt__timer_cb_mtime(int type, void *data)
1816 {
1817   struct Evnt *evnt = data;
1818   struct timeval tv[1];
1819   unsigned long diff = 0;
1820 
1821   if (!evnt) /* deleted */
1822     return;
1823 
1824   ASSERT(evnt__valid(evnt));
1825 
1826   if (type == TIMER_Q_TYPE_CALL_RUN_ALL)
1827     return;
1828 
1829   evnt->tm_o = NULL;
1830 
1831   if (type == TIMER_Q_TYPE_CALL_DEL)
1832     return;
1833 
1834   EVNT__COPY_TV(tv);
1835 
1836   /* find out time elapsed */
1837   diff = timer_q_timeval_udiff_msecs(tv, &evnt->mtime);
1838   if (diff < evnt->msecs_tm_mtime)
1839     diff = evnt->msecs_tm_mtime - diff; /* seconds left until timeout */
1840   else
1841   {
1842     vlg_dbg2(vlg, "timeout = %p[$<sa:%p>] (%lu, %lu)\n",
1843              evnt, EVNT_SA(evnt), diff, evnt->msecs_tm_mtime);
1844     if (!evnt_shutdown_w(evnt))
1845     {
1846       evnt_close(evnt);
1847       return;
1848     }
1849 
1850     /* FIXME: linger close time configurable? */
1851     EVNT__COPY_TV(&evnt->mtime);
1852     diff = evnt->msecs_tm_mtime;
1853   }
1854 
1855   if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, diff)))
1856   {
1857     errno = ENOMEM;
1858     vlg_warn(vlg, "%s: %m\n", "timer reinit");
1859     evnt_close(evnt);
1860   }
1861 }
1862 
evnt_timeout_init(void)1863 void evnt_timeout_init(void)
1864 {
1865   ASSERT(!evnt__timeout_1);
1866 
1867   EVNT__UPDATE_TV();
1868 
1869   /* move when empty is buggy, *sigh* */
1870   evnt__timeout_1   = timer_q_add_base(evnt__timer_cb_mtime,
1871                                        TIMER_Q_FLAG_BASE_INSERT_FROM_END);
1872   evnt__timeout_10  = timer_q_add_base(evnt__timer_cb_mtime,
1873                                        TIMER_Q_FLAG_BASE_INSERT_FROM_END);
1874   evnt__timeout_100 = timer_q_add_base(evnt__timer_cb_mtime,
1875                                        TIMER_Q_FLAG_BASE_INSERT_FROM_END);
1876 
1877   if (!evnt__timeout_1 || !evnt__timeout_10 || !evnt__timeout_100)
1878     VLG_ERRNOMEM((vlg, EXIT_FAILURE, "timer init"));
1879 
1880   /* FIXME: massive hack 1.0.5 is broken */
1881   timer_q_cntl_base(evnt__timeout_1,
1882                     TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE);
1883   timer_q_cntl_base(evnt__timeout_10,
1884                     TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE);
1885   timer_q_cntl_base(evnt__timeout_100,
1886                     TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE);
1887 }
1888 
evnt_timeout_exit(void)1889 void evnt_timeout_exit(void)
1890 {
1891   ASSERT(evnt__timeout_1);
1892 
1893   timer_q_del_base(evnt__timeout_1);   evnt__timeout_1   = NULL;
1894   timer_q_del_base(evnt__timeout_10);  evnt__timeout_10  = NULL;
1895   timer_q_del_base(evnt__timeout_100); evnt__timeout_100 = NULL;
1896 }
1897 
evnt_sc_timeout_via_mtime(struct Evnt * evnt,unsigned long msecs)1898 int evnt_sc_timeout_via_mtime(struct Evnt *evnt, unsigned long msecs)
1899 {
1900   struct timeval tv[1];
1901 
1902   evnt->msecs_tm_mtime = msecs;
1903 
1904   EVNT__COPY_TV(tv);
1905 
1906   if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, msecs)))
1907     return (FALSE);
1908 
1909   return (TRUE);
1910 }
1911 
evnt_sc_main_loop(size_t max_sz)1912 void evnt_sc_main_loop(size_t max_sz)
1913 {
1914   int ready = 0;
1915   struct timeval tv[1];
1916 
1917   ready = evnt_poll();
1918   if ((ready == -1) && (errno != EINTR))
1919     vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", "poll");
1920   if (ready == -1)
1921     return;
1922 
1923   evnt_out_dbg3("1");
1924   evnt_scan_fds(ready, max_sz);
1925   evnt_out_dbg3("2");
1926   evnt_scan_send_fds();
1927   evnt_out_dbg3("3");
1928 
1929   EVNT__COPY_TV(tv);
1930   timer_q_run_norm(tv);
1931 
1932   evnt_out_dbg3("4");
1933   evnt_scan_send_fds();
1934   evnt_out_dbg3("5");
1935 }
1936 
evnt_sc_time(void)1937 time_t evnt_sc_time(void)
1938 {
1939   time_t ret = -1;
1940 
1941   if (!CONF_GETTIMEOFDAY_TIME)
1942     ret = time(NULL);
1943   else
1944   {
1945     struct timeval tv[1];
1946 
1947     EVNT__COPY_TV(tv);
1948     ret = tv->tv_sec;
1949   }
1950 
1951   return (ret);
1952 }
1953 
evnt_sc_serv_cb_func_acpt_free(struct Evnt * evnt)1954 void evnt_sc_serv_cb_func_acpt_free(struct Evnt *evnt)
1955 {
1956   struct Acpt_listener *acpt_listener = (struct Acpt_listener *)evnt;
1957   struct Acpt_data *acpt_data = acpt_listener->ref->ptr;
1958 
1959   evnt_vlg_stats_info(acpt_listener->evnt, "ACCEPT FREE");
1960 
1961   acpt_data->evnt = NULL;
1962 
1963   vstr_ref_del(acpt_listener->ref);
1964 
1965   F(acpt_listener);
1966 }
1967 
evnt__sc_serv_make_acpt_data_cb(Vstr_ref * ref)1968 static void evnt__sc_serv_make_acpt_data_cb(Vstr_ref *ref)
1969 {
1970   struct Acpt_data *ptr = NULL;
1971 
1972   if (!ref)
1973     return;
1974 
1975   ptr = ref->ptr;
1976   vstr_ref_del(ptr->sa);
1977   F(ptr);
1978   free(ref);
1979 }
1980 
evnt_sc_serv_make_bind(const char * acpt_addr,unsigned short acpt_port,unsigned int q_listen_len,unsigned int max_connections,unsigned int defer_accept,const char * acpt_filter_file)1981 struct Evnt *evnt_sc_serv_make_bind(const char *acpt_addr,
1982                                     unsigned short acpt_port,
1983                                     unsigned int q_listen_len,
1984                                     unsigned int max_connections,
1985                                     unsigned int defer_accept,
1986                                     const char *acpt_filter_file)
1987 {
1988   struct sockaddr_in *sinv4 = NULL;
1989   Acpt_listener *acpt_listener = NULL;
1990   Acpt_data *acpt_data = NULL;
1991   Vstr_ref *ref = NULL;
1992 
1993   if (!(acpt_listener = MK(sizeof(Acpt_listener))))
1994     VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
1995                   acpt_addr, acpt_port));
1996   acpt_listener->max_connections = max_connections;
1997 
1998   if (!(acpt_data = MK(sizeof(Acpt_data))))
1999     VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
2000                   acpt_addr, acpt_port));
2001   acpt_data->evnt = NULL;
2002   acpt_data->sa   = NULL;
2003 
2004   if (!(ref = vstr_ref_make_ptr(acpt_data, evnt__sc_serv_make_acpt_data_cb)))
2005     VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
2006                   acpt_addr, acpt_port));
2007   acpt_listener->ref = ref;
2008 
2009   if (!evnt_make_bind_ipv4(acpt_listener->evnt, acpt_addr, acpt_port,
2010                            q_listen_len))
2011     vlg_err(vlg, 2, "%s: %m\n", __func__);
2012   acpt_data->evnt = acpt_listener->evnt;
2013   acpt_data->sa   = vstr_ref_add(acpt_data->evnt->sa_ref);
2014 
2015   sinv4 = EVNT_SA_IN4(acpt_listener->evnt);
2016   ASSERT(!acpt_port || (acpt_port == ntohs(sinv4->sin_port)));
2017 
2018   if (defer_accept)
2019     evnt_fd_set_defer_accept(acpt_listener->evnt, defer_accept);
2020 
2021   if (acpt_filter_file &&
2022       !evnt_fd_set_filter(acpt_listener->evnt, acpt_filter_file))
2023     vlg_err(vlg, 3, "set_filter(%s): %m\n", acpt_filter_file);
2024 
2025   acpt_listener->evnt->cbs->cb_func_free = evnt_sc_serv_cb_func_acpt_free;
2026 
2027   return (acpt_listener->evnt);
2028 }
2029 
evnt_vlg_stats_info(struct Evnt * evnt,const char * prefix)2030 void evnt_vlg_stats_info(struct Evnt *evnt, const char *prefix)
2031 {
2032   vlg_info(vlg, "%s from[$<sa:%p>] req_got[%'u:%u] req_put[%'u:%u]"
2033            " recv[${BKMG.ju:%ju}:%ju] send[${BKMG.ju:%ju}:%ju]\n",
2034            prefix, EVNT_SA(evnt),
2035            evnt->acct.req_got, evnt->acct.req_got,
2036            evnt->acct.req_put, evnt->acct.req_put,
2037            evnt->acct.bytes_r, evnt->acct.bytes_r,
2038            evnt->acct.bytes_w, evnt->acct.bytes_w);
2039 }
2040 
2041 #ifdef TCP_DEFER_ACCEPT
2042 # define USE_TCP_DEFER_ACCEPT 1
2043 #else
2044 # define USE_TCP_DEFER_ACCEPT 0
2045 # define TCP_DEFER_ACCEPT 0
2046 #endif
2047 
evnt_fd_set_defer_accept(struct Evnt * evnt,int val)2048 void evnt_fd_set_defer_accept(struct Evnt *evnt, int val)
2049 {
2050   socklen_t len = sizeof(int);
2051 
2052   ASSERT(evnt__valid(evnt));
2053 
2054   if (!USE_TCP_DEFER_ACCEPT)
2055     return;
2056 
2057   /* ignore return val */
2058   setsockopt(evnt_fd(evnt), IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len);
2059 }
2060 
evnt_make_child(void)2061 pid_t evnt_make_child(void)
2062 {
2063   pid_t ret = fork();
2064 
2065   if (!ret)
2066   {
2067     evnt__is_child = TRUE;
2068     evnt_poll_child_init();
2069   }
2070 
2071   return (ret);
2072 }
2073 
evnt_is_child(void)2074 int evnt_is_child(void)
2075 {
2076   return (evnt__is_child);
2077 }
2078 
evnt_child_block_beg(void)2079 int evnt_child_block_beg(void)
2080 {
2081   if (!evnt_is_child())
2082     return (TRUE);
2083 
2084   if (PROC_CNTL_PDEATHSIG(SIGTERM) == -1)
2085     vlg_err(vlg, EXIT_FAILURE, "prctl(): %m\n");
2086 
2087   if (evnt_child_exited)
2088     return (FALSE);
2089 
2090   return (TRUE);
2091 }
2092 
evnt_child_block_end(void)2093 int evnt_child_block_end(void)
2094 {
2095   if (!evnt_is_child())
2096     return (TRUE);
2097 
2098   if (PROC_CNTL_PDEATHSIG(SIGCHLD) == -1)
2099     vlg_err(vlg, EXIT_FAILURE, "prctl(): %m\n");
2100 
2101   return (TRUE);
2102 }
2103 
2104 /* if we are blocking forever, and the only thing we are waiting for is
2105  * a single accept() fd, just call accept() */
evnt__poll_tst_accept(int msecs)2106 static int evnt__poll_tst_accept(int msecs)
2107 {
2108   return ((msecs == -1) && (evnt_num_all() == 1) && q_accept);
2109 }
2110 
evnt__poll_accept(void)2111 static int evnt__poll_accept(void)
2112 {
2113   int fd = -1;
2114   struct Evnt *scan = q_accept;
2115   struct sockaddr_in sa;
2116   socklen_t len = sizeof(struct sockaddr_in);
2117   struct Evnt *tmp = NULL;
2118 
2119   /* need to make sure we die if the parent does */
2120   evnt_fd__set_nonblock(evnt_fd(scan), FALSE);
2121   if (!evnt_child_block_beg())
2122     goto block_beg_fail;
2123 
2124   fd = accept(evnt_fd(scan), (struct sockaddr *) &sa, &len);
2125   evnt_child_block_end();
2126   evnt_fd__set_nonblock(evnt_fd(scan), TRUE);
2127 
2128   if (fd == -1)
2129     goto accept_fail;
2130 
2131   if (!(tmp = scan->cbs->cb_func_accept(scan,
2132                                         fd, (struct sockaddr *) &sa, len)))
2133     goto cb_accept_fail;
2134 
2135   if (!tmp->flag_q_closed)
2136     tmp->flag_fully_acpt = TRUE;
2137   assert(SOCKET_POLL_INDICATOR(tmp->ind)->events  == POLLIN);
2138   assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN);
2139   assert(tmp == q_recv);
2140 
2141   return (1);
2142 
2143  block_beg_fail:
2144   evnt_fd__set_nonblock(evnt_fd(scan), TRUE);
2145   goto accept_fail;
2146 
2147  cb_accept_fail:
2148   close(fd);
2149  accept_fail:
2150   errno = EINTR;
2151   return (-1);
2152 }
2153 
2154 
2155 #ifndef  EVNT_USE_EPOLL
2156 # ifdef  VSTR_AUTOCONF_HAVE_SYS_EPOLL_H
2157 #  define EVNT_USE_EPOLL 1
2158 # else
2159 #  define EVNT_USE_EPOLL 0
2160 # endif
2161 #endif
2162 
2163 #if !EVNT_USE_EPOLL
evnt_poll_init(void)2164 int evnt_poll_init(void)
2165 {
2166   return (TRUE);
2167 }
2168 
evnt_poll_direct_enabled(void)2169 int evnt_poll_direct_enabled(void)
2170 {
2171   return (FALSE);
2172 }
2173 
evnt_poll_child_init(void)2174 int evnt_poll_child_init(void)
2175 {
2176   return (TRUE);
2177 }
2178 
evnt_wait_cntl_add(struct Evnt * evnt,int flags)2179 void evnt_wait_cntl_add(struct Evnt *evnt, int flags)
2180 {
2181   SOCKET_POLL_INDICATOR(evnt->ind)->events  |= flags;
2182   SOCKET_POLL_INDICATOR(evnt->ind)->revents |= flags;
2183 }
2184 
evnt_wait_cntl_del(struct Evnt * evnt,int flags)2185 void evnt_wait_cntl_del(struct Evnt *evnt, int flags)
2186 {
2187   SOCKET_POLL_INDICATOR(evnt->ind)->events  &= ~flags;
2188   SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags;
2189 }
2190 
evnt_poll_add(struct Evnt * EVNT__ATTR_UNUSED (evnt),int fd)2191 unsigned int evnt_poll_add(struct Evnt *EVNT__ATTR_UNUSED(evnt), int fd)
2192 {
2193   return (socket_poll_add(fd));
2194 }
2195 
evnt_poll_del(struct Evnt * evnt)2196 void evnt_poll_del(struct Evnt *evnt)
2197 {
2198   if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1)
2199     close(SOCKET_POLL_INDICATOR(evnt->ind)->fd);
2200   socket_poll_del(evnt->ind);
2201 }
2202 
2203 /* NOTE: that because of socket_poll direct mapping etc. we can't be "clever" */
evnt_poll_swap_accept_read(struct Evnt * evnt,int fd)2204 int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd)
2205 {
2206   unsigned int old_ind = evnt->ind;
2207 
2208   assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd);
2209 
2210   ASSERT(evnt__valid(evnt));
2211 
2212   if (!(evnt->ind = socket_poll_add(fd)))
2213     goto poll_add_fail;
2214 
2215   if (!(evnt->io_r = vstr_make_base(NULL)) ||
2216       !(evnt->io_w = vstr_make_base(NULL)))
2217     goto malloc_base_fail;
2218 
2219   SOCKET_POLL_INDICATOR(evnt->ind)->events  |= POLLIN;
2220   SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN;
2221 
2222   socket_poll_del(old_ind);
2223 
2224   evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE;
2225   evnt_add(&q_recv, evnt);   evnt->flag_q_recv   = TRUE;
2226 
2227   evnt_fd__set_nonblock(fd, TRUE);
2228 
2229   ASSERT(evnt__valid(evnt));
2230 
2231   return (TRUE);
2232 
2233  malloc_base_fail:
2234   socket_poll_del(evnt->ind);
2235   evnt->ind = old_ind;
2236   vstr_free_base(evnt->io_r); evnt->io_r = NULL;
2237   ASSERT(!evnt->io_r && !evnt->io_w);
2238  poll_add_fail:
2239   return (FALSE);
2240 }
2241 
evnt_poll(void)2242 int evnt_poll(void)
2243 {
2244   int msecs = evnt__get_timeout();
2245 
2246   if (evnt_child_exited)
2247     return (errno = EINTR, -1);
2248 
2249   if (evnt__poll_tst_accept(msecs))
2250     return (evnt__poll_accept());
2251 
2252   return (socket_poll_update_all(msecs));
2253 }
2254 #else
2255 
2256 #include <sys/epoll.h>
2257 
2258 static int evnt__epoll_fd = -1;
2259 
evnt_poll_init(void)2260 int evnt_poll_init(void)
2261 {
2262   assert(POLLIN  == EPOLLIN);
2263   assert(POLLOUT == EPOLLOUT);
2264   assert(POLLHUP == EPOLLHUP);
2265   assert(POLLERR == EPOLLERR);
2266 
2267   if (!CONF_EVNT_NO_EPOLL)
2268   {
2269     evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ);
2270 
2271     vlg_dbg2(vlg, "epoll_create(%d): %m\n", evnt__epoll_fd);
2272   }
2273 
2274   return (evnt__epoll_fd != -1);
2275 }
2276 
evnt_poll_direct_enabled(void)2277 int evnt_poll_direct_enabled(void)
2278 {
2279   return (evnt__epoll_fd != -1);
2280 }
2281 
evnt__epoll_readd(struct Evnt * evnt)2282 static int evnt__epoll_readd(struct Evnt *evnt)
2283 {
2284   struct epoll_event epevent[1];
2285 
2286   while (evnt)
2287   {
2288     int flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
2289     vlg_dbg2(vlg, "epoll_mod_add(%p,%d)\n", evnt, flags);
2290     epevent->events   = flags;
2291     epevent->data.ptr = evnt;
2292     if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, evnt_fd(evnt), epevent) == -1)
2293       vlg_err(vlg, EXIT_FAILURE, "epoll_readd: %m\n");
2294 
2295     evnt = evnt->next;
2296   }
2297 
2298   return (TRUE);
2299 }
2300 
evnt_poll_child_init(void)2301 int evnt_poll_child_init(void)
2302 { /* Can't share epoll() fd's between tasks ... */
2303   if (CONF_EVNT_DUP_EPOLL && evnt_poll_direct_enabled())
2304   {
2305     close(evnt__epoll_fd);
2306     evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ); /* size does nothing */
2307     if (evnt__epoll_fd == -1)
2308       VLG_WARN_RET(FALSE, (vlg, "epoll_recreate(): %m\n"));
2309 
2310     evnt__epoll_readd(q_connect);
2311     evnt__epoll_readd(q_accept);
2312     evnt__epoll_readd(q_recv);
2313     evnt__epoll_readd(q_send_recv);
2314     evnt__epoll_readd(q_none);
2315   }
2316 
2317   return (TRUE);
2318 }
2319 
evnt_wait_cntl_add(struct Evnt * evnt,int flags)2320 void evnt_wait_cntl_add(struct Evnt *evnt, int flags)
2321 {
2322   if ((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags)
2323     return;
2324 
2325   SOCKET_POLL_INDICATOR(evnt->ind)->events  |=  flags;
2326   SOCKET_POLL_INDICATOR(evnt->ind)->revents |= (flags & POLLIN);
2327 
2328   if (evnt_poll_direct_enabled())
2329   {
2330     struct epoll_event epevent[1];
2331 
2332     flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
2333     vlg_dbg2(vlg, "epoll_mod_add(%p,%d)\n", evnt, flags);
2334     epevent->events   = flags;
2335     epevent->data.ptr = evnt;
2336     if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1)
2337       vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n");
2338   }
2339 }
2340 
evnt_wait_cntl_del(struct Evnt * evnt,int flags)2341 void evnt_wait_cntl_del(struct Evnt *evnt, int flags)
2342 {
2343   if (!(SOCKET_POLL_INDICATOR(evnt->ind)->events & flags))
2344     return;
2345 
2346   SOCKET_POLL_INDICATOR(evnt->ind)->events  &= ~flags;
2347   SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags;
2348 
2349   if (flags && evnt_poll_direct_enabled())
2350   {
2351     struct epoll_event epevent[1];
2352 
2353     flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
2354     vlg_dbg2(vlg, "epoll_mod_del(%p,%d)\n", evnt, flags);
2355     epevent->events   = flags;
2356     epevent->data.ptr = evnt;
2357     if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1)
2358       vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n");
2359   }
2360 }
2361 
evnt_poll_add(struct Evnt * evnt,int fd)2362 unsigned int evnt_poll_add(struct Evnt *evnt, int fd)
2363 {
2364   unsigned int ind = socket_poll_add(fd);
2365 
2366   if (ind && evnt_poll_direct_enabled())
2367   {
2368     struct epoll_event epevent[1];
2369     int flags = 0;
2370 
2371     vlg_dbg2(vlg, "epoll_add(%p,%d)\n", evnt, flags);
2372     epevent->events   = flags;
2373     epevent->data.ptr = evnt;
2374 
2375     if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1)
2376     {
2377       vlg_warn(vlg, "epoll: %m\n");
2378       socket_poll_del(fd);
2379     }
2380   }
2381 
2382   return (ind);
2383 }
2384 
evnt_poll_del(struct Evnt * evnt)2385 void evnt_poll_del(struct Evnt *evnt)
2386 {
2387   if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1)
2388     close(SOCKET_POLL_INDICATOR(evnt->ind)->fd);
2389   socket_poll_del(evnt->ind);
2390 
2391   /* done via. the close() */
2392   if (FALSE && evnt_poll_direct_enabled())
2393   {
2394     int fd = SOCKET_POLL_INDICATOR(evnt->ind)->fd;
2395     struct epoll_event epevent[1];
2396 
2397     vlg_dbg2(vlg, "epoll_del(%p,%d)\n", evnt, 0);
2398     epevent->events   = 0;
2399     epevent->data.ptr = evnt;
2400 
2401     if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, fd, epevent) == -1)
2402       vlg_abort(vlg, "epoll: %m\n");
2403   }
2404 }
2405 
evnt_poll_swap_accept_read(struct Evnt * evnt,int fd)2406 int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd)
2407 {
2408   unsigned int old_ind = evnt->ind;
2409   int old_fd = SOCKET_POLL_INDICATOR(old_ind)->fd;
2410 
2411   assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd);
2412 
2413   ASSERT(evnt__valid(evnt));
2414 
2415   if (!(evnt->ind = socket_poll_add(fd)))
2416     goto poll_add_fail;
2417 
2418   if (!(evnt->io_r = vstr_make_base(NULL)) ||
2419       !(evnt->io_w = vstr_make_base(NULL)))
2420     goto malloc_base_fail;
2421 
2422   SOCKET_POLL_INDICATOR(evnt->ind)->events  |= POLLIN;
2423   SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN;
2424 
2425   socket_poll_del(old_ind);
2426 
2427   evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE;
2428   evnt_add(&q_recv, evnt);   evnt->flag_q_recv   = TRUE;
2429 
2430   evnt_fd__set_nonblock(fd, TRUE);
2431 
2432   if (evnt_poll_direct_enabled())
2433   {
2434     struct epoll_event epevent[1];
2435 
2436     vlg_dbg2(vlg, "epoll_swap(%p,%d,%d)\n", evnt, old_fd, fd);
2437 
2438     epevent->events   = POLLIN;
2439     epevent->data.ptr = evnt;
2440 
2441     if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, old_fd, epevent) == -1)
2442       vlg_abort(vlg, "epoll: %m\n");
2443 
2444     epevent->events   = SOCKET_POLL_INDICATOR(evnt->ind)->events;
2445     epevent->data.ptr = evnt;
2446 
2447     if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1)
2448       vlg_abort(vlg, "epoll: %m\n");
2449   }
2450 
2451   ASSERT(evnt__valid(evnt));
2452 
2453   return (TRUE);
2454 
2455  malloc_base_fail:
2456   socket_poll_del(evnt->ind);
2457   evnt->ind = old_ind;
2458   vstr_free_base(evnt->io_r); evnt->io_r = NULL;
2459   ASSERT(!evnt->io_r && !evnt->io_w);
2460  poll_add_fail:
2461   return (FALSE);
2462 }
2463 
2464 #define EVNT__EPOLL_EVENTS 128
evnt_poll(void)2465 int evnt_poll(void)
2466 {
2467   struct epoll_event events[EVNT__EPOLL_EVENTS];
2468   int msecs = evnt__get_timeout();
2469   int ret = -1;
2470   unsigned int scan = 0;
2471 
2472   if (evnt_child_exited)
2473     return (errno = EINTR, -1);
2474 
2475   if (evnt__poll_tst_accept(msecs))
2476     return (evnt__poll_accept());
2477 
2478   if (!evnt_poll_direct_enabled())
2479     return (socket_poll_update_all(msecs));
2480 
2481   ret = epoll_wait(evnt__epoll_fd, events, EVNT__EPOLL_EVENTS, msecs);
2482   if (ret == -1)
2483     return (ret);
2484 
2485   scan = ret;
2486   ASSERT(scan <= EVNT__EPOLL_EVENTS);
2487   while (scan-- > 0)
2488   {
2489     struct Evnt *evnt  = NULL;
2490     unsigned int flags = 0;
2491 
2492     flags = events[scan].events;
2493     evnt  = events[scan].data.ptr;
2494 
2495     ASSERT(evnt__valid(evnt));
2496 
2497     vlg_dbg2(vlg, "epoll_wait(%p,%u)\n", evnt, flags);
2498     vlg_dbg2(vlg, "epoll_wait[flags]=a=%u|r=%u\n",
2499              evnt->flag_q_accept, evnt->flag_q_recv);
2500 
2501     assert(((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags) ||
2502            ((POLLHUP|POLLERR) & flags));
2503 
2504     SOCKET_POLL_INDICATOR(evnt->ind)->revents = flags;
2505 
2506     evnt__del_whatever(evnt); /* move to front of queue */
2507     evnt__add_whatever(evnt);
2508   }
2509 
2510   return (ret);
2511 }
2512 #endif
2513