1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 /*
11 * A listen socket handler calls an event facility specific io_accept()
12 * method. The method accept()s a new connection and then calls
13 * nxt_event_conn_accept() to handle the new connection and to prepare
14 * for a next connection to avoid just dropping next accept()ed socket
15 * if no more connections allowed. If there are no available connections
16 * an idle connection would be closed. If there are no idle connections
17 * then new connections will not be accept()ed for 1 second.
18 */
19
20
21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
22 nxt_listen_event_t *lev);
23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
24 void *data);
25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
26 nxt_listen_event_t *lev);
27 static void nxt_conn_accept_close_idle(nxt_task_t *task,
28 nxt_listen_event_t *lev);
29 static void nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj,
30 void *data);
31 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
32 void *data);
33 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
34 void *data);
35
36
37 nxt_listen_event_t *
nxt_listen_event(nxt_task_t * task,nxt_listen_socket_t * ls)38 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
39 {
40 nxt_listen_event_t *lev;
41 nxt_event_engine_t *engine;
42
43 lev = nxt_zalloc(sizeof(nxt_listen_event_t));
44
45 if (nxt_fast_path(lev != NULL)) {
46 lev->socket.fd = ls->socket;
47
48 engine = task->thread->engine;
49 lev->batch = engine->batch;
50 lev->count = 1;
51
52 lev->socket.read_work_queue = &engine->accept_work_queue;
53 lev->socket.read_handler = nxt_conn_listen_handler;
54 lev->socket.error_handler = nxt_conn_listen_event_error;
55 lev->socket.log = &nxt_main_log;
56
57 lev->accept = engine->event.io->accept;
58
59 lev->listen = ls;
60 lev->work_queue = &engine->read_work_queue;
61
62 lev->timer.work_queue = &engine->fast_work_queue;
63 lev->timer.handler = nxt_conn_listen_timer_handler;
64 lev->timer.log = &nxt_main_log;
65
66 lev->task.thread = task->thread;
67 lev->task.log = &nxt_main_log;
68 lev->task.ident = nxt_task_next_ident();
69 lev->socket.task = &lev->task;
70 lev->timer.task = &lev->task;
71
72 if (nxt_conn_accept_alloc(task, lev) != NULL) {
73 nxt_fd_event_enable_accept(engine, &lev->socket);
74
75 nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
76 }
77
78 return lev;
79 }
80
81 return NULL;
82 }
83
84
85 static nxt_conn_t *
nxt_conn_accept_alloc(nxt_task_t * task,nxt_listen_event_t * lev)86 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
87 {
88 nxt_mp_t *mp;
89 nxt_conn_t *c;
90 nxt_event_engine_t *engine;
91
92 engine = task->thread->engine;
93
94 if (engine->connections < engine->max_connections) {
95
96 mp = nxt_mp_create(1024, 128, 256, 32);
97
98 if (nxt_fast_path(mp != NULL)) {
99 c = nxt_conn_create(mp, lev->socket.task);
100 if (nxt_slow_path(c == NULL)) {
101 nxt_mp_destroy(mp);
102
103 return NULL;
104 }
105
106 c->socket.read_work_queue = lev->socket.read_work_queue;
107 c->socket.write_ready = 1;
108
109 c->remote = nxt_sockaddr_cache_alloc(engine, lev->listen);
110 if (nxt_fast_path(c->remote != NULL)) {
111 lev->next = c;
112 return c;
113 }
114
115 nxt_conn_free(task, c);
116 }
117 }
118
119 return NULL;
120 }
121
122
123 static void
nxt_conn_listen_handler(nxt_task_t * task,void * obj,void * data)124 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
125 {
126 nxt_listen_event_t *lev;
127
128 lev = obj;
129 lev->ready = lev->batch;
130
131 lev->accept(task, lev, data);
132 }
133
134
135 void
nxt_conn_io_accept(nxt_task_t * task,void * obj,void * data)136 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
137 {
138 socklen_t socklen;
139 nxt_conn_t *c;
140 nxt_socket_t s;
141 struct sockaddr *sa;
142 nxt_listen_event_t *lev;
143
144 lev = obj;
145 c = lev->next;
146
147 lev->ready--;
148 lev->socket.read_ready = (lev->ready != 0);
149
150 sa = &c->remote->u.sockaddr;
151 socklen = c->remote->socklen;
152 /*
153 * The returned socklen is ignored here, because sockaddr_in and
154 * sockaddr_in6 socklens are not changed. As to unspecified sockaddr_un
155 * it is 3 byte length and already prepared, because old BSDs return zero
156 * socklen and do not update the sockaddr_un at all; Linux returns 2 byte
157 * socklen and updates only the sa_family part; other systems copy 3 bytes
158 * and truncate surplus zero part. Only bound sockaddr_un will be really
159 * truncated here.
160 */
161 s = accept(lev->socket.fd, sa, &socklen);
162
163 if (s == -1) {
164 nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
165 return;
166 }
167
168 c->socket.fd = s;
169
170 #if (NXT_LINUX)
171 /*
172 * Linux does not inherit non-blocking mode
173 * from listen socket for accept()ed socket.
174 */
175 if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
176 nxt_socket_close(task, s);
177 }
178
179 #endif
180
181 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
182
183 nxt_conn_accept(task, lev, c);
184 }
185
186
187 void
nxt_conn_accept(nxt_task_t * task,nxt_listen_event_t * lev,nxt_conn_t * c)188 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
189 {
190 nxt_conn_t *next;
191
192 nxt_sockaddr_text(c->remote);
193
194 nxt_debug(task, "client: %*s",
195 (size_t) c->remote->address_length,
196 nxt_sockaddr_address(c->remote));
197
198 nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
199
200 c->listen = lev;
201 lev->count++;
202 lev->next = NULL;
203 c->socket.data = NULL;
204
205 c->read_work_queue = lev->work_queue;
206 c->write_work_queue = lev->work_queue;
207
208 if (lev->listen->read_after_accept) {
209
210 //c->socket.read_ready = 1;
211 // lev->listen->handler(task, c, lev);
212 nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
213 &c->task, c, lev);
214
215 } else {
216 nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
217 &c->task, c, lev);
218 }
219
220 next = nxt_conn_accept_next(task, lev);
221
222 if (next != NULL && lev->socket.read_ready) {
223 nxt_work_queue_add(lev->socket.read_work_queue,
224 lev->accept, task, lev, next);
225 }
226 }
227
228
229 static nxt_conn_t *
nxt_conn_accept_next(nxt_task_t * task,nxt_listen_event_t * lev)230 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
231 {
232 nxt_conn_t *c;
233
234 c = lev->next;
235
236 if (c == NULL) {
237 c = nxt_conn_accept_alloc(task, lev);
238
239 if (nxt_slow_path(c == NULL)) {
240 nxt_conn_accept_close_idle(task, lev);
241 }
242 }
243
244 return c;
245 }
246
247
248 static void
nxt_conn_accept_close_idle(nxt_task_t * task,nxt_listen_event_t * lev)249 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
250 {
251 nxt_event_engine_t *engine;
252
253 engine = task->thread->engine;
254
255 nxt_work_queue_add(&engine->close_work_queue,
256 nxt_conn_accept_close_idle_handler, task, NULL, NULL);
257
258 nxt_timer_add(engine, &lev->timer, 100);
259
260 nxt_fd_event_disable_read(engine, &lev->socket);
261
262 nxt_alert(task, "new connections are not accepted within 100ms");
263 }
264
265
266 static void
nxt_conn_accept_close_idle_handler(nxt_task_t * task,void * obj,void * data)267 nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj, void *data)
268 {
269 nxt_uint_t times;
270 nxt_conn_t *c;
271 nxt_queue_t *idle;
272 nxt_queue_link_t *link, *next;
273 nxt_event_engine_t *engine;
274
275 static nxt_log_moderation_t nxt_idle_close_log_moderation = {
276 NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
277 };
278
279 times = 10;
280 engine = task->thread->engine;
281 idle = &engine->idle_connections;
282
283 for (link = nxt_queue_last(idle);
284 link != nxt_queue_head(idle);
285 link = next)
286 {
287 next = nxt_queue_next(link);
288
289 c = nxt_queue_link_data(link, nxt_conn_t, link);
290
291 nxt_debug(c->socket.task, "idle connection: %d rdy:%d",
292 c->socket.fd, c->socket.read_ready);
293
294 if (!c->socket.read_ready) {
295 nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
296 task->log, "no available connections, "
297 "close idle connection");
298
299 c->read_state->close_handler(c->socket.task, c, c->socket.data);
300
301 times--;
302
303 if (times == 0) {
304 break;
305 }
306 }
307 }
308 }
309
310
311 void
nxt_conn_accept_error(nxt_task_t * task,nxt_listen_event_t * lev,const char * accept_syscall,nxt_err_t err)312 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
313 const char *accept_syscall, nxt_err_t err)
314 {
315 static nxt_log_moderation_t nxt_accept_log_moderation = {
316 NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
317 };
318
319 lev->socket.read_ready = 0;
320
321 switch (err) {
322
323 case NXT_EAGAIN:
324 nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
325 return;
326
327 case ECONNABORTED:
328 nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
329 task->log, "%s(%d) failed %E",
330 accept_syscall, lev->socket.fd, err);
331 return;
332
333 case EMFILE:
334 case ENFILE:
335 case ENOBUFS:
336 case ENOMEM:
337 nxt_alert(task, "%s(%d) failed %E",
338 accept_syscall, lev->socket.fd, err);
339
340 nxt_conn_accept_close_idle(task, lev);
341 return;
342
343 default:
344 nxt_alert(task, "%s(%d) failed %E",
345 accept_syscall, lev->socket.fd, err);
346 return;
347 }
348 }
349
350
351 static void
nxt_conn_listen_timer_handler(nxt_task_t * task,void * obj,void * data)352 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
353 {
354 nxt_conn_t *c;
355 nxt_timer_t *timer;
356 nxt_listen_event_t *lev;
357
358 timer = obj;
359
360 lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
361
362 c = nxt_conn_accept_next(task, lev);
363 if (c == NULL) {
364 return;
365 }
366
367 nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
368
369 lev->accept(task, lev, c);
370 }
371
372
373 static void
nxt_conn_listen_event_error(nxt_task_t * task,void * obj,void * data)374 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
375 {
376 nxt_fd_event_t *ev;
377
378 ev = obj;
379
380 nxt_alert(task, "accept(%d) event error", ev->fd);
381 }
382