1 /*
2 * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku, Fastly, Inc.
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to
6 * deal in the Software without restriction, including without limitation the
7 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8 * sell copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 * IN THE SOFTWARE.
21 */
22 #include <netinet/in.h>
23 #include <netinet/tcp.h>
24 #include <stdlib.h>
25 #include <sys/socket.h>
26 #include <sys/time.h>
27 #include <sys/uio.h>
28 #include <unistd.h>
29 #include "cloexec.h"
30 #include "h2o/linklist.h"
31
32 #if !defined(H2O_USE_ACCEPT4)
33 #ifdef __linux__
34 #if defined(__ANDROID__) && __ANDROID_API__ < 21
35 #define H2O_USE_ACCEPT4 0
36 #else
37 #define H2O_USE_ACCEPT4 1
38 #endif
39 #elif __FreeBSD__ >= 10
40 #define H2O_USE_ACCEPT4 1
41 #elif defined(__DragonFly__)
42 #define H2O_USE_ACCEPT4 1
43 #else
44 #define H2O_USE_ACCEPT4 0
45 #endif
46 #endif
47
48 struct st_h2o_evloop_socket_t {
49 h2o_socket_t super;
50 int fd;
51 int _flags;
52 h2o_evloop_t *loop;
53 size_t max_read_size;
54 struct st_h2o_evloop_socket_t *_next_pending;
55 struct st_h2o_evloop_socket_t *_next_statechanged;
56 };
57
58 static void link_to_pending(struct st_h2o_evloop_socket_t *sock);
59 static void write_pending(struct st_h2o_evloop_socket_t *sock);
60 static h2o_evloop_t *create_evloop(size_t sz);
61 static void update_now(h2o_evloop_t *loop);
62 static int32_t adjust_max_wait(h2o_evloop_t *loop, int32_t max_wait);
63
64 /* functions to be defined in the backends */
65 static int evloop_do_proceed(h2o_evloop_t *loop, int32_t max_wait);
66 static void evloop_do_dispose(h2o_evloop_t *loop);
67 static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock);
68 static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock);
69 static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock);
70
71 #if H2O_USE_POLL || H2O_USE_EPOLL || H2O_USE_KQUEUE
72 /* explicitly specified */
73 #else
74 #if defined(__APPLE__) || defined(__DragonFly__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
75 #define H2O_USE_KQUEUE 1
76 #elif defined(__linux)
77 #define H2O_USE_EPOLL 1
78 #else
79 #define H2O_USE_POLL 1
80 #endif
81 #endif
82
83 #if H2O_USE_POLL
84 #include "evloop/poll.c.h"
85 #elif H2O_USE_EPOLL
86 #include "evloop/epoll.c.h"
87 #elif H2O_USE_KQUEUE
88 #include "evloop/kqueue.c.h"
89 #else
90 #error "poller not specified"
91 #endif
92
link_to_pending(struct st_h2o_evloop_socket_t * sock)93 void link_to_pending(struct st_h2o_evloop_socket_t *sock)
94 {
95 if (sock->_next_pending == sock) {
96 struct st_h2o_evloop_socket_t **slot = (sock->_flags & H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION) != 0
97 ? &sock->loop->_pending_as_server
98 : &sock->loop->_pending_as_client;
99 sock->_next_pending = *slot;
100 *slot = sock;
101 }
102 }
103
link_to_statechanged(struct st_h2o_evloop_socket_t * sock)104 static void link_to_statechanged(struct st_h2o_evloop_socket_t *sock)
105 {
106 if (sock->_next_statechanged == sock) {
107 sock->_next_statechanged = NULL;
108 *sock->loop->_statechanged.tail_ref = sock;
109 sock->loop->_statechanged.tail_ref = &sock->_next_statechanged;
110 }
111 }
112
on_read_core(int fd,h2o_buffer_t ** input,size_t max_bytes)113 static const char *on_read_core(int fd, h2o_buffer_t **input, size_t max_bytes)
114 {
115 ssize_t read_so_far = 0;
116
117 while (1) {
118 ssize_t rret;
119 h2o_iovec_t buf = h2o_buffer_try_reserve(input, max_bytes < 4096 ? max_bytes : 4096);
120 if (buf.base == NULL) {
121 /* memory allocation failed */
122 return h2o_socket_error_out_of_memory;
123 }
124 size_t read_size = buf.len <= INT_MAX / 2 ? buf.len : INT_MAX / 2 + 1;
125 if (read_size > max_bytes)
126 read_size = max_bytes;
127 while ((rret = read(fd, buf.base, read_size)) == -1 && errno == EINTR)
128 ;
129 if (rret == -1) {
130 if (errno == EAGAIN)
131 break;
132 else
133 return h2o_socket_error_io;
134 } else if (rret == 0) {
135 if (read_so_far == 0)
136 return h2o_socket_error_closed; /* TODO notify close */
137 break;
138 }
139 (*input)->size += rret;
140 if (buf.len != rret)
141 break;
142 read_so_far += rret;
143 if (read_so_far >= max_bytes)
144 break;
145 }
146 return NULL;
147 }
148
write_vecs(struct st_h2o_evloop_socket_t * sock,h2o_iovec_t ** bufs,size_t * bufcnt)149 static size_t write_vecs(struct st_h2o_evloop_socket_t *sock, h2o_iovec_t **bufs, size_t *bufcnt)
150 {
151 ssize_t wret;
152
153 while (*bufcnt != 0) {
154 /* write */
155 int iovcnt = *bufcnt < IOV_MAX ? (int)*bufcnt : IOV_MAX;
156 while ((wret = writev(sock->fd, (struct iovec *)*bufs, iovcnt)) == -1 && errno == EINTR)
157 ;
158 SOCKET_PROBE(WRITEV, &sock->super, wret);
159
160 if (wret == -1)
161 return errno == EAGAIN ? 0 : SIZE_MAX;
162
163 /* adjust the buffer, doing the write once again only if all IOV_MAX buffers being supplied were fully written */
164 while ((*bufs)->len <= wret) {
165 wret -= (*bufs)->len;
166 ++*bufs;
167 --*bufcnt;
168 if (*bufcnt == 0) {
169 assert(wret == 0);
170 return 0;
171 }
172 }
173 if (wret != 0) {
174 return wret;
175 } else if (iovcnt < IOV_MAX) {
176 return 0;
177 }
178 }
179
180 return 0;
181 }
182
write_core(struct st_h2o_evloop_socket_t * sock,h2o_iovec_t ** bufs,size_t * bufcnt)183 static size_t write_core(struct st_h2o_evloop_socket_t *sock, h2o_iovec_t **bufs, size_t *bufcnt)
184 {
185 if (sock->super.ssl == NULL)
186 return write_vecs(sock, bufs, bufcnt);
187
188 /* SSL */
189 size_t first_buf_written = 0;
190 do {
191 /* write bytes already encrypted, if any */
192 if (has_pending_ssl_bytes(sock->super.ssl)) {
193 h2o_iovec_t encbuf = h2o_iovec_init(sock->super.ssl->output.buf.base + sock->super.ssl->output.pending_off,
194 sock->super.ssl->output.buf.off - sock->super.ssl->output.pending_off);
195 h2o_iovec_t *encbufs = &encbuf;
196 size_t encbufcnt = 1, enc_written;
197 if ((enc_written = write_vecs(sock, &encbufs, &encbufcnt)) == SIZE_MAX) {
198 dispose_ssl_output_buffer(sock->super.ssl);
199 return SIZE_MAX;
200 }
201 /* if write is incomplete, record the advance and bail out */
202 if (encbufcnt != 0) {
203 sock->super.ssl->output.pending_off += enc_written;
204 break;
205 }
206 /* succeeded in writing all the encrypted data; free the buffer */
207 dispose_ssl_output_buffer(sock->super.ssl);
208 }
209 /* bail out if complete */
210 if (*bufcnt == 0)
211 break;
212 /* convert more cleartext to TLS records if possible, or bail out on fatal error */
213 } while ((first_buf_written = generate_tls_records(&sock->super, bufs, bufcnt, first_buf_written)) != SIZE_MAX);
214
215 return first_buf_written;
216 }
217
write_pending(struct st_h2o_evloop_socket_t * sock)218 void write_pending(struct st_h2o_evloop_socket_t *sock)
219 {
220
221 assert(sock->super._cb.write != NULL);
222
223 /* DONT_WRITE poll */
224 if (sock->super._write_buf.cnt == 0 && !has_pending_ssl_bytes(sock->super.ssl))
225 goto Complete;
226
227 { /* write */
228 size_t first_buf_written;
229 if ((first_buf_written = write_core(sock, &sock->super._write_buf.bufs, &sock->super._write_buf.cnt)) != SIZE_MAX) {
230 /* return if there's still pending data, adjusting buf[0] if necessary */
231 if (sock->super._write_buf.cnt != 0) {
232 sock->super._write_buf.bufs[0].base += first_buf_written;
233 sock->super._write_buf.bufs[0].len -= first_buf_written;
234 return;
235 } else if (has_pending_ssl_bytes(sock->super.ssl)) {
236 return;
237 }
238 }
239 }
240
241 /* either completed or failed */
242 dispose_write_buf(&sock->super);
243
244 Complete:
245 SOCKET_PROBE(WRITE_COMPLETE, &sock->super, sock->super._write_buf.cnt == 0 && !has_pending_ssl_bytes(sock->super.ssl));
246 sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY;
247 link_to_pending(sock);
248 link_to_statechanged(sock); /* might need to disable the write polling */
249 }
250
read_on_ready(struct st_h2o_evloop_socket_t * sock)251 static void read_on_ready(struct st_h2o_evloop_socket_t *sock)
252 {
253 const char *err = 0;
254 size_t prev_size = sock->super.input->size;
255
256 if ((sock->_flags & H2O_SOCKET_FLAG_DONT_READ) != 0)
257 goto Notify;
258
259 if ((err = on_read_core(sock->fd, sock->super.ssl == NULL ? &sock->super.input : &sock->super.ssl->input.encrypted,
260 sock->max_read_size)) != NULL)
261 goto Notify;
262
263 if (sock->super.ssl != NULL && sock->super.ssl->handshake.cb == NULL)
264 err = decode_ssl_input(&sock->super);
265
266 Notify:
267 /* the application may get notified even if no new data is avaiable. The
268 * behavior is intentional; it is designed as such so that the applications
269 * can update their timeout counters when a partial SSL record arrives.
270 */
271 sock->super.bytes_read += sock->super.input->size - prev_size;
272 sock->super._cb.read(&sock->super, err);
273 }
274
do_dispose_socket(h2o_socket_t * _sock)275 void do_dispose_socket(h2o_socket_t *_sock)
276 {
277 struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
278
279 evloop_do_on_socket_close(sock);
280 dispose_write_buf(&sock->super);
281 if (sock->fd != -1) {
282 close(sock->fd);
283 sock->fd = -1;
284 }
285 sock->_flags = H2O_SOCKET_FLAG_IS_DISPOSED;
286 link_to_statechanged(sock);
287 }
288
do_write(h2o_socket_t * _sock,h2o_iovec_t * bufs,size_t bufcnt,h2o_socket_cb cb)289 void do_write(h2o_socket_t *_sock, h2o_iovec_t *bufs, size_t bufcnt, h2o_socket_cb cb)
290 {
291 struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
292 size_t first_buf_written;
293
294 sock->super._cb.write = cb;
295
296 /* try to write now */
297 if ((first_buf_written = write_core(sock, &bufs, &bufcnt)) == SIZE_MAX) {
298 /* fill in _wreq.bufs with fake data to indicate error */
299 sock->super._write_buf.bufs = sock->super._write_buf.smallbufs;
300 sock->super._write_buf.cnt = 1;
301 *sock->super._write_buf.bufs = h2o_iovec_init(H2O_STRLIT("deadbeef"));
302 sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY;
303 link_to_pending(sock);
304 return;
305 }
306 if (bufcnt == 0 && !has_pending_ssl_bytes(sock->super.ssl)) {
307 /* write complete, schedule the callback */
308 sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY;
309 link_to_pending(sock);
310 return;
311 }
312
313 /* setup the buffer to send pending data */
314 init_write_buf(&sock->super, bufs, bufcnt, first_buf_written);
315
316 /* schedule the write */
317 link_to_statechanged(sock);
318 }
319
h2o_socket_get_fd(h2o_socket_t * _sock)320 int h2o_socket_get_fd(h2o_socket_t *_sock)
321 {
322 struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
323 return sock->fd;
324 }
325
do_read_start(h2o_socket_t * _sock)326 void do_read_start(h2o_socket_t *_sock)
327 {
328 struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
329
330 link_to_statechanged(sock);
331 }
332
do_read_stop(h2o_socket_t * _sock)333 void do_read_stop(h2o_socket_t *_sock)
334 {
335 struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
336
337 sock->_flags &= ~H2O_SOCKET_FLAG_IS_READ_READY;
338 link_to_statechanged(sock);
339 }
340
h2o_socket_dont_read(h2o_socket_t * _sock,int dont_read)341 void h2o_socket_dont_read(h2o_socket_t *_sock, int dont_read)
342 {
343 struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
344
345 if (dont_read) {
346 sock->_flags |= H2O_SOCKET_FLAG_DONT_READ;
347 } else {
348 sock->_flags &= ~H2O_SOCKET_FLAG_DONT_READ;
349 }
350 }
351
do_export(h2o_socket_t * _sock,h2o_socket_export_t * info)352 int do_export(h2o_socket_t *_sock, h2o_socket_export_t *info)
353 {
354 struct st_h2o_evloop_socket_t *sock = (void *)_sock;
355
356 assert((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) == 0);
357 evloop_do_on_socket_export(sock);
358 sock->_flags = H2O_SOCKET_FLAG_IS_DISPOSED;
359
360 info->fd = sock->fd;
361 sock->fd = -1;
362
363 return 0;
364 }
365
do_import(h2o_loop_t * loop,h2o_socket_export_t * info)366 h2o_socket_t *do_import(h2o_loop_t *loop, h2o_socket_export_t *info)
367 {
368 return h2o_evloop_socket_create(loop, info->fd, 0);
369 }
370
h2o_socket_get_loop(h2o_socket_t * _sock)371 h2o_loop_t *h2o_socket_get_loop(h2o_socket_t *_sock)
372 {
373 struct st_h2o_evloop_socket_t *sock = (void *)_sock;
374 return sock->loop;
375 }
376
get_sockname_uncached(h2o_socket_t * _sock,struct sockaddr * sa)377 socklen_t get_sockname_uncached(h2o_socket_t *_sock, struct sockaddr *sa)
378 {
379 struct st_h2o_evloop_socket_t *sock = (void *)_sock;
380 socklen_t len = sizeof(struct sockaddr_storage);
381 if (getsockname(sock->fd, sa, &len) != 0)
382 return 0;
383 return len;
384 }
385
get_peername_uncached(h2o_socket_t * _sock,struct sockaddr * sa)386 socklen_t get_peername_uncached(h2o_socket_t *_sock, struct sockaddr *sa)
387 {
388 struct st_h2o_evloop_socket_t *sock = (void *)_sock;
389 socklen_t len = sizeof(struct sockaddr_storage);
390 if (getpeername(sock->fd, sa, &len) != 0)
391 return 0;
392 return len;
393 }
394
create_socket(h2o_evloop_t * loop,int fd,int flags)395 static struct st_h2o_evloop_socket_t *create_socket(h2o_evloop_t *loop, int fd, int flags)
396 {
397 struct st_h2o_evloop_socket_t *sock;
398
399 sock = h2o_mem_alloc(sizeof(*sock));
400 memset(sock, 0, sizeof(*sock));
401 h2o_buffer_init(&sock->super.input, &h2o_socket_buffer_prototype);
402 sock->loop = loop;
403 sock->fd = fd;
404 sock->_flags = flags;
405 sock->max_read_size = 1024 * 1024; /* by default, we read up to 1MB at once */
406 sock->_next_pending = sock;
407 sock->_next_statechanged = sock;
408
409 evloop_do_on_socket_create(sock);
410
411 return sock;
412 }
413
414 /**
415 * Sets TCP_NODELAY if the given file descriptor is likely to be a TCP socket. The intent of this function isto reduce number of
416 * unnecessary system calls. Therefore, we skip setting TCP_NODELAY when it is certain that the socket is not a TCP socket,
417 * otherwise call setsockopt.
418 */
set_nodelay_if_likely_tcp(int fd,struct sockaddr * sa)419 static void set_nodelay_if_likely_tcp(int fd, struct sockaddr *sa)
420 {
421 if (sa != NULL && !(sa->sa_family == AF_INET || sa->sa_family == AF_INET6))
422 return;
423
424 int on = 1;
425 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
426 }
427
h2o_evloop_socket_create(h2o_evloop_t * loop,int fd,int flags)428 h2o_socket_t *h2o_evloop_socket_create(h2o_evloop_t *loop, int fd, int flags)
429 {
430 /* It is the reponsibility of the event loop to modify the properties of a socket for its use (e.g., set O_NONBLOCK). */
431 fcntl(fd, F_SETFL, O_NONBLOCK);
432 set_nodelay_if_likely_tcp(fd, NULL);
433
434 return &create_socket(loop, fd, flags)->super;
435 }
436
h2o_evloop_socket_accept(h2o_socket_t * _listener)437 h2o_socket_t *h2o_evloop_socket_accept(h2o_socket_t *_listener)
438 {
439 struct st_h2o_evloop_socket_t *listener = (struct st_h2o_evloop_socket_t *)_listener;
440 int fd;
441 h2o_socket_t *sock;
442
443 /* cache the remote address, if we know that we are going to use the value (in h2o_socket_ebpf_lookup_flags) */
444 #if H2O_USE_EBPF_MAP
445 struct {
446 struct sockaddr_storage storage;
447 socklen_t len;
448 } _peeraddr;
449 _peeraddr.len = sizeof(_peeraddr.storage);
450 struct sockaddr_storage *peeraddr = &_peeraddr.storage;
451 socklen_t *peeraddrlen = &_peeraddr.len;
452 #else
453 struct sockaddr_storage *peeraddr = NULL;
454 socklen_t *peeraddrlen = NULL;
455 #endif
456
457 #if H2O_USE_ACCEPT4
458 /* the anticipation here is that a socket returned by `accept4` will inherit the TCP_NODELAY flag from the listening socket */
459 if ((fd = accept4(listener->fd, (struct sockaddr *)peeraddr, peeraddrlen, SOCK_NONBLOCK | SOCK_CLOEXEC)) == -1)
460 return NULL;
461 sock = &create_socket(listener->loop, fd, H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION)->super;
462 #else
463 if ((fd = cloexec_accept(listener->fd, (struct sockaddr *)peeraddr, peeraddrlen)) == -1)
464 return NULL;
465 fcntl(fd, F_SETFL, O_NONBLOCK);
466 sock = &create_socket(listener->loop, fd, H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION)->super;
467 #endif
468 set_nodelay_if_likely_tcp(fd, (struct sockaddr *)peeraddr);
469
470 if (peeraddr != NULL && *peeraddrlen <= sizeof(*peeraddr))
471 h2o_socket_setpeername(sock, (struct sockaddr *)peeraddr, *peeraddrlen);
472 uint64_t flags = h2o_socket_ebpf_lookup_flags(listener->loop, h2o_socket_ebpf_init_key, sock);
473 if ((flags & H2O_EBPF_FLAGS_SKIP_TRACING_BIT) != 0)
474 sock->_skip_tracing = 1;
475 return sock;
476 }
477
h2o_socket_connect(h2o_loop_t * loop,struct sockaddr * addr,socklen_t addrlen,h2o_socket_cb cb,const char ** err)478 h2o_socket_t *h2o_socket_connect(h2o_loop_t *loop, struct sockaddr *addr, socklen_t addrlen, h2o_socket_cb cb, const char **err)
479 {
480 int fd;
481 struct st_h2o_evloop_socket_t *sock;
482
483 if ((fd = cloexec_socket(addr->sa_family, SOCK_STREAM, 0)) == -1) {
484 if (err != NULL) {
485 *err = h2o_socket_error_socket_fail;
486 }
487 return NULL;
488 }
489 fcntl(fd, F_SETFL, O_NONBLOCK);
490 if (!(connect(fd, addr, addrlen) == 0 || errno == EINPROGRESS)) {
491 if (err != NULL)
492 *err = h2o_socket_get_error_string(errno, h2o_socket_error_conn_fail);
493 close(fd);
494 return NULL;
495 }
496
497 sock = create_socket(loop, fd, H2O_SOCKET_FLAG_IS_CONNECTING);
498 set_nodelay_if_likely_tcp(fd, addr);
499
500 h2o_socket_notify_write(&sock->super, cb);
501 return &sock->super;
502 }
503
h2o_evloop_socket_set_max_read_size(h2o_socket_t * _sock,size_t max_size)504 void h2o_evloop_socket_set_max_read_size(h2o_socket_t *_sock, size_t max_size)
505 {
506 struct st_h2o_evloop_socket_t *sock = (void *)_sock;
507 sock->max_read_size = max_size;
508 }
509
create_evloop(size_t sz)510 h2o_evloop_t *create_evloop(size_t sz)
511 {
512 h2o_evloop_t *loop = h2o_mem_alloc(sz);
513
514 memset(loop, 0, sz);
515 loop->_statechanged.tail_ref = &loop->_statechanged.head;
516 update_now(loop);
517 /* 3 levels * 32-slots => 1 second goes into 2nd, becomes O(N) above approx. 31 seconds */
518 loop->_timeouts = h2o_timerwheel_create(3, loop->_now_millisec);
519
520 return loop;
521 }
522
update_now(h2o_evloop_t * loop)523 void update_now(h2o_evloop_t *loop)
524 {
525 gettimeofday(&loop->_tv_at, NULL);
526 loop->_now_nanosec = ((uint64_t)loop->_tv_at.tv_sec * 1000000 + loop->_tv_at.tv_usec) * 1000;
527 loop->_now_millisec = loop->_now_nanosec / 1000000;
528 }
529
adjust_max_wait(h2o_evloop_t * loop,int32_t max_wait)530 int32_t adjust_max_wait(h2o_evloop_t *loop, int32_t max_wait)
531 {
532 uint64_t wake_at = h2o_timerwheel_get_wake_at(loop->_timeouts);
533
534 update_now(loop);
535
536 if (wake_at <= loop->_now_millisec) {
537 max_wait = 0;
538 } else {
539 uint64_t delta = wake_at - loop->_now_millisec;
540 if (delta < max_wait)
541 max_wait = (int32_t)delta;
542 }
543
544 return max_wait;
545 }
546
h2o_socket_notify_write(h2o_socket_t * _sock,h2o_socket_cb cb)547 void h2o_socket_notify_write(h2o_socket_t *_sock, h2o_socket_cb cb)
548 {
549 struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock;
550 assert(sock->super._cb.write == NULL);
551 assert(sock->super._write_buf.cnt == 0);
552 assert(!has_pending_ssl_bytes(sock->super.ssl));
553
554 sock->super._cb.write = cb;
555 link_to_statechanged(sock);
556 }
557
run_socket(struct st_h2o_evloop_socket_t * sock)558 static void run_socket(struct st_h2o_evloop_socket_t *sock)
559 {
560 if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) {
561 /* is freed in updatestates phase */
562 return;
563 }
564
565 if ((sock->_flags & H2O_SOCKET_FLAG_IS_READ_READY) != 0) {
566 sock->_flags &= ~H2O_SOCKET_FLAG_IS_READ_READY;
567 read_on_ready(sock);
568 }
569
570 if ((sock->_flags & H2O_SOCKET_FLAG_IS_WRITE_NOTIFY) != 0) {
571 const char *err = NULL;
572 assert(sock->super._cb.write != NULL);
573 sock->_flags &= ~H2O_SOCKET_FLAG_IS_WRITE_NOTIFY;
574 if (sock->super._write_buf.cnt != 0 || has_pending_ssl_bytes(sock->super.ssl)) {
575 /* error */
576 err = h2o_socket_error_io;
577 sock->super._write_buf.cnt = 0;
578 if (has_pending_ssl_bytes(sock->super.ssl))
579 dispose_ssl_output_buffer(sock->super.ssl);
580 } else if ((sock->_flags & H2O_SOCKET_FLAG_IS_CONNECTING) != 0) {
581 sock->_flags &= ~H2O_SOCKET_FLAG_IS_CONNECTING;
582 int so_err = 0;
583 socklen_t l = sizeof(so_err);
584 so_err = 0;
585 if (getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, &so_err, &l) != 0 || so_err != 0)
586 err = h2o_socket_get_error_string(so_err, h2o_socket_error_conn_fail);
587 }
588 on_write_complete(&sock->super, err);
589 }
590 }
591
run_pending(h2o_evloop_t * loop)592 static void run_pending(h2o_evloop_t *loop)
593 {
594 struct st_h2o_evloop_socket_t *sock;
595
596 while (loop->_pending_as_server != NULL || loop->_pending_as_client != NULL) {
597 while ((sock = loop->_pending_as_client) != NULL) {
598 loop->_pending_as_client = sock->_next_pending;
599 sock->_next_pending = sock;
600 run_socket(sock);
601 }
602 if ((sock = loop->_pending_as_server) != NULL) {
603 loop->_pending_as_server = sock->_next_pending;
604 sock->_next_pending = sock;
605 run_socket(sock);
606 }
607 }
608 }
609
h2o_evloop_destroy(h2o_evloop_t * loop)610 void h2o_evloop_destroy(h2o_evloop_t *loop)
611 {
612 struct st_h2o_evloop_socket_t *sock;
613
614 /* timeouts are governed by the application and MUST be destroyed prior to destroying the loop */
615 assert(h2o_timerwheel_get_wake_at(loop->_timeouts) == UINT64_MAX);
616
617 /* dispose all socket */
618 while ((sock = loop->_pending_as_client) != NULL) {
619 loop->_pending_as_client = sock->_next_pending;
620 sock->_next_pending = sock;
621 h2o_socket_close((h2o_socket_t *)sock);
622 }
623 while ((sock = loop->_pending_as_server) != NULL) {
624 loop->_pending_as_server = sock->_next_pending;
625 sock->_next_pending = sock;
626 h2o_socket_close((h2o_socket_t *)sock);
627 }
628
629 /* now all socket are disposedand and placed in linked list statechanged
630 * we can freeing memory in cycle by next_statechanged,
631 */
632 while ((sock = loop->_statechanged.head) != NULL) {
633 loop->_statechanged.head = sock->_next_statechanged;
634 free(sock);
635 }
636
637 /* dispose backend-specific data */
638 evloop_do_dispose(loop);
639
640 /* lastly we need to free loop memory */
641 h2o_timerwheel_destroy(loop->_timeouts);
642 free(loop);
643 }
644
h2o_evloop_run(h2o_evloop_t * loop,int32_t max_wait)645 int h2o_evloop_run(h2o_evloop_t *loop, int32_t max_wait)
646 {
647 /* update socket states, poll, set readable flags, perform pending writes */
648 if (evloop_do_proceed(loop, max_wait) != 0)
649 return -1;
650
651 /* run the pending callbacks */
652 run_pending(loop);
653
654 /* run the expired timers at the same time invoking pending callbacks for every timer callback. This is an locality
655 * optimization; handles things like timeout -> write -> on_write_complete for each object. */
656 while (1) {
657 h2o_linklist_t expired;
658 h2o_linklist_init_anchor(&expired);
659 h2o_timerwheel_get_expired(loop->_timeouts, loop->_now_millisec, &expired);
660 if (h2o_linklist_is_empty(&expired))
661 break;
662 do {
663 h2o_timerwheel_entry_t *timer = H2O_STRUCT_FROM_MEMBER(h2o_timerwheel_entry_t, _link, expired.next);
664 h2o_linklist_unlink(&timer->_link);
665 timer->cb(timer);
666 run_pending(loop);
667 } while (!h2o_linklist_is_empty(&expired));
668 }
669
670 assert(loop->_pending_as_client == NULL);
671 assert(loop->_pending_as_server == NULL);
672
673 if (h2o_sliding_counter_is_running(&loop->exec_time_nanosec_counter)) {
674 update_now(loop);
675 h2o_sliding_counter_stop(&loop->exec_time_nanosec_counter, loop->_now_nanosec);
676 }
677
678 return 0;
679 }
680