1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20
21 #define NXT_SHARED_PORT_ID 0xFFFFu
22
23 typedef struct {
24 nxt_str_t type;
25 uint32_t processes;
26 uint32_t max_processes;
27 uint32_t spare_processes;
28 nxt_msec_t timeout;
29 nxt_msec_t idle_timeout;
30 nxt_conf_value_t *limits_value;
31 nxt_conf_value_t *processes_value;
32 nxt_conf_value_t *targets_value;
33 } nxt_router_app_conf_t;
34
35
36 typedef struct {
37 nxt_str_t pass;
38 nxt_str_t application;
39 } nxt_router_listener_conf_t;
40
41
42 #if (NXT_TLS)
43
44 typedef struct {
45 nxt_str_t name;
46 nxt_socket_conf_t *socket_conf;
47 nxt_router_temp_conf_t *temp_conf;
48 nxt_tls_init_t *tls_init;
49 nxt_bool_t last;
50
51 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53
54 #endif
55
56
57 typedef struct {
58 nxt_str_t *name;
59 nxt_socket_conf_t *socket_conf;
60 nxt_router_temp_conf_t *temp_conf;
61 nxt_bool_t last;
62 } nxt_socket_rpc_t;
63
64
65 typedef struct {
66 nxt_app_t *app;
67 nxt_router_temp_conf_t *temp_conf;
68 uint8_t proto; /* 1 bit */
69 } nxt_app_rpc_t;
70
71
72 typedef struct {
73 nxt_app_joint_t *app_joint;
74 uint32_t generation;
75 uint8_t proto; /* 1 bit */
76 } nxt_app_joint_rpc_t;
77
78
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80 nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83 nxt_port_t *controller_port);
84
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88 nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90 nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92 nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94 nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96 nxt_port_recv_msg_t *msg);
97
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101 nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103 nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
112 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
113 nxt_conf_value_t *conf);
114
115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
118 nxt_app_t *app);
119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
120 nxt_str_t *name);
121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
122 int i);
123
124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
125 nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
127 nxt_port_t *port);
128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
129 nxt_port_t *port, nxt_fd_t fd);
130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
131 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
132 static void nxt_router_listen_socket_ready(nxt_task_t *task,
133 nxt_port_recv_msg_t *msg, void *data);
134 static void nxt_router_listen_socket_error(nxt_task_t *task,
135 nxt_port_recv_msg_t *msg, void *data);
136 #if (NXT_TLS)
137 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
138 nxt_port_recv_msg_t *msg, void *data);
139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
140 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
141 nxt_bool_t last);
142 #endif
143 static void nxt_router_app_rpc_create(nxt_task_t *task,
144 nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
145 static void nxt_router_app_prefork_ready(nxt_task_t *task,
146 nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_app_prefork_error(nxt_task_t *task,
148 nxt_port_recv_msg_t *msg, void *data);
149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
150 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
152 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
153
154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
155 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
156 const nxt_event_interface_t *interface);
157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
158 nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
160 nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
162 nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
164 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
165 nxt_work_handler_t handler);
166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
167 nxt_router_engine_conf_t *recf);
168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
169 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
170
171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
172 nxt_router_temp_conf_t *tmcf);
173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
174 nxt_event_engine_t *engine);
175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
176 nxt_router_temp_conf_t *tmcf);
177
178 static void nxt_router_engines_post(nxt_router_t *router,
179 nxt_router_temp_conf_t *tmcf);
180 static void nxt_router_engine_post(nxt_event_engine_t *engine,
181 nxt_work_t *jobs);
182
183 static void nxt_router_thread_start(void *data);
184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
185 void *data);
186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
187 void *data);
188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
189 void *data);
190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
191 void *data);
192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
193 void *data);
194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
195 void *data);
196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
197 void *data);
198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
199 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
200 static void nxt_router_listen_socket_release(nxt_task_t *task,
201 nxt_socket_conf_t *skcf);
202
203 static void nxt_router_access_log_writer(nxt_task_t *task,
204 nxt_http_request_t *r, nxt_router_access_log_t *access_log);
205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
206 struct tm *tm, size_t size, const char *format);
207 static void nxt_router_access_log_open(nxt_task_t *task,
208 nxt_router_temp_conf_t *tmcf);
209 static void nxt_router_access_log_ready(nxt_task_t *task,
210 nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_error(nxt_task_t *task,
212 nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
214 nxt_router_access_log_t *access_log);
215 static void nxt_router_access_log_release(nxt_task_t *task,
216 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
218 void *data);
219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
220 nxt_port_recv_msg_t *msg, void *data);
221 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
222 nxt_port_recv_msg_t *msg, void *data);
223
224 static void nxt_router_app_port_ready(nxt_task_t *task,
225 nxt_port_recv_msg_t *msg, void *data);
226 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
227 nxt_port_t *app_port);
228 static void nxt_router_app_port_error(nxt_task_t *task,
229 nxt_port_recv_msg_t *msg, void *data);
230
231 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
232 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
233
234 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
235 nxt_port_t *port, nxt_apr_action_t action);
236 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
237 nxt_request_rpc_data_t *req_rpc_data);
238 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
239 void *data);
240 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
241 void *data);
242
243 static void nxt_router_app_prepare_request(nxt_task_t *task,
244 nxt_request_rpc_data_t *req_rpc_data);
245 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
246 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
247
248 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
249 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
250 void *data);
251 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
252 void *data);
253 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
254 void *data);
255 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
256
257 static const nxt_http_request_state_t nxt_http_request_send_state;
258 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
259
260 static void nxt_router_app_joint_use(nxt_task_t *task,
261 nxt_app_joint_t *app_joint, int i);
262
263 static void nxt_router_http_request_release_post(nxt_task_t *task,
264 nxt_http_request_t *r);
265 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
266 void *data);
267 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
268 static void nxt_router_get_port_handler(nxt_task_t *task,
269 nxt_port_recv_msg_t *msg);
270 static void nxt_router_get_mmap_handler(nxt_task_t *task,
271 nxt_port_recv_msg_t *msg);
272
273 extern const nxt_http_request_state_t nxt_http_websocket;
274
275 static nxt_router_t *nxt_router;
276
277 static const nxt_str_t http_prefix = nxt_string("HTTP_");
278 static const nxt_str_t empty_prefix = nxt_string("");
279
280 static const nxt_str_t *nxt_app_msg_prefix[] = {
281 &empty_prefix,
282 &empty_prefix,
283 &http_prefix,
284 &http_prefix,
285 &http_prefix,
286 &empty_prefix,
287 };
288
289
290 static const nxt_port_handlers_t nxt_router_process_port_handlers = {
291 .quit = nxt_signal_quit_handler,
292 .new_port = nxt_router_new_port_handler,
293 .get_port = nxt_router_get_port_handler,
294 .change_file = nxt_port_change_log_file_handler,
295 .mmap = nxt_port_mmap_handler,
296 .get_mmap = nxt_router_get_mmap_handler,
297 .data = nxt_router_conf_data_handler,
298 .app_restart = nxt_router_app_restart_handler,
299 .remove_pid = nxt_router_remove_pid_handler,
300 .access_log = nxt_router_access_log_reopen_handler,
301 .rpc_ready = nxt_port_rpc_handler,
302 .rpc_error = nxt_port_rpc_handler,
303 .oosm = nxt_router_oosm_handler,
304 };
305
306
307 const nxt_process_init_t nxt_router_process = {
308 .name = "router",
309 .type = NXT_PROCESS_ROUTER,
310 .prefork = nxt_router_prefork,
311 .restart = 1,
312 .setup = nxt_process_core_setup,
313 .start = nxt_router_start,
314 .port_handlers = &nxt_router_process_port_handlers,
315 .signals = nxt_process_signals,
316 };
317
318
319 /* Queues of nxt_socket_conf_t */
320 nxt_queue_t creating_sockets;
321 nxt_queue_t pending_sockets;
322 nxt_queue_t updating_sockets;
323 nxt_queue_t keeping_sockets;
324 nxt_queue_t deleting_sockets;
325
326
327 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)328 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
329 {
330 nxt_runtime_stop_app_processes(task, task->thread->runtime);
331
332 return NXT_OK;
333 }
334
335
336 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)337 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
338 {
339 nxt_int_t ret;
340 nxt_port_t *controller_port;
341 nxt_router_t *router;
342 nxt_runtime_t *rt;
343
344 rt = task->thread->runtime;
345
346 nxt_log(task, NXT_LOG_INFO, "router started");
347
348 #if (NXT_TLS)
349 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
350 if (nxt_slow_path(rt->tls == NULL)) {
351 return NXT_ERROR;
352 }
353
354 ret = rt->tls->library_init(task);
355 if (nxt_slow_path(ret != NXT_OK)) {
356 return ret;
357 }
358 #endif
359
360 ret = nxt_http_init(task);
361 if (nxt_slow_path(ret != NXT_OK)) {
362 return ret;
363 }
364
365 router = nxt_zalloc(sizeof(nxt_router_t));
366 if (nxt_slow_path(router == NULL)) {
367 return NXT_ERROR;
368 }
369
370 nxt_queue_init(&router->engines);
371 nxt_queue_init(&router->sockets);
372 nxt_queue_init(&router->apps);
373
374 nxt_router = router;
375
376 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
377 if (controller_port != NULL) {
378 nxt_router_greet_controller(task, controller_port);
379 }
380
381 return NXT_OK;
382 }
383
384
385 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)386 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
387 {
388 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
389 -1, 0, 0, NULL);
390 }
391
392
393 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)394 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
395 void *data)
396 {
397 size_t size;
398 uint32_t stream;
399 nxt_int_t ret;
400 nxt_app_t *app;
401 nxt_buf_t *b;
402 nxt_port_t *dport;
403 nxt_runtime_t *rt;
404 nxt_app_joint_rpc_t *app_joint_rpc;
405
406 app = data;
407
408 nxt_thread_mutex_lock(&app->mutex);
409
410 dport = app->proto_port;
411
412 nxt_thread_mutex_unlock(&app->mutex);
413
414 if (dport != NULL) {
415 nxt_debug(task, "app '%V' %p start process", &app->name, app);
416
417 b = NULL;
418
419 } else {
420 if (app->proto_port_requests > 0) {
421 nxt_debug(task, "app '%V' %p wait for prototype process",
422 &app->name, app);
423
424 app->proto_port_requests++;
425
426 goto skip;
427 }
428
429 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
430
431 rt = task->thread->runtime;
432 dport = rt->port_by_type[NXT_PROCESS_MAIN];
433
434 size = app->name.length + 1 + app->conf.length;
435
436 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
437 if (nxt_slow_path(b == NULL)) {
438 goto failed;
439 }
440
441 nxt_buf_cpystr(b, &app->name);
442 *b->mem.free++ = '\0';
443 nxt_buf_cpystr(b, &app->conf);
444 }
445
446 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
447 nxt_router_app_port_ready,
448 nxt_router_app_port_error,
449 sizeof(nxt_app_joint_rpc_t));
450 if (nxt_slow_path(app_joint_rpc == NULL)) {
451 goto failed;
452 }
453
454 stream = nxt_port_rpc_ex_stream(app_joint_rpc);
455
456 ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS,
457 -1, stream, port->id, b);
458 if (nxt_slow_path(ret != NXT_OK)) {
459 nxt_port_rpc_cancel(task, port, stream);
460
461 goto failed;
462 }
463
464 app_joint_rpc->app_joint = app->joint;
465 app_joint_rpc->generation = app->generation;
466 app_joint_rpc->proto = (b != NULL);
467
468 if (b != NULL) {
469 app->proto_port_requests++;
470
471 b = NULL;
472 }
473
474 nxt_router_app_joint_use(task, app->joint, 1);
475
476 failed:
477
478 if (b != NULL) {
479 nxt_mp_free(b->data, b);
480 }
481
482 skip:
483
484 nxt_router_app_use(task, app, -1);
485 }
486
487
488 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)489 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
490 {
491 app_joint->use_count += i;
492
493 if (app_joint->use_count == 0) {
494 nxt_assert(app_joint->app == NULL);
495
496 nxt_free(app_joint);
497 }
498 }
499
500
501 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)502 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
503 {
504 nxt_int_t res;
505 nxt_port_t *router_port;
506 nxt_runtime_t *rt;
507
508 nxt_debug(task, "app '%V' start process", &app->name);
509
510 rt = task->thread->runtime;
511 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
512
513 nxt_router_app_use(task, app, 1);
514
515 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
516 app);
517
518 if (res == NXT_OK) {
519 return res;
520 }
521
522 nxt_thread_mutex_lock(&app->mutex);
523
524 app->pending_processes--;
525
526 nxt_thread_mutex_unlock(&app->mutex);
527
528 nxt_router_app_use(task, app, -1);
529
530 return NXT_ERROR;
531 }
532
533
534 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)535 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
536 {
537 nxt_buf_t *b, *next;
538 nxt_bool_t cancelled;
539 nxt_port_t *app_port;
540 nxt_msg_info_t *msg_info;
541
542 msg_info = &req_rpc_data->msg_info;
543
544 if (msg_info->buf == NULL) {
545 return 0;
546 }
547
548 app_port = req_rpc_data->app_port;
549
550 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
551 cancelled = nxt_app_queue_cancel(app_port->queue,
552 msg_info->tracking_cookie,
553 req_rpc_data->stream);
554
555 if (cancelled) {
556 nxt_debug(task, "stream #%uD: cancelled by router",
557 req_rpc_data->stream);
558 }
559
560 } else {
561 cancelled = 0;
562 }
563
564 for (b = msg_info->buf; b != NULL; b = next) {
565 next = b->next;
566 b->next = NULL;
567
568 if (b->is_port_mmap_sent) {
569 b->is_port_mmap_sent = cancelled == 0;
570 }
571
572 b->completion_handler(task, b, b->parent);
573 }
574
575 msg_info->buf = NULL;
576
577 return cancelled;
578 }
579
580
581 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)582 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
583 {
584 if (lnk->next != NULL) {
585 nxt_queue_remove(lnk);
586
587 lnk->next = NULL;
588
589 return 1;
590 }
591
592 return 0;
593 }
594
595
596 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)597 nxt_request_rpc_data_unlink(nxt_task_t *task,
598 nxt_request_rpc_data_t *req_rpc_data)
599 {
600 nxt_app_t *app;
601 nxt_bool_t unlinked;
602 nxt_http_request_t *r;
603
604 nxt_router_msg_cancel(task, req_rpc_data);
605
606 app = req_rpc_data->app;
607
608 if (req_rpc_data->app_port != NULL) {
609 nxt_router_app_port_release(task, app, req_rpc_data->app_port,
610 req_rpc_data->apr_action);
611
612 req_rpc_data->app_port = NULL;
613 }
614
615 r = req_rpc_data->request;
616
617 if (r != NULL) {
618 r->timer_data = NULL;
619
620 nxt_router_http_request_release_post(task, r);
621
622 r->req_rpc_data = NULL;
623 req_rpc_data->request = NULL;
624
625 if (app != NULL) {
626 unlinked = 0;
627
628 nxt_thread_mutex_lock(&app->mutex);
629
630 if (r->app_link.next != NULL) {
631 nxt_queue_remove(&r->app_link);
632 r->app_link.next = NULL;
633
634 unlinked = 1;
635 }
636
637 nxt_thread_mutex_unlock(&app->mutex);
638
639 if (unlinked) {
640 nxt_mp_release(r->mem_pool);
641 }
642 }
643 }
644
645 if (app != NULL) {
646 nxt_router_app_use(task, app, -1);
647
648 req_rpc_data->app = NULL;
649 }
650
651 if (req_rpc_data->msg_info.body_fd != -1) {
652 nxt_fd_close(req_rpc_data->msg_info.body_fd);
653
654 req_rpc_data->msg_info.body_fd = -1;
655 }
656
657 if (req_rpc_data->rpc_cancel) {
658 req_rpc_data->rpc_cancel = 0;
659
660 nxt_port_rpc_cancel(task, task->thread->engine->port,
661 req_rpc_data->stream);
662 }
663 }
664
665
666 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)667 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
668 {
669 nxt_int_t res;
670 nxt_app_t *app;
671 nxt_port_t *port, *main_app_port;
672 nxt_runtime_t *rt;
673
674 nxt_port_new_port_handler(task, msg);
675
676 port = msg->u.new_port;
677
678 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
679 nxt_router_greet_controller(task, msg->u.new_port);
680 }
681
682 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) {
683 nxt_port_rpc_handler(task, msg);
684
685 return;
686 }
687
688 if (port == NULL || port->type != NXT_PROCESS_APP) {
689
690 if (msg->port_msg.stream == 0) {
691 return;
692 }
693
694 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
695
696 } else {
697 if (msg->fd[1] != -1) {
698 res = nxt_router_port_queue_map(task, port, msg->fd[1]);
699 if (nxt_slow_path(res != NXT_OK)) {
700 return;
701 }
702
703 nxt_fd_close(msg->fd[1]);
704 msg->fd[1] = -1;
705 }
706 }
707
708 if (msg->port_msg.stream != 0) {
709 nxt_port_rpc_handler(task, msg);
710 return;
711 }
712
713 nxt_debug(task, "new port id %d (%d)", port->id, port->type);
714
715 /*
716 * Port with "id == 0" is application 'main' port and it always
717 * should come with non-zero stream.
718 */
719 nxt_assert(port->id != 0);
720
721 /* Find 'main' app port and get app reference. */
722 rt = task->thread->runtime;
723
724 /*
725 * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
726 * sent to main port (with id == 0) and processed in main thread.
727 */
728 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
729 nxt_assert(main_app_port != NULL);
730
731 app = main_app_port->app;
732
733 if (nxt_fast_path(app != NULL)) {
734 nxt_thread_mutex_lock(&app->mutex);
735
736 /* TODO here should be find-and-add code because there can be
737 port waiters in port_hash */
738 nxt_port_hash_add(&app->port_hash, port);
739 app->port_hash_count++;
740
741 nxt_thread_mutex_unlock(&app->mutex);
742
743 port->app = app;
744 }
745
746 port->main_app_port = main_app_port;
747
748 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
749 }
750
751
752 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)753 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
754 {
755 void *p;
756 size_t size;
757 nxt_int_t ret;
758 nxt_port_t *port;
759 nxt_router_temp_conf_t *tmcf;
760
761 port = nxt_runtime_port_find(task->thread->runtime,
762 msg->port_msg.pid,
763 msg->port_msg.reply_port);
764 if (nxt_slow_path(port == NULL)) {
765 nxt_alert(task, "conf_data_handler: reply port not found");
766 return;
767 }
768
769 p = MAP_FAILED;
770
771 /*
772 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
773 * initialized in 'cleanup' section.
774 */
775 size = 0;
776
777 tmcf = nxt_router_temp_conf(task);
778 if (nxt_slow_path(tmcf == NULL)) {
779 goto fail;
780 }
781
782 if (nxt_slow_path(msg->fd[0] == -1)) {
783 nxt_alert(task, "conf_data_handler: invalid shm fd");
784 goto fail;
785 }
786
787 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
788 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
789 (int) nxt_buf_mem_used_size(&msg->buf->mem));
790 goto fail;
791 }
792
793 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
794
795 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
796
797 nxt_fd_close(msg->fd[0]);
798 msg->fd[0] = -1;
799
800 if (nxt_slow_path(p == MAP_FAILED)) {
801 goto fail;
802 }
803
804 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
805
806 tmcf->router_conf->router = nxt_router;
807 tmcf->stream = msg->port_msg.stream;
808 tmcf->port = port;
809
810 nxt_port_use(task, tmcf->port, 1);
811
812 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
813
814 if (nxt_fast_path(ret == NXT_OK)) {
815 nxt_router_conf_apply(task, tmcf, NULL);
816
817 } else {
818 nxt_router_conf_error(task, tmcf);
819 }
820
821 goto cleanup;
822
823 fail:
824
825 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
826 msg->port_msg.stream, 0, NULL);
827
828 if (tmcf != NULL) {
829 nxt_mp_release(tmcf->mem_pool);
830 }
831
832 cleanup:
833
834 if (p != MAP_FAILED) {
835 nxt_mem_munmap(p, size);
836 }
837
838 if (msg->fd[0] != -1) {
839 nxt_fd_close(msg->fd[0]);
840 msg->fd[0] = -1;
841 }
842 }
843
844
845 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)846 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
847 {
848 nxt_app_t *app;
849 nxt_int_t ret;
850 nxt_str_t app_name;
851 nxt_port_t *reply_port, *shared_port, *old_shared_port;
852 nxt_port_t *proto_port;
853 nxt_port_msg_type_t reply;
854
855 reply_port = nxt_runtime_port_find(task->thread->runtime,
856 msg->port_msg.pid,
857 msg->port_msg.reply_port);
858 if (nxt_slow_path(reply_port == NULL)) {
859 nxt_alert(task, "app_restart_handler: reply port not found");
860 return;
861 }
862
863 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
864 app_name.start = msg->buf->mem.pos;
865
866 nxt_debug(task, "app_restart_handler: %V", &app_name);
867
868 app = nxt_router_app_find(&nxt_router->apps, &app_name);
869
870 if (nxt_fast_path(app != NULL)) {
871 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
872 NXT_PROCESS_APP);
873 if (nxt_slow_path(shared_port == NULL)) {
874 goto fail;
875 }
876
877 ret = nxt_port_socket_init(task, shared_port, 0);
878 if (nxt_slow_path(ret != NXT_OK)) {
879 nxt_port_use(task, shared_port, -1);
880 goto fail;
881 }
882
883 ret = nxt_router_app_queue_init(task, shared_port);
884 if (nxt_slow_path(ret != NXT_OK)) {
885 nxt_port_write_close(shared_port);
886 nxt_port_read_close(shared_port);
887 nxt_port_use(task, shared_port, -1);
888 goto fail;
889 }
890
891 nxt_port_write_enable(task, shared_port);
892
893 nxt_thread_mutex_lock(&app->mutex);
894
895 proto_port = app->proto_port;
896
897 if (proto_port != NULL) {
898 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
899 proto_port->pid);
900
901 app->proto_port = NULL;
902 proto_port->app = NULL;
903 }
904
905 app->generation++;
906
907 shared_port->app = app;
908
909 old_shared_port = app->shared_port;
910 old_shared_port->app = NULL;
911
912 app->shared_port = shared_port;
913
914 nxt_thread_mutex_unlock(&app->mutex);
915
916 nxt_port_close(task, old_shared_port);
917 nxt_port_use(task, old_shared_port, -1);
918
919 if (proto_port != NULL) {
920 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
921 -1, 0, 0, NULL);
922
923 nxt_port_close(task, proto_port);
924
925 nxt_port_use(task, proto_port, -1);
926 }
927
928 reply = NXT_PORT_MSG_RPC_READY_LAST;
929
930 } else {
931
932 fail:
933
934 reply = NXT_PORT_MSG_RPC_ERROR;
935 }
936
937 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
938 0, NULL);
939 }
940
941
942 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)943 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
944 void *data)
945 {
946 union {
947 nxt_pid_t removed_pid;
948 void *data;
949 } u;
950
951 u.data = data;
952
953 nxt_port_rpc_remove_peer(task, port, u.removed_pid);
954 }
955
956
957 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)958 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
959 {
960 nxt_event_engine_t *engine;
961
962 nxt_port_remove_pid_handler(task, msg);
963
964 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
965 {
966 if (nxt_fast_path(engine->port != NULL)) {
967 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
968 msg->u.data);
969 }
970 }
971 nxt_queue_loop;
972
973 if (msg->port_msg.stream == 0) {
974 return;
975 }
976
977 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
978
979 nxt_port_rpc_handler(task, msg);
980 }
981
982
983 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)984 nxt_router_temp_conf(nxt_task_t *task)
985 {
986 nxt_mp_t *mp, *tmp;
987 nxt_router_conf_t *rtcf;
988 nxt_router_temp_conf_t *tmcf;
989
990 mp = nxt_mp_create(1024, 128, 256, 32);
991 if (nxt_slow_path(mp == NULL)) {
992 return NULL;
993 }
994
995 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
996 if (nxt_slow_path(rtcf == NULL)) {
997 goto fail;
998 }
999
1000 rtcf->mem_pool = mp;
1001
1002 tmp = nxt_mp_create(1024, 128, 256, 32);
1003 if (nxt_slow_path(tmp == NULL)) {
1004 goto fail;
1005 }
1006
1007 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1008 if (nxt_slow_path(tmcf == NULL)) {
1009 goto temp_fail;
1010 }
1011
1012 tmcf->mem_pool = tmp;
1013 tmcf->router_conf = rtcf;
1014 tmcf->count = 1;
1015 tmcf->engine = task->thread->engine;
1016
1017 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1018 sizeof(nxt_router_engine_conf_t));
1019 if (nxt_slow_path(tmcf->engines == NULL)) {
1020 goto temp_fail;
1021 }
1022
1023 nxt_queue_init(&creating_sockets);
1024 nxt_queue_init(&pending_sockets);
1025 nxt_queue_init(&updating_sockets);
1026 nxt_queue_init(&keeping_sockets);
1027 nxt_queue_init(&deleting_sockets);
1028
1029 #if (NXT_TLS)
1030 nxt_queue_init(&tmcf->tls);
1031 #endif
1032
1033 nxt_queue_init(&tmcf->apps);
1034 nxt_queue_init(&tmcf->previous);
1035
1036 return tmcf;
1037
1038 temp_fail:
1039
1040 nxt_mp_destroy(tmp);
1041
1042 fail:
1043
1044 nxt_mp_destroy(mp);
1045
1046 return NULL;
1047 }
1048
1049
1050 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1051 nxt_router_app_can_start(nxt_app_t *app)
1052 {
1053 return app->processes + app->pending_processes < app->max_processes
1054 && app->pending_processes < app->max_pending_processes;
1055 }
1056
1057
1058 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1059 nxt_router_app_need_start(nxt_app_t *app)
1060 {
1061 return (app->active_requests
1062 > app->port_hash_count + app->pending_processes)
1063 || (app->spare_processes
1064 > app->idle_processes + app->pending_processes);
1065 }
1066
1067
1068 static void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1069 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1070 {
1071 nxt_int_t ret;
1072 nxt_app_t *app;
1073 nxt_router_t *router;
1074 nxt_runtime_t *rt;
1075 nxt_queue_link_t *qlk;
1076 nxt_socket_conf_t *skcf;
1077 nxt_router_conf_t *rtcf;
1078 nxt_router_temp_conf_t *tmcf;
1079 const nxt_event_interface_t *interface;
1080 #if (NXT_TLS)
1081 nxt_router_tlssock_t *tls;
1082 #endif
1083
1084 tmcf = obj;
1085
1086 qlk = nxt_queue_first(&pending_sockets);
1087
1088 if (qlk != nxt_queue_tail(&pending_sockets)) {
1089 nxt_queue_remove(qlk);
1090 nxt_queue_insert_tail(&creating_sockets, qlk);
1091
1092 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1093
1094 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1095
1096 return;
1097 }
1098
1099 #if (NXT_TLS)
1100 qlk = nxt_queue_last(&tmcf->tls);
1101
1102 if (qlk != nxt_queue_head(&tmcf->tls)) {
1103 nxt_queue_remove(qlk);
1104
1105 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1106
1107 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1108 nxt_router_tls_rpc_handler, tls);
1109 return;
1110 }
1111 #endif
1112
1113 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1114
1115 if (nxt_router_app_need_start(app)) {
1116 nxt_router_app_rpc_create(task, tmcf, app);
1117 return;
1118 }
1119
1120 } nxt_queue_loop;
1121
1122 rtcf = tmcf->router_conf;
1123
1124 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1125 nxt_router_access_log_open(task, tmcf);
1126 return;
1127 }
1128
1129 rt = task->thread->runtime;
1130
1131 interface = nxt_service_get(rt->services, "engine", NULL);
1132
1133 router = rtcf->router;
1134
1135 ret = nxt_router_engines_create(task, router, tmcf, interface);
1136 if (nxt_slow_path(ret != NXT_OK)) {
1137 goto fail;
1138 }
1139
1140 ret = nxt_router_threads_create(task, rt, tmcf);
1141 if (nxt_slow_path(ret != NXT_OK)) {
1142 goto fail;
1143 }
1144
1145 nxt_router_apps_sort(task, router, tmcf);
1146
1147 nxt_router_apps_hash_use(task, rtcf, 1);
1148
1149 nxt_router_engines_post(router, tmcf);
1150
1151 nxt_queue_add(&router->sockets, &updating_sockets);
1152 nxt_queue_add(&router->sockets, &creating_sockets);
1153
1154 if (router->access_log != rtcf->access_log) {
1155 nxt_router_access_log_use(&router->lock, rtcf->access_log);
1156
1157 nxt_router_access_log_release(task, &router->lock, router->access_log);
1158
1159 router->access_log = rtcf->access_log;
1160 }
1161
1162 nxt_router_conf_ready(task, tmcf);
1163
1164 return;
1165
1166 fail:
1167
1168 nxt_router_conf_error(task, tmcf);
1169
1170 return;
1171 }
1172
1173
1174 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1175 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1176 {
1177 nxt_joint_job_t *job;
1178
1179 job = obj;
1180
1181 nxt_router_conf_ready(task, job->tmcf);
1182 }
1183
1184
1185 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1186 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1187 {
1188 uint32_t count;
1189 nxt_router_conf_t *rtcf;
1190 nxt_thread_spinlock_t *lock;
1191
1192 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1193
1194 if (--tmcf->count > 0) {
1195 return;
1196 }
1197
1198 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1199
1200 rtcf = tmcf->router_conf;
1201
1202 lock = &rtcf->router->lock;
1203
1204 nxt_thread_spin_lock(lock);
1205
1206 count = rtcf->count;
1207
1208 nxt_thread_spin_unlock(lock);
1209
1210 nxt_debug(task, "rtcf %p: %D", rtcf, count);
1211
1212 if (count == 0) {
1213 nxt_router_apps_hash_use(task, rtcf, -1);
1214
1215 nxt_router_access_log_release(task, lock, rtcf->access_log);
1216
1217 nxt_mp_destroy(rtcf->mem_pool);
1218 }
1219
1220 nxt_mp_release(tmcf->mem_pool);
1221 }
1222
1223
1224 static void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1225 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1226 {
1227 nxt_app_t *app;
1228 nxt_queue_t new_socket_confs;
1229 nxt_socket_t s;
1230 nxt_router_t *router;
1231 nxt_queue_link_t *qlk;
1232 nxt_socket_conf_t *skcf;
1233 nxt_router_conf_t *rtcf;
1234
1235 nxt_alert(task, "failed to apply new conf");
1236
1237 for (qlk = nxt_queue_first(&creating_sockets);
1238 qlk != nxt_queue_tail(&creating_sockets);
1239 qlk = nxt_queue_next(qlk))
1240 {
1241 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1242 s = skcf->listen->socket;
1243
1244 if (s != -1) {
1245 nxt_socket_close(task, s);
1246 }
1247
1248 nxt_free(skcf->listen);
1249 }
1250
1251 nxt_queue_init(&new_socket_confs);
1252 nxt_queue_add(&new_socket_confs, &updating_sockets);
1253 nxt_queue_add(&new_socket_confs, &pending_sockets);
1254 nxt_queue_add(&new_socket_confs, &creating_sockets);
1255
1256 rtcf = tmcf->router_conf;
1257
1258 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1259
1260 nxt_router_app_unlink(task, app);
1261
1262 } nxt_queue_loop;
1263
1264 router = rtcf->router;
1265
1266 nxt_queue_add(&router->sockets, &keeping_sockets);
1267 nxt_queue_add(&router->sockets, &deleting_sockets);
1268
1269 nxt_queue_add(&router->apps, &tmcf->previous);
1270
1271 // TODO: new engines and threads
1272
1273 nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1274
1275 nxt_mp_destroy(rtcf->mem_pool);
1276
1277 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1278
1279 nxt_mp_release(tmcf->mem_pool);
1280 }
1281
1282
1283 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1284 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1285 nxt_port_msg_type_t type)
1286 {
1287 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1288
1289 nxt_port_use(task, tmcf->port, -1);
1290
1291 tmcf->port = NULL;
1292 }
1293
1294
1295 static nxt_conf_map_t nxt_router_conf[] = {
1296 {
1297 nxt_string("listeners_threads"),
1298 NXT_CONF_MAP_INT32,
1299 offsetof(nxt_router_conf_t, threads),
1300 },
1301 };
1302
1303
1304 static nxt_conf_map_t nxt_router_app_conf[] = {
1305 {
1306 nxt_string("type"),
1307 NXT_CONF_MAP_STR,
1308 offsetof(nxt_router_app_conf_t, type),
1309 },
1310
1311 {
1312 nxt_string("limits"),
1313 NXT_CONF_MAP_PTR,
1314 offsetof(nxt_router_app_conf_t, limits_value),
1315 },
1316
1317 {
1318 nxt_string("processes"),
1319 NXT_CONF_MAP_INT32,
1320 offsetof(nxt_router_app_conf_t, processes),
1321 },
1322
1323 {
1324 nxt_string("processes"),
1325 NXT_CONF_MAP_PTR,
1326 offsetof(nxt_router_app_conf_t, processes_value),
1327 },
1328
1329 {
1330 nxt_string("targets"),
1331 NXT_CONF_MAP_PTR,
1332 offsetof(nxt_router_app_conf_t, targets_value),
1333 },
1334 };
1335
1336
1337 static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1338 {
1339 nxt_string("timeout"),
1340 NXT_CONF_MAP_MSEC,
1341 offsetof(nxt_router_app_conf_t, timeout),
1342 },
1343 };
1344
1345
1346 static nxt_conf_map_t nxt_router_app_processes_conf[] = {
1347 {
1348 nxt_string("spare"),
1349 NXT_CONF_MAP_INT32,
1350 offsetof(nxt_router_app_conf_t, spare_processes),
1351 },
1352
1353 {
1354 nxt_string("max"),
1355 NXT_CONF_MAP_INT32,
1356 offsetof(nxt_router_app_conf_t, max_processes),
1357 },
1358
1359 {
1360 nxt_string("idle_timeout"),
1361 NXT_CONF_MAP_MSEC,
1362 offsetof(nxt_router_app_conf_t, idle_timeout),
1363 },
1364 };
1365
1366
1367 static nxt_conf_map_t nxt_router_listener_conf[] = {
1368 {
1369 nxt_string("pass"),
1370 NXT_CONF_MAP_STR_COPY,
1371 offsetof(nxt_router_listener_conf_t, pass),
1372 },
1373
1374 {
1375 nxt_string("application"),
1376 NXT_CONF_MAP_STR_COPY,
1377 offsetof(nxt_router_listener_conf_t, application),
1378 },
1379 };
1380
1381
1382 static nxt_conf_map_t nxt_router_http_conf[] = {
1383 {
1384 nxt_string("header_buffer_size"),
1385 NXT_CONF_MAP_SIZE,
1386 offsetof(nxt_socket_conf_t, header_buffer_size),
1387 },
1388
1389 {
1390 nxt_string("large_header_buffer_size"),
1391 NXT_CONF_MAP_SIZE,
1392 offsetof(nxt_socket_conf_t, large_header_buffer_size),
1393 },
1394
1395 {
1396 nxt_string("large_header_buffers"),
1397 NXT_CONF_MAP_SIZE,
1398 offsetof(nxt_socket_conf_t, large_header_buffers),
1399 },
1400
1401 {
1402 nxt_string("body_buffer_size"),
1403 NXT_CONF_MAP_SIZE,
1404 offsetof(nxt_socket_conf_t, body_buffer_size),
1405 },
1406
1407 {
1408 nxt_string("max_body_size"),
1409 NXT_CONF_MAP_SIZE,
1410 offsetof(nxt_socket_conf_t, max_body_size),
1411 },
1412
1413 {
1414 nxt_string("idle_timeout"),
1415 NXT_CONF_MAP_MSEC,
1416 offsetof(nxt_socket_conf_t, idle_timeout),
1417 },
1418
1419 {
1420 nxt_string("header_read_timeout"),
1421 NXT_CONF_MAP_MSEC,
1422 offsetof(nxt_socket_conf_t, header_read_timeout),
1423 },
1424
1425 {
1426 nxt_string("body_read_timeout"),
1427 NXT_CONF_MAP_MSEC,
1428 offsetof(nxt_socket_conf_t, body_read_timeout),
1429 },
1430
1431 {
1432 nxt_string("send_timeout"),
1433 NXT_CONF_MAP_MSEC,
1434 offsetof(nxt_socket_conf_t, send_timeout),
1435 },
1436
1437 {
1438 nxt_string("body_temp_path"),
1439 NXT_CONF_MAP_STR,
1440 offsetof(nxt_socket_conf_t, body_temp_path),
1441 },
1442
1443 {
1444 nxt_string("discard_unsafe_fields"),
1445 NXT_CONF_MAP_INT8,
1446 offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1447 },
1448 };
1449
1450
1451 static nxt_conf_map_t nxt_router_websocket_conf[] = {
1452 {
1453 nxt_string("max_frame_size"),
1454 NXT_CONF_MAP_SIZE,
1455 offsetof(nxt_websocket_conf_t, max_frame_size),
1456 },
1457
1458 {
1459 nxt_string("read_timeout"),
1460 NXT_CONF_MAP_MSEC,
1461 offsetof(nxt_websocket_conf_t, read_timeout),
1462 },
1463
1464 {
1465 nxt_string("keepalive_interval"),
1466 NXT_CONF_MAP_MSEC,
1467 offsetof(nxt_websocket_conf_t, keepalive_interval),
1468 },
1469
1470 };
1471
1472
1473 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1474 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1475 u_char *start, u_char *end)
1476 {
1477 u_char *p;
1478 size_t size;
1479 nxt_mp_t *mp, *app_mp;
1480 uint32_t next, next_target;
1481 nxt_int_t ret;
1482 nxt_str_t name, path, target;
1483 nxt_app_t *app, *prev;
1484 nxt_str_t *t, *s, *targets;
1485 nxt_uint_t n, i;
1486 nxt_port_t *port;
1487 nxt_router_t *router;
1488 nxt_app_joint_t *app_joint;
1489 #if (NXT_TLS)
1490 nxt_tls_init_t *tls_init;
1491 nxt_conf_value_t *certificate;
1492 #endif
1493 nxt_conf_value_t *conf, *http, *value, *websocket;
1494 nxt_conf_value_t *applications, *application;
1495 nxt_conf_value_t *listeners, *listener;
1496 nxt_conf_value_t *routes_conf, *static_conf, *client_ip_conf;
1497 nxt_socket_conf_t *skcf;
1498 nxt_http_routes_t *routes;
1499 nxt_event_engine_t *engine;
1500 nxt_app_lang_module_t *lang;
1501 nxt_router_app_conf_t apcf;
1502 nxt_router_access_log_t *access_log;
1503 nxt_router_listener_conf_t lscf;
1504
1505 static nxt_str_t http_path = nxt_string("/settings/http");
1506 static nxt_str_t applications_path = nxt_string("/applications");
1507 static nxt_str_t listeners_path = nxt_string("/listeners");
1508 static nxt_str_t routes_path = nxt_string("/routes");
1509 static nxt_str_t access_log_path = nxt_string("/access_log");
1510 #if (NXT_TLS)
1511 static nxt_str_t certificate_path = nxt_string("/tls/certificate");
1512 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands");
1513 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size");
1514 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout");
1515 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets");
1516 #endif
1517 static nxt_str_t static_path = nxt_string("/settings/http/static");
1518 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
1519 static nxt_str_t client_ip_path = nxt_string("/client_ip");
1520
1521 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1522 if (conf == NULL) {
1523 nxt_alert(task, "configuration parsing error");
1524 return NXT_ERROR;
1525 }
1526
1527 mp = tmcf->router_conf->mem_pool;
1528
1529 ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1530 nxt_nitems(nxt_router_conf), tmcf->router_conf);
1531 if (ret != NXT_OK) {
1532 nxt_alert(task, "root map error");
1533 return NXT_ERROR;
1534 }
1535
1536 if (tmcf->router_conf->threads == 0) {
1537 tmcf->router_conf->threads = nxt_ncpu;
1538 }
1539
1540 static_conf = nxt_conf_get_path(conf, &static_path);
1541
1542 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1543 if (nxt_slow_path(ret != NXT_OK)) {
1544 return NXT_ERROR;
1545 }
1546
1547 router = tmcf->router_conf->router;
1548
1549 applications = nxt_conf_get_path(conf, &applications_path);
1550
1551 if (applications != NULL) {
1552 next = 0;
1553
1554 for ( ;; ) {
1555 application = nxt_conf_next_object_member(applications,
1556 &name, &next);
1557 if (application == NULL) {
1558 break;
1559 }
1560
1561 nxt_debug(task, "application \"%V\"", &name);
1562
1563 size = nxt_conf_json_length(application, NULL);
1564
1565 app_mp = nxt_mp_create(4096, 128, 1024, 64);
1566 if (nxt_slow_path(app_mp == NULL)) {
1567 goto fail;
1568 }
1569
1570 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1571 if (app == NULL) {
1572 goto app_fail;
1573 }
1574
1575 nxt_memzero(app, sizeof(nxt_app_t));
1576
1577 app->mem_pool = app_mp;
1578
1579 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1580 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1581 + name.length);
1582
1583 p = nxt_conf_json_print(app->conf.start, application, NULL);
1584 app->conf.length = p - app->conf.start;
1585
1586 nxt_assert(app->conf.length <= size);
1587
1588 nxt_debug(task, "application conf \"%V\"", &app->conf);
1589
1590 prev = nxt_router_app_find(&router->apps, &name);
1591
1592 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1593 nxt_mp_destroy(app_mp);
1594
1595 nxt_queue_remove(&prev->link);
1596 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1597
1598 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1599 if (nxt_slow_path(ret != NXT_OK)) {
1600 goto fail;
1601 }
1602
1603 continue;
1604 }
1605
1606 apcf.processes = 1;
1607 apcf.max_processes = 1;
1608 apcf.spare_processes = 0;
1609 apcf.timeout = 0;
1610 apcf.idle_timeout = 15000;
1611 apcf.limits_value = NULL;
1612 apcf.processes_value = NULL;
1613 apcf.targets_value = NULL;
1614
1615 app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1616 if (nxt_slow_path(app_joint == NULL)) {
1617 goto app_fail;
1618 }
1619
1620 nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1621
1622 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1623 nxt_nitems(nxt_router_app_conf), &apcf);
1624 if (ret != NXT_OK) {
1625 nxt_alert(task, "application map error");
1626 goto app_fail;
1627 }
1628
1629 if (apcf.limits_value != NULL) {
1630
1631 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1632 nxt_alert(task, "application limits is not object");
1633 goto app_fail;
1634 }
1635
1636 ret = nxt_conf_map_object(mp, apcf.limits_value,
1637 nxt_router_app_limits_conf,
1638 nxt_nitems(nxt_router_app_limits_conf),
1639 &apcf);
1640 if (ret != NXT_OK) {
1641 nxt_alert(task, "application limits map error");
1642 goto app_fail;
1643 }
1644 }
1645
1646 if (apcf.processes_value != NULL
1647 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1648 {
1649 ret = nxt_conf_map_object(mp, apcf.processes_value,
1650 nxt_router_app_processes_conf,
1651 nxt_nitems(nxt_router_app_processes_conf),
1652 &apcf);
1653 if (ret != NXT_OK) {
1654 nxt_alert(task, "application processes map error");
1655 goto app_fail;
1656 }
1657
1658 } else {
1659 apcf.max_processes = apcf.processes;
1660 apcf.spare_processes = apcf.processes;
1661 }
1662
1663 if (apcf.targets_value != NULL) {
1664 n = nxt_conf_object_members_count(apcf.targets_value);
1665
1666 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1667 if (nxt_slow_path(targets == NULL)) {
1668 goto app_fail;
1669 }
1670
1671 next_target = 0;
1672
1673 for (i = 0; i < n; i++) {
1674 (void) nxt_conf_next_object_member(apcf.targets_value,
1675 &target, &next_target);
1676
1677 s = nxt_str_dup(app_mp, &targets[i], &target);
1678 if (nxt_slow_path(s == NULL)) {
1679 goto app_fail;
1680 }
1681 }
1682
1683 } else {
1684 targets = NULL;
1685 }
1686
1687 nxt_debug(task, "application type: %V", &apcf.type);
1688 nxt_debug(task, "application processes: %D", apcf.processes);
1689 nxt_debug(task, "application request timeout: %M", apcf.timeout);
1690
1691 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1692
1693 if (lang == NULL) {
1694 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1695 goto app_fail;
1696 }
1697
1698 nxt_debug(task, "application language module: \"%s\"", lang->file);
1699
1700 ret = nxt_thread_mutex_create(&app->mutex);
1701 if (ret != NXT_OK) {
1702 goto app_fail;
1703 }
1704
1705 nxt_queue_init(&app->ports);
1706 nxt_queue_init(&app->spare_ports);
1707 nxt_queue_init(&app->idle_ports);
1708 nxt_queue_init(&app->ack_waiting_req);
1709
1710 app->name.length = name.length;
1711 nxt_memcpy(app->name.start, name.start, name.length);
1712
1713 app->type = lang->type;
1714 app->max_processes = apcf.max_processes;
1715 app->spare_processes = apcf.spare_processes;
1716 app->max_pending_processes = apcf.spare_processes
1717 ? apcf.spare_processes : 1;
1718 app->timeout = apcf.timeout;
1719 app->idle_timeout = apcf.idle_timeout;
1720
1721 app->targets = targets;
1722
1723 engine = task->thread->engine;
1724
1725 app->engine = engine;
1726
1727 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1728 app->adjust_idle_work.task = &engine->task;
1729 app->adjust_idle_work.obj = app;
1730
1731 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1732
1733 ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1734 if (nxt_slow_path(ret != NXT_OK)) {
1735 goto app_fail;
1736 }
1737
1738 nxt_router_app_use(task, app, 1);
1739
1740 app->joint = app_joint;
1741
1742 app_joint->use_count = 1;
1743 app_joint->app = app;
1744
1745 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1746 app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1747 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1748 app_joint->idle_timer.task = &engine->task;
1749 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1750
1751 app_joint->free_app_work.handler = nxt_router_free_app;
1752 app_joint->free_app_work.task = &engine->task;
1753 app_joint->free_app_work.obj = app_joint;
1754
1755 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1756 NXT_PROCESS_APP);
1757 if (nxt_slow_path(port == NULL)) {
1758 return NXT_ERROR;
1759 }
1760
1761 ret = nxt_port_socket_init(task, port, 0);
1762 if (nxt_slow_path(ret != NXT_OK)) {
1763 nxt_port_use(task, port, -1);
1764 return NXT_ERROR;
1765 }
1766
1767 ret = nxt_router_app_queue_init(task, port);
1768 if (nxt_slow_path(ret != NXT_OK)) {
1769 nxt_port_write_close(port);
1770 nxt_port_read_close(port);
1771 nxt_port_use(task, port, -1);
1772 return NXT_ERROR;
1773 }
1774
1775 nxt_port_write_enable(task, port);
1776 port->app = app;
1777
1778 app->shared_port = port;
1779
1780 nxt_thread_mutex_create(&app->outgoing.mutex);
1781 }
1782 }
1783
1784 routes_conf = nxt_conf_get_path(conf, &routes_path);
1785 if (nxt_fast_path(routes_conf != NULL)) {
1786 routes = nxt_http_routes_create(task, tmcf, routes_conf);
1787 if (nxt_slow_path(routes == NULL)) {
1788 return NXT_ERROR;
1789 }
1790 tmcf->router_conf->routes = routes;
1791 }
1792
1793 ret = nxt_upstreams_create(task, tmcf, conf);
1794 if (nxt_slow_path(ret != NXT_OK)) {
1795 return ret;
1796 }
1797
1798 http = nxt_conf_get_path(conf, &http_path);
1799 #if 0
1800 if (http == NULL) {
1801 nxt_alert(task, "no \"http\" block");
1802 return NXT_ERROR;
1803 }
1804 #endif
1805
1806 websocket = nxt_conf_get_path(conf, &websocket_path);
1807
1808 listeners = nxt_conf_get_path(conf, &listeners_path);
1809
1810 if (listeners != NULL) {
1811 next = 0;
1812
1813 for ( ;; ) {
1814 listener = nxt_conf_next_object_member(listeners, &name, &next);
1815 if (listener == NULL) {
1816 break;
1817 }
1818
1819 skcf = nxt_router_socket_conf(task, tmcf, &name);
1820 if (skcf == NULL) {
1821 goto fail;
1822 }
1823
1824 nxt_memzero(&lscf, sizeof(lscf));
1825
1826 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1827 nxt_nitems(nxt_router_listener_conf),
1828 &lscf);
1829 if (ret != NXT_OK) {
1830 nxt_alert(task, "listener map error");
1831 goto fail;
1832 }
1833
1834 nxt_debug(task, "application: %V", &lscf.application);
1835
1836 // STUB, default values if http block is not defined.
1837 skcf->header_buffer_size = 2048;
1838 skcf->large_header_buffer_size = 8192;
1839 skcf->large_header_buffers = 4;
1840 skcf->discard_unsafe_fields = 1;
1841 skcf->body_buffer_size = 16 * 1024;
1842 skcf->max_body_size = 8 * 1024 * 1024;
1843 skcf->proxy_header_buffer_size = 64 * 1024;
1844 skcf->proxy_buffer_size = 4096;
1845 skcf->proxy_buffers = 256;
1846 skcf->idle_timeout = 180 * 1000;
1847 skcf->header_read_timeout = 30 * 1000;
1848 skcf->body_read_timeout = 30 * 1000;
1849 skcf->send_timeout = 30 * 1000;
1850 skcf->proxy_timeout = 60 * 1000;
1851 skcf->proxy_send_timeout = 30 * 1000;
1852 skcf->proxy_read_timeout = 30 * 1000;
1853
1854 skcf->websocket_conf.max_frame_size = 1024 * 1024;
1855 skcf->websocket_conf.read_timeout = 60 * 1000;
1856 skcf->websocket_conf.keepalive_interval = 30 * 1000;
1857
1858 nxt_str_null(&skcf->body_temp_path);
1859
1860 if (http != NULL) {
1861 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1862 nxt_nitems(nxt_router_http_conf),
1863 skcf);
1864 if (ret != NXT_OK) {
1865 nxt_alert(task, "http map error");
1866 goto fail;
1867 }
1868 }
1869
1870 if (websocket != NULL) {
1871 ret = nxt_conf_map_object(mp, websocket,
1872 nxt_router_websocket_conf,
1873 nxt_nitems(nxt_router_websocket_conf),
1874 &skcf->websocket_conf);
1875 if (ret != NXT_OK) {
1876 nxt_alert(task, "websocket map error");
1877 goto fail;
1878 }
1879 }
1880
1881 t = &skcf->body_temp_path;
1882
1883 if (t->length == 0) {
1884 t->start = (u_char *) task->thread->runtime->tmp;
1885 t->length = nxt_strlen(t->start);
1886 }
1887
1888 client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1889 ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1890 client_ip_conf);
1891 if (nxt_slow_path(ret != NXT_OK)) {
1892 return NXT_ERROR;
1893 }
1894
1895 #if (NXT_TLS)
1896 certificate = nxt_conf_get_path(listener, &certificate_path);
1897
1898 if (certificate != NULL) {
1899 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1900 if (nxt_slow_path(tls_init == NULL)) {
1901 return NXT_ERROR;
1902 }
1903
1904 tls_init->cache_size = 0;
1905 tls_init->timeout = 300;
1906
1907 value = nxt_conf_get_path(listener, &conf_cache_path);
1908 if (value != NULL) {
1909 tls_init->cache_size = nxt_conf_get_number(value);
1910 }
1911
1912 value = nxt_conf_get_path(listener, &conf_timeout_path);
1913 if (value != NULL) {
1914 tls_init->timeout = nxt_conf_get_number(value);
1915 }
1916
1917 tls_init->conf_cmds = nxt_conf_get_path(listener,
1918 &conf_commands_path);
1919
1920 tls_init->tickets_conf = nxt_conf_get_path(listener,
1921 &conf_tickets);
1922
1923 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1924 n = nxt_conf_array_elements_count(certificate);
1925
1926 for (i = 0; i < n; i++) {
1927 value = nxt_conf_get_array_element(certificate, i);
1928
1929 nxt_assert(value != NULL);
1930
1931 ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1932 tls_init, i == 0);
1933 if (nxt_slow_path(ret != NXT_OK)) {
1934 goto fail;
1935 }
1936 }
1937
1938 } else {
1939 /* NXT_CONF_STRING */
1940 ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1941 tls_init, 1);
1942 if (nxt_slow_path(ret != NXT_OK)) {
1943 goto fail;
1944 }
1945 }
1946 }
1947 #endif
1948
1949 skcf->listen->handler = nxt_http_conn_init;
1950 skcf->router_conf = tmcf->router_conf;
1951 skcf->router_conf->count++;
1952
1953 if (lscf.pass.length != 0) {
1954 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1955
1956 /* COMPATIBILITY: listener application. */
1957 } else if (lscf.application.length > 0) {
1958 skcf->action = nxt_http_pass_application(task,
1959 tmcf->router_conf,
1960 &lscf.application);
1961 }
1962
1963 if (nxt_slow_path(skcf->action == NULL)) {
1964 goto fail;
1965 }
1966 }
1967 }
1968
1969 ret = nxt_http_routes_resolve(task, tmcf);
1970 if (nxt_slow_path(ret != NXT_OK)) {
1971 goto fail;
1972 }
1973
1974 value = nxt_conf_get_path(conf, &access_log_path);
1975
1976 if (value != NULL) {
1977 nxt_conf_get_string(value, &path);
1978
1979 access_log = router->access_log;
1980
1981 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1982 nxt_router_access_log_use(&router->lock, access_log);
1983
1984 } else {
1985 access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1986 + path.length);
1987 if (access_log == NULL) {
1988 nxt_alert(task, "failed to allocate access log structure");
1989 goto fail;
1990 }
1991
1992 access_log->fd = -1;
1993 access_log->handler = &nxt_router_access_log_writer;
1994 access_log->count = 1;
1995
1996 access_log->path.length = path.length;
1997 access_log->path.start = (u_char *) access_log
1998 + sizeof(nxt_router_access_log_t);
1999
2000 nxt_memcpy(access_log->path.start, path.start, path.length);
2001 }
2002
2003 tmcf->router_conf->access_log = access_log;
2004 }
2005
2006 nxt_queue_add(&deleting_sockets, &router->sockets);
2007 nxt_queue_init(&router->sockets);
2008
2009 return NXT_OK;
2010
2011 app_fail:
2012
2013 nxt_mp_destroy(app_mp);
2014
2015 fail:
2016
2017 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2018
2019 nxt_queue_remove(&app->link);
2020 nxt_thread_mutex_destroy(&app->mutex);
2021 nxt_mp_destroy(app->mem_pool);
2022
2023 } nxt_queue_loop;
2024
2025 return NXT_ERROR;
2026 }
2027
2028
2029 #if (NXT_TLS)
2030
2031 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2032 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2033 nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2034 nxt_tls_init_t *tls_init, nxt_bool_t last)
2035 {
2036 nxt_router_tlssock_t *tls;
2037
2038 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2039 if (nxt_slow_path(tls == NULL)) {
2040 return NXT_ERROR;
2041 }
2042
2043 tls->tls_init = tls_init;
2044 tls->socket_conf = skcf;
2045 tls->temp_conf = tmcf;
2046 tls->last = last;
2047 nxt_conf_get_string(value, &tls->name);
2048
2049 nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2050
2051 return NXT_OK;
2052 }
2053
2054 #endif
2055
2056
2057 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2058 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2059 nxt_conf_value_t *conf)
2060 {
2061 uint32_t next, i;
2062 nxt_mp_t *mp;
2063 nxt_str_t *type, exten, str;
2064 nxt_int_t ret;
2065 nxt_uint_t exts;
2066 nxt_conf_value_t *mtypes_conf, *ext_conf, *value;
2067
2068 static nxt_str_t mtypes_path = nxt_string("/mime_types");
2069
2070 mp = rtcf->mem_pool;
2071
2072 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2073 if (nxt_slow_path(ret != NXT_OK)) {
2074 return NXT_ERROR;
2075 }
2076
2077 if (conf == NULL) {
2078 return NXT_OK;
2079 }
2080
2081 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2082
2083 if (mtypes_conf != NULL) {
2084 next = 0;
2085
2086 for ( ;; ) {
2087 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2088
2089 if (ext_conf == NULL) {
2090 break;
2091 }
2092
2093 type = nxt_str_dup(mp, NULL, &str);
2094 if (nxt_slow_path(type == NULL)) {
2095 return NXT_ERROR;
2096 }
2097
2098 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2099 nxt_conf_get_string(ext_conf, &str);
2100
2101 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2102 return NXT_ERROR;
2103 }
2104
2105 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2106 &exten, type);
2107 if (nxt_slow_path(ret != NXT_OK)) {
2108 return NXT_ERROR;
2109 }
2110
2111 continue;
2112 }
2113
2114 exts = nxt_conf_array_elements_count(ext_conf);
2115
2116 for (i = 0; i < exts; i++) {
2117 value = nxt_conf_get_array_element(ext_conf, i);
2118
2119 nxt_conf_get_string(value, &str);
2120
2121 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2122 return NXT_ERROR;
2123 }
2124
2125 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2126 &exten, type);
2127 if (nxt_slow_path(ret != NXT_OK)) {
2128 return NXT_ERROR;
2129 }
2130 }
2131 }
2132 }
2133
2134 return NXT_OK;
2135 }
2136
2137
2138 static nxt_int_t
nxt_router_conf_process_client_ip(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf,nxt_conf_value_t * conf)2139 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2140 nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2141 {
2142 char c;
2143 size_t i;
2144 nxt_mp_t *mp;
2145 uint32_t hash;
2146 nxt_str_t header;
2147 nxt_conf_value_t *source_conf, *header_conf, *recursive_conf;
2148 nxt_http_client_ip_t *client_ip;
2149 nxt_http_route_addr_rule_t *source;
2150
2151 static nxt_str_t header_path = nxt_string("/header");
2152 static nxt_str_t source_path = nxt_string("/source");
2153 static nxt_str_t recursive_path = nxt_string("/recursive");
2154
2155 if (conf == NULL) {
2156 skcf->client_ip = NULL;
2157
2158 return NXT_OK;
2159 }
2160
2161 mp = tmcf->router_conf->mem_pool;
2162
2163 source_conf = nxt_conf_get_path(conf, &source_path);
2164 header_conf = nxt_conf_get_path(conf, &header_path);
2165 recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2166
2167 if (source_conf == NULL || header_conf == NULL) {
2168 return NXT_ERROR;
2169 }
2170
2171 client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2172 if (nxt_slow_path(client_ip == NULL)) {
2173 return NXT_ERROR;
2174 }
2175
2176 source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2177 if (nxt_slow_path(source == NULL)) {
2178 return NXT_ERROR;
2179 }
2180
2181 client_ip->source = source;
2182
2183 nxt_conf_get_string(header_conf, &header);
2184
2185 if (recursive_conf != NULL) {
2186 client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2187 }
2188
2189 client_ip->header = nxt_str_dup(mp, NULL, &header);
2190 if (nxt_slow_path(client_ip->header == NULL)) {
2191 return NXT_ERROR;
2192 }
2193
2194 hash = NXT_HTTP_FIELD_HASH_INIT;
2195
2196 for (i = 0; i < client_ip->header->length; i++) {
2197 c = client_ip->header->start[i];
2198 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2199 }
2200
2201 hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2202
2203 client_ip->header_hash = hash;
2204
2205 skcf->client_ip = client_ip;
2206
2207 return NXT_OK;
2208 }
2209
2210
2211 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2212 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2213 {
2214 nxt_app_t *app;
2215
2216 nxt_queue_each(app, queue, nxt_app_t, link) {
2217
2218 if (nxt_strstr_eq(name, &app->name)) {
2219 return app;
2220 }
2221
2222 } nxt_queue_loop;
2223
2224 return NULL;
2225 }
2226
2227
2228 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2229 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2230 {
2231 void *mem;
2232 nxt_int_t fd;
2233
2234 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2235 if (nxt_slow_path(fd == -1)) {
2236 return NXT_ERROR;
2237 }
2238
2239 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2240 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2241 if (nxt_slow_path(mem == MAP_FAILED)) {
2242 nxt_fd_close(fd);
2243
2244 return NXT_ERROR;
2245 }
2246
2247 nxt_app_queue_init(mem);
2248
2249 port->queue_fd = fd;
2250 port->queue = mem;
2251
2252 return NXT_OK;
2253 }
2254
2255
2256 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2257 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2258 {
2259 void *mem;
2260 nxt_int_t fd;
2261
2262 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2263 if (nxt_slow_path(fd == -1)) {
2264 return NXT_ERROR;
2265 }
2266
2267 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2268 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2269 if (nxt_slow_path(mem == MAP_FAILED)) {
2270 nxt_fd_close(fd);
2271
2272 return NXT_ERROR;
2273 }
2274
2275 nxt_port_queue_init(mem);
2276
2277 port->queue_fd = fd;
2278 port->queue = mem;
2279
2280 return NXT_OK;
2281 }
2282
2283
2284 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2285 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2286 {
2287 void *mem;
2288
2289 nxt_assert(fd != -1);
2290
2291 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2292 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2293 if (nxt_slow_path(mem == MAP_FAILED)) {
2294
2295 return NXT_ERROR;
2296 }
2297
2298 port->queue = mem;
2299
2300 return NXT_OK;
2301 }
2302
2303
2304 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = {
2305 NXT_LVLHSH_DEFAULT,
2306 nxt_router_apps_hash_test,
2307 nxt_mp_lvlhsh_alloc,
2308 nxt_mp_lvlhsh_free,
2309 };
2310
2311
2312 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2313 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2314 {
2315 nxt_app_t *app;
2316
2317 app = data;
2318
2319 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2320 }
2321
2322
2323 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2324 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2325 {
2326 nxt_lvlhsh_query_t lhq;
2327
2328 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2329 lhq.replace = 0;
2330 lhq.key = app->name;
2331 lhq.value = app;
2332 lhq.proto = &nxt_router_apps_hash_proto;
2333 lhq.pool = rtcf->mem_pool;
2334
2335 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2336
2337 case NXT_OK:
2338 return NXT_OK;
2339
2340 case NXT_DECLINED:
2341 nxt_thread_log_alert("router app hash adding failed: "
2342 "\"%V\" is already in hash", &lhq.key);
2343 /* Fall through. */
2344 default:
2345 return NXT_ERROR;
2346 }
2347 }
2348
2349
2350 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2351 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2352 {
2353 nxt_lvlhsh_query_t lhq;
2354
2355 lhq.key_hash = nxt_djb_hash(name->start, name->length);
2356 lhq.key = *name;
2357 lhq.proto = &nxt_router_apps_hash_proto;
2358
2359 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2360 return NULL;
2361 }
2362
2363 return lhq.value;
2364 }
2365
2366
2367 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2368 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2369 {
2370 nxt_app_t *app;
2371 nxt_lvlhsh_each_t lhe;
2372
2373 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2374
2375 for ( ;; ) {
2376 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2377
2378 if (app == NULL) {
2379 break;
2380 }
2381
2382 nxt_router_app_use(task, app, i);
2383 }
2384 }
2385
2386
2387 typedef struct {
2388 nxt_app_t *app;
2389 nxt_int_t target;
2390 } nxt_http_app_conf_t;
2391
2392
2393 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2394 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2395 nxt_str_t *target, nxt_http_action_t *action)
2396 {
2397 nxt_app_t *app;
2398 nxt_str_t *targets;
2399 nxt_uint_t i;
2400 nxt_http_app_conf_t *conf;
2401
2402 app = nxt_router_apps_hash_get(rtcf, name);
2403 if (app == NULL) {
2404 return NXT_DECLINED;
2405 }
2406
2407 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2408 if (nxt_slow_path(conf == NULL)) {
2409 return NXT_ERROR;
2410 }
2411
2412 action->handler = nxt_http_application_handler;
2413 action->u.conf = conf;
2414
2415 conf->app = app;
2416
2417 if (target != NULL && target->length != 0) {
2418 targets = app->targets;
2419
2420 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2421
2422 conf->target = i;
2423
2424 } else {
2425 conf->target = 0;
2426 }
2427
2428 return NXT_OK;
2429 }
2430
2431
2432 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2433 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2434 nxt_str_t *name)
2435 {
2436 size_t size;
2437 nxt_int_t ret;
2438 nxt_bool_t wildcard;
2439 nxt_sockaddr_t *sa;
2440 nxt_socket_conf_t *skcf;
2441 nxt_listen_socket_t *ls;
2442
2443 sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2444 if (nxt_slow_path(sa == NULL)) {
2445 nxt_alert(task, "invalid listener \"%V\"", name);
2446 return NULL;
2447 }
2448
2449 sa->type = SOCK_STREAM;
2450
2451 nxt_debug(task, "router listener: \"%*s\"",
2452 (size_t) sa->length, nxt_sockaddr_start(sa));
2453
2454 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2455 if (nxt_slow_path(skcf == NULL)) {
2456 return NULL;
2457 }
2458
2459 size = nxt_sockaddr_size(sa);
2460
2461 ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2462
2463 if (ret != NXT_OK) {
2464
2465 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2466 if (nxt_slow_path(ls == NULL)) {
2467 return NULL;
2468 }
2469
2470 skcf->listen = ls;
2471
2472 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2473 nxt_memcpy(ls->sockaddr, sa, size);
2474
2475 nxt_listen_socket_remote_size(ls);
2476
2477 ls->socket = -1;
2478 ls->backlog = NXT_LISTEN_BACKLOG;
2479 ls->flags = NXT_NONBLOCK;
2480 ls->read_after_accept = 1;
2481 }
2482
2483 switch (sa->u.sockaddr.sa_family) {
2484 #if (NXT_HAVE_UNIX_DOMAIN)
2485 case AF_UNIX:
2486 wildcard = 0;
2487 break;
2488 #endif
2489 #if (NXT_INET6)
2490 case AF_INET6:
2491 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2492 break;
2493 #endif
2494 case AF_INET:
2495 default:
2496 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2497 break;
2498 }
2499
2500 if (!wildcard) {
2501 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2502 if (nxt_slow_path(skcf->sockaddr == NULL)) {
2503 return NULL;
2504 }
2505
2506 nxt_memcpy(skcf->sockaddr, sa, size);
2507 }
2508
2509 return skcf;
2510 }
2511
2512
2513 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2514 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2515 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2516 {
2517 nxt_router_t *router;
2518 nxt_queue_link_t *qlk;
2519 nxt_socket_conf_t *skcf;
2520
2521 router = tmcf->router_conf->router;
2522
2523 for (qlk = nxt_queue_first(&router->sockets);
2524 qlk != nxt_queue_tail(&router->sockets);
2525 qlk = nxt_queue_next(qlk))
2526 {
2527 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2528
2529 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2530 nskcf->listen = skcf->listen;
2531
2532 nxt_queue_remove(qlk);
2533 nxt_queue_insert_tail(&keeping_sockets, qlk);
2534
2535 nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2536
2537 return NXT_OK;
2538 }
2539 }
2540
2541 nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2542
2543 return NXT_DECLINED;
2544 }
2545
2546
2547 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2548 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2549 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2550 {
2551 size_t size;
2552 uint32_t stream;
2553 nxt_int_t ret;
2554 nxt_buf_t *b;
2555 nxt_port_t *main_port, *router_port;
2556 nxt_runtime_t *rt;
2557 nxt_socket_rpc_t *rpc;
2558
2559 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2560 if (rpc == NULL) {
2561 goto fail;
2562 }
2563
2564 rpc->socket_conf = skcf;
2565 rpc->temp_conf = tmcf;
2566
2567 size = nxt_sockaddr_size(skcf->listen->sockaddr);
2568
2569 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2570 if (b == NULL) {
2571 goto fail;
2572 }
2573
2574 b->completion_handler = nxt_buf_dummy_completion;
2575
2576 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2577
2578 rt = task->thread->runtime;
2579 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2580 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2581
2582 stream = nxt_port_rpc_register_handler(task, router_port,
2583 nxt_router_listen_socket_ready,
2584 nxt_router_listen_socket_error,
2585 main_port->pid, rpc);
2586 if (nxt_slow_path(stream == 0)) {
2587 goto fail;
2588 }
2589
2590 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2591 stream, router_port->id, b);
2592
2593 if (nxt_slow_path(ret != NXT_OK)) {
2594 nxt_port_rpc_cancel(task, router_port, stream);
2595 goto fail;
2596 }
2597
2598 return;
2599
2600 fail:
2601
2602 nxt_router_conf_error(task, tmcf);
2603 }
2604
2605
2606 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2607 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2608 void *data)
2609 {
2610 nxt_int_t ret;
2611 nxt_socket_t s;
2612 nxt_socket_rpc_t *rpc;
2613
2614 rpc = data;
2615
2616 s = msg->fd[0];
2617
2618 ret = nxt_socket_nonblocking(task, s);
2619 if (nxt_slow_path(ret != NXT_OK)) {
2620 goto fail;
2621 }
2622
2623 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2624
2625 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2626 if (nxt_slow_path(ret != NXT_OK)) {
2627 goto fail;
2628 }
2629
2630 rpc->socket_conf->listen->socket = s;
2631
2632 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2633 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2634
2635 return;
2636
2637 fail:
2638
2639 nxt_socket_close(task, s);
2640
2641 nxt_router_conf_error(task, rpc->temp_conf);
2642 }
2643
2644
2645 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2646 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2647 void *data)
2648 {
2649 nxt_socket_rpc_t *rpc;
2650 nxt_router_temp_conf_t *tmcf;
2651
2652 rpc = data;
2653 tmcf = rpc->temp_conf;
2654
2655 #if 0
2656 u_char *p;
2657 size_t size;
2658 uint8_t error;
2659 nxt_buf_t *in, *out;
2660 nxt_sockaddr_t *sa;
2661
2662 static nxt_str_t socket_errors[] = {
2663 nxt_string("ListenerSystem"),
2664 nxt_string("ListenerNoIPv6"),
2665 nxt_string("ListenerPort"),
2666 nxt_string("ListenerInUse"),
2667 nxt_string("ListenerNoAddress"),
2668 nxt_string("ListenerNoAccess"),
2669 nxt_string("ListenerPath"),
2670 };
2671
2672 sa = rpc->socket_conf->listen->sockaddr;
2673
2674 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2675
2676 if (nxt_slow_path(in == NULL)) {
2677 return;
2678 }
2679
2680 p = in->mem.pos;
2681
2682 error = *p++;
2683
2684 size = nxt_length("listen socket error: ")
2685 + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2686 + sa->length + socket_errors[error].length + (in->mem.free - p);
2687
2688 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2689 if (nxt_slow_path(out == NULL)) {
2690 return;
2691 }
2692
2693 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2694 "listen socket error: "
2695 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2696 (size_t) sa->length, nxt_sockaddr_start(sa),
2697 &socket_errors[error], in->mem.free - p, p);
2698
2699 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2700 #endif
2701
2702 nxt_router_conf_error(task, tmcf);
2703 }
2704
2705
2706 #if (NXT_TLS)
2707
2708 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2709 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2710 void *data)
2711 {
2712 nxt_mp_t *mp;
2713 nxt_int_t ret;
2714 nxt_tls_conf_t *tlscf;
2715 nxt_router_tlssock_t *tls;
2716 nxt_tls_bundle_conf_t *bundle;
2717 nxt_router_temp_conf_t *tmcf;
2718
2719 nxt_debug(task, "tls rpc handler");
2720
2721 tls = data;
2722 tmcf = tls->temp_conf;
2723
2724 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2725 goto fail;
2726 }
2727
2728 mp = tmcf->router_conf->mem_pool;
2729
2730 if (tls->socket_conf->tls == NULL){
2731 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2732 if (nxt_slow_path(tlscf == NULL)) {
2733 goto fail;
2734 }
2735
2736 tlscf->no_wait_shutdown = 1;
2737 tls->socket_conf->tls = tlscf;
2738
2739 } else {
2740 tlscf = tls->socket_conf->tls;
2741 }
2742
2743 tls->tls_init->conf = tlscf;
2744
2745 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2746 if (nxt_slow_path(bundle == NULL)) {
2747 goto fail;
2748 }
2749
2750 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2751 goto fail;
2752 }
2753
2754 bundle->chain_file = msg->fd[0];
2755 bundle->next = tlscf->bundle;
2756 tlscf->bundle = bundle;
2757
2758 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2759 tls->last);
2760 if (nxt_slow_path(ret != NXT_OK)) {
2761 goto fail;
2762 }
2763
2764 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2765 nxt_router_conf_apply, task, tmcf, NULL);
2766 return;
2767
2768 fail:
2769
2770 nxt_router_conf_error(task, tmcf);
2771 }
2772
2773 #endif
2774
2775
2776 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)2777 nxt_router_app_rpc_create(nxt_task_t *task,
2778 nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2779 {
2780 size_t size;
2781 uint32_t stream;
2782 nxt_int_t ret;
2783 nxt_buf_t *b;
2784 nxt_port_t *router_port, *dport;
2785 nxt_runtime_t *rt;
2786 nxt_app_rpc_t *rpc;
2787
2788 rt = task->thread->runtime;
2789
2790 dport = app->proto_port;
2791
2792 if (dport == NULL) {
2793 nxt_debug(task, "app '%V' prototype prefork", &app->name);
2794
2795 size = app->name.length + 1 + app->conf.length;
2796
2797 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2798 if (nxt_slow_path(b == NULL)) {
2799 goto fail;
2800 }
2801
2802 b->completion_handler = nxt_buf_dummy_completion;
2803
2804 nxt_buf_cpystr(b, &app->name);
2805 *b->mem.free++ = '\0';
2806 nxt_buf_cpystr(b, &app->conf);
2807
2808 dport = rt->port_by_type[NXT_PROCESS_MAIN];
2809
2810 } else {
2811 nxt_debug(task, "app '%V' prefork", &app->name);
2812
2813 b = NULL;
2814 }
2815
2816 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2817
2818 rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2819 nxt_router_app_prefork_ready,
2820 nxt_router_app_prefork_error,
2821 sizeof(nxt_app_rpc_t));
2822 if (nxt_slow_path(rpc == NULL)) {
2823 goto fail;
2824 }
2825
2826 rpc->app = app;
2827 rpc->temp_conf = tmcf;
2828 rpc->proto = (b != NULL);
2829
2830 stream = nxt_port_rpc_ex_stream(rpc);
2831
2832 ret = nxt_port_socket_write(task, dport,
2833 NXT_PORT_MSG_START_PROCESS,
2834 -1, stream, router_port->id, b);
2835 if (nxt_slow_path(ret != NXT_OK)) {
2836 nxt_port_rpc_cancel(task, router_port, stream);
2837 goto fail;
2838 }
2839
2840 if (b == NULL) {
2841 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2842
2843 app->pending_processes++;
2844 }
2845
2846 return;
2847
2848 fail:
2849
2850 nxt_router_conf_error(task, tmcf);
2851 }
2852
2853
2854 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2855 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2856 void *data)
2857 {
2858 nxt_app_t *app;
2859 nxt_port_t *port;
2860 nxt_app_rpc_t *rpc;
2861 nxt_event_engine_t *engine;
2862
2863 rpc = data;
2864 app = rpc->app;
2865
2866 port = msg->u.new_port;
2867
2868 nxt_assert(port != NULL);
2869 nxt_assert(port->id == 0);
2870
2871 if (rpc->proto) {
2872 nxt_assert(app->proto_port == NULL);
2873 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2874
2875 nxt_port_inc_use(port);
2876
2877 app->proto_port = port;
2878 port->app = app;
2879
2880 nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2881
2882 return;
2883 }
2884
2885 nxt_assert(port->type == NXT_PROCESS_APP);
2886
2887 port->app = app;
2888 port->main_app_port = port;
2889
2890 app->pending_processes--;
2891 app->processes++;
2892 app->idle_processes++;
2893
2894 engine = task->thread->engine;
2895
2896 nxt_queue_insert_tail(&app->ports, &port->app_link);
2897 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2898
2899 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2900 &app->name, port->pid, port->id);
2901
2902 nxt_port_hash_add(&app->port_hash, port);
2903 app->port_hash_count++;
2904
2905 port->idle_start = 0;
2906
2907 nxt_port_inc_use(port);
2908
2909 nxt_router_app_shared_port_send(task, port);
2910
2911 nxt_work_queue_add(&engine->fast_work_queue,
2912 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2913 }
2914
2915
2916 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2917 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2918 void *data)
2919 {
2920 nxt_app_t *app;
2921 nxt_app_rpc_t *rpc;
2922 nxt_router_temp_conf_t *tmcf;
2923
2924 rpc = data;
2925 app = rpc->app;
2926 tmcf = rpc->temp_conf;
2927
2928 if (rpc->proto) {
2929 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2930 &app->name);
2931
2932 } else {
2933 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2934 &app->name);
2935
2936 app->pending_processes--;
2937 }
2938
2939 nxt_router_conf_error(task, tmcf);
2940 }
2941
2942
2943 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)2944 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2945 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2946 {
2947 nxt_int_t ret;
2948 nxt_uint_t n, threads;
2949 nxt_queue_link_t *qlk;
2950 nxt_router_engine_conf_t *recf;
2951
2952 threads = tmcf->router_conf->threads;
2953
2954 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2955 sizeof(nxt_router_engine_conf_t));
2956 if (nxt_slow_path(tmcf->engines == NULL)) {
2957 return NXT_ERROR;
2958 }
2959
2960 n = 0;
2961
2962 for (qlk = nxt_queue_first(&router->engines);
2963 qlk != nxt_queue_tail(&router->engines);
2964 qlk = nxt_queue_next(qlk))
2965 {
2966 recf = nxt_array_zero_add(tmcf->engines);
2967 if (nxt_slow_path(recf == NULL)) {
2968 return NXT_ERROR;
2969 }
2970
2971 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2972
2973 if (n < threads) {
2974 recf->action = NXT_ROUTER_ENGINE_KEEP;
2975 ret = nxt_router_engine_conf_update(tmcf, recf);
2976
2977 } else {
2978 recf->action = NXT_ROUTER_ENGINE_DELETE;
2979 ret = nxt_router_engine_conf_delete(tmcf, recf);
2980 }
2981
2982 if (nxt_slow_path(ret != NXT_OK)) {
2983 return ret;
2984 }
2985
2986 n++;
2987 }
2988
2989 tmcf->new_threads = n;
2990
2991 while (n < threads) {
2992 recf = nxt_array_zero_add(tmcf->engines);
2993 if (nxt_slow_path(recf == NULL)) {
2994 return NXT_ERROR;
2995 }
2996
2997 recf->action = NXT_ROUTER_ENGINE_ADD;
2998
2999 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3000 if (nxt_slow_path(recf->engine == NULL)) {
3001 return NXT_ERROR;
3002 }
3003
3004 ret = nxt_router_engine_conf_create(tmcf, recf);
3005 if (nxt_slow_path(ret != NXT_OK)) {
3006 return ret;
3007 }
3008
3009 n++;
3010 }
3011
3012 return NXT_OK;
3013 }
3014
3015
3016 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3017 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3018 nxt_router_engine_conf_t *recf)
3019 {
3020 nxt_int_t ret;
3021
3022 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3023 nxt_router_listen_socket_create);
3024 if (nxt_slow_path(ret != NXT_OK)) {
3025 return ret;
3026 }
3027
3028 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3029 nxt_router_listen_socket_create);
3030 if (nxt_slow_path(ret != NXT_OK)) {
3031 return ret;
3032 }
3033
3034 return ret;
3035 }
3036
3037
3038 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3039 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3040 nxt_router_engine_conf_t *recf)
3041 {
3042 nxt_int_t ret;
3043
3044 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3045 nxt_router_listen_socket_create);
3046 if (nxt_slow_path(ret != NXT_OK)) {
3047 return ret;
3048 }
3049
3050 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3051 nxt_router_listen_socket_update);
3052 if (nxt_slow_path(ret != NXT_OK)) {
3053 return ret;
3054 }
3055
3056 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3057 if (nxt_slow_path(ret != NXT_OK)) {
3058 return ret;
3059 }
3060
3061 return ret;
3062 }
3063
3064
3065 static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3066 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3067 nxt_router_engine_conf_t *recf)
3068 {
3069 nxt_int_t ret;
3070
3071 ret = nxt_router_engine_quit(tmcf, recf);
3072 if (nxt_slow_path(ret != NXT_OK)) {
3073 return ret;
3074 }
3075
3076 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3077 if (nxt_slow_path(ret != NXT_OK)) {
3078 return ret;
3079 }
3080
3081 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3082 }
3083
3084
3085 static nxt_int_t
nxt_router_engine_joints_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets,nxt_work_handler_t handler)3086 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3087 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3088 nxt_work_handler_t handler)
3089 {
3090 nxt_int_t ret;
3091 nxt_joint_job_t *job;
3092 nxt_queue_link_t *qlk;
3093 nxt_socket_conf_t *skcf;
3094 nxt_socket_conf_joint_t *joint;
3095
3096 for (qlk = nxt_queue_first(sockets);
3097 qlk != nxt_queue_tail(sockets);
3098 qlk = nxt_queue_next(qlk))
3099 {
3100 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3101 if (nxt_slow_path(job == NULL)) {
3102 return NXT_ERROR;
3103 }
3104
3105 job->work.next = recf->jobs;
3106 recf->jobs = &job->work;
3107
3108 job->task = tmcf->engine->task;
3109 job->work.handler = handler;
3110 job->work.task = &job->task;
3111 job->work.obj = job;
3112 job->tmcf = tmcf;
3113
3114 tmcf->count++;
3115
3116 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3117 sizeof(nxt_socket_conf_joint_t));
3118 if (nxt_slow_path(joint == NULL)) {
3119 return NXT_ERROR;
3120 }
3121
3122 job->work.data = joint;
3123
3124 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3125 if (nxt_slow_path(ret != NXT_OK)) {
3126 return ret;
3127 }
3128
3129 joint->count = 1;
3130
3131 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3132 skcf->count++;
3133 joint->socket_conf = skcf;
3134
3135 joint->engine = recf->engine;
3136 }
3137
3138 return NXT_OK;
3139 }
3140
3141
3142 static nxt_int_t
nxt_router_engine_quit(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3143 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3144 nxt_router_engine_conf_t *recf)
3145 {
3146 nxt_joint_job_t *job;
3147
3148 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3149 if (nxt_slow_path(job == NULL)) {
3150 return NXT_ERROR;
3151 }
3152
3153 job->work.next = recf->jobs;
3154 recf->jobs = &job->work;
3155
3156 job->task = tmcf->engine->task;
3157 job->work.handler = nxt_router_worker_thread_quit;
3158 job->work.task = &job->task;
3159 job->work.obj = NULL;
3160 job->work.data = NULL;
3161 job->tmcf = NULL;
3162
3163 return NXT_OK;
3164 }
3165
3166
3167 static nxt_int_t
nxt_router_engine_joints_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets)3168 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3169 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3170 {
3171 nxt_joint_job_t *job;
3172 nxt_queue_link_t *qlk;
3173
3174 for (qlk = nxt_queue_first(sockets);
3175 qlk != nxt_queue_tail(sockets);
3176 qlk = nxt_queue_next(qlk))
3177 {
3178 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3179 if (nxt_slow_path(job == NULL)) {
3180 return NXT_ERROR;
3181 }
3182
3183 job->work.next = recf->jobs;
3184 recf->jobs = &job->work;
3185
3186 job->task = tmcf->engine->task;
3187 job->work.handler = nxt_router_listen_socket_delete;
3188 job->work.task = &job->task;
3189 job->work.obj = job;
3190 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3191 job->tmcf = tmcf;
3192
3193 tmcf->count++;
3194 }
3195
3196 return NXT_OK;
3197 }
3198
3199
3200 static nxt_int_t
nxt_router_threads_create(nxt_task_t * task,nxt_runtime_t * rt,nxt_router_temp_conf_t * tmcf)3201 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3202 nxt_router_temp_conf_t *tmcf)
3203 {
3204 nxt_int_t ret;
3205 nxt_uint_t i, threads;
3206 nxt_router_engine_conf_t *recf;
3207
3208 recf = tmcf->engines->elts;
3209 threads = tmcf->router_conf->threads;
3210
3211 for (i = tmcf->new_threads; i < threads; i++) {
3212 ret = nxt_router_thread_create(task, rt, recf[i].engine);
3213 if (nxt_slow_path(ret != NXT_OK)) {
3214 return ret;
3215 }
3216 }
3217
3218 return NXT_OK;
3219 }
3220
3221
3222 static nxt_int_t
nxt_router_thread_create(nxt_task_t * task,nxt_runtime_t * rt,nxt_event_engine_t * engine)3223 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3224 nxt_event_engine_t *engine)
3225 {
3226 nxt_int_t ret;
3227 nxt_thread_link_t *link;
3228 nxt_thread_handle_t handle;
3229
3230 link = nxt_zalloc(sizeof(nxt_thread_link_t));
3231
3232 if (nxt_slow_path(link == NULL)) {
3233 return NXT_ERROR;
3234 }
3235
3236 link->start = nxt_router_thread_start;
3237 link->engine = engine;
3238 link->work.handler = nxt_router_thread_exit_handler;
3239 link->work.task = task;
3240 link->work.data = link;
3241
3242 nxt_queue_insert_tail(&rt->engines, &engine->link);
3243
3244 ret = nxt_thread_create(&handle, link);
3245
3246 if (nxt_slow_path(ret != NXT_OK)) {
3247 nxt_queue_remove(&engine->link);
3248 }
3249
3250 return ret;
3251 }
3252
3253
3254 static void
nxt_router_apps_sort(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf)3255 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3256 nxt_router_temp_conf_t *tmcf)
3257 {
3258 nxt_app_t *app;
3259
3260 nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3261
3262 nxt_router_app_unlink(task, app);
3263
3264 } nxt_queue_loop;
3265
3266 nxt_queue_add(&router->apps, &tmcf->previous);
3267 nxt_queue_add(&router->apps, &tmcf->apps);
3268 }
3269
3270
3271 static void
nxt_router_engines_post(nxt_router_t * router,nxt_router_temp_conf_t * tmcf)3272 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3273 {
3274 nxt_uint_t n;
3275 nxt_event_engine_t *engine;
3276 nxt_router_engine_conf_t *recf;
3277
3278 recf = tmcf->engines->elts;
3279
3280 for (n = tmcf->engines->nelts; n != 0; n--) {
3281 engine = recf->engine;
3282
3283 switch (recf->action) {
3284
3285 case NXT_ROUTER_ENGINE_KEEP:
3286 break;
3287
3288 case NXT_ROUTER_ENGINE_ADD:
3289 nxt_queue_insert_tail(&router->engines, &engine->link0);
3290 break;
3291
3292 case NXT_ROUTER_ENGINE_DELETE:
3293 nxt_queue_remove(&engine->link0);
3294 break;
3295 }
3296
3297 nxt_router_engine_post(engine, recf->jobs);
3298
3299 recf++;
3300 }
3301 }
3302
3303
3304 static void
nxt_router_engine_post(nxt_event_engine_t * engine,nxt_work_t * jobs)3305 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3306 {
3307 nxt_work_t *work, *next;
3308
3309 for (work = jobs; work != NULL; work = next) {
3310 next = work->next;
3311 work->next = NULL;
3312
3313 nxt_event_engine_post(engine, work);
3314 }
3315 }
3316
3317
3318 static nxt_port_handlers_t nxt_router_app_port_handlers = {
3319 .rpc_error = nxt_port_rpc_handler,
3320 .mmap = nxt_port_mmap_handler,
3321 .data = nxt_port_rpc_handler,
3322 .oosm = nxt_router_oosm_handler,
3323 .req_headers_ack = nxt_port_rpc_handler,
3324 };
3325
3326
3327 static void
nxt_router_thread_start(void * data)3328 nxt_router_thread_start(void *data)
3329 {
3330 nxt_int_t ret;
3331 nxt_port_t *port;
3332 nxt_task_t *task;
3333 nxt_work_t *work;
3334 nxt_thread_t *thread;
3335 nxt_thread_link_t *link;
3336 nxt_event_engine_t *engine;
3337
3338 link = data;
3339 engine = link->engine;
3340 task = &engine->task;
3341
3342 thread = nxt_thread();
3343
3344 nxt_event_engine_thread_adopt(engine);
3345
3346 /* STUB */
3347 thread->runtime = engine->task.thread->runtime;
3348
3349 engine->task.thread = thread;
3350 engine->task.log = thread->log;
3351 thread->engine = engine;
3352 thread->task = &engine->task;
3353 #if 0
3354 thread->fiber = &engine->fibers->fiber;
3355 #endif
3356
3357 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3358 if (nxt_slow_path(engine->mem_pool == NULL)) {
3359 return;
3360 }
3361
3362 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3363 NXT_PROCESS_ROUTER);
3364 if (nxt_slow_path(port == NULL)) {
3365 return;
3366 }
3367
3368 ret = nxt_port_socket_init(task, port, 0);
3369 if (nxt_slow_path(ret != NXT_OK)) {
3370 nxt_port_use(task, port, -1);
3371 return;
3372 }
3373
3374 ret = nxt_router_port_queue_init(task, port);
3375 if (nxt_slow_path(ret != NXT_OK)) {
3376 nxt_port_use(task, port, -1);
3377 return;
3378 }
3379
3380 engine->port = port;
3381
3382 nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3383
3384 work = nxt_zalloc(sizeof(nxt_work_t));
3385 if (nxt_slow_path(work == NULL)) {
3386 return;
3387 }
3388
3389 work->handler = nxt_router_rt_add_port;
3390 work->task = link->work.task;
3391 work->obj = work;
3392 work->data = port;
3393
3394 nxt_event_engine_post(link->work.task->thread->engine, work);
3395
3396 nxt_event_engine_start(engine);
3397 }
3398
3399
3400 static void
nxt_router_rt_add_port(nxt_task_t * task,void * obj,void * data)3401 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3402 {
3403 nxt_int_t res;
3404 nxt_port_t *port;
3405 nxt_runtime_t *rt;
3406
3407 rt = task->thread->runtime;
3408 port = data;
3409
3410 nxt_free(obj);
3411
3412 res = nxt_port_hash_add(&rt->ports, port);
3413
3414 if (nxt_fast_path(res == NXT_OK)) {
3415 nxt_port_use(task, port, 1);
3416 }
3417 }
3418
3419
3420 static void
nxt_router_listen_socket_create(nxt_task_t * task,void * obj,void * data)3421 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3422 {
3423 nxt_joint_job_t *job;
3424 nxt_socket_conf_t *skcf;
3425 nxt_listen_event_t *lev;
3426 nxt_listen_socket_t *ls;
3427 nxt_thread_spinlock_t *lock;
3428 nxt_socket_conf_joint_t *joint;
3429
3430 job = obj;
3431 joint = data;
3432
3433 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3434
3435 skcf = joint->socket_conf;
3436 ls = skcf->listen;
3437
3438 lev = nxt_listen_event(task, ls);
3439 if (nxt_slow_path(lev == NULL)) {
3440 nxt_router_listen_socket_release(task, skcf);
3441 return;
3442 }
3443
3444 lev->socket.data = joint;
3445
3446 lock = &skcf->router_conf->router->lock;
3447
3448 nxt_thread_spin_lock(lock);
3449 ls->count++;
3450 nxt_thread_spin_unlock(lock);
3451
3452 job->work.next = NULL;
3453 job->work.handler = nxt_router_conf_wait;
3454
3455 nxt_event_engine_post(job->tmcf->engine, &job->work);
3456 }
3457
3458
3459 nxt_inline nxt_listen_event_t *
nxt_router_listen_event(nxt_queue_t * listen_connections,nxt_socket_conf_t * skcf)3460 nxt_router_listen_event(nxt_queue_t *listen_connections,
3461 nxt_socket_conf_t *skcf)
3462 {
3463 nxt_socket_t fd;
3464 nxt_queue_link_t *qlk;
3465 nxt_listen_event_t *lev;
3466
3467 fd = skcf->listen->socket;
3468
3469 for (qlk = nxt_queue_first(listen_connections);
3470 qlk != nxt_queue_tail(listen_connections);
3471 qlk = nxt_queue_next(qlk))
3472 {
3473 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3474
3475 if (fd == lev->socket.fd) {
3476 return lev;
3477 }
3478 }
3479
3480 return NULL;
3481 }
3482
3483
3484 static void
nxt_router_listen_socket_update(nxt_task_t * task,void * obj,void * data)3485 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3486 {
3487 nxt_joint_job_t *job;
3488 nxt_event_engine_t *engine;
3489 nxt_listen_event_t *lev;
3490 nxt_socket_conf_joint_t *joint, *old;
3491
3492 job = obj;
3493 joint = data;
3494
3495 engine = task->thread->engine;
3496
3497 nxt_queue_insert_tail(&engine->joints, &joint->link);
3498
3499 lev = nxt_router_listen_event(&engine->listen_connections,
3500 joint->socket_conf);
3501
3502 old = lev->socket.data;
3503 lev->socket.data = joint;
3504 lev->listen = joint->socket_conf->listen;
3505
3506 job->work.next = NULL;
3507 job->work.handler = nxt_router_conf_wait;
3508
3509 nxt_event_engine_post(job->tmcf->engine, &job->work);
3510
3511 /*
3512 * The task is allocated from configuration temporary
3513 * memory pool so it can be freed after engine post operation.
3514 */
3515
3516 nxt_router_conf_release(&engine->task, old);
3517 }
3518
3519
3520 static void
nxt_router_listen_socket_delete(nxt_task_t * task,void * obj,void * data)3521 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3522 {
3523 nxt_socket_conf_t *skcf;
3524 nxt_listen_event_t *lev;
3525 nxt_event_engine_t *engine;
3526 nxt_socket_conf_joint_t *joint;
3527
3528 skcf = data;
3529
3530 engine = task->thread->engine;
3531
3532 lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3533
3534 nxt_fd_event_delete(engine, &lev->socket);
3535
3536 nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3537 lev->socket.fd);
3538
3539 joint = lev->socket.data;
3540 joint->close_job = obj;
3541
3542 lev->timer.handler = nxt_router_listen_socket_close;
3543 lev->timer.work_queue = &engine->fast_work_queue;
3544
3545 nxt_timer_add(engine, &lev->timer, 0);
3546 }
3547
3548
3549 static void
nxt_router_worker_thread_quit(nxt_task_t * task,void * obj,void * data)3550 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3551 {
3552 nxt_event_engine_t *engine;
3553
3554 nxt_debug(task, "router worker thread quit");
3555
3556 engine = task->thread->engine;
3557
3558 engine->shutdown = 1;
3559
3560 if (nxt_queue_is_empty(&engine->joints)) {
3561 nxt_thread_exit(task->thread);
3562 }
3563 }
3564
3565
3566 static void
nxt_router_listen_socket_close(nxt_task_t * task,void * obj,void * data)3567 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3568 {
3569 nxt_timer_t *timer;
3570 nxt_joint_job_t *job;
3571 nxt_listen_event_t *lev;
3572 nxt_socket_conf_joint_t *joint;
3573
3574 timer = obj;
3575 lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3576
3577 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3578 lev->socket.fd);
3579
3580 nxt_queue_remove(&lev->link);
3581
3582 joint = lev->socket.data;
3583 lev->socket.data = NULL;
3584
3585 /* 'task' refers to lev->task and we cannot use after nxt_free() */
3586 task = &task->thread->engine->task;
3587
3588 nxt_router_listen_socket_release(task, joint->socket_conf);
3589
3590 job = joint->close_job;
3591 job->work.next = NULL;
3592 job->work.handler = nxt_router_conf_wait;
3593
3594 nxt_event_engine_post(job->tmcf->engine, &job->work);
3595
3596 nxt_router_listen_event_release(task, lev, joint);
3597 }
3598
3599
3600 static void
nxt_router_listen_socket_release(nxt_task_t * task,nxt_socket_conf_t * skcf)3601 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3602 {
3603 nxt_listen_socket_t *ls;
3604 nxt_thread_spinlock_t *lock;
3605
3606 ls = skcf->listen;
3607 lock = &skcf->router_conf->router->lock;
3608
3609 nxt_thread_spin_lock(lock);
3610
3611 nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3612 task->thread->engine, ls->count);
3613
3614 if (--ls->count != 0) {
3615 ls = NULL;
3616 }
3617
3618 nxt_thread_spin_unlock(lock);
3619
3620 if (ls != NULL) {
3621 nxt_socket_close(task, ls->socket);
3622 nxt_free(ls);
3623 }
3624 }
3625
3626
3627 void
nxt_router_listen_event_release(nxt_task_t * task,nxt_listen_event_t * lev,nxt_socket_conf_joint_t * joint)3628 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3629 nxt_socket_conf_joint_t *joint)
3630 {
3631 nxt_event_engine_t *engine;
3632
3633 nxt_debug(task, "listen event count: %D", lev->count);
3634
3635 engine = task->thread->engine;
3636
3637 if (--lev->count == 0) {
3638 if (lev->next != NULL) {
3639 nxt_sockaddr_cache_free(engine, lev->next);
3640
3641 nxt_conn_free(task, lev->next);
3642 }
3643
3644 nxt_free(lev);
3645 }
3646
3647 if (joint != NULL) {
3648 nxt_router_conf_release(task, joint);
3649 }
3650
3651 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3652 nxt_thread_exit(task->thread);
3653 }
3654 }
3655
3656
3657 void
nxt_router_conf_release(nxt_task_t * task,nxt_socket_conf_joint_t * joint)3658 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3659 {
3660 nxt_socket_conf_t *skcf;
3661 nxt_router_conf_t *rtcf;
3662 nxt_thread_spinlock_t *lock;
3663
3664 nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3665
3666 if (--joint->count != 0) {
3667 return;
3668 }
3669
3670 nxt_queue_remove(&joint->link);
3671
3672 /*
3673 * The joint content can not be safely used after the critical
3674 * section protected by the spinlock because its memory pool may
3675 * be already destroyed by another thread.
3676 */
3677 skcf = joint->socket_conf;
3678 rtcf = skcf->router_conf;
3679 lock = &rtcf->router->lock;
3680
3681 nxt_thread_spin_lock(lock);
3682
3683 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3684 rtcf, rtcf->count);
3685
3686 if (--skcf->count != 0) {
3687 skcf = NULL;
3688 rtcf = NULL;
3689
3690 } else {
3691 nxt_queue_remove(&skcf->link);
3692
3693 if (--rtcf->count != 0) {
3694 rtcf = NULL;
3695 }
3696 }
3697
3698 nxt_thread_spin_unlock(lock);
3699
3700 #if (NXT_TLS)
3701 if (skcf != NULL && skcf->tls != NULL) {
3702 task->thread->runtime->tls->server_free(task, skcf->tls);
3703 }
3704 #endif
3705
3706 /* TODO remove engine->port */
3707
3708 if (rtcf != NULL) {
3709 nxt_debug(task, "old router conf is destroyed");
3710
3711 nxt_router_apps_hash_use(task, rtcf, -1);
3712
3713 nxt_router_access_log_release(task, lock, rtcf->access_log);
3714
3715 nxt_mp_thread_adopt(rtcf->mem_pool);
3716
3717 nxt_mp_destroy(rtcf->mem_pool);
3718 }
3719 }
3720
3721
3722 static void
nxt_router_access_log_writer(nxt_task_t * task,nxt_http_request_t * r,nxt_router_access_log_t * access_log)3723 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3724 nxt_router_access_log_t *access_log)
3725 {
3726 size_t size;
3727 u_char *buf, *p;
3728 nxt_off_t bytes;
3729
3730 static nxt_time_string_t date_cache = {
3731 (nxt_atomic_uint_t) -1,
3732 nxt_router_access_log_date,
3733 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3734 nxt_length("31/Dec/1986:19:40:00 +0300"),
3735 NXT_THREAD_TIME_LOCAL,
3736 NXT_THREAD_TIME_SEC,
3737 };
3738
3739 size = r->remote->address_length
3740 + 6 /* ' - - [' */
3741 + date_cache.size
3742 + 3 /* '] "' */
3743 + r->method->length
3744 + 1 /* space */
3745 + r->target.length
3746 + 1 /* space */
3747 + r->version.length
3748 + 2 /* '" ' */
3749 + 3 /* status */
3750 + 1 /* space */
3751 + NXT_OFF_T_LEN
3752 + 2 /* ' "' */
3753 + (r->referer != NULL ? r->referer->value_length : 1)
3754 + 3 /* '" "' */
3755 + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3756 + 2 /* '"\n' */
3757 ;
3758
3759 buf = nxt_mp_nget(r->mem_pool, size);
3760 if (nxt_slow_path(buf == NULL)) {
3761 return;
3762 }
3763
3764 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3765 r->remote->address_length);
3766
3767 p = nxt_cpymem(p, " - - [", 6);
3768
3769 p = nxt_thread_time_string(task->thread, &date_cache, p);
3770
3771 p = nxt_cpymem(p, "] \"", 3);
3772
3773 if (r->method->length != 0) {
3774 p = nxt_cpymem(p, r->method->start, r->method->length);
3775
3776 if (r->target.length != 0) {
3777 *p++ = ' ';
3778 p = nxt_cpymem(p, r->target.start, r->target.length);
3779
3780 if (r->version.length != 0) {
3781 *p++ = ' ';
3782 p = nxt_cpymem(p, r->version.start, r->version.length);
3783 }
3784 }
3785
3786 } else {
3787 *p++ = '-';
3788 }
3789
3790 p = nxt_cpymem(p, "\" ", 2);
3791
3792 p = nxt_sprintf(p, p + 3, "%03d", r->status);
3793
3794 *p++ = ' ';
3795
3796 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3797
3798 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3799
3800 p = nxt_cpymem(p, " \"", 2);
3801
3802 if (r->referer != NULL) {
3803 p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3804
3805 } else {
3806 *p++ = '-';
3807 }
3808
3809 p = nxt_cpymem(p, "\" \"", 3);
3810
3811 if (r->user_agent != NULL) {
3812 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3813
3814 } else {
3815 *p++ = '-';
3816 }
3817
3818 p = nxt_cpymem(p, "\"\n", 2);
3819
3820 nxt_fd_write(access_log->fd, buf, p - buf);
3821 }
3822
3823
3824 static u_char *
nxt_router_access_log_date(u_char * buf,nxt_realtime_t * now,struct tm * tm,size_t size,const char * format)3825 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3826 size_t size, const char *format)
3827 {
3828 u_char sign;
3829 time_t gmtoff;
3830
3831 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3832 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3833
3834 gmtoff = nxt_timezone(tm) / 60;
3835
3836 if (gmtoff < 0) {
3837 gmtoff = -gmtoff;
3838 sign = '-';
3839
3840 } else {
3841 sign = '+';
3842 }
3843
3844 return nxt_sprintf(buf, buf + size, format,
3845 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3846 tm->tm_hour, tm->tm_min, tm->tm_sec,
3847 sign, gmtoff / 60, gmtoff % 60);
3848 }
3849
3850
3851 static void
nxt_router_access_log_open(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)3852 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3853 {
3854 uint32_t stream;
3855 nxt_int_t ret;
3856 nxt_buf_t *b;
3857 nxt_port_t *main_port, *router_port;
3858 nxt_runtime_t *rt;
3859 nxt_router_access_log_t *access_log;
3860
3861 access_log = tmcf->router_conf->access_log;
3862
3863 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3864 if (nxt_slow_path(b == NULL)) {
3865 goto fail;
3866 }
3867
3868 b->completion_handler = nxt_buf_dummy_completion;
3869
3870 nxt_buf_cpystr(b, &access_log->path);
3871 *b->mem.free++ = '\0';
3872
3873 rt = task->thread->runtime;
3874 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3875 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3876
3877 stream = nxt_port_rpc_register_handler(task, router_port,
3878 nxt_router_access_log_ready,
3879 nxt_router_access_log_error,
3880 -1, tmcf);
3881 if (nxt_slow_path(stream == 0)) {
3882 goto fail;
3883 }
3884
3885 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3886 stream, router_port->id, b);
3887
3888 if (nxt_slow_path(ret != NXT_OK)) {
3889 nxt_port_rpc_cancel(task, router_port, stream);
3890 goto fail;
3891 }
3892
3893 return;
3894
3895 fail:
3896
3897 nxt_router_conf_error(task, tmcf);
3898 }
3899
3900
3901 static void
nxt_router_access_log_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3902 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3903 void *data)
3904 {
3905 nxt_router_temp_conf_t *tmcf;
3906 nxt_router_access_log_t *access_log;
3907
3908 tmcf = data;
3909
3910 access_log = tmcf->router_conf->access_log;
3911
3912 access_log->fd = msg->fd[0];
3913
3914 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3915 nxt_router_conf_apply, task, tmcf, NULL);
3916 }
3917
3918
3919 static void
nxt_router_access_log_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3920 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3921 void *data)
3922 {
3923 nxt_router_temp_conf_t *tmcf;
3924
3925 tmcf = data;
3926
3927 nxt_router_conf_error(task, tmcf);
3928 }
3929
3930
3931 static void
nxt_router_access_log_use(nxt_thread_spinlock_t * lock,nxt_router_access_log_t * access_log)3932 nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
3933 nxt_router_access_log_t *access_log)
3934 {
3935 if (access_log == NULL) {
3936 return;
3937 }
3938
3939 nxt_thread_spin_lock(lock);
3940
3941 access_log->count++;
3942
3943 nxt_thread_spin_unlock(lock);
3944 }
3945
3946
3947 static void
nxt_router_access_log_release(nxt_task_t * task,nxt_thread_spinlock_t * lock,nxt_router_access_log_t * access_log)3948 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3949 nxt_router_access_log_t *access_log)
3950 {
3951 if (access_log == NULL) {
3952 return;
3953 }
3954
3955 nxt_thread_spin_lock(lock);
3956
3957 if (--access_log->count != 0) {
3958 access_log = NULL;
3959 }
3960
3961 nxt_thread_spin_unlock(lock);
3962
3963 if (access_log != NULL) {
3964
3965 if (access_log->fd != -1) {
3966 nxt_fd_close(access_log->fd);
3967 }
3968
3969 nxt_free(access_log);
3970 }
3971 }
3972
3973
3974 typedef struct {
3975 nxt_mp_t *mem_pool;
3976 nxt_router_access_log_t *access_log;
3977 } nxt_router_access_log_reopen_t;
3978
3979
3980 static void
nxt_router_access_log_reopen_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)3981 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3982 {
3983 nxt_mp_t *mp;
3984 uint32_t stream;
3985 nxt_int_t ret;
3986 nxt_buf_t *b;
3987 nxt_port_t *main_port, *router_port;
3988 nxt_runtime_t *rt;
3989 nxt_router_access_log_t *access_log;
3990 nxt_router_access_log_reopen_t *reopen;
3991
3992 access_log = nxt_router->access_log;
3993
3994 if (access_log == NULL) {
3995 return;
3996 }
3997
3998 mp = nxt_mp_create(1024, 128, 256, 32);
3999 if (nxt_slow_path(mp == NULL)) {
4000 return;
4001 }
4002
4003 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
4004 if (nxt_slow_path(reopen == NULL)) {
4005 goto fail;
4006 }
4007
4008 reopen->mem_pool = mp;
4009 reopen->access_log = access_log;
4010
4011 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
4012 if (nxt_slow_path(b == NULL)) {
4013 goto fail;
4014 }
4015
4016 b->completion_handler = nxt_router_access_log_reopen_completion;
4017
4018 nxt_buf_cpystr(b, &access_log->path);
4019 *b->mem.free++ = '\0';
4020
4021 rt = task->thread->runtime;
4022 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
4023 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
4024
4025 stream = nxt_port_rpc_register_handler(task, router_port,
4026 nxt_router_access_log_reopen_ready,
4027 nxt_router_access_log_reopen_error,
4028 -1, reopen);
4029 if (nxt_slow_path(stream == 0)) {
4030 goto fail;
4031 }
4032
4033 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
4034 stream, router_port->id, b);
4035
4036 if (nxt_slow_path(ret != NXT_OK)) {
4037 nxt_port_rpc_cancel(task, router_port, stream);
4038 goto fail;
4039 }
4040
4041 nxt_mp_retain(mp);
4042
4043 return;
4044
4045 fail:
4046
4047 nxt_mp_destroy(mp);
4048 }
4049
4050
4051 static void
nxt_router_access_log_reopen_completion(nxt_task_t * task,void * obj,void * data)4052 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
4053 {
4054 nxt_mp_t *mp;
4055 nxt_buf_t *b;
4056
4057 b = obj;
4058 mp = b->data;
4059
4060 nxt_mp_release(mp);
4061 }
4062
4063
4064 static void
nxt_router_access_log_reopen_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4065 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4066 void *data)
4067 {
4068 nxt_router_access_log_t *access_log;
4069 nxt_router_access_log_reopen_t *reopen;
4070
4071 reopen = data;
4072
4073 access_log = reopen->access_log;
4074
4075 if (access_log == nxt_router->access_log) {
4076
4077 if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
4078 nxt_alert(task, "dup2(%FD, %FD) failed %E",
4079 msg->fd[0], access_log->fd, nxt_errno);
4080 }
4081 }
4082
4083 nxt_fd_close(msg->fd[0]);
4084 nxt_mp_release(reopen->mem_pool);
4085 }
4086
4087
4088 static void
nxt_router_access_log_reopen_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4089 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4090 void *data)
4091 {
4092 nxt_router_access_log_reopen_t *reopen;
4093
4094 reopen = data;
4095
4096 nxt_mp_release(reopen->mem_pool);
4097 }
4098
4099
4100 static void
nxt_router_thread_exit_handler(nxt_task_t * task,void * obj,void * data)4101 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4102 {
4103 nxt_port_t *port;
4104 nxt_thread_link_t *link;
4105 nxt_event_engine_t *engine;
4106 nxt_thread_handle_t handle;
4107
4108 handle = (nxt_thread_handle_t) (uintptr_t) obj;
4109 link = data;
4110
4111 nxt_thread_wait(handle);
4112
4113 engine = link->engine;
4114
4115 nxt_queue_remove(&engine->link);
4116
4117 port = engine->port;
4118
4119 // TODO notify all apps
4120
4121 port->engine = task->thread->engine;
4122 nxt_mp_thread_adopt(port->mem_pool);
4123 nxt_port_use(task, port, -1);
4124
4125 nxt_mp_thread_adopt(engine->mem_pool);
4126 nxt_mp_destroy(engine->mem_pool);
4127
4128 nxt_event_engine_free(engine);
4129
4130 nxt_free(link);
4131 }
4132
4133
4134 static void
nxt_router_response_ready_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4135 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4136 void *data)
4137 {
4138 size_t b_size, count;
4139 nxt_int_t ret;
4140 nxt_app_t *app;
4141 nxt_buf_t *b, *next;
4142 nxt_port_t *app_port;
4143 nxt_unit_field_t *f;
4144 nxt_http_field_t *field;
4145 nxt_http_request_t *r;
4146 nxt_unit_response_t *resp;
4147 nxt_request_rpc_data_t *req_rpc_data;
4148
4149 req_rpc_data = data;
4150
4151 r = req_rpc_data->request;
4152 if (nxt_slow_path(r == NULL)) {
4153 return;
4154 }
4155
4156 if (r->error) {
4157 nxt_request_rpc_data_unlink(task, req_rpc_data);
4158 return;
4159 }
4160
4161 app = req_rpc_data->app;
4162 nxt_assert(app != NULL);
4163
4164 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4165 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4166
4167 return;
4168 }
4169
4170 b = (msg->size == 0) ? NULL : msg->buf;
4171
4172 if (msg->port_msg.last != 0) {
4173 nxt_debug(task, "router data create last buf");
4174
4175 nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4176
4177 req_rpc_data->rpc_cancel = 0;
4178
4179 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4180 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4181 }
4182
4183 nxt_request_rpc_data_unlink(task, req_rpc_data);
4184
4185 } else {
4186 if (app->timeout != 0) {
4187 r->timer.handler = nxt_router_app_timeout;
4188 r->timer_data = req_rpc_data;
4189 nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4190 }
4191 }
4192
4193 if (b == NULL) {
4194 return;
4195 }
4196
4197 if (msg->buf == b) {
4198 /* Disable instant buffer completion/re-using by port. */
4199 msg->buf = NULL;
4200 }
4201
4202 if (r->header_sent) {
4203 nxt_buf_chain_add(&r->out, b);
4204 nxt_http_request_send_body(task, r, NULL);
4205
4206 } else {
4207 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4208
4209 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4210 nxt_alert(task, "response buffer too small: %z", b_size);
4211 goto fail;
4212 }
4213
4214 resp = (void *) b->mem.pos;
4215 count = (b_size - sizeof(nxt_unit_response_t))
4216 / sizeof(nxt_unit_field_t);
4217
4218 if (nxt_slow_path(count < resp->fields_count)) {
4219 nxt_alert(task, "response buffer too small for fields count: %D",
4220 resp->fields_count);
4221 goto fail;
4222 }
4223
4224 field = NULL;
4225
4226 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4227 if (f->skip) {
4228 continue;
4229 }
4230
4231 field = nxt_list_add(r->resp.fields);
4232
4233 if (nxt_slow_path(field == NULL)) {
4234 goto fail;
4235 }
4236
4237 field->hash = f->hash;
4238 field->skip = 0;
4239 field->hopbyhop = 0;
4240
4241 field->name_length = f->name_length;
4242 field->value_length = f->value_length;
4243 field->name = nxt_unit_sptr_get(&f->name);
4244 field->value = nxt_unit_sptr_get(&f->value);
4245
4246 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4247 if (nxt_slow_path(ret != NXT_OK)) {
4248 goto fail;
4249 }
4250
4251 nxt_debug(task, "header%s: %*s: %*s",
4252 (field->skip ? " skipped" : ""),
4253 (size_t) field->name_length, field->name,
4254 (size_t) field->value_length, field->value);
4255
4256 if (field->skip) {
4257 r->resp.fields->last->nelts--;
4258 }
4259 }
4260
4261 r->status = resp->status;
4262
4263 if (resp->piggyback_content_length != 0) {
4264 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4265 b->mem.free = b->mem.pos + resp->piggyback_content_length;
4266
4267 } else {
4268 b->mem.pos = b->mem.free;
4269 }
4270
4271 if (nxt_buf_mem_used_size(&b->mem) == 0) {
4272 next = b->next;
4273 b->next = NULL;
4274
4275 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4276 b->completion_handler, task, b, b->parent);
4277
4278 b = next;
4279 }
4280
4281 if (b != NULL) {
4282 nxt_buf_chain_add(&r->out, b);
4283 }
4284
4285 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4286
4287 if (r->websocket_handshake
4288 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4289 {
4290 app_port = req_rpc_data->app_port;
4291 if (nxt_slow_path(app_port == NULL)) {
4292 goto fail;
4293 }
4294
4295 nxt_thread_mutex_lock(&app->mutex);
4296
4297 app_port->main_app_port->active_websockets++;
4298
4299 nxt_thread_mutex_unlock(&app->mutex);
4300
4301 nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
4302 req_rpc_data->apr_action = NXT_APR_CLOSE;
4303
4304 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4305
4306 r->state = &nxt_http_websocket;
4307
4308 } else {
4309 r->state = &nxt_http_request_send_state;
4310 }
4311 }
4312
4313 return;
4314
4315 fail:
4316
4317 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4318
4319 nxt_request_rpc_data_unlink(task, req_rpc_data);
4320 }
4321
4322
4323 static void
nxt_router_req_headers_ack_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,nxt_request_rpc_data_t * req_rpc_data)4324 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4325 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4326 {
4327 int res;
4328 nxt_app_t *app;
4329 nxt_buf_t *b;
4330 nxt_bool_t start_process, unlinked;
4331 nxt_port_t *app_port, *main_app_port, *idle_port;
4332 nxt_queue_link_t *idle_lnk;
4333 nxt_http_request_t *r;
4334
4335 nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4336 req_rpc_data->stream,
4337 msg->port_msg.pid, msg->port_msg.reply_port);
4338
4339 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4340 msg->port_msg.pid);
4341
4342 app = req_rpc_data->app;
4343 r = req_rpc_data->request;
4344
4345 start_process = 0;
4346 unlinked = 0;
4347
4348 nxt_thread_mutex_lock(&app->mutex);
4349
4350 if (r->app_link.next != NULL) {
4351 nxt_queue_remove(&r->app_link);
4352 r->app_link.next = NULL;
4353
4354 unlinked = 1;
4355 }
4356
4357 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4358 msg->port_msg.reply_port);
4359 if (nxt_slow_path(app_port == NULL)) {
4360 nxt_thread_mutex_unlock(&app->mutex);
4361
4362 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4363
4364 if (unlinked) {
4365 nxt_mp_release(r->mem_pool);
4366 }
4367
4368 return;
4369 }
4370
4371 main_app_port = app_port->main_app_port;
4372
4373 if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4374 app->idle_processes--;
4375
4376 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4377 &app->name, main_app_port->pid, main_app_port->id,
4378 (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4379
4380 /* Check port was in 'spare_ports' using idle_start field. */
4381 if (main_app_port->idle_start == 0
4382 && app->idle_processes >= app->spare_processes)
4383 {
4384 /*
4385 * If there is a vacant space in spare ports,
4386 * move the last idle to spare_ports.
4387 */
4388 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4389
4390 idle_lnk = nxt_queue_last(&app->idle_ports);
4391 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4392 nxt_queue_remove(idle_lnk);
4393
4394 nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4395
4396 idle_port->idle_start = 0;
4397
4398 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4399 "to spare_ports",
4400 &app->name, idle_port->pid, idle_port->id);
4401 }
4402
4403 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4404 app->pending_processes++;
4405 start_process = 1;
4406 }
4407 }
4408
4409 main_app_port->active_requests++;
4410
4411 nxt_port_inc_use(app_port);
4412
4413 nxt_thread_mutex_unlock(&app->mutex);
4414
4415 if (unlinked) {
4416 nxt_mp_release(r->mem_pool);
4417 }
4418
4419 if (start_process) {
4420 nxt_router_start_app_process(task, app);
4421 }
4422
4423 nxt_port_use(task, req_rpc_data->app_port, -1);
4424
4425 req_rpc_data->app_port = app_port;
4426
4427 b = req_rpc_data->msg_info.buf;
4428
4429 if (b != NULL) {
4430 /* First buffer is already sent. Start from second. */
4431 b = b->next;
4432
4433 req_rpc_data->msg_info.buf->next = NULL;
4434 }
4435
4436 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4437 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4438 req_rpc_data->msg_info.body_fd);
4439
4440 if (req_rpc_data->msg_info.body_fd != -1) {
4441 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4442 }
4443
4444 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4445 req_rpc_data->msg_info.body_fd,
4446 req_rpc_data->stream,
4447 task->thread->engine->port->id, b);
4448
4449 if (nxt_slow_path(res != NXT_OK)) {
4450 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4451 }
4452 }
4453
4454 if (app->timeout != 0) {
4455 r->timer.handler = nxt_router_app_timeout;
4456 r->timer_data = req_rpc_data;
4457 nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4458 }
4459 }
4460
4461
4462 static const nxt_http_request_state_t nxt_http_request_send_state
4463 nxt_aligned(64) =
4464 {
4465 .error_handler = nxt_http_request_error_handler,
4466 };
4467
4468
4469 static void
nxt_http_request_send_body(nxt_task_t * task,void * obj,void * data)4470 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4471 {
4472 nxt_buf_t *out;
4473 nxt_http_request_t *r;
4474
4475 r = obj;
4476
4477 out = r->out;
4478
4479 if (out != NULL) {
4480 r->out = NULL;
4481 nxt_http_request_send(task, r, out);
4482 }
4483 }
4484
4485
4486 static void
nxt_router_response_error_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4487 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4488 void *data)
4489 {
4490 nxt_request_rpc_data_t *req_rpc_data;
4491
4492 req_rpc_data = data;
4493
4494 req_rpc_data->rpc_cancel = 0;
4495
4496 /* TODO cancel message and return if cancelled. */
4497 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4498
4499 if (req_rpc_data->request != NULL) {
4500 nxt_http_request_error(task, req_rpc_data->request,
4501 NXT_HTTP_SERVICE_UNAVAILABLE);
4502 }
4503
4504 nxt_request_rpc_data_unlink(task, req_rpc_data);
4505 }
4506
4507
4508 static void
nxt_router_app_port_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4509 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4510 void *data)
4511 {
4512 uint32_t n;
4513 nxt_app_t *app;
4514 nxt_bool_t start_process, restarted;
4515 nxt_port_t *port;
4516 nxt_app_joint_t *app_joint;
4517 nxt_app_joint_rpc_t *app_joint_rpc;
4518
4519 nxt_assert(data != NULL);
4520
4521 app_joint_rpc = data;
4522 app_joint = app_joint_rpc->app_joint;
4523 port = msg->u.new_port;
4524
4525 nxt_assert(app_joint != NULL);
4526 nxt_assert(port != NULL);
4527 nxt_assert(port->id == 0);
4528
4529 app = app_joint->app;
4530
4531 nxt_router_app_joint_use(task, app_joint, -1);
4532
4533 if (nxt_slow_path(app == NULL)) {
4534 nxt_debug(task, "new port ready for released app, send QUIT");
4535
4536 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4537
4538 return;
4539 }
4540
4541 nxt_thread_mutex_lock(&app->mutex);
4542
4543 restarted = (app->generation != app_joint_rpc->generation);
4544
4545 if (app_joint_rpc->proto) {
4546 nxt_assert(app->proto_port == NULL);
4547 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
4548
4549 n = app->proto_port_requests;
4550 app->proto_port_requests = 0;
4551
4552 if (nxt_slow_path(restarted)) {
4553 nxt_thread_mutex_unlock(&app->mutex);
4554
4555 nxt_debug(task, "proto port ready for restarted app, send QUIT");
4556
4557 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4558 NULL);
4559
4560 } else {
4561 port->app = app;
4562 app->proto_port = port;
4563
4564 nxt_thread_mutex_unlock(&app->mutex);
4565
4566 nxt_port_use(task, port, 1);
4567 }
4568
4569 port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER];
4570
4571 while (n > 0) {
4572 nxt_router_app_use(task, app, 1);
4573
4574 nxt_router_start_app_process_handler(task, port, app);
4575
4576 n--;
4577 }
4578
4579 return;
4580 }
4581
4582 nxt_assert(port->type == NXT_PROCESS_APP);
4583 nxt_assert(app->pending_processes != 0);
4584
4585 app->pending_processes--;
4586
4587 if (nxt_slow_path(restarted)) {
4588 nxt_debug(task, "new port ready for restarted app, send QUIT");
4589
4590 start_process = !task->thread->engine->shutdown
4591 && nxt_router_app_can_start(app)
4592 && nxt_router_app_need_start(app);
4593
4594 if (start_process) {
4595 app->pending_processes++;
4596 }
4597
4598 nxt_thread_mutex_unlock(&app->mutex);
4599
4600 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4601
4602 if (start_process) {
4603 nxt_router_start_app_process(task, app);
4604 }
4605
4606 return;
4607 }
4608
4609 port->app = app;
4610 port->main_app_port = port;
4611
4612 app->processes++;
4613 nxt_port_hash_add(&app->port_hash, port);
4614 app->port_hash_count++;
4615
4616 nxt_thread_mutex_unlock(&app->mutex);
4617
4618 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4619 &app->name, port->pid, app->processes, app->pending_processes);
4620
4621 nxt_router_app_shared_port_send(task, port);
4622
4623 nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
4624 }
4625
4626
4627 static nxt_int_t
nxt_router_app_shared_port_send(nxt_task_t * task,nxt_port_t * app_port)4628 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
4629 {
4630 nxt_buf_t *b;
4631 nxt_port_t *port;
4632 nxt_port_msg_new_port_t *msg;
4633
4634 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
4635 sizeof(nxt_port_data_t));
4636 if (nxt_slow_path(b == NULL)) {
4637 return NXT_ERROR;
4638 }
4639
4640 port = app_port->app->shared_port;
4641
4642 nxt_debug(task, "send port %FD to process %PI",
4643 port->pair[0], app_port->pid);
4644
4645 b->mem.free += sizeof(nxt_port_msg_new_port_t);
4646 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
4647
4648 msg->id = port->id;
4649 msg->pid = port->pid;
4650 msg->max_size = port->max_size;
4651 msg->max_share = port->max_share;
4652 msg->type = port->type;
4653
4654 return nxt_port_socket_write2(task, app_port,
4655 NXT_PORT_MSG_NEW_PORT,
4656 port->pair[0], port->queue_fd,
4657 0, 0, b);
4658 }
4659
4660
4661 static void
nxt_router_app_port_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4662 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4663 void *data)
4664 {
4665 nxt_app_t *app;
4666 nxt_app_joint_t *app_joint;
4667 nxt_queue_link_t *link;
4668 nxt_http_request_t *r;
4669 nxt_app_joint_rpc_t *app_joint_rpc;
4670
4671 nxt_assert(data != NULL);
4672
4673 app_joint_rpc = data;
4674 app_joint = app_joint_rpc->app_joint;
4675
4676 nxt_assert(app_joint != NULL);
4677
4678 app = app_joint->app;
4679
4680 nxt_router_app_joint_use(task, app_joint, -1);
4681
4682 if (nxt_slow_path(app == NULL)) {
4683 nxt_debug(task, "start error for released app");
4684
4685 return;
4686 }
4687
4688 nxt_debug(task, "app '%V' %p start error", &app->name, app);
4689
4690 link = NULL;
4691
4692 nxt_thread_mutex_lock(&app->mutex);
4693
4694 nxt_assert(app->pending_processes != 0);
4695
4696 app->pending_processes--;
4697
4698 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4699 link = nxt_queue_first(&app->ack_waiting_req);
4700
4701 nxt_queue_remove(link);
4702 link->next = NULL;
4703 }
4704
4705 nxt_thread_mutex_unlock(&app->mutex);
4706
4707 while (link != NULL) {
4708 r = nxt_container_of(link, nxt_http_request_t, app_link);
4709
4710 nxt_event_engine_post(r->engine, &r->err_work);
4711
4712 link = NULL;
4713
4714 nxt_thread_mutex_lock(&app->mutex);
4715
4716 if (app->processes == 0 && app->pending_processes == 0
4717 && !nxt_queue_is_empty(&app->ack_waiting_req))
4718 {
4719 link = nxt_queue_first(&app->ack_waiting_req);
4720
4721 nxt_queue_remove(link);
4722 link->next = NULL;
4723 }
4724
4725 nxt_thread_mutex_unlock(&app->mutex);
4726 }
4727 }
4728
4729
4730 nxt_inline nxt_port_t *
nxt_router_app_get_port_for_quit(nxt_task_t * task,nxt_app_t * app)4731 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4732 {
4733 nxt_port_t *port;
4734
4735 port = NULL;
4736
4737 nxt_thread_mutex_lock(&app->mutex);
4738
4739 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4740
4741 /* Caller is responsible to decrease port use count. */
4742 nxt_queue_chk_remove(&port->app_link);
4743
4744 if (nxt_queue_chk_remove(&port->idle_link)) {
4745 app->idle_processes--;
4746
4747 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4748 &app->name, port->pid, port->id,
4749 (port->idle_start ? "idle_ports" : "spare_ports"));
4750 }
4751
4752 nxt_port_hash_remove(&app->port_hash, port);
4753 app->port_hash_count--;
4754
4755 port->app = NULL;
4756 app->processes--;
4757
4758 break;
4759
4760 } nxt_queue_loop;
4761
4762 nxt_thread_mutex_unlock(&app->mutex);
4763
4764 return port;
4765 }
4766
4767
4768 static void
nxt_router_app_use(nxt_task_t * task,nxt_app_t * app,int i)4769 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4770 {
4771 int c;
4772
4773 c = nxt_atomic_fetch_add(&app->use_count, i);
4774
4775 if (i < 0 && c == -i) {
4776
4777 if (task->thread->engine != app->engine) {
4778 nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4779
4780 } else {
4781 nxt_router_free_app(task, app->joint, NULL);
4782 }
4783 }
4784 }
4785
4786
4787 static void
nxt_router_app_unlink(nxt_task_t * task,nxt_app_t * app)4788 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4789 {
4790 nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4791
4792 nxt_queue_remove(&app->link);
4793
4794 nxt_router_app_use(task, app, -1);
4795 }
4796
4797
4798 static void
nxt_router_app_port_release(nxt_task_t * task,nxt_app_t * app,nxt_port_t * port,nxt_apr_action_t action)4799 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4800 nxt_apr_action_t action)
4801 {
4802 int inc_use;
4803 uint32_t got_response, dec_requests;
4804 nxt_bool_t adjust_idle_timer;
4805 nxt_port_t *main_app_port;
4806
4807 nxt_assert(port != NULL);
4808
4809 inc_use = 0;
4810 got_response = 0;
4811 dec_requests = 0;
4812
4813 switch (action) {
4814 case NXT_APR_NEW_PORT:
4815 break;
4816 case NXT_APR_REQUEST_FAILED:
4817 dec_requests = 1;
4818 inc_use = -1;
4819 break;
4820 case NXT_APR_GOT_RESPONSE:
4821 got_response = 1;
4822 inc_use = -1;
4823 break;
4824 case NXT_APR_UPGRADE:
4825 got_response = 1;
4826 break;
4827 case NXT_APR_CLOSE:
4828 inc_use = -1;
4829 break;
4830 }
4831
4832 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4833 port->pid, port->id,
4834 (int) inc_use, (int) got_response);
4835
4836 if (port->id == NXT_SHARED_PORT_ID) {
4837 nxt_thread_mutex_lock(&app->mutex);
4838
4839 app->active_requests -= got_response + dec_requests;
4840
4841 nxt_thread_mutex_unlock(&app->mutex);
4842
4843 goto adjust_use;
4844 }
4845
4846 main_app_port = port->main_app_port;
4847
4848 nxt_thread_mutex_lock(&app->mutex);
4849
4850 main_app_port->active_requests -= got_response + dec_requests;
4851 app->active_requests -= got_response + dec_requests;
4852
4853 if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4854 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4855
4856 nxt_port_inc_use(main_app_port);
4857 }
4858
4859 adjust_idle_timer = 0;
4860
4861 if (main_app_port->pair[1] != -1
4862 && main_app_port->active_requests == 0
4863 && main_app_port->active_websockets == 0
4864 && main_app_port->idle_link.next == NULL)
4865 {
4866 if (app->idle_processes == app->spare_processes
4867 && app->adjust_idle_work.data == NULL)
4868 {
4869 adjust_idle_timer = 1;
4870 app->adjust_idle_work.data = app;
4871 app->adjust_idle_work.next = NULL;
4872 }
4873
4874 if (app->idle_processes < app->spare_processes) {
4875 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4876
4877 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4878 &app->name, main_app_port->pid, main_app_port->id);
4879 } else {
4880 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4881
4882 main_app_port->idle_start = task->thread->engine->timers.now;
4883
4884 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4885 &app->name, main_app_port->pid, main_app_port->id);
4886 }
4887
4888 app->idle_processes++;
4889 }
4890
4891 nxt_thread_mutex_unlock(&app->mutex);
4892
4893 if (adjust_idle_timer) {
4894 nxt_router_app_use(task, app, 1);
4895 nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4896 }
4897
4898 /* ? */
4899 if (main_app_port->pair[1] == -1) {
4900 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4901 &app->name, app, main_app_port, main_app_port->pid);
4902
4903 goto adjust_use;
4904 }
4905
4906 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4907 &app->name, app);
4908
4909 adjust_use:
4910
4911 nxt_port_use(task, port, inc_use);
4912 }
4913
4914
4915 void
nxt_router_app_port_close(nxt_task_t * task,nxt_port_t * port)4916 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4917 {
4918 nxt_app_t *app;
4919 nxt_bool_t unchain, start_process;
4920 nxt_port_t *idle_port;
4921 nxt_queue_link_t *idle_lnk;
4922
4923 app = port->app;
4924
4925 nxt_assert(app != NULL);
4926
4927 nxt_thread_mutex_lock(&app->mutex);
4928
4929 if (port == app->proto_port) {
4930 app->proto_port = NULL;
4931 port->app = NULL;
4932
4933 nxt_thread_mutex_unlock(&app->mutex);
4934
4935 nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name,
4936 port->pid);
4937
4938 nxt_port_use(task, port, -1);
4939
4940 return;
4941 }
4942
4943 nxt_port_hash_remove(&app->port_hash, port);
4944 app->port_hash_count--;
4945
4946 if (port->id != 0) {
4947 nxt_thread_mutex_unlock(&app->mutex);
4948
4949 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4950 port->pid, port->id);
4951
4952 return;
4953 }
4954
4955 unchain = nxt_queue_chk_remove(&port->app_link);
4956
4957 if (nxt_queue_chk_remove(&port->idle_link)) {
4958 app->idle_processes--;
4959
4960 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4961 &app->name, port->pid, port->id,
4962 (port->idle_start ? "idle_ports" : "spare_ports"));
4963
4964 if (port->idle_start == 0
4965 && app->idle_processes >= app->spare_processes)
4966 {
4967 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4968
4969 idle_lnk = nxt_queue_last(&app->idle_ports);
4970 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4971 nxt_queue_remove(idle_lnk);
4972
4973 nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4974
4975 idle_port->idle_start = 0;
4976
4977 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4978 "to spare_ports",
4979 &app->name, idle_port->pid, idle_port->id);
4980 }
4981 }
4982
4983 app->processes--;
4984
4985 start_process = !task->thread->engine->shutdown
4986 && nxt_router_app_can_start(app)
4987 && nxt_router_app_need_start(app);
4988
4989 if (start_process) {
4990 app->pending_processes++;
4991 }
4992
4993 nxt_thread_mutex_unlock(&app->mutex);
4994
4995 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4996
4997 if (unchain) {
4998 nxt_port_use(task, port, -1);
4999 }
5000
5001 if (start_process) {
5002 nxt_router_start_app_process(task, app);
5003 }
5004 }
5005
5006
5007 static void
nxt_router_adjust_idle_timer(nxt_task_t * task,void * obj,void * data)5008 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
5009 {
5010 nxt_app_t *app;
5011 nxt_bool_t queued;
5012 nxt_port_t *port;
5013 nxt_msec_t timeout, threshold;
5014 nxt_queue_link_t *lnk;
5015 nxt_event_engine_t *engine;
5016
5017 app = obj;
5018 queued = (data == app);
5019
5020 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
5021 &app->name, queued);
5022
5023 engine = task->thread->engine;
5024
5025 nxt_assert(app->engine == engine);
5026
5027 threshold = engine->timers.now + app->joint->idle_timer.bias;
5028 timeout = 0;
5029
5030 nxt_thread_mutex_lock(&app->mutex);
5031
5032 if (queued) {
5033 app->adjust_idle_work.data = NULL;
5034 }
5035
5036 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
5037 &app->name,
5038 (int) app->idle_processes, (int) app->spare_processes);
5039
5040 while (app->idle_processes > app->spare_processes) {
5041
5042 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
5043
5044 lnk = nxt_queue_first(&app->idle_ports);
5045 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
5046
5047 timeout = port->idle_start + app->idle_timeout;
5048
5049 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
5050 &app->name, port->pid,
5051 port->idle_start, timeout, threshold);
5052
5053 if (timeout > threshold) {
5054 break;
5055 }
5056
5057 nxt_queue_remove(lnk);
5058 lnk->next = NULL;
5059
5060 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
5061 &app->name, port->pid, port->id);
5062
5063 nxt_queue_chk_remove(&port->app_link);
5064
5065 nxt_port_hash_remove(&app->port_hash, port);
5066 app->port_hash_count--;
5067
5068 app->idle_processes--;
5069 app->processes--;
5070 port->app = NULL;
5071
5072 nxt_thread_mutex_unlock(&app->mutex);
5073
5074 nxt_debug(task, "app '%V' send QUIT to idle port %PI",
5075 &app->name, port->pid);
5076
5077 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
5078
5079 nxt_port_use(task, port, -1);
5080
5081 nxt_thread_mutex_lock(&app->mutex);
5082 }
5083
5084 nxt_thread_mutex_unlock(&app->mutex);
5085
5086 if (timeout > threshold) {
5087 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
5088
5089 } else {
5090 nxt_timer_disable(engine, &app->joint->idle_timer);
5091 }
5092
5093 if (queued) {
5094 nxt_router_app_use(task, app, -1);
5095 }
5096 }
5097
5098
5099 static void
nxt_router_app_idle_timeout(nxt_task_t * task,void * obj,void * data)5100 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
5101 {
5102 nxt_timer_t *timer;
5103 nxt_app_joint_t *app_joint;
5104
5105 timer = obj;
5106 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5107
5108 if (nxt_fast_path(app_joint->app != NULL)) {
5109 nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
5110 }
5111 }
5112
5113
5114 static void
nxt_router_app_joint_release_handler(nxt_task_t * task,void * obj,void * data)5115 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
5116 {
5117 nxt_timer_t *timer;
5118 nxt_app_joint_t *app_joint;
5119
5120 timer = obj;
5121 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5122
5123 nxt_router_app_joint_use(task, app_joint, -1);
5124 }
5125
5126
5127 static void
nxt_router_free_app(nxt_task_t * task,void * obj,void * data)5128 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
5129 {
5130 nxt_app_t *app;
5131 nxt_port_t *port, *proto_port;
5132 nxt_app_joint_t *app_joint;
5133
5134 app_joint = obj;
5135 app = app_joint->app;
5136
5137 for ( ;; ) {
5138 port = nxt_router_app_get_port_for_quit(task, app);
5139 if (port == NULL) {
5140 break;
5141 }
5142
5143 nxt_port_use(task, port, -1);
5144 }
5145
5146 nxt_thread_mutex_lock(&app->mutex);
5147
5148 for ( ;; ) {
5149 port = nxt_port_hash_retrieve(&app->port_hash);
5150 if (port == NULL) {
5151 break;
5152 }
5153
5154 app->port_hash_count--;
5155
5156 port->app = NULL;
5157
5158 nxt_port_close(task, port);
5159
5160 nxt_port_use(task, port, -1);
5161 }
5162
5163 proto_port = app->proto_port;
5164
5165 if (proto_port != NULL) {
5166 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
5167 proto_port->pid);
5168
5169 app->proto_port = NULL;
5170 proto_port->app = NULL;
5171 }
5172
5173 nxt_thread_mutex_unlock(&app->mutex);
5174
5175 if (proto_port != NULL) {
5176 nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
5177 -1, 0, 0, NULL);
5178
5179 nxt_port_close(task, proto_port);
5180
5181 nxt_port_use(task, proto_port, -1);
5182 }
5183
5184 nxt_assert(app->proto_port == NULL);
5185 nxt_assert(app->processes == 0);
5186 nxt_assert(app->active_requests == 0);
5187 nxt_assert(app->port_hash_count == 0);
5188 nxt_assert(app->idle_processes == 0);
5189 nxt_assert(nxt_queue_is_empty(&app->ports));
5190 nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5191 nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5192
5193 nxt_port_mmaps_destroy(&app->outgoing, 1);
5194
5195 nxt_thread_mutex_destroy(&app->outgoing.mutex);
5196
5197 if (app->shared_port != NULL) {
5198 app->shared_port->app = NULL;
5199 nxt_port_close(task, app->shared_port);
5200 nxt_port_use(task, app->shared_port, -1);
5201
5202 app->shared_port = NULL;
5203 }
5204
5205 nxt_thread_mutex_destroy(&app->mutex);
5206 nxt_mp_destroy(app->mem_pool);
5207
5208 app_joint->app = NULL;
5209
5210 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5211 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5212 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5213
5214 } else {
5215 nxt_router_app_joint_use(task, app_joint, -1);
5216 }
5217 }
5218
5219
5220 static void
nxt_router_app_port_get(nxt_task_t * task,nxt_app_t * app,nxt_request_rpc_data_t * req_rpc_data)5221 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5222 nxt_request_rpc_data_t *req_rpc_data)
5223 {
5224 nxt_bool_t start_process;
5225 nxt_port_t *port;
5226 nxt_http_request_t *r;
5227
5228 start_process = 0;
5229
5230 nxt_thread_mutex_lock(&app->mutex);
5231
5232 port = app->shared_port;
5233 nxt_port_inc_use(port);
5234
5235 app->active_requests++;
5236
5237 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5238 app->pending_processes++;
5239 start_process = 1;
5240 }
5241
5242 r = req_rpc_data->request;
5243
5244 /*
5245 * Put request into application-wide list to be able to cancel request
5246 * if something goes wrong with application processes.
5247 */
5248 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5249
5250 nxt_thread_mutex_unlock(&app->mutex);
5251
5252 /*
5253 * Retain request memory pool while request is linked in ack_waiting_req
5254 * to guarantee request structure memory is accessble.
5255 */
5256 nxt_mp_retain(r->mem_pool);
5257
5258 req_rpc_data->app_port = port;
5259 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5260
5261 if (start_process) {
5262 nxt_router_start_app_process(task, app);
5263 }
5264 }
5265
5266
5267 void
nxt_router_process_http_request(nxt_task_t * task,nxt_http_request_t * r,nxt_http_action_t * action)5268 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5269 nxt_http_action_t *action)
5270 {
5271 nxt_event_engine_t *engine;
5272 nxt_http_app_conf_t *conf;
5273 nxt_request_rpc_data_t *req_rpc_data;
5274
5275 conf = action->u.conf;
5276 engine = task->thread->engine;
5277
5278 r->app_target = conf->target;
5279
5280 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5281 nxt_router_response_ready_handler,
5282 nxt_router_response_error_handler,
5283 sizeof(nxt_request_rpc_data_t));
5284 if (nxt_slow_path(req_rpc_data == NULL)) {
5285 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5286 return;
5287 }
5288
5289 /*
5290 * At this point we have request req_rpc_data allocated and registered
5291 * in port handlers. Need to fixup request memory pool. Counterpart
5292 * release will be called via following call chain:
5293 * nxt_request_rpc_data_unlink() ->
5294 * nxt_router_http_request_release_post() ->
5295 * nxt_router_http_request_release()
5296 */
5297 nxt_mp_retain(r->mem_pool);
5298
5299 r->timer.task = &engine->task;
5300 r->timer.work_queue = &engine->fast_work_queue;
5301 r->timer.log = engine->task.log;
5302 r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5303
5304 r->engine = engine;
5305 r->err_work.handler = nxt_router_http_request_error;
5306 r->err_work.task = task;
5307 r->err_work.obj = r;
5308
5309 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5310 req_rpc_data->app = conf->app;
5311 req_rpc_data->msg_info.body_fd = -1;
5312 req_rpc_data->rpc_cancel = 1;
5313
5314 nxt_router_app_use(task, conf->app, 1);
5315
5316 req_rpc_data->request = r;
5317 r->req_rpc_data = req_rpc_data;
5318
5319 if (r->last != NULL) {
5320 r->last->completion_handler = nxt_router_http_request_done;
5321 }
5322
5323 nxt_router_app_port_get(task, conf->app, req_rpc_data);
5324 nxt_router_app_prepare_request(task, req_rpc_data);
5325 }
5326
5327
5328 static void
nxt_router_http_request_error(nxt_task_t * task,void * obj,void * data)5329 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5330 {
5331 nxt_http_request_t *r;
5332
5333 r = obj;
5334
5335 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5336
5337 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5338
5339 if (r->req_rpc_data != NULL) {
5340 nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5341 }
5342
5343 nxt_mp_release(r->mem_pool);
5344 }
5345
5346
5347 static void
nxt_router_http_request_done(nxt_task_t * task,void * obj,void * data)5348 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5349 {
5350 nxt_http_request_t *r;
5351
5352 r = data;
5353
5354 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5355
5356 if (r->req_rpc_data != NULL) {
5357 nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5358 }
5359
5360 nxt_http_request_close_handler(task, r, r->proto.any);
5361 }
5362
5363
5364 static void
nxt_router_app_prepare_request(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)5365 nxt_router_app_prepare_request(nxt_task_t *task,
5366 nxt_request_rpc_data_t *req_rpc_data)
5367 {
5368 nxt_app_t *app;
5369 nxt_buf_t *buf, *body;
5370 nxt_int_t res;
5371 nxt_port_t *port, *reply_port;
5372
5373 int notify;
5374 struct {
5375 nxt_port_msg_t pm;
5376 nxt_port_mmap_msg_t mm;
5377 } msg;
5378
5379
5380 app = req_rpc_data->app;
5381
5382 nxt_assert(app != NULL);
5383
5384 port = req_rpc_data->app_port;
5385
5386 nxt_assert(port != NULL);
5387 nxt_assert(port->queue != NULL);
5388
5389 reply_port = task->thread->engine->port;
5390
5391 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5392 nxt_app_msg_prefix[app->type]);
5393 if (nxt_slow_path(buf == NULL)) {
5394 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5395 req_rpc_data->stream, &app->name);
5396
5397 nxt_http_request_error(task, req_rpc_data->request,
5398 NXT_HTTP_INTERNAL_SERVER_ERROR);
5399
5400 return;
5401 }
5402
5403 nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5404 nxt_buf_used_size(buf),
5405 port->socket.fd);
5406
5407 req_rpc_data->msg_info.buf = buf;
5408
5409 body = req_rpc_data->request->body;
5410
5411 if (body != NULL && nxt_buf_is_file(body)) {
5412 req_rpc_data->msg_info.body_fd = body->file->fd;
5413
5414 body->file->fd = -1;
5415
5416 } else {
5417 req_rpc_data->msg_info.body_fd = -1;
5418 }
5419
5420 msg.pm.stream = req_rpc_data->stream;
5421 msg.pm.pid = reply_port->pid;
5422 msg.pm.reply_port = reply_port->id;
5423 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5424 msg.pm.last = 0;
5425 msg.pm.mmap = 1;
5426 msg.pm.nf = 0;
5427 msg.pm.mf = 0;
5428 msg.pm.tracking = 0;
5429
5430 nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5431 nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5432
5433 msg.mm.mmap_id = hdr->id;
5434 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5435 msg.mm.size = nxt_buf_used_size(buf);
5436
5437 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5438 req_rpc_data->stream, ¬ify,
5439 &req_rpc_data->msg_info.tracking_cookie);
5440 if (nxt_fast_path(res == NXT_OK)) {
5441 if (notify != 0) {
5442 (void) nxt_port_socket_write(task, port,
5443 NXT_PORT_MSG_READ_QUEUE,
5444 -1, req_rpc_data->stream,
5445 reply_port->id, NULL);
5446
5447 } else {
5448 nxt_debug(task, "queue is not empty");
5449 }
5450
5451 buf->is_port_mmap_sent = 1;
5452 buf->mem.pos = buf->mem.free;
5453
5454 } else {
5455 nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5456 req_rpc_data->stream, &app->name);
5457
5458 nxt_http_request_error(task, req_rpc_data->request,
5459 NXT_HTTP_INTERNAL_SERVER_ERROR);
5460 }
5461 }
5462
5463
5464 struct nxt_fields_iter_s {
5465 nxt_list_part_t *part;
5466 nxt_http_field_t *field;
5467 };
5468
5469 typedef struct nxt_fields_iter_s nxt_fields_iter_t;
5470
5471
5472 static nxt_http_field_t *
nxt_fields_part_first(nxt_list_part_t * part,nxt_fields_iter_t * i)5473 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5474 {
5475 if (part == NULL) {
5476 return NULL;
5477 }
5478
5479 while (part->nelts == 0) {
5480 part = part->next;
5481 if (part == NULL) {
5482 return NULL;
5483 }
5484 }
5485
5486 i->part = part;
5487 i->field = nxt_list_data(i->part);
5488
5489 return i->field;
5490 }
5491
5492
5493 static nxt_http_field_t *
nxt_fields_first(nxt_list_t * fields,nxt_fields_iter_t * i)5494 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5495 {
5496 return nxt_fields_part_first(nxt_list_part(fields), i);
5497 }
5498
5499
5500 static nxt_http_field_t *
nxt_fields_next(nxt_fields_iter_t * i)5501 nxt_fields_next(nxt_fields_iter_t *i)
5502 {
5503 nxt_http_field_t *end = nxt_list_data(i->part);
5504
5505 end += i->part->nelts;
5506 i->field++;
5507
5508 if (i->field < end) {
5509 return i->field;
5510 }
5511
5512 return nxt_fields_part_first(i->part->next, i);
5513 }
5514
5515
5516 static nxt_buf_t *
nxt_router_prepare_msg(nxt_task_t * task,nxt_http_request_t * r,nxt_app_t * app,const nxt_str_t * prefix)5517 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5518 nxt_app_t *app, const nxt_str_t *prefix)
5519 {
5520 void *target_pos, *query_pos;
5521 u_char *pos, *end, *p, c;
5522 size_t fields_count, req_size, size, free_size;
5523 size_t copy_size;
5524 nxt_off_t content_length;
5525 nxt_buf_t *b, *buf, *out, **tail;
5526 nxt_http_field_t *field, *dup;
5527 nxt_unit_field_t *dst_field;
5528 nxt_fields_iter_t iter, dup_iter;
5529 nxt_unit_request_t *req;
5530
5531 req_size = sizeof(nxt_unit_request_t)
5532 + r->method->length + 1
5533 + r->version.length + 1
5534 + r->remote->length + 1
5535 + r->local->length + 1
5536 + r->server_name.length + 1
5537 + r->target.length + 1
5538 + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5539
5540 content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5541 fields_count = 0;
5542
5543 nxt_list_each(field, r->fields) {
5544 fields_count++;
5545
5546 req_size += field->name_length + prefix->length + 1
5547 + field->value_length + 1;
5548 } nxt_list_loop;
5549
5550 req_size += fields_count * sizeof(nxt_unit_field_t);
5551
5552 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5553 nxt_alert(task, "headers to big to fit in shared memory (%d)",
5554 (int) req_size);
5555
5556 return NULL;
5557 }
5558
5559 out = nxt_port_mmap_get_buf(task, &app->outgoing,
5560 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5561 if (nxt_slow_path(out == NULL)) {
5562 return NULL;
5563 }
5564
5565 req = (nxt_unit_request_t *) out->mem.free;
5566 out->mem.free += req_size;
5567
5568 req->app_target = r->app_target;
5569
5570 req->content_length = content_length;
5571
5572 p = (u_char *) (req->fields + fields_count);
5573
5574 nxt_debug(task, "fields_count=%d", (int) fields_count);
5575
5576 req->method_length = r->method->length;
5577 nxt_unit_sptr_set(&req->method, p);
5578 p = nxt_cpymem(p, r->method->start, r->method->length);
5579 *p++ = '\0';
5580
5581 req->version_length = r->version.length;
5582 nxt_unit_sptr_set(&req->version, p);
5583 p = nxt_cpymem(p, r->version.start, r->version.length);
5584 *p++ = '\0';
5585
5586 req->remote_length = r->remote->address_length;
5587 nxt_unit_sptr_set(&req->remote, p);
5588 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5589 r->remote->address_length);
5590 *p++ = '\0';
5591
5592 req->local_length = r->local->address_length;
5593 nxt_unit_sptr_set(&req->local, p);
5594 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5595 *p++ = '\0';
5596
5597 req->tls = (r->tls != NULL);
5598 req->websocket_handshake = r->websocket_handshake;
5599
5600 req->server_name_length = r->server_name.length;
5601 nxt_unit_sptr_set(&req->server_name, p);
5602 p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5603 *p++ = '\0';
5604
5605 target_pos = p;
5606 req->target_length = (uint32_t) r->target.length;
5607 nxt_unit_sptr_set(&req->target, p);
5608 p = nxt_cpymem(p, r->target.start, r->target.length);
5609 *p++ = '\0';
5610
5611 req->path_length = (uint32_t) r->path->length;
5612 if (r->path->start == r->target.start) {
5613 nxt_unit_sptr_set(&req->path, target_pos);
5614
5615 } else {
5616 nxt_unit_sptr_set(&req->path, p);
5617 p = nxt_cpymem(p, r->path->start, r->path->length);
5618 *p++ = '\0';
5619 }
5620
5621 req->query_length = (uint32_t) r->args->length;
5622 if (r->args->start != NULL) {
5623 query_pos = nxt_pointer_to(target_pos,
5624 r->args->start - r->target.start);
5625
5626 nxt_unit_sptr_set(&req->query, query_pos);
5627
5628 } else {
5629 req->query.offset = 0;
5630 }
5631
5632 req->content_length_field = NXT_UNIT_NONE_FIELD;
5633 req->content_type_field = NXT_UNIT_NONE_FIELD;
5634 req->cookie_field = NXT_UNIT_NONE_FIELD;
5635 req->authorization_field = NXT_UNIT_NONE_FIELD;
5636
5637 dst_field = req->fields;
5638
5639 for (field = nxt_fields_first(r->fields, &iter);
5640 field != NULL;
5641 field = nxt_fields_next(&iter))
5642 {
5643 if (field->skip) {
5644 continue;
5645 }
5646
5647 dst_field->hash = field->hash;
5648 dst_field->skip = 0;
5649 dst_field->name_length = field->name_length + prefix->length;
5650 dst_field->value_length = field->value_length;
5651
5652 if (field == r->content_length) {
5653 req->content_length_field = dst_field - req->fields;
5654
5655 } else if (field == r->content_type) {
5656 req->content_type_field = dst_field - req->fields;
5657
5658 } else if (field == r->cookie) {
5659 req->cookie_field = dst_field - req->fields;
5660
5661 } else if (field == r->authorization) {
5662 req->authorization_field = dst_field - req->fields;
5663 }
5664
5665 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5666 (int) field->hash, (int) field->skip,
5667 (int) field->name_length, field->name,
5668 (int) field->value_length, field->value);
5669
5670 if (prefix->length != 0) {
5671 nxt_unit_sptr_set(&dst_field->name, p);
5672 p = nxt_cpymem(p, prefix->start, prefix->length);
5673
5674 end = field->name + field->name_length;
5675 for (pos = field->name; pos < end; pos++) {
5676 c = *pos;
5677
5678 if (c >= 'a' && c <= 'z') {
5679 *p++ = (c & ~0x20);
5680 continue;
5681 }
5682
5683 if (c == '-') {
5684 *p++ = '_';
5685 continue;
5686 }
5687
5688 *p++ = c;
5689 }
5690
5691 } else {
5692 nxt_unit_sptr_set(&dst_field->name, p);
5693 p = nxt_cpymem(p, field->name, field->name_length);
5694 }
5695
5696 *p++ = '\0';
5697
5698 nxt_unit_sptr_set(&dst_field->value, p);
5699 p = nxt_cpymem(p, field->value, field->value_length);
5700
5701 if (prefix->length != 0) {
5702 dup_iter = iter;
5703
5704 for (dup = nxt_fields_next(&dup_iter);
5705 dup != NULL;
5706 dup = nxt_fields_next(&dup_iter))
5707 {
5708 if (dup->name_length != field->name_length
5709 || dup->skip
5710 || dup->hash != field->hash
5711 || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5712 {
5713 continue;
5714 }
5715
5716 p = nxt_cpymem(p, ", ", 2);
5717 p = nxt_cpymem(p, dup->value, dup->value_length);
5718
5719 dst_field->value_length += 2 + dup->value_length;
5720
5721 dup->skip = 1;
5722 }
5723 }
5724
5725 *p++ = '\0';
5726
5727 dst_field++;
5728 }
5729
5730 req->fields_count = (uint32_t) (dst_field - req->fields);
5731
5732 nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5733
5734 buf = out;
5735 tail = &buf->next;
5736
5737 for (b = r->body; b != NULL; b = b->next) {
5738 size = nxt_buf_mem_used_size(&b->mem);
5739 pos = b->mem.pos;
5740
5741 while (size > 0) {
5742 if (buf == NULL) {
5743 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5744
5745 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5746 if (nxt_slow_path(buf == NULL)) {
5747 while (out != NULL) {
5748 buf = out->next;
5749 out->next = NULL;
5750 out->completion_handler(task, out, out->parent);
5751 out = buf;
5752 }
5753 return NULL;
5754 }
5755
5756 *tail = buf;
5757 tail = &buf->next;
5758
5759 } else {
5760 free_size = nxt_buf_mem_free_size(&buf->mem);
5761 if (free_size < size
5762 && nxt_port_mmap_increase_buf(task, buf, size, 1)
5763 == NXT_OK)
5764 {
5765 free_size = nxt_buf_mem_free_size(&buf->mem);
5766 }
5767 }
5768
5769 if (free_size > 0) {
5770 copy_size = nxt_min(free_size, size);
5771
5772 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5773
5774 size -= copy_size;
5775 pos += copy_size;
5776
5777 if (size == 0) {
5778 break;
5779 }
5780 }
5781
5782 buf = NULL;
5783 }
5784 }
5785
5786 return out;
5787 }
5788
5789
5790 static void
nxt_router_app_timeout(nxt_task_t * task,void * obj,void * data)5791 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5792 {
5793 nxt_timer_t *timer;
5794 nxt_http_request_t *r;
5795 nxt_request_rpc_data_t *req_rpc_data;
5796
5797 timer = obj;
5798
5799 nxt_debug(task, "router app timeout");
5800
5801 r = nxt_timer_data(timer, nxt_http_request_t, timer);
5802 req_rpc_data = r->timer_data;
5803
5804 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5805
5806 nxt_request_rpc_data_unlink(task, req_rpc_data);
5807 }
5808
5809
5810 static void
nxt_router_http_request_release_post(nxt_task_t * task,nxt_http_request_t * r)5811 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5812 {
5813 r->timer.handler = nxt_router_http_request_release;
5814 nxt_timer_add(task->thread->engine, &r->timer, 0);
5815 }
5816
5817
5818 static void
nxt_router_http_request_release(nxt_task_t * task,void * obj,void * data)5819 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5820 {
5821 nxt_http_request_t *r;
5822
5823 nxt_debug(task, "http request pool release");
5824
5825 r = nxt_timer_data(obj, nxt_http_request_t, timer);
5826
5827 nxt_mp_release(r->mem_pool);
5828 }
5829
5830
5831 static void
nxt_router_oosm_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5832 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5833 {
5834 size_t mi;
5835 uint32_t i;
5836 nxt_bool_t ack;
5837 nxt_process_t *process;
5838 nxt_free_map_t *m;
5839 nxt_port_mmap_handler_t *mmap_handler;
5840
5841 nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5842
5843 process = nxt_runtime_process_find(task->thread->runtime,
5844 msg->port_msg.pid);
5845 if (nxt_slow_path(process == NULL)) {
5846 return;
5847 }
5848
5849 ack = 0;
5850
5851 /*
5852 * To mitigate possible racing condition (when OOSM message received
5853 * after some of the memory was already freed), need to try to find
5854 * first free segment in shared memory and send ACK if found.
5855 */
5856
5857 nxt_thread_mutex_lock(&process->incoming.mutex);
5858
5859 for (i = 0; i < process->incoming.size; i++) {
5860 mmap_handler = process->incoming.elts[i].mmap_handler;
5861
5862 if (nxt_slow_path(mmap_handler == NULL)) {
5863 continue;
5864 }
5865
5866 m = mmap_handler->hdr->free_map;
5867
5868 for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5869 if (m[mi] != 0) {
5870 ack = 1;
5871
5872 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5873 i, mi, m[mi]);
5874
5875 break;
5876 }
5877 }
5878 }
5879
5880 nxt_thread_mutex_unlock(&process->incoming.mutex);
5881
5882 if (ack) {
5883 nxt_process_broadcast_shm_ack(task, process);
5884 }
5885 }
5886
5887
5888 static void
nxt_router_get_mmap_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5889 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5890 {
5891 nxt_fd_t fd;
5892 nxt_port_t *port;
5893 nxt_runtime_t *rt;
5894 nxt_port_mmaps_t *mmaps;
5895 nxt_port_msg_get_mmap_t *get_mmap_msg;
5896 nxt_port_mmap_handler_t *mmap_handler;
5897
5898 rt = task->thread->runtime;
5899
5900 port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5901 msg->port_msg.reply_port);
5902 if (nxt_slow_path(port == NULL)) {
5903 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5904 msg->port_msg.pid, msg->port_msg.reply_port);
5905
5906 return;
5907 }
5908
5909 if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5910 < (int) sizeof(nxt_port_msg_get_mmap_t)))
5911 {
5912 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5913 (int) nxt_buf_used_size(msg->buf));
5914
5915 return;
5916 }
5917
5918 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5919
5920 nxt_assert(port->type == NXT_PROCESS_APP);
5921
5922 if (nxt_slow_path(port->app == NULL)) {
5923 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5924 port->pid, port->id);
5925
5926 // FIXME
5927 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5928 -1, msg->port_msg.stream, 0, NULL);
5929
5930 return;
5931 }
5932
5933 mmaps = &port->app->outgoing;
5934 nxt_thread_mutex_lock(&mmaps->mutex);
5935
5936 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5937 nxt_thread_mutex_unlock(&mmaps->mutex);
5938
5939 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5940 (int) get_mmap_msg->id);
5941
5942 // FIXME
5943 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5944 -1, msg->port_msg.stream, 0, NULL);
5945 return;
5946 }
5947
5948 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5949
5950 fd = mmap_handler->fd;
5951
5952 nxt_thread_mutex_unlock(&mmaps->mutex);
5953
5954 nxt_debug(task, "get mmap %PI:%d found",
5955 msg->port_msg.pid, (int) get_mmap_msg->id);
5956
5957 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5958 }
5959
5960
5961 static void
nxt_router_get_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5962 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5963 {
5964 nxt_port_t *port, *reply_port;
5965 nxt_runtime_t *rt;
5966 nxt_port_msg_get_port_t *get_port_msg;
5967
5968 rt = task->thread->runtime;
5969
5970 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5971 msg->port_msg.reply_port);
5972 if (nxt_slow_path(reply_port == NULL)) {
5973 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5974 msg->port_msg.pid, msg->port_msg.reply_port);
5975
5976 return;
5977 }
5978
5979 if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5980 < (int) sizeof(nxt_port_msg_get_port_t)))
5981 {
5982 nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5983 (int) nxt_buf_used_size(msg->buf));
5984
5985 return;
5986 }
5987
5988 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5989
5990 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5991 if (nxt_slow_path(port == NULL)) {
5992 nxt_alert(task, "get_port_handler: port %PI:%d not found",
5993 get_port_msg->pid, get_port_msg->id);
5994
5995 return;
5996 }
5997
5998 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5999 get_port_msg->id);
6000
6001 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
6002 }
6003