1 /*
2  * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku, Fastly, Inc.
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to
6  * deal in the Software without restriction, including without limitation the
7  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8  * sell copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20  * IN THE SOFTWARE.
21  */
22 #include <netinet/in.h>
23 #include <netinet/tcp.h>
24 #include <stdlib.h>
25 #include <sys/socket.h>
26 #include <sys/time.h>
27 #include <sys/uio.h>
28 #include <unistd.h>
29 #include "cloexec.h"
30 #include "h2o/linklist.h"
31 
32 #if !defined(H2O_USE_ACCEPT4)
33 #ifdef __linux__
34 #if defined(__ANDROID__) && __ANDROID_API__ < 21
35 #define H2O_USE_ACCEPT4 0
36 #else
37 #define H2O_USE_ACCEPT4 1
38 #endif
39 #elif __FreeBSD__ >= 10
40 #define H2O_USE_ACCEPT4 1
41 #elif defined(__DragonFly__)
42 #define H2O_USE_ACCEPT4 1
43 #else
44 #define H2O_USE_ACCEPT4 0
45 #endif
46 #endif
47 
48 struct st_h2o_evloop_socket_t {
49     h2o_socket_t super;
50     int fd;
51     int _flags;
52     h2o_evloop_t *loop;
53     size_t max_read_size;
54     struct st_h2o_evloop_socket_t *_next_pending;
55     struct st_h2o_evloop_socket_t *_next_statechanged;
56 };
57 
58 static void link_to_pending(struct st_h2o_evloop_socket_t *sock);
59 static void write_pending(struct st_h2o_evloop_socket_t *sock);
60 static h2o_evloop_t *create_evloop(size_t sz);
61 static void update_now(h2o_evloop_t *loop);
62 static int32_t adjust_max_wait(h2o_evloop_t *loop, int32_t max_wait);
63 
64 /* functions to be defined in the backends */
65 static int evloop_do_proceed(h2o_evloop_t *loop, int32_t max_wait);
66 static void evloop_do_dispose(h2o_evloop_t *loop);
67 static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock);
68 static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock);
69 static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock);
70 
71 #if H2O_USE_POLL || H2O_USE_EPOLL || H2O_USE_KQUEUE
72 /* explicitly specified */
73 #else
74 #if defined(__APPLE__) || defined(__DragonFly__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
75 #define H2O_USE_KQUEUE 1
76 #elif defined(__linux)
77 #define H2O_USE_EPOLL 1
78 #else
79 #define H2O_USE_POLL 1
80 #endif
81 #endif
82 
83 #if H2O_USE_POLL
84 #include "evloop/poll.c.h"
85 #elif H2O_USE_EPOLL
86 #include "evloop/epoll.c.h"
87 #elif H2O_USE_KQUEUE
88 #include "evloop/kqueue.c.h"
89 #else
90 #error "poller not specified"
91 #endif
92 
link_to_pending(struct st_h2o_evloop_socket_t * sock)93 void link_to_pending(struct st_h2o_evloop_socket_t *sock)
94 {
95     if (sock->_next_pending == sock) {
96         struct st_h2o_evloop_socket_t **slot = (sock->_flags & H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION) != 0
97                                                    ? &sock->loop->_pending_as_server
98                                                    : &sock->loop->_pending_as_client;
99         sock->_next_pending = *slot;
100         *slot = sock;
101     }
102 }
103 
link_to_statechanged(struct st_h2o_evloop_socket_t * sock)104 static void link_to_statechanged(struct st_h2o_evloop_socket_t *sock)
105 {
106     if (sock->_next_statechanged == sock) {
107         sock->_next_statechanged = NULL;
108         *sock->loop->_statechanged.tail_ref = sock;
109         sock->loop->_statechanged.tail_ref = &sock->_next_statechanged;
110     }
111 }
112 
on_read_core(int fd,h2o_buffer_t ** input,size_t max_bytes)113 static const char *on_read_core(int fd, h2o_buffer_t **input, size_t max_bytes)
114 {
115     ssize_t read_so_far = 0;
116 
117     while (1) {
118         ssize_t rret;
119         h2o_iovec_t buf = h2o_buffer_try_reserve(input, max_bytes < 4096 ? max_bytes : 4096);
120         if (buf.base == NULL) {
121             /* memory allocation failed */
122             return h2o_socket_error_out_of_memory;
123         }
124         size_t read_size = buf.len <= INT_MAX / 2 ? buf.len : INT_MAX / 2 + 1;
125         if (read_size > max_bytes)
126             read_size = max_bytes;
127         while ((rret = read(fd, buf.base, read_size)) == -1 && errno == EINTR)
128             ;
129         if (rret == -1) {
130             if (errno == EAGAIN)
131                 break;
132             else
133                 return h2o_socket_error_io;
134         } else if (rret == 0) {
135             if (read_so_far == 0)
136                 return h2o_socket_error_closed; /* TODO notify close */
137             break;
138         }
139         (*input)->size += rret;
140         if (buf.len != rret)
141             break;
142         read_so_far += rret;
143         if (read_so_far >= max_bytes)
144             break;
145     }
146     return NULL;
147 }
148 
write_vecs(struct st_h2o_evloop_socket_t * sock,h2o_iovec_t ** bufs,size_t * bufcnt)149 static size_t write_vecs(struct st_h2o_evloop_socket_t *sock, h2o_iovec_t **bufs, size_t *bufcnt)
150 {
151     ssize_t wret;
152 
153     while (*bufcnt != 0) {
154         /* write */
155         int iovcnt = *bufcnt < IOV_MAX ? (int)*bufcnt : IOV_MAX;
156         while ((wret = writev(sock->fd, (struct iovec *)*bufs, iovcnt)) == -1 && errno == EINTR)
157             ;
158         SOCKET_PROBE(WRITEV, &sock->super, wret);
159 
160         if (wret == -1)
161             return errno == EAGAIN ? 0 : SIZE_MAX;
162 
163         /* adjust the buffer, doing the write once again only if all IOV_MAX buffers being supplied were fully written */
164         while ((*bufs)->len <= wret) {
165             wret -= (*bufs)->len;
166             ++*bufs;
167             --*bufcnt;
168             if (*bufcnt == 0) {
169                 assert(wret == 0);
170                 return 0;
171             }
172         }
173         if (wret != 0) {
174             return wret;
175         } else if (iovcnt < IOV_MAX) {
176             return 0;
177         }
178     }
179 
180     return 0;
181 }
182 
write_core(struct st_h2o_evloop_socket_t * sock,h2o_iovec_t ** bufs,size_t * bufcnt)183 static size_t write_core(struct st_h2o_evloop_socket_t *sock, h2o_iovec_t **bufs, size_t *bufcnt)
184 {
185     if (sock->super.ssl == NULL)
186         return write_vecs(sock, bufs, bufcnt);
187 
188     /* SSL */
189     size_t first_buf_written = 0;
190     do {
191         /* write bytes already encrypted, if any */
192         if (has_pending_ssl_bytes(sock->super.ssl)) {
193             h2o_iovec_t encbuf = h2o_iovec_init(sock->super.ssl->output.buf.base + sock->super.ssl->output.pending_off,
194                                                 sock->super.ssl->output.buf.off - sock->super.ssl->output.pending_off);
195             h2o_iovec_t *encbufs = &encbuf;
196             size_t encbufcnt = 1, enc_written;
197             if ((enc_written = write_vecs(sock, &encbufs, &encbufcnt)) == SIZE_MAX) {
198                 dispose_ssl_output_buffer(sock->super.ssl);
199                 return SIZE_MAX;
200             }
201             /* if write is incomplete, record the advance and bail out */
202             if (encbufcnt != 0) {
203                 sock->super.ssl->output.pending_off += enc_written;
204                 break;
205             }
206             /* succeeded in writing all the encrypted data; free the buffer */
207             dispose_ssl_output_buffer(sock->super.ssl);
208         }
209         /* bail out if complete */
210         if (*bufcnt == 0)
211             break;
212         /* convert more cleartext to TLS records if possible, or bail out on fatal error */
213     } while ((first_buf_written = generate_tls_records(&sock->super, bufs, bufcnt, first_buf_written)) != SIZE_MAX);
214 
215     return first_buf_written;
216 }
217 
write_pending(struct st_h2o_evloop_socket_t * sock)218 void write_pending(struct st_h2o_evloop_socket_t *sock)
219 {
220 
221     assert(sock->super._cb.write != NULL);
222 
223     /* DONT_WRITE poll */
224     if (sock->super._write_buf.cnt == 0 && !has_pending_ssl_bytes(sock->super.ssl))
225         goto Complete;
226 
227     { /* write */
228         size_t first_buf_written;
229         if ((first_buf_written = write_core(sock, &sock->super._write_buf.bufs, &sock->super._write_buf.cnt)) != SIZE_MAX) {
230             /* return if there's still pending data, adjusting buf[0] if necessary */
231             if (sock->super._write_buf.cnt != 0) {
232                 sock->super._write_buf.bufs[0].base += first_buf_written;
233                 sock->super._write_buf.bufs[0].len -= first_buf_written;
234                 return;
235             } else if (has_pending_ssl_bytes(sock->super.ssl)) {
236                 return;
237             }
238         }
239     }
240 
241     /* either completed or failed */
242     dispose_write_buf(&sock->super);
243 
244 Complete:
245     SOCKET_PROBE(WRITE_COMPLETE, &sock->super, sock->super._write_buf.cnt == 0 && !has_pending_ssl_bytes(sock->super.ssl));
246     sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY;
247     link_to_pending(sock);
248     link_to_statechanged(sock); /* might need to disable the write polling */
249 }
250 
read_on_ready(struct st_h2o_evloop_socket_t * sock)251 static void read_on_ready(struct st_h2o_evloop_socket_t *sock)
252 {
253     const char *err = 0;
254     size_t prev_size = sock->super.input->size;
255 
256     if ((sock->_flags & H2O_SOCKET_FLAG_DONT_READ) != 0)
257         goto Notify;
258 
259     if ((err = on_read_core(sock->fd, sock->super.ssl == NULL ? &sock->super.input : &sock->super.ssl->input.encrypted,
260                             sock->max_read_size)) != NULL)
261         goto Notify;
262 
263     if (sock->super.ssl != NULL && sock->super.ssl->handshake.cb == NULL)
264         err = decode_ssl_input(&sock->super);
265 
266 Notify:
267     /* the application may get notified even if no new data is avaiable.  The
268      * behavior is intentional; it is designed as such so that the applications
269      * can update their timeout counters when a partial SSL record arrives.
270      */
271     sock->super.bytes_read += sock->super.input->size - prev_size;
272     sock->super._cb.read(&sock->super, err);
273 }
274 
do_dispose_socket(h2o_socket_t * _sock)275 void do_dispose_socket(h2o_socket_t *_sock)
276 {
277     struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
278 
279     evloop_do_on_socket_close(sock);
280     dispose_write_buf(&sock->super);
281     if (sock->fd != -1) {
282         close(sock->fd);
283         sock->fd = -1;
284     }
285     sock->_flags = H2O_SOCKET_FLAG_IS_DISPOSED;
286     link_to_statechanged(sock);
287 }
288 
do_write(h2o_socket_t * _sock,h2o_iovec_t * bufs,size_t bufcnt,h2o_socket_cb cb)289 void do_write(h2o_socket_t *_sock, h2o_iovec_t *bufs, size_t bufcnt, h2o_socket_cb cb)
290 {
291     struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
292     size_t first_buf_written;
293 
294     sock->super._cb.write = cb;
295 
296     /* try to write now */
297     if ((first_buf_written = write_core(sock, &bufs, &bufcnt)) == SIZE_MAX) {
298         /* fill in _wreq.bufs with fake data to indicate error */
299         sock->super._write_buf.bufs = sock->super._write_buf.smallbufs;
300         sock->super._write_buf.cnt = 1;
301         *sock->super._write_buf.bufs = h2o_iovec_init(H2O_STRLIT("deadbeef"));
302         sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY;
303         link_to_pending(sock);
304         return;
305     }
306     if (bufcnt == 0 && !has_pending_ssl_bytes(sock->super.ssl)) {
307         /* write complete, schedule the callback */
308         sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY;
309         link_to_pending(sock);
310         return;
311     }
312 
313     /* setup the buffer to send pending data */
314     init_write_buf(&sock->super, bufs, bufcnt, first_buf_written);
315 
316     /* schedule the write */
317     link_to_statechanged(sock);
318 }
319 
h2o_socket_get_fd(h2o_socket_t * _sock)320 int h2o_socket_get_fd(h2o_socket_t *_sock)
321 {
322     struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
323     return sock->fd;
324 }
325 
do_read_start(h2o_socket_t * _sock)326 void do_read_start(h2o_socket_t *_sock)
327 {
328     struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
329 
330     link_to_statechanged(sock);
331 }
332 
do_read_stop(h2o_socket_t * _sock)333 void do_read_stop(h2o_socket_t *_sock)
334 {
335     struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
336 
337     sock->_flags &= ~H2O_SOCKET_FLAG_IS_READ_READY;
338     link_to_statechanged(sock);
339 }
340 
h2o_socket_dont_read(h2o_socket_t * _sock,int dont_read)341 void h2o_socket_dont_read(h2o_socket_t *_sock, int dont_read)
342 {
343     struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
344 
345     if (dont_read) {
346         sock->_flags |= H2O_SOCKET_FLAG_DONT_READ;
347     } else {
348         sock->_flags &= ~H2O_SOCKET_FLAG_DONT_READ;
349     }
350 }
351 
do_export(h2o_socket_t * _sock,h2o_socket_export_t * info)352 int do_export(h2o_socket_t *_sock, h2o_socket_export_t *info)
353 {
354     struct st_h2o_evloop_socket_t *sock = (void *)_sock;
355 
356     assert((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) == 0);
357     evloop_do_on_socket_export(sock);
358     sock->_flags = H2O_SOCKET_FLAG_IS_DISPOSED;
359 
360     info->fd = sock->fd;
361     sock->fd = -1;
362 
363     return 0;
364 }
365 
do_import(h2o_loop_t * loop,h2o_socket_export_t * info)366 h2o_socket_t *do_import(h2o_loop_t *loop, h2o_socket_export_t *info)
367 {
368     return h2o_evloop_socket_create(loop, info->fd, 0);
369 }
370 
h2o_socket_get_loop(h2o_socket_t * _sock)371 h2o_loop_t *h2o_socket_get_loop(h2o_socket_t *_sock)
372 {
373     struct st_h2o_evloop_socket_t *sock = (void *)_sock;
374     return sock->loop;
375 }
376 
get_sockname_uncached(h2o_socket_t * _sock,struct sockaddr * sa)377 socklen_t get_sockname_uncached(h2o_socket_t *_sock, struct sockaddr *sa)
378 {
379     struct st_h2o_evloop_socket_t *sock = (void *)_sock;
380     socklen_t len = sizeof(struct sockaddr_storage);
381     if (getsockname(sock->fd, sa, &len) != 0)
382         return 0;
383     return len;
384 }
385 
get_peername_uncached(h2o_socket_t * _sock,struct sockaddr * sa)386 socklen_t get_peername_uncached(h2o_socket_t *_sock, struct sockaddr *sa)
387 {
388     struct st_h2o_evloop_socket_t *sock = (void *)_sock;
389     socklen_t len = sizeof(struct sockaddr_storage);
390     if (getpeername(sock->fd, sa, &len) != 0)
391         return 0;
392     return len;
393 }
394 
create_socket(h2o_evloop_t * loop,int fd,int flags)395 static struct st_h2o_evloop_socket_t *create_socket(h2o_evloop_t *loop, int fd, int flags)
396 {
397     struct st_h2o_evloop_socket_t *sock;
398 
399     sock = h2o_mem_alloc(sizeof(*sock));
400     memset(sock, 0, sizeof(*sock));
401     h2o_buffer_init(&sock->super.input, &h2o_socket_buffer_prototype);
402     sock->loop = loop;
403     sock->fd = fd;
404     sock->_flags = flags;
405     sock->max_read_size = 1024 * 1024; /* by default, we read up to 1MB at once */
406     sock->_next_pending = sock;
407     sock->_next_statechanged = sock;
408 
409     evloop_do_on_socket_create(sock);
410 
411     return sock;
412 }
413 
414 /**
415  * Sets TCP_NODELAY if the given file descriptor is likely to be a TCP socket. The intent of this function isto reduce number of
416  * unnecessary system calls. Therefore, we skip setting TCP_NODELAY when it is certain that the socket is not a TCP socket,
417  * otherwise call setsockopt.
418  */
set_nodelay_if_likely_tcp(int fd,struct sockaddr * sa)419 static void set_nodelay_if_likely_tcp(int fd, struct sockaddr *sa)
420 {
421     if (sa != NULL && !(sa->sa_family == AF_INET || sa->sa_family == AF_INET6))
422         return;
423 
424     int on = 1;
425     setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
426 }
427 
h2o_evloop_socket_create(h2o_evloop_t * loop,int fd,int flags)428 h2o_socket_t *h2o_evloop_socket_create(h2o_evloop_t *loop, int fd, int flags)
429 {
430     /* It is the reponsibility of the event loop to modify the properties of a socket for its use (e.g., set O_NONBLOCK). */
431     fcntl(fd, F_SETFL, O_NONBLOCK);
432     set_nodelay_if_likely_tcp(fd, NULL);
433 
434     return &create_socket(loop, fd, flags)->super;
435 }
436 
h2o_evloop_socket_accept(h2o_socket_t * _listener)437 h2o_socket_t *h2o_evloop_socket_accept(h2o_socket_t *_listener)
438 {
439     struct st_h2o_evloop_socket_t *listener = (struct st_h2o_evloop_socket_t *)_listener;
440     int fd;
441     h2o_socket_t *sock;
442 
443     /* cache the remote address, if we know that we are going to use the value (in h2o_socket_ebpf_lookup_flags) */
444 #if H2O_USE_EBPF_MAP
445     struct {
446         struct sockaddr_storage storage;
447         socklen_t len;
448     } _peeraddr;
449     _peeraddr.len = sizeof(_peeraddr.storage);
450     struct sockaddr_storage *peeraddr = &_peeraddr.storage;
451     socklen_t *peeraddrlen = &_peeraddr.len;
452 #else
453     struct sockaddr_storage *peeraddr = NULL;
454     socklen_t *peeraddrlen = NULL;
455 #endif
456 
457 #if H2O_USE_ACCEPT4
458     /* the anticipation here is that a socket returned by `accept4` will inherit the TCP_NODELAY flag from the listening socket */
459     if ((fd = accept4(listener->fd, (struct sockaddr *)peeraddr, peeraddrlen, SOCK_NONBLOCK | SOCK_CLOEXEC)) == -1)
460         return NULL;
461     sock = &create_socket(listener->loop, fd, H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION)->super;
462 #else
463     if ((fd = cloexec_accept(listener->fd, (struct sockaddr *)peeraddr, peeraddrlen)) == -1)
464         return NULL;
465     fcntl(fd, F_SETFL, O_NONBLOCK);
466     sock = &create_socket(listener->loop, fd, H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION)->super;
467 #endif
468     set_nodelay_if_likely_tcp(fd, (struct sockaddr *)peeraddr);
469 
470     if (peeraddr != NULL && *peeraddrlen <= sizeof(*peeraddr))
471         h2o_socket_setpeername(sock, (struct sockaddr *)peeraddr, *peeraddrlen);
472     uint64_t flags = h2o_socket_ebpf_lookup_flags(listener->loop, h2o_socket_ebpf_init_key, sock);
473     if ((flags & H2O_EBPF_FLAGS_SKIP_TRACING_BIT) != 0)
474         sock->_skip_tracing = 1;
475     return sock;
476 }
477 
h2o_socket_connect(h2o_loop_t * loop,struct sockaddr * addr,socklen_t addrlen,h2o_socket_cb cb,const char ** err)478 h2o_socket_t *h2o_socket_connect(h2o_loop_t *loop, struct sockaddr *addr, socklen_t addrlen, h2o_socket_cb cb, const char **err)
479 {
480     int fd;
481     struct st_h2o_evloop_socket_t *sock;
482 
483     if ((fd = cloexec_socket(addr->sa_family, SOCK_STREAM, 0)) == -1) {
484         if (err != NULL) {
485             *err = h2o_socket_error_socket_fail;
486         }
487         return NULL;
488     }
489     fcntl(fd, F_SETFL, O_NONBLOCK);
490     if (!(connect(fd, addr, addrlen) == 0 || errno == EINPROGRESS)) {
491         if (err != NULL)
492             *err = h2o_socket_get_error_string(errno, h2o_socket_error_conn_fail);
493         close(fd);
494         return NULL;
495     }
496 
497     sock = create_socket(loop, fd, H2O_SOCKET_FLAG_IS_CONNECTING);
498     set_nodelay_if_likely_tcp(fd, addr);
499 
500     h2o_socket_notify_write(&sock->super, cb);
501     return &sock->super;
502 }
503 
h2o_evloop_socket_set_max_read_size(h2o_socket_t * _sock,size_t max_size)504 void h2o_evloop_socket_set_max_read_size(h2o_socket_t *_sock, size_t max_size)
505 {
506     struct st_h2o_evloop_socket_t *sock = (void *)_sock;
507     sock->max_read_size = max_size;
508 }
509 
create_evloop(size_t sz)510 h2o_evloop_t *create_evloop(size_t sz)
511 {
512     h2o_evloop_t *loop = h2o_mem_alloc(sz);
513 
514     memset(loop, 0, sz);
515     loop->_statechanged.tail_ref = &loop->_statechanged.head;
516     update_now(loop);
517     /* 3 levels * 32-slots => 1 second goes into 2nd, becomes O(N) above approx. 31 seconds */
518     loop->_timeouts = h2o_timerwheel_create(3, loop->_now_millisec);
519 
520     return loop;
521 }
522 
update_now(h2o_evloop_t * loop)523 void update_now(h2o_evloop_t *loop)
524 {
525     gettimeofday(&loop->_tv_at, NULL);
526     loop->_now_nanosec = ((uint64_t)loop->_tv_at.tv_sec * 1000000 + loop->_tv_at.tv_usec) * 1000;
527     loop->_now_millisec = loop->_now_nanosec / 1000000;
528 }
529 
adjust_max_wait(h2o_evloop_t * loop,int32_t max_wait)530 int32_t adjust_max_wait(h2o_evloop_t *loop, int32_t max_wait)
531 {
532     uint64_t wake_at = h2o_timerwheel_get_wake_at(loop->_timeouts);
533 
534     update_now(loop);
535 
536     if (wake_at <= loop->_now_millisec) {
537         max_wait = 0;
538     } else {
539         uint64_t delta = wake_at - loop->_now_millisec;
540         if (delta < max_wait)
541             max_wait = (int32_t)delta;
542     }
543 
544     return max_wait;
545 }
546 
h2o_socket_notify_write(h2o_socket_t * _sock,h2o_socket_cb cb)547 void h2o_socket_notify_write(h2o_socket_t *_sock, h2o_socket_cb cb)
548 {
549     struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
550     assert(sock->super._cb.write == NULL);
551     assert(sock->super._write_buf.cnt == 0);
552     assert(!has_pending_ssl_bytes(sock->super.ssl));
553 
554     sock->super._cb.write = cb;
555     link_to_statechanged(sock);
556 }
557 
run_socket(struct st_h2o_evloop_socket_t * sock)558 static void run_socket(struct st_h2o_evloop_socket_t *sock)
559 {
560     if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) {
561         /* is freed in updatestates phase */
562         return;
563     }
564 
565     if ((sock->_flags & H2O_SOCKET_FLAG_IS_READ_READY) != 0) {
566         sock->_flags &= ~H2O_SOCKET_FLAG_IS_READ_READY;
567         read_on_ready(sock);
568     }
569 
570     if ((sock->_flags & H2O_SOCKET_FLAG_IS_WRITE_NOTIFY) != 0) {
571         const char *err = NULL;
572         assert(sock->super._cb.write != NULL);
573         sock->_flags &= ~H2O_SOCKET_FLAG_IS_WRITE_NOTIFY;
574         if (sock->super._write_buf.cnt != 0 || has_pending_ssl_bytes(sock->super.ssl)) {
575             /* error */
576             err = h2o_socket_error_io;
577             sock->super._write_buf.cnt = 0;
578             if (has_pending_ssl_bytes(sock->super.ssl))
579                 dispose_ssl_output_buffer(sock->super.ssl);
580         } else if ((sock->_flags & H2O_SOCKET_FLAG_IS_CONNECTING) != 0) {
581             sock->_flags &= ~H2O_SOCKET_FLAG_IS_CONNECTING;
582             int so_err = 0;
583             socklen_t l = sizeof(so_err);
584             so_err = 0;
585             if (getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, &so_err, &l) != 0 || so_err != 0)
586                 err = h2o_socket_get_error_string(so_err, h2o_socket_error_conn_fail);
587         }
588         on_write_complete(&sock->super, err);
589     }
590 }
591 
run_pending(h2o_evloop_t * loop)592 static void run_pending(h2o_evloop_t *loop)
593 {
594     struct st_h2o_evloop_socket_t *sock;
595 
596     while (loop->_pending_as_server != NULL || loop->_pending_as_client != NULL) {
597         while ((sock = loop->_pending_as_client) != NULL) {
598             loop->_pending_as_client = sock->_next_pending;
599             sock->_next_pending = sock;
600             run_socket(sock);
601         }
602         if ((sock = loop->_pending_as_server) != NULL) {
603             loop->_pending_as_server = sock->_next_pending;
604             sock->_next_pending = sock;
605             run_socket(sock);
606         }
607     }
608 }
609 
h2o_evloop_destroy(h2o_evloop_t * loop)610 void h2o_evloop_destroy(h2o_evloop_t *loop)
611 {
612     struct st_h2o_evloop_socket_t *sock;
613 
614     /* timeouts are governed by the application and MUST be destroyed prior to destroying the loop */
615     assert(h2o_timerwheel_get_wake_at(loop->_timeouts) == UINT64_MAX);
616 
617     /* dispose all socket */
618     while ((sock = loop->_pending_as_client) != NULL) {
619         loop->_pending_as_client = sock->_next_pending;
620         sock->_next_pending = sock;
621         h2o_socket_close((h2o_socket_t *)sock);
622     }
623     while ((sock = loop->_pending_as_server) != NULL) {
624         loop->_pending_as_server = sock->_next_pending;
625         sock->_next_pending = sock;
626         h2o_socket_close((h2o_socket_t *)sock);
627     }
628 
629     /* now all socket are disposedand and placed in linked list statechanged
630      * we can freeing memory in cycle by next_statechanged,
631      */
632     while ((sock = loop->_statechanged.head) != NULL) {
633         loop->_statechanged.head = sock->_next_statechanged;
634         free(sock);
635     }
636 
637     /* dispose backend-specific data */
638     evloop_do_dispose(loop);
639 
640     /* lastly we need to free loop memory */
641     h2o_timerwheel_destroy(loop->_timeouts);
642     free(loop);
643 }
644 
h2o_evloop_run(h2o_evloop_t * loop,int32_t max_wait)645 int h2o_evloop_run(h2o_evloop_t *loop, int32_t max_wait)
646 {
647     /* update socket states, poll, set readable flags, perform pending writes */
648     if (evloop_do_proceed(loop, max_wait) != 0)
649         return -1;
650 
651     /* run the pending callbacks */
652     run_pending(loop);
653 
654     /* run the expired timers at the same time invoking pending callbacks for every timer callback. This is an locality
655      * optimization; handles things like timeout -> write -> on_write_complete for each object. */
656     while (1) {
657         h2o_linklist_t expired;
658         h2o_linklist_init_anchor(&expired);
659         h2o_timerwheel_get_expired(loop->_timeouts, loop->_now_millisec, &expired);
660         if (h2o_linklist_is_empty(&expired))
661             break;
662         do {
663             h2o_timerwheel_entry_t *timer = H2O_STRUCT_FROM_MEMBER(h2o_timerwheel_entry_t, _link, expired.next);
664             h2o_linklist_unlink(&timer->_link);
665             timer->cb(timer);
666             run_pending(loop);
667         } while (!h2o_linklist_is_empty(&expired));
668     }
669 
670     assert(loop->_pending_as_client == NULL);
671     assert(loop->_pending_as_server == NULL);
672 
673     if (h2o_sliding_counter_is_running(&loop->exec_time_nanosec_counter)) {
674         update_now(loop);
675         h2o_sliding_counter_stop(&loop->exec_time_nanosec_counter, loop->_now_nanosec);
676     }
677 
678     return 0;
679 }
680