1 /*
2  * Copyright (c) 2014 DeNA Co., Ltd.
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to
6  * deal in the Software without restriction, including without limitation the
7  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8  * sell copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20  * IN THE SOFTWARE.
21  */
22 #include <assert.h>
23 #include <stdio.h>
24 #include <sys/types.h>
25 #include <sys/event.h>
26 #include <sys/time.h>
27 
28 #if 0
29 #define DEBUG_LOG(...) h2o_error_printf(__VA_ARGS__)
30 #else
31 #define DEBUG_LOG(...)
32 #endif
33 
34 struct st_h2o_socket_loop_kqueue_t {
35     h2o_evloop_t super;
36     int kq;
37 };
38 
ev_set(struct kevent * ev,int fd,int filter,int flags,struct st_h2o_evloop_socket_t * sock)39 static void ev_set(struct kevent *ev, int fd, int filter, int flags, struct st_h2o_evloop_socket_t *sock)
40 {
41 #ifdef __NetBSD__
42     EV_SET(ev, fd, filter, flags, 0, 0, (intptr_t)sock);
43 #else
44     EV_SET(ev, fd, filter, flags, 0, 0, sock);
45 #endif
46 }
47 
collect_status(struct st_h2o_socket_loop_kqueue_t * loop,struct kevent * changelist,int changelist_capacity)48 static int collect_status(struct st_h2o_socket_loop_kqueue_t *loop, struct kevent *changelist, int changelist_capacity)
49 {
50     int change_index = 0;
51 
52 #define SET_AND_UPDATE(filter, flags)                                                                                              \
53     do {                                                                                                                           \
54         ev_set(changelist + change_index++, sock->fd, filter, flags, sock);                                                        \
55         if (change_index == changelist_capacity) {                                                                                 \
56             int ret;                                                                                                               \
57             while ((ret = kevent(loop->kq, changelist, change_index, NULL, 0, NULL)) != 0 && errno == EINTR)                       \
58                 ;                                                                                                                  \
59             if (ret == -1)                                                                                                         \
60                 return -1;                                                                                                         \
61             change_index = 0;                                                                                                      \
62         }                                                                                                                          \
63     } while (0)
64 
65     while (loop->super._statechanged.head != NULL) {
66         /* detach the top */
67         struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head;
68         loop->super._statechanged.head = sock->_next_statechanged;
69         sock->_next_statechanged = sock;
70         /* update the state */
71         if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) {
72             free(sock);
73         } else {
74             if (h2o_socket_is_reading(&sock->super)) {
75                 if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) == 0) {
76                     sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ;
77                     SET_AND_UPDATE(EVFILT_READ, EV_ADD);
78                 }
79             } else {
80                 if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) {
81                     sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ;
82                     SET_AND_UPDATE(EVFILT_READ, EV_DELETE);
83                 }
84             }
85             if (h2o_socket_is_writing(&sock->super)) {
86                 if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) == 0) {
87                     sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE;
88                     SET_AND_UPDATE(EVFILT_WRITE, EV_ADD);
89                 }
90             } else {
91                 if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) {
92                     sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE;
93                     SET_AND_UPDATE(EVFILT_WRITE, EV_DELETE);
94                 }
95             }
96         }
97     }
98     loop->super._statechanged.tail_ref = &loop->super._statechanged.head;
99 
100     return change_index;
101 
102 #undef SET_AND_UPDATE
103 }
104 
evloop_do_proceed(h2o_evloop_t * _loop,int32_t max_wait)105 int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait)
106 {
107     struct st_h2o_socket_loop_kqueue_t *loop = (struct st_h2o_socket_loop_kqueue_t *)_loop;
108     struct kevent changelist[64], events[128];
109     int nchanges, nevents, i;
110     struct timespec ts;
111 
112     /* collect (and update) status */
113     if ((nchanges = collect_status(loop, changelist, sizeof(changelist) / sizeof(changelist[0]))) == -1)
114         return -1;
115 
116     /* poll */
117     max_wait = adjust_max_wait(&loop->super, max_wait);
118     ts.tv_sec = max_wait / 1000;
119     ts.tv_nsec = max_wait % 1000 * 1000 * 1000;
120     nevents = kevent(loop->kq, changelist, nchanges, events, sizeof(events) / sizeof(events[0]), &ts);
121 
122     update_now(&loop->super);
123     if (nevents == -1)
124         return -1;
125 
126     if (nevents != 0) {
127         h2o_sliding_counter_start(&loop->super.exec_time_nanosec_counter, loop->super._now_nanosec);
128     }
129 
130     /* update readable flags, perform writes */
131     for (i = 0; i != nevents; ++i) {
132         struct st_h2o_evloop_socket_t *sock = (void *)events[i].udata;
133         assert(sock->fd == events[i].ident);
134         switch (events[i].filter) {
135         case EVFILT_READ:
136             if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) {
137                 sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY;
138                 link_to_pending(sock);
139             }
140             break;
141         case EVFILT_WRITE:
142             if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) {
143                 write_pending(sock);
144             }
145             break;
146         default:
147             break; /* ??? */
148         }
149     }
150 
151     return 0;
152 }
153 
evloop_do_on_socket_create(struct st_h2o_evloop_socket_t * sock)154 static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock)
155 {
156 }
157 
evloop_do_on_socket_close(struct st_h2o_evloop_socket_t * sock)158 static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock)
159 {
160 }
161 
evloop_do_on_socket_export(struct st_h2o_evloop_socket_t * sock)162 static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock)
163 {
164     struct st_h2o_socket_loop_kqueue_t *loop = (void *)sock->loop;
165     struct kevent changelist[2];
166     int change_index = 0, ret;
167 
168     if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0)
169         ev_set(changelist + change_index++, sock->fd, EVFILT_READ, EV_DELETE, 0);
170     if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0)
171         ev_set(changelist + change_index++, sock->fd, EVFILT_WRITE, EV_DELETE, 0);
172     if (change_index == 0)
173         return;
174     while ((ret = kevent(loop->kq, changelist, change_index, NULL, 0, NULL)) != 0 && errno == EINTR)
175         ;
176     if (ret == -1)
177         h2o_error_printf("kevent returned error %d (fd=%d)", errno, sock->fd);
178 }
179 
evloop_do_dispose(h2o_evloop_t * _loop)180 static void evloop_do_dispose(h2o_evloop_t *_loop)
181 {
182     struct st_h2o_socket_loop_kqueue_t *loop = (struct st_h2o_socket_loop_kqueue_t *)_loop;
183     close(loop->kq);
184 }
185 
h2o_evloop_create(void)186 h2o_evloop_t *h2o_evloop_create(void)
187 {
188     struct st_h2o_socket_loop_kqueue_t *loop = (struct st_h2o_socket_loop_kqueue_t *)create_evloop(sizeof(*loop));
189 
190     loop->kq = kqueue();
191 
192     return &loop->super;
193 }
194