1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96     nxt_port_recv_msg_t *msg);
97 
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106 
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
112     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
113     nxt_conf_value_t *conf);
114 
115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
118     nxt_app_t *app);
119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
120     nxt_str_t *name);
121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
122     int i);
123 
124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
125     nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
127     nxt_port_t *port);
128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
129     nxt_port_t *port, nxt_fd_t fd);
130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
131     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
132 static void nxt_router_listen_socket_ready(nxt_task_t *task,
133     nxt_port_recv_msg_t *msg, void *data);
134 static void nxt_router_listen_socket_error(nxt_task_t *task,
135     nxt_port_recv_msg_t *msg, void *data);
136 #if (NXT_TLS)
137 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
138     nxt_port_recv_msg_t *msg, void *data);
139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
140     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
141     nxt_bool_t last);
142 #endif
143 static void nxt_router_app_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
145 static void nxt_router_app_prefork_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_app_prefork_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
150     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
152     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
153 
154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
155     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
156     const nxt_event_interface_t *interface);
157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
162     nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
164     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
165     nxt_work_handler_t handler);
166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
167     nxt_router_engine_conf_t *recf);
168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
169     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
170 
171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
172     nxt_router_temp_conf_t *tmcf);
173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
174     nxt_event_engine_t *engine);
175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
176     nxt_router_temp_conf_t *tmcf);
177 
178 static void nxt_router_engines_post(nxt_router_t *router,
179     nxt_router_temp_conf_t *tmcf);
180 static void nxt_router_engine_post(nxt_event_engine_t *engine,
181     nxt_work_t *jobs);
182 
183 static void nxt_router_thread_start(void *data);
184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
197     void *data);
198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
199     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
200 static void nxt_router_listen_socket_release(nxt_task_t *task,
201     nxt_socket_conf_t *skcf);
202 
203 static void nxt_router_access_log_writer(nxt_task_t *task,
204     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
206     struct tm *tm, size_t size, const char *format);
207 static void nxt_router_access_log_open(nxt_task_t *task,
208     nxt_router_temp_conf_t *tmcf);
209 static void nxt_router_access_log_ready(nxt_task_t *task,
210     nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_error(nxt_task_t *task,
212     nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
214     nxt_router_access_log_t *access_log);
215 static void nxt_router_access_log_release(nxt_task_t *task,
216     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
218     void *data);
219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
220     nxt_port_recv_msg_t *msg, void *data);
221 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
222     nxt_port_recv_msg_t *msg, void *data);
223 
224 static void nxt_router_app_port_ready(nxt_task_t *task,
225     nxt_port_recv_msg_t *msg, void *data);
226 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
227     nxt_port_t *app_port);
228 static void nxt_router_app_port_error(nxt_task_t *task,
229     nxt_port_recv_msg_t *msg, void *data);
230 
231 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
232 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
233 
234 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
235     nxt_port_t *port, nxt_apr_action_t action);
236 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
237     nxt_request_rpc_data_t *req_rpc_data);
238 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
239     void *data);
240 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
241     void *data);
242 
243 static void nxt_router_app_prepare_request(nxt_task_t *task,
244     nxt_request_rpc_data_t *req_rpc_data);
245 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
246     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
247 
248 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
249 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
254     void *data);
255 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
256 
257 static const nxt_http_request_state_t  nxt_http_request_send_state;
258 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
259 
260 static void nxt_router_app_joint_use(nxt_task_t *task,
261     nxt_app_joint_t *app_joint, int i);
262 
263 static void nxt_router_http_request_release_post(nxt_task_t *task,
264     nxt_http_request_t *r);
265 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
266     void *data);
267 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
268 static void nxt_router_get_port_handler(nxt_task_t *task,
269     nxt_port_recv_msg_t *msg);
270 static void nxt_router_get_mmap_handler(nxt_task_t *task,
271     nxt_port_recv_msg_t *msg);
272 
273 extern const nxt_http_request_state_t  nxt_http_websocket;
274 
275 static nxt_router_t  *nxt_router;
276 
277 static const nxt_str_t http_prefix = nxt_string("HTTP_");
278 static const nxt_str_t empty_prefix = nxt_string("");
279 
280 static const nxt_str_t  *nxt_app_msg_prefix[] = {
281     &empty_prefix,
282     &empty_prefix,
283     &http_prefix,
284     &http_prefix,
285     &http_prefix,
286     &empty_prefix,
287 };
288 
289 
290 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
291     .quit         = nxt_signal_quit_handler,
292     .new_port     = nxt_router_new_port_handler,
293     .get_port     = nxt_router_get_port_handler,
294     .change_file  = nxt_port_change_log_file_handler,
295     .mmap         = nxt_port_mmap_handler,
296     .get_mmap     = nxt_router_get_mmap_handler,
297     .data         = nxt_router_conf_data_handler,
298     .app_restart  = nxt_router_app_restart_handler,
299     .remove_pid   = nxt_router_remove_pid_handler,
300     .access_log   = nxt_router_access_log_reopen_handler,
301     .rpc_ready    = nxt_port_rpc_handler,
302     .rpc_error    = nxt_port_rpc_handler,
303     .oosm         = nxt_router_oosm_handler,
304 };
305 
306 
307 const nxt_process_init_t  nxt_router_process = {
308     .name           = "router",
309     .type           = NXT_PROCESS_ROUTER,
310     .prefork        = nxt_router_prefork,
311     .restart        = 1,
312     .setup          = nxt_process_core_setup,
313     .start          = nxt_router_start,
314     .port_handlers  = &nxt_router_process_port_handlers,
315     .signals        = nxt_process_signals,
316 };
317 
318 
319 /* Queues of nxt_socket_conf_t */
320 nxt_queue_t  creating_sockets;
321 nxt_queue_t  pending_sockets;
322 nxt_queue_t  updating_sockets;
323 nxt_queue_t  keeping_sockets;
324 nxt_queue_t  deleting_sockets;
325 
326 
327 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)328 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
329 {
330     nxt_runtime_stop_app_processes(task, task->thread->runtime);
331 
332     return NXT_OK;
333 }
334 
335 
336 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)337 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
338 {
339     nxt_int_t      ret;
340     nxt_port_t     *controller_port;
341     nxt_router_t   *router;
342     nxt_runtime_t  *rt;
343 
344     rt = task->thread->runtime;
345 
346     nxt_log(task, NXT_LOG_INFO, "router started");
347 
348 #if (NXT_TLS)
349     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
350     if (nxt_slow_path(rt->tls == NULL)) {
351         return NXT_ERROR;
352     }
353 
354     ret = rt->tls->library_init(task);
355     if (nxt_slow_path(ret != NXT_OK)) {
356         return ret;
357     }
358 #endif
359 
360     ret = nxt_http_init(task);
361     if (nxt_slow_path(ret != NXT_OK)) {
362         return ret;
363     }
364 
365     router = nxt_zalloc(sizeof(nxt_router_t));
366     if (nxt_slow_path(router == NULL)) {
367         return NXT_ERROR;
368     }
369 
370     nxt_queue_init(&router->engines);
371     nxt_queue_init(&router->sockets);
372     nxt_queue_init(&router->apps);
373 
374     nxt_router = router;
375 
376     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
377     if (controller_port != NULL) {
378         nxt_router_greet_controller(task, controller_port);
379     }
380 
381     return NXT_OK;
382 }
383 
384 
385 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)386 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
387 {
388     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
389                           -1, 0, 0, NULL);
390 }
391 
392 
393 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)394 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
395     void *data)
396 {
397     size_t               size;
398     uint32_t             stream;
399     nxt_int_t            ret;
400     nxt_app_t            *app;
401     nxt_buf_t            *b;
402     nxt_port_t           *dport;
403     nxt_runtime_t        *rt;
404     nxt_app_joint_rpc_t  *app_joint_rpc;
405 
406     app = data;
407 
408     nxt_thread_mutex_lock(&app->mutex);
409 
410     dport = app->proto_port;
411 
412     nxt_thread_mutex_unlock(&app->mutex);
413 
414     if (dport != NULL) {
415         nxt_debug(task, "app '%V' %p start process", &app->name, app);
416 
417         b = NULL;
418 
419     } else {
420         if (app->proto_port_requests > 0) {
421             nxt_debug(task, "app '%V' %p wait for prototype process",
422                       &app->name, app);
423 
424             app->proto_port_requests++;
425 
426             goto skip;
427         }
428 
429         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
430 
431         rt = task->thread->runtime;
432         dport = rt->port_by_type[NXT_PROCESS_MAIN];
433 
434         size = app->name.length + 1 + app->conf.length;
435 
436         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
437         if (nxt_slow_path(b == NULL)) {
438             goto failed;
439         }
440 
441         nxt_buf_cpystr(b, &app->name);
442         *b->mem.free++ = '\0';
443         nxt_buf_cpystr(b, &app->conf);
444     }
445 
446     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
447                                                      nxt_router_app_port_ready,
448                                                      nxt_router_app_port_error,
449                                                    sizeof(nxt_app_joint_rpc_t));
450     if (nxt_slow_path(app_joint_rpc == NULL)) {
451         goto failed;
452     }
453 
454     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
455 
456     ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS,
457                                 -1, stream, port->id, b);
458     if (nxt_slow_path(ret != NXT_OK)) {
459         nxt_port_rpc_cancel(task, port, stream);
460 
461         goto failed;
462     }
463 
464     app_joint_rpc->app_joint = app->joint;
465     app_joint_rpc->generation = app->generation;
466     app_joint_rpc->proto = (b != NULL);
467 
468     if (b != NULL) {
469         app->proto_port_requests++;
470 
471         b = NULL;
472     }
473 
474     nxt_router_app_joint_use(task, app->joint, 1);
475 
476 failed:
477 
478     if (b != NULL) {
479         nxt_mp_free(b->data, b);
480     }
481 
482 skip:
483 
484     nxt_router_app_use(task, app, -1);
485 }
486 
487 
488 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)489 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
490 {
491     app_joint->use_count += i;
492 
493     if (app_joint->use_count == 0) {
494         nxt_assert(app_joint->app == NULL);
495 
496         nxt_free(app_joint);
497     }
498 }
499 
500 
501 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)502 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
503 {
504     nxt_int_t      res;
505     nxt_port_t     *router_port;
506     nxt_runtime_t  *rt;
507 
508     nxt_debug(task, "app '%V' start process", &app->name);
509 
510     rt = task->thread->runtime;
511     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
512 
513     nxt_router_app_use(task, app, 1);
514 
515     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
516                         app);
517 
518     if (res == NXT_OK) {
519         return res;
520     }
521 
522     nxt_thread_mutex_lock(&app->mutex);
523 
524     app->pending_processes--;
525 
526     nxt_thread_mutex_unlock(&app->mutex);
527 
528     nxt_router_app_use(task, app, -1);
529 
530     return NXT_ERROR;
531 }
532 
533 
534 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)535 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
536 {
537     nxt_buf_t       *b, *next;
538     nxt_bool_t      cancelled;
539     nxt_port_t      *app_port;
540     nxt_msg_info_t  *msg_info;
541 
542     msg_info = &req_rpc_data->msg_info;
543 
544     if (msg_info->buf == NULL) {
545         return 0;
546     }
547 
548     app_port = req_rpc_data->app_port;
549 
550     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
551         cancelled = nxt_app_queue_cancel(app_port->queue,
552                                          msg_info->tracking_cookie,
553                                          req_rpc_data->stream);
554 
555         if (cancelled) {
556             nxt_debug(task, "stream #%uD: cancelled by router",
557                       req_rpc_data->stream);
558         }
559 
560     } else {
561         cancelled = 0;
562     }
563 
564     for (b = msg_info->buf; b != NULL; b = next) {
565         next = b->next;
566         b->next = NULL;
567 
568         if (b->is_port_mmap_sent) {
569             b->is_port_mmap_sent = cancelled == 0;
570         }
571 
572         b->completion_handler(task, b, b->parent);
573     }
574 
575     msg_info->buf = NULL;
576 
577     return cancelled;
578 }
579 
580 
581 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)582 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
583 {
584     if (lnk->next != NULL) {
585         nxt_queue_remove(lnk);
586 
587         lnk->next = NULL;
588 
589         return 1;
590     }
591 
592     return 0;
593 }
594 
595 
596 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)597 nxt_request_rpc_data_unlink(nxt_task_t *task,
598     nxt_request_rpc_data_t *req_rpc_data)
599 {
600     nxt_app_t           *app;
601     nxt_bool_t          unlinked;
602     nxt_http_request_t  *r;
603 
604     nxt_router_msg_cancel(task, req_rpc_data);
605 
606     app = req_rpc_data->app;
607 
608     if (req_rpc_data->app_port != NULL) {
609         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
610                                     req_rpc_data->apr_action);
611 
612         req_rpc_data->app_port = NULL;
613     }
614 
615     r = req_rpc_data->request;
616 
617     if (r != NULL) {
618         r->timer_data = NULL;
619 
620         nxt_router_http_request_release_post(task, r);
621 
622         r->req_rpc_data = NULL;
623         req_rpc_data->request = NULL;
624 
625         if (app != NULL) {
626             unlinked = 0;
627 
628             nxt_thread_mutex_lock(&app->mutex);
629 
630             if (r->app_link.next != NULL) {
631                 nxt_queue_remove(&r->app_link);
632                 r->app_link.next = NULL;
633 
634                 unlinked = 1;
635             }
636 
637             nxt_thread_mutex_unlock(&app->mutex);
638 
639             if (unlinked) {
640                 nxt_mp_release(r->mem_pool);
641             }
642         }
643     }
644 
645     if (app != NULL) {
646         nxt_router_app_use(task, app, -1);
647 
648         req_rpc_data->app = NULL;
649     }
650 
651     if (req_rpc_data->msg_info.body_fd != -1) {
652         nxt_fd_close(req_rpc_data->msg_info.body_fd);
653 
654         req_rpc_data->msg_info.body_fd = -1;
655     }
656 
657     if (req_rpc_data->rpc_cancel) {
658         req_rpc_data->rpc_cancel = 0;
659 
660         nxt_port_rpc_cancel(task, task->thread->engine->port,
661                             req_rpc_data->stream);
662     }
663 }
664 
665 
666 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)667 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
668 {
669     nxt_int_t      res;
670     nxt_app_t      *app;
671     nxt_port_t     *port, *main_app_port;
672     nxt_runtime_t  *rt;
673 
674     nxt_port_new_port_handler(task, msg);
675 
676     port = msg->u.new_port;
677 
678     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
679         nxt_router_greet_controller(task, msg->u.new_port);
680     }
681 
682     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
683         nxt_port_rpc_handler(task, msg);
684 
685         return;
686     }
687 
688     if (port == NULL || port->type != NXT_PROCESS_APP) {
689 
690         if (msg->port_msg.stream == 0) {
691             return;
692         }
693 
694         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
695 
696     } else {
697         if (msg->fd[1] != -1) {
698             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
699             if (nxt_slow_path(res != NXT_OK)) {
700                 return;
701             }
702 
703             nxt_fd_close(msg->fd[1]);
704             msg->fd[1] = -1;
705         }
706     }
707 
708     if (msg->port_msg.stream != 0) {
709         nxt_port_rpc_handler(task, msg);
710         return;
711     }
712 
713     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
714 
715     /*
716      * Port with "id == 0" is application 'main' port and it always
717      * should come with non-zero stream.
718      */
719     nxt_assert(port->id != 0);
720 
721     /* Find 'main' app port and get app reference. */
722     rt = task->thread->runtime;
723 
724     /*
725      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
726      * sent to main port (with id == 0) and processed in main thread.
727      */
728     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
729     nxt_assert(main_app_port != NULL);
730 
731     app = main_app_port->app;
732 
733     if (nxt_fast_path(app != NULL)) {
734         nxt_thread_mutex_lock(&app->mutex);
735 
736         /* TODO here should be find-and-add code because there can be
737            port waiters in port_hash */
738         nxt_port_hash_add(&app->port_hash, port);
739         app->port_hash_count++;
740 
741         nxt_thread_mutex_unlock(&app->mutex);
742 
743         port->app = app;
744     }
745 
746     port->main_app_port = main_app_port;
747 
748     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
749 }
750 
751 
752 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)753 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
754 {
755     void                    *p;
756     size_t                  size;
757     nxt_int_t               ret;
758     nxt_port_t              *port;
759     nxt_router_temp_conf_t  *tmcf;
760 
761     port = nxt_runtime_port_find(task->thread->runtime,
762                                  msg->port_msg.pid,
763                                  msg->port_msg.reply_port);
764     if (nxt_slow_path(port == NULL)) {
765         nxt_alert(task, "conf_data_handler: reply port not found");
766         return;
767     }
768 
769     p = MAP_FAILED;
770 
771     /*
772      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
773      * initialized in 'cleanup' section.
774      */
775     size = 0;
776 
777     tmcf = nxt_router_temp_conf(task);
778     if (nxt_slow_path(tmcf == NULL)) {
779         goto fail;
780     }
781 
782     if (nxt_slow_path(msg->fd[0] == -1)) {
783         nxt_alert(task, "conf_data_handler: invalid shm fd");
784         goto fail;
785     }
786 
787     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
788         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
789                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
790         goto fail;
791     }
792 
793     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
794 
795     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
796 
797     nxt_fd_close(msg->fd[0]);
798     msg->fd[0] = -1;
799 
800     if (nxt_slow_path(p == MAP_FAILED)) {
801         goto fail;
802     }
803 
804     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
805 
806     tmcf->router_conf->router = nxt_router;
807     tmcf->stream = msg->port_msg.stream;
808     tmcf->port = port;
809 
810     nxt_port_use(task, tmcf->port, 1);
811 
812     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
813 
814     if (nxt_fast_path(ret == NXT_OK)) {
815         nxt_router_conf_apply(task, tmcf, NULL);
816 
817     } else {
818         nxt_router_conf_error(task, tmcf);
819     }
820 
821     goto cleanup;
822 
823 fail:
824 
825     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
826                           msg->port_msg.stream, 0, NULL);
827 
828     if (tmcf != NULL) {
829         nxt_mp_release(tmcf->mem_pool);
830     }
831 
832 cleanup:
833 
834     if (p != MAP_FAILED) {
835         nxt_mem_munmap(p, size);
836     }
837 
838     if (msg->fd[0] != -1) {
839         nxt_fd_close(msg->fd[0]);
840         msg->fd[0] = -1;
841     }
842 }
843 
844 
845 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)846 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
847 {
848     nxt_app_t            *app;
849     nxt_int_t            ret;
850     nxt_str_t            app_name;
851     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
852     nxt_port_t           *proto_port;
853     nxt_port_msg_type_t  reply;
854 
855     reply_port = nxt_runtime_port_find(task->thread->runtime,
856                                        msg->port_msg.pid,
857                                        msg->port_msg.reply_port);
858     if (nxt_slow_path(reply_port == NULL)) {
859         nxt_alert(task, "app_restart_handler: reply port not found");
860         return;
861     }
862 
863     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
864     app_name.start = msg->buf->mem.pos;
865 
866     nxt_debug(task, "app_restart_handler: %V", &app_name);
867 
868     app = nxt_router_app_find(&nxt_router->apps, &app_name);
869 
870     if (nxt_fast_path(app != NULL)) {
871         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
872                                    NXT_PROCESS_APP);
873         if (nxt_slow_path(shared_port == NULL)) {
874             goto fail;
875         }
876 
877         ret = nxt_port_socket_init(task, shared_port, 0);
878         if (nxt_slow_path(ret != NXT_OK)) {
879             nxt_port_use(task, shared_port, -1);
880             goto fail;
881         }
882 
883         ret = nxt_router_app_queue_init(task, shared_port);
884         if (nxt_slow_path(ret != NXT_OK)) {
885             nxt_port_write_close(shared_port);
886             nxt_port_read_close(shared_port);
887             nxt_port_use(task, shared_port, -1);
888             goto fail;
889         }
890 
891         nxt_port_write_enable(task, shared_port);
892 
893         nxt_thread_mutex_lock(&app->mutex);
894 
895         proto_port = app->proto_port;
896 
897         if (proto_port != NULL) {
898             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
899                       proto_port->pid);
900 
901             app->proto_port = NULL;
902             proto_port->app = NULL;
903         }
904 
905         app->generation++;
906 
907         shared_port->app = app;
908 
909         old_shared_port = app->shared_port;
910         old_shared_port->app = NULL;
911 
912         app->shared_port = shared_port;
913 
914         nxt_thread_mutex_unlock(&app->mutex);
915 
916         nxt_port_close(task, old_shared_port);
917         nxt_port_use(task, old_shared_port, -1);
918 
919         if (proto_port != NULL) {
920             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
921                                          -1, 0, 0, NULL);
922 
923             nxt_port_close(task, proto_port);
924 
925             nxt_port_use(task, proto_port, -1);
926         }
927 
928         reply = NXT_PORT_MSG_RPC_READY_LAST;
929 
930     } else {
931 
932 fail:
933 
934         reply = NXT_PORT_MSG_RPC_ERROR;
935     }
936 
937     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
938                           0, NULL);
939 }
940 
941 
942 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)943 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
944     void *data)
945 {
946     union {
947         nxt_pid_t  removed_pid;
948         void       *data;
949     } u;
950 
951     u.data = data;
952 
953     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
954 }
955 
956 
957 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)958 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
959 {
960     nxt_event_engine_t  *engine;
961 
962     nxt_port_remove_pid_handler(task, msg);
963 
964     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
965     {
966         if (nxt_fast_path(engine->port != NULL)) {
967             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
968                           msg->u.data);
969         }
970     }
971     nxt_queue_loop;
972 
973     if (msg->port_msg.stream == 0) {
974         return;
975     }
976 
977     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
978 
979     nxt_port_rpc_handler(task, msg);
980 }
981 
982 
983 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)984 nxt_router_temp_conf(nxt_task_t *task)
985 {
986     nxt_mp_t                *mp, *tmp;
987     nxt_router_conf_t       *rtcf;
988     nxt_router_temp_conf_t  *tmcf;
989 
990     mp = nxt_mp_create(1024, 128, 256, 32);
991     if (nxt_slow_path(mp == NULL)) {
992         return NULL;
993     }
994 
995     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
996     if (nxt_slow_path(rtcf == NULL)) {
997         goto fail;
998     }
999 
1000     rtcf->mem_pool = mp;
1001 
1002     tmp = nxt_mp_create(1024, 128, 256, 32);
1003     if (nxt_slow_path(tmp == NULL)) {
1004         goto fail;
1005     }
1006 
1007     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1008     if (nxt_slow_path(tmcf == NULL)) {
1009         goto temp_fail;
1010     }
1011 
1012     tmcf->mem_pool = tmp;
1013     tmcf->router_conf = rtcf;
1014     tmcf->count = 1;
1015     tmcf->engine = task->thread->engine;
1016 
1017     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1018                                      sizeof(nxt_router_engine_conf_t));
1019     if (nxt_slow_path(tmcf->engines == NULL)) {
1020         goto temp_fail;
1021     }
1022 
1023     nxt_queue_init(&creating_sockets);
1024     nxt_queue_init(&pending_sockets);
1025     nxt_queue_init(&updating_sockets);
1026     nxt_queue_init(&keeping_sockets);
1027     nxt_queue_init(&deleting_sockets);
1028 
1029 #if (NXT_TLS)
1030     nxt_queue_init(&tmcf->tls);
1031 #endif
1032 
1033     nxt_queue_init(&tmcf->apps);
1034     nxt_queue_init(&tmcf->previous);
1035 
1036     return tmcf;
1037 
1038 temp_fail:
1039 
1040     nxt_mp_destroy(tmp);
1041 
1042 fail:
1043 
1044     nxt_mp_destroy(mp);
1045 
1046     return NULL;
1047 }
1048 
1049 
1050 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1051 nxt_router_app_can_start(nxt_app_t *app)
1052 {
1053     return app->processes + app->pending_processes < app->max_processes
1054             && app->pending_processes < app->max_pending_processes;
1055 }
1056 
1057 
1058 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1059 nxt_router_app_need_start(nxt_app_t *app)
1060 {
1061     return (app->active_requests
1062               > app->port_hash_count + app->pending_processes)
1063            || (app->spare_processes
1064                 > app->idle_processes + app->pending_processes);
1065 }
1066 
1067 
1068 static void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1069 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1070 {
1071     nxt_int_t                    ret;
1072     nxt_app_t                    *app;
1073     nxt_router_t                 *router;
1074     nxt_runtime_t                *rt;
1075     nxt_queue_link_t             *qlk;
1076     nxt_socket_conf_t            *skcf;
1077     nxt_router_conf_t            *rtcf;
1078     nxt_router_temp_conf_t       *tmcf;
1079     const nxt_event_interface_t  *interface;
1080 #if (NXT_TLS)
1081     nxt_router_tlssock_t         *tls;
1082 #endif
1083 
1084     tmcf = obj;
1085 
1086     qlk = nxt_queue_first(&pending_sockets);
1087 
1088     if (qlk != nxt_queue_tail(&pending_sockets)) {
1089         nxt_queue_remove(qlk);
1090         nxt_queue_insert_tail(&creating_sockets, qlk);
1091 
1092         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1093 
1094         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1095 
1096         return;
1097     }
1098 
1099 #if (NXT_TLS)
1100     qlk = nxt_queue_last(&tmcf->tls);
1101 
1102     if (qlk != nxt_queue_head(&tmcf->tls)) {
1103         nxt_queue_remove(qlk);
1104 
1105         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1106 
1107         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1108                            nxt_router_tls_rpc_handler, tls);
1109         return;
1110     }
1111 #endif
1112 
1113     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1114 
1115         if (nxt_router_app_need_start(app)) {
1116             nxt_router_app_rpc_create(task, tmcf, app);
1117             return;
1118         }
1119 
1120     } nxt_queue_loop;
1121 
1122     rtcf = tmcf->router_conf;
1123 
1124     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1125         nxt_router_access_log_open(task, tmcf);
1126         return;
1127     }
1128 
1129     rt = task->thread->runtime;
1130 
1131     interface = nxt_service_get(rt->services, "engine", NULL);
1132 
1133     router = rtcf->router;
1134 
1135     ret = nxt_router_engines_create(task, router, tmcf, interface);
1136     if (nxt_slow_path(ret != NXT_OK)) {
1137         goto fail;
1138     }
1139 
1140     ret = nxt_router_threads_create(task, rt, tmcf);
1141     if (nxt_slow_path(ret != NXT_OK)) {
1142         goto fail;
1143     }
1144 
1145     nxt_router_apps_sort(task, router, tmcf);
1146 
1147     nxt_router_apps_hash_use(task, rtcf, 1);
1148 
1149     nxt_router_engines_post(router, tmcf);
1150 
1151     nxt_queue_add(&router->sockets, &updating_sockets);
1152     nxt_queue_add(&router->sockets, &creating_sockets);
1153 
1154     if (router->access_log != rtcf->access_log) {
1155         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1156 
1157         nxt_router_access_log_release(task, &router->lock, router->access_log);
1158 
1159         router->access_log = rtcf->access_log;
1160     }
1161 
1162     nxt_router_conf_ready(task, tmcf);
1163 
1164     return;
1165 
1166 fail:
1167 
1168     nxt_router_conf_error(task, tmcf);
1169 
1170     return;
1171 }
1172 
1173 
1174 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1175 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1176 {
1177     nxt_joint_job_t  *job;
1178 
1179     job = obj;
1180 
1181     nxt_router_conf_ready(task, job->tmcf);
1182 }
1183 
1184 
1185 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1186 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1187 {
1188     uint32_t               count;
1189     nxt_router_conf_t      *rtcf;
1190     nxt_thread_spinlock_t  *lock;
1191 
1192     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1193 
1194     if (--tmcf->count > 0) {
1195         return;
1196     }
1197 
1198     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1199 
1200     rtcf = tmcf->router_conf;
1201 
1202     lock = &rtcf->router->lock;
1203 
1204     nxt_thread_spin_lock(lock);
1205 
1206     count = rtcf->count;
1207 
1208     nxt_thread_spin_unlock(lock);
1209 
1210     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1211 
1212     if (count == 0) {
1213         nxt_router_apps_hash_use(task, rtcf, -1);
1214 
1215         nxt_router_access_log_release(task, lock, rtcf->access_log);
1216 
1217         nxt_mp_destroy(rtcf->mem_pool);
1218     }
1219 
1220     nxt_mp_release(tmcf->mem_pool);
1221 }
1222 
1223 
1224 static void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1225 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1226 {
1227     nxt_app_t          *app;
1228     nxt_queue_t        new_socket_confs;
1229     nxt_socket_t       s;
1230     nxt_router_t       *router;
1231     nxt_queue_link_t   *qlk;
1232     nxt_socket_conf_t  *skcf;
1233     nxt_router_conf_t  *rtcf;
1234 
1235     nxt_alert(task, "failed to apply new conf");
1236 
1237     for (qlk = nxt_queue_first(&creating_sockets);
1238          qlk != nxt_queue_tail(&creating_sockets);
1239          qlk = nxt_queue_next(qlk))
1240     {
1241         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1242         s = skcf->listen->socket;
1243 
1244         if (s != -1) {
1245             nxt_socket_close(task, s);
1246         }
1247 
1248         nxt_free(skcf->listen);
1249     }
1250 
1251     nxt_queue_init(&new_socket_confs);
1252     nxt_queue_add(&new_socket_confs, &updating_sockets);
1253     nxt_queue_add(&new_socket_confs, &pending_sockets);
1254     nxt_queue_add(&new_socket_confs, &creating_sockets);
1255 
1256     rtcf = tmcf->router_conf;
1257 
1258     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1259 
1260         nxt_router_app_unlink(task, app);
1261 
1262     } nxt_queue_loop;
1263 
1264     router = rtcf->router;
1265 
1266     nxt_queue_add(&router->sockets, &keeping_sockets);
1267     nxt_queue_add(&router->sockets, &deleting_sockets);
1268 
1269     nxt_queue_add(&router->apps, &tmcf->previous);
1270 
1271     // TODO: new engines and threads
1272 
1273     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1274 
1275     nxt_mp_destroy(rtcf->mem_pool);
1276 
1277     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1278 
1279     nxt_mp_release(tmcf->mem_pool);
1280 }
1281 
1282 
1283 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1284 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1285     nxt_port_msg_type_t type)
1286 {
1287     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1288 
1289     nxt_port_use(task, tmcf->port, -1);
1290 
1291     tmcf->port = NULL;
1292 }
1293 
1294 
1295 static nxt_conf_map_t  nxt_router_conf[] = {
1296     {
1297         nxt_string("listeners_threads"),
1298         NXT_CONF_MAP_INT32,
1299         offsetof(nxt_router_conf_t, threads),
1300     },
1301 };
1302 
1303 
1304 static nxt_conf_map_t  nxt_router_app_conf[] = {
1305     {
1306         nxt_string("type"),
1307         NXT_CONF_MAP_STR,
1308         offsetof(nxt_router_app_conf_t, type),
1309     },
1310 
1311     {
1312         nxt_string("limits"),
1313         NXT_CONF_MAP_PTR,
1314         offsetof(nxt_router_app_conf_t, limits_value),
1315     },
1316 
1317     {
1318         nxt_string("processes"),
1319         NXT_CONF_MAP_INT32,
1320         offsetof(nxt_router_app_conf_t, processes),
1321     },
1322 
1323     {
1324         nxt_string("processes"),
1325         NXT_CONF_MAP_PTR,
1326         offsetof(nxt_router_app_conf_t, processes_value),
1327     },
1328 
1329     {
1330         nxt_string("targets"),
1331         NXT_CONF_MAP_PTR,
1332         offsetof(nxt_router_app_conf_t, targets_value),
1333     },
1334 };
1335 
1336 
1337 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1338     {
1339         nxt_string("timeout"),
1340         NXT_CONF_MAP_MSEC,
1341         offsetof(nxt_router_app_conf_t, timeout),
1342     },
1343 };
1344 
1345 
1346 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1347     {
1348         nxt_string("spare"),
1349         NXT_CONF_MAP_INT32,
1350         offsetof(nxt_router_app_conf_t, spare_processes),
1351     },
1352 
1353     {
1354         nxt_string("max"),
1355         NXT_CONF_MAP_INT32,
1356         offsetof(nxt_router_app_conf_t, max_processes),
1357     },
1358 
1359     {
1360         nxt_string("idle_timeout"),
1361         NXT_CONF_MAP_MSEC,
1362         offsetof(nxt_router_app_conf_t, idle_timeout),
1363     },
1364 };
1365 
1366 
1367 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1368     {
1369         nxt_string("pass"),
1370         NXT_CONF_MAP_STR_COPY,
1371         offsetof(nxt_router_listener_conf_t, pass),
1372     },
1373 
1374     {
1375         nxt_string("application"),
1376         NXT_CONF_MAP_STR_COPY,
1377         offsetof(nxt_router_listener_conf_t, application),
1378     },
1379 };
1380 
1381 
1382 static nxt_conf_map_t  nxt_router_http_conf[] = {
1383     {
1384         nxt_string("header_buffer_size"),
1385         NXT_CONF_MAP_SIZE,
1386         offsetof(nxt_socket_conf_t, header_buffer_size),
1387     },
1388 
1389     {
1390         nxt_string("large_header_buffer_size"),
1391         NXT_CONF_MAP_SIZE,
1392         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1393     },
1394 
1395     {
1396         nxt_string("large_header_buffers"),
1397         NXT_CONF_MAP_SIZE,
1398         offsetof(nxt_socket_conf_t, large_header_buffers),
1399     },
1400 
1401     {
1402         nxt_string("body_buffer_size"),
1403         NXT_CONF_MAP_SIZE,
1404         offsetof(nxt_socket_conf_t, body_buffer_size),
1405     },
1406 
1407     {
1408         nxt_string("max_body_size"),
1409         NXT_CONF_MAP_SIZE,
1410         offsetof(nxt_socket_conf_t, max_body_size),
1411     },
1412 
1413     {
1414         nxt_string("idle_timeout"),
1415         NXT_CONF_MAP_MSEC,
1416         offsetof(nxt_socket_conf_t, idle_timeout),
1417     },
1418 
1419     {
1420         nxt_string("header_read_timeout"),
1421         NXT_CONF_MAP_MSEC,
1422         offsetof(nxt_socket_conf_t, header_read_timeout),
1423     },
1424 
1425     {
1426         nxt_string("body_read_timeout"),
1427         NXT_CONF_MAP_MSEC,
1428         offsetof(nxt_socket_conf_t, body_read_timeout),
1429     },
1430 
1431     {
1432         nxt_string("send_timeout"),
1433         NXT_CONF_MAP_MSEC,
1434         offsetof(nxt_socket_conf_t, send_timeout),
1435     },
1436 
1437     {
1438         nxt_string("body_temp_path"),
1439         NXT_CONF_MAP_STR,
1440         offsetof(nxt_socket_conf_t, body_temp_path),
1441     },
1442 
1443     {
1444         nxt_string("discard_unsafe_fields"),
1445         NXT_CONF_MAP_INT8,
1446         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1447     },
1448 };
1449 
1450 
1451 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1452     {
1453         nxt_string("max_frame_size"),
1454         NXT_CONF_MAP_SIZE,
1455         offsetof(nxt_websocket_conf_t, max_frame_size),
1456     },
1457 
1458     {
1459         nxt_string("read_timeout"),
1460         NXT_CONF_MAP_MSEC,
1461         offsetof(nxt_websocket_conf_t, read_timeout),
1462     },
1463 
1464     {
1465         nxt_string("keepalive_interval"),
1466         NXT_CONF_MAP_MSEC,
1467         offsetof(nxt_websocket_conf_t, keepalive_interval),
1468     },
1469 
1470 };
1471 
1472 
1473 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1474 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1475     u_char *start, u_char *end)
1476 {
1477     u_char                      *p;
1478     size_t                      size;
1479     nxt_mp_t                    *mp, *app_mp;
1480     uint32_t                    next, next_target;
1481     nxt_int_t                   ret;
1482     nxt_str_t                   name, path, target;
1483     nxt_app_t                   *app, *prev;
1484     nxt_str_t                   *t, *s, *targets;
1485     nxt_uint_t                  n, i;
1486     nxt_port_t                  *port;
1487     nxt_router_t                *router;
1488     nxt_app_joint_t             *app_joint;
1489 #if (NXT_TLS)
1490     nxt_tls_init_t              *tls_init;
1491     nxt_conf_value_t            *certificate;
1492 #endif
1493     nxt_conf_value_t            *conf, *http, *value, *websocket;
1494     nxt_conf_value_t            *applications, *application;
1495     nxt_conf_value_t            *listeners, *listener;
1496     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1497     nxt_socket_conf_t           *skcf;
1498     nxt_http_routes_t           *routes;
1499     nxt_event_engine_t          *engine;
1500     nxt_app_lang_module_t       *lang;
1501     nxt_router_app_conf_t       apcf;
1502     nxt_router_access_log_t     *access_log;
1503     nxt_router_listener_conf_t  lscf;
1504 
1505     static nxt_str_t  http_path = nxt_string("/settings/http");
1506     static nxt_str_t  applications_path = nxt_string("/applications");
1507     static nxt_str_t  listeners_path = nxt_string("/listeners");
1508     static nxt_str_t  routes_path = nxt_string("/routes");
1509     static nxt_str_t  access_log_path = nxt_string("/access_log");
1510 #if (NXT_TLS)
1511     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1512     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1513     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1514     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1515     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1516 #endif
1517     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1518     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1519     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1520 
1521     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1522     if (conf == NULL) {
1523         nxt_alert(task, "configuration parsing error");
1524         return NXT_ERROR;
1525     }
1526 
1527     mp = tmcf->router_conf->mem_pool;
1528 
1529     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1530                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1531     if (ret != NXT_OK) {
1532         nxt_alert(task, "root map error");
1533         return NXT_ERROR;
1534     }
1535 
1536     if (tmcf->router_conf->threads == 0) {
1537         tmcf->router_conf->threads = nxt_ncpu;
1538     }
1539 
1540     static_conf = nxt_conf_get_path(conf, &static_path);
1541 
1542     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1543     if (nxt_slow_path(ret != NXT_OK)) {
1544         return NXT_ERROR;
1545     }
1546 
1547     router = tmcf->router_conf->router;
1548 
1549     applications = nxt_conf_get_path(conf, &applications_path);
1550 
1551     if (applications != NULL) {
1552         next = 0;
1553 
1554         for ( ;; ) {
1555             application = nxt_conf_next_object_member(applications,
1556                                                       &name, &next);
1557             if (application == NULL) {
1558                 break;
1559             }
1560 
1561             nxt_debug(task, "application \"%V\"", &name);
1562 
1563             size = nxt_conf_json_length(application, NULL);
1564 
1565             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1566             if (nxt_slow_path(app_mp == NULL)) {
1567                 goto fail;
1568             }
1569 
1570             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1571             if (app == NULL) {
1572                 goto app_fail;
1573             }
1574 
1575             nxt_memzero(app, sizeof(nxt_app_t));
1576 
1577             app->mem_pool = app_mp;
1578 
1579             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1580             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1581                                                   + name.length);
1582 
1583             p = nxt_conf_json_print(app->conf.start, application, NULL);
1584             app->conf.length = p - app->conf.start;
1585 
1586             nxt_assert(app->conf.length <= size);
1587 
1588             nxt_debug(task, "application conf \"%V\"", &app->conf);
1589 
1590             prev = nxt_router_app_find(&router->apps, &name);
1591 
1592             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1593                 nxt_mp_destroy(app_mp);
1594 
1595                 nxt_queue_remove(&prev->link);
1596                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1597 
1598                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1599                 if (nxt_slow_path(ret != NXT_OK)) {
1600                     goto fail;
1601                 }
1602 
1603                 continue;
1604             }
1605 
1606             apcf.processes = 1;
1607             apcf.max_processes = 1;
1608             apcf.spare_processes = 0;
1609             apcf.timeout = 0;
1610             apcf.idle_timeout = 15000;
1611             apcf.limits_value = NULL;
1612             apcf.processes_value = NULL;
1613             apcf.targets_value = NULL;
1614 
1615             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1616             if (nxt_slow_path(app_joint == NULL)) {
1617                 goto app_fail;
1618             }
1619 
1620             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1621 
1622             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1623                                       nxt_nitems(nxt_router_app_conf), &apcf);
1624             if (ret != NXT_OK) {
1625                 nxt_alert(task, "application map error");
1626                 goto app_fail;
1627             }
1628 
1629             if (apcf.limits_value != NULL) {
1630 
1631                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1632                     nxt_alert(task, "application limits is not object");
1633                     goto app_fail;
1634                 }
1635 
1636                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1637                                         nxt_router_app_limits_conf,
1638                                         nxt_nitems(nxt_router_app_limits_conf),
1639                                         &apcf);
1640                 if (ret != NXT_OK) {
1641                     nxt_alert(task, "application limits map error");
1642                     goto app_fail;
1643                 }
1644             }
1645 
1646             if (apcf.processes_value != NULL
1647                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1648             {
1649                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1650                                      nxt_router_app_processes_conf,
1651                                      nxt_nitems(nxt_router_app_processes_conf),
1652                                      &apcf);
1653                 if (ret != NXT_OK) {
1654                     nxt_alert(task, "application processes map error");
1655                     goto app_fail;
1656                 }
1657 
1658             } else {
1659                 apcf.max_processes = apcf.processes;
1660                 apcf.spare_processes = apcf.processes;
1661             }
1662 
1663             if (apcf.targets_value != NULL) {
1664                 n = nxt_conf_object_members_count(apcf.targets_value);
1665 
1666                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1667                 if (nxt_slow_path(targets == NULL)) {
1668                     goto app_fail;
1669                 }
1670 
1671                 next_target = 0;
1672 
1673                 for (i = 0; i < n; i++) {
1674                     (void) nxt_conf_next_object_member(apcf.targets_value,
1675                                                        &target, &next_target);
1676 
1677                     s = nxt_str_dup(app_mp, &targets[i], &target);
1678                     if (nxt_slow_path(s == NULL)) {
1679                         goto app_fail;
1680                     }
1681                 }
1682 
1683             } else {
1684                 targets = NULL;
1685             }
1686 
1687             nxt_debug(task, "application type: %V", &apcf.type);
1688             nxt_debug(task, "application processes: %D", apcf.processes);
1689             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1690 
1691             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1692 
1693             if (lang == NULL) {
1694                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1695                 goto app_fail;
1696             }
1697 
1698             nxt_debug(task, "application language module: \"%s\"", lang->file);
1699 
1700             ret = nxt_thread_mutex_create(&app->mutex);
1701             if (ret != NXT_OK) {
1702                 goto app_fail;
1703             }
1704 
1705             nxt_queue_init(&app->ports);
1706             nxt_queue_init(&app->spare_ports);
1707             nxt_queue_init(&app->idle_ports);
1708             nxt_queue_init(&app->ack_waiting_req);
1709 
1710             app->name.length = name.length;
1711             nxt_memcpy(app->name.start, name.start, name.length);
1712 
1713             app->type = lang->type;
1714             app->max_processes = apcf.max_processes;
1715             app->spare_processes = apcf.spare_processes;
1716             app->max_pending_processes = apcf.spare_processes
1717                                          ? apcf.spare_processes : 1;
1718             app->timeout = apcf.timeout;
1719             app->idle_timeout = apcf.idle_timeout;
1720 
1721             app->targets = targets;
1722 
1723             engine = task->thread->engine;
1724 
1725             app->engine = engine;
1726 
1727             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1728             app->adjust_idle_work.task = &engine->task;
1729             app->adjust_idle_work.obj = app;
1730 
1731             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1732 
1733             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1734             if (nxt_slow_path(ret != NXT_OK)) {
1735                 goto app_fail;
1736             }
1737 
1738             nxt_router_app_use(task, app, 1);
1739 
1740             app->joint = app_joint;
1741 
1742             app_joint->use_count = 1;
1743             app_joint->app = app;
1744 
1745             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1746             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1747             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1748             app_joint->idle_timer.task = &engine->task;
1749             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1750 
1751             app_joint->free_app_work.handler = nxt_router_free_app;
1752             app_joint->free_app_work.task = &engine->task;
1753             app_joint->free_app_work.obj = app_joint;
1754 
1755             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1756                                 NXT_PROCESS_APP);
1757             if (nxt_slow_path(port == NULL)) {
1758                 return NXT_ERROR;
1759             }
1760 
1761             ret = nxt_port_socket_init(task, port, 0);
1762             if (nxt_slow_path(ret != NXT_OK)) {
1763                 nxt_port_use(task, port, -1);
1764                 return NXT_ERROR;
1765             }
1766 
1767             ret = nxt_router_app_queue_init(task, port);
1768             if (nxt_slow_path(ret != NXT_OK)) {
1769                 nxt_port_write_close(port);
1770                 nxt_port_read_close(port);
1771                 nxt_port_use(task, port, -1);
1772                 return NXT_ERROR;
1773             }
1774 
1775             nxt_port_write_enable(task, port);
1776             port->app = app;
1777 
1778             app->shared_port = port;
1779 
1780             nxt_thread_mutex_create(&app->outgoing.mutex);
1781         }
1782     }
1783 
1784     routes_conf = nxt_conf_get_path(conf, &routes_path);
1785     if (nxt_fast_path(routes_conf != NULL)) {
1786         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1787         if (nxt_slow_path(routes == NULL)) {
1788             return NXT_ERROR;
1789         }
1790         tmcf->router_conf->routes = routes;
1791     }
1792 
1793     ret = nxt_upstreams_create(task, tmcf, conf);
1794     if (nxt_slow_path(ret != NXT_OK)) {
1795         return ret;
1796     }
1797 
1798     http = nxt_conf_get_path(conf, &http_path);
1799 #if 0
1800     if (http == NULL) {
1801         nxt_alert(task, "no \"http\" block");
1802         return NXT_ERROR;
1803     }
1804 #endif
1805 
1806     websocket = nxt_conf_get_path(conf, &websocket_path);
1807 
1808     listeners = nxt_conf_get_path(conf, &listeners_path);
1809 
1810     if (listeners != NULL) {
1811         next = 0;
1812 
1813         for ( ;; ) {
1814             listener = nxt_conf_next_object_member(listeners, &name, &next);
1815             if (listener == NULL) {
1816                 break;
1817             }
1818 
1819             skcf = nxt_router_socket_conf(task, tmcf, &name);
1820             if (skcf == NULL) {
1821                 goto fail;
1822             }
1823 
1824             nxt_memzero(&lscf, sizeof(lscf));
1825 
1826             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1827                                       nxt_nitems(nxt_router_listener_conf),
1828                                       &lscf);
1829             if (ret != NXT_OK) {
1830                 nxt_alert(task, "listener map error");
1831                 goto fail;
1832             }
1833 
1834             nxt_debug(task, "application: %V", &lscf.application);
1835 
1836             // STUB, default values if http block is not defined.
1837             skcf->header_buffer_size = 2048;
1838             skcf->large_header_buffer_size = 8192;
1839             skcf->large_header_buffers = 4;
1840             skcf->discard_unsafe_fields = 1;
1841             skcf->body_buffer_size = 16 * 1024;
1842             skcf->max_body_size = 8 * 1024 * 1024;
1843             skcf->proxy_header_buffer_size = 64 * 1024;
1844             skcf->proxy_buffer_size = 4096;
1845             skcf->proxy_buffers = 256;
1846             skcf->idle_timeout = 180 * 1000;
1847             skcf->header_read_timeout = 30 * 1000;
1848             skcf->body_read_timeout = 30 * 1000;
1849             skcf->send_timeout = 30 * 1000;
1850             skcf->proxy_timeout = 60 * 1000;
1851             skcf->proxy_send_timeout = 30 * 1000;
1852             skcf->proxy_read_timeout = 30 * 1000;
1853 
1854             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1855             skcf->websocket_conf.read_timeout = 60 * 1000;
1856             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1857 
1858             nxt_str_null(&skcf->body_temp_path);
1859 
1860             if (http != NULL) {
1861                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1862                                           nxt_nitems(nxt_router_http_conf),
1863                                           skcf);
1864                 if (ret != NXT_OK) {
1865                     nxt_alert(task, "http map error");
1866                     goto fail;
1867                 }
1868             }
1869 
1870             if (websocket != NULL) {
1871                 ret = nxt_conf_map_object(mp, websocket,
1872                                           nxt_router_websocket_conf,
1873                                           nxt_nitems(nxt_router_websocket_conf),
1874                                           &skcf->websocket_conf);
1875                 if (ret != NXT_OK) {
1876                     nxt_alert(task, "websocket map error");
1877                     goto fail;
1878                 }
1879             }
1880 
1881             t = &skcf->body_temp_path;
1882 
1883             if (t->length == 0) {
1884                 t->start = (u_char *) task->thread->runtime->tmp;
1885                 t->length = nxt_strlen(t->start);
1886             }
1887 
1888             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1889             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1890                                                     client_ip_conf);
1891             if (nxt_slow_path(ret != NXT_OK)) {
1892                 return NXT_ERROR;
1893             }
1894 
1895 #if (NXT_TLS)
1896             certificate = nxt_conf_get_path(listener, &certificate_path);
1897 
1898             if (certificate != NULL) {
1899                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1900                 if (nxt_slow_path(tls_init == NULL)) {
1901                     return NXT_ERROR;
1902                 }
1903 
1904                 tls_init->cache_size = 0;
1905                 tls_init->timeout = 300;
1906 
1907                 value = nxt_conf_get_path(listener, &conf_cache_path);
1908                 if (value != NULL) {
1909                     tls_init->cache_size = nxt_conf_get_number(value);
1910                 }
1911 
1912                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1913                 if (value != NULL) {
1914                     tls_init->timeout = nxt_conf_get_number(value);
1915                 }
1916 
1917                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1918                                                         &conf_commands_path);
1919 
1920                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1921                                                            &conf_tickets);
1922 
1923                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1924                     n = nxt_conf_array_elements_count(certificate);
1925 
1926                     for (i = 0; i < n; i++) {
1927                         value = nxt_conf_get_array_element(certificate, i);
1928 
1929                         nxt_assert(value != NULL);
1930 
1931                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1932                                                          tls_init, i == 0);
1933                         if (nxt_slow_path(ret != NXT_OK)) {
1934                             goto fail;
1935                         }
1936                     }
1937 
1938                 } else {
1939                     /* NXT_CONF_STRING */
1940                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1941                                                      tls_init, 1);
1942                     if (nxt_slow_path(ret != NXT_OK)) {
1943                         goto fail;
1944                     }
1945                 }
1946             }
1947 #endif
1948 
1949             skcf->listen->handler = nxt_http_conn_init;
1950             skcf->router_conf = tmcf->router_conf;
1951             skcf->router_conf->count++;
1952 
1953             if (lscf.pass.length != 0) {
1954                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1955 
1956             /* COMPATIBILITY: listener application. */
1957             } else if (lscf.application.length > 0) {
1958                 skcf->action = nxt_http_pass_application(task,
1959                                                          tmcf->router_conf,
1960                                                          &lscf.application);
1961             }
1962 
1963             if (nxt_slow_path(skcf->action == NULL)) {
1964                 goto fail;
1965             }
1966         }
1967     }
1968 
1969     ret = nxt_http_routes_resolve(task, tmcf);
1970     if (nxt_slow_path(ret != NXT_OK)) {
1971         goto fail;
1972     }
1973 
1974     value = nxt_conf_get_path(conf, &access_log_path);
1975 
1976     if (value != NULL) {
1977         nxt_conf_get_string(value, &path);
1978 
1979         access_log = router->access_log;
1980 
1981         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1982             nxt_router_access_log_use(&router->lock, access_log);
1983 
1984         } else {
1985             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1986                                     + path.length);
1987             if (access_log == NULL) {
1988                 nxt_alert(task, "failed to allocate access log structure");
1989                 goto fail;
1990             }
1991 
1992             access_log->fd = -1;
1993             access_log->handler = &nxt_router_access_log_writer;
1994             access_log->count = 1;
1995 
1996             access_log->path.length = path.length;
1997             access_log->path.start = (u_char *) access_log
1998                                      + sizeof(nxt_router_access_log_t);
1999 
2000             nxt_memcpy(access_log->path.start, path.start, path.length);
2001         }
2002 
2003         tmcf->router_conf->access_log = access_log;
2004     }
2005 
2006     nxt_queue_add(&deleting_sockets, &router->sockets);
2007     nxt_queue_init(&router->sockets);
2008 
2009     return NXT_OK;
2010 
2011 app_fail:
2012 
2013     nxt_mp_destroy(app_mp);
2014 
2015 fail:
2016 
2017     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2018 
2019         nxt_queue_remove(&app->link);
2020         nxt_thread_mutex_destroy(&app->mutex);
2021         nxt_mp_destroy(app->mem_pool);
2022 
2023     } nxt_queue_loop;
2024 
2025     return NXT_ERROR;
2026 }
2027 
2028 
2029 #if (NXT_TLS)
2030 
2031 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2032 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2033     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2034     nxt_tls_init_t *tls_init, nxt_bool_t last)
2035 {
2036     nxt_router_tlssock_t  *tls;
2037 
2038     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2039     if (nxt_slow_path(tls == NULL)) {
2040         return NXT_ERROR;
2041     }
2042 
2043     tls->tls_init = tls_init;
2044     tls->socket_conf = skcf;
2045     tls->temp_conf = tmcf;
2046     tls->last = last;
2047     nxt_conf_get_string(value, &tls->name);
2048 
2049     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2050 
2051     return NXT_OK;
2052 }
2053 
2054 #endif
2055 
2056 
2057 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2058 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2059     nxt_conf_value_t *conf)
2060 {
2061     uint32_t          next, i;
2062     nxt_mp_t          *mp;
2063     nxt_str_t         *type, exten, str;
2064     nxt_int_t         ret;
2065     nxt_uint_t        exts;
2066     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2067 
2068     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2069 
2070     mp = rtcf->mem_pool;
2071 
2072     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2073     if (nxt_slow_path(ret != NXT_OK)) {
2074         return NXT_ERROR;
2075     }
2076 
2077     if (conf == NULL) {
2078         return NXT_OK;
2079     }
2080 
2081     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2082 
2083     if (mtypes_conf != NULL) {
2084         next = 0;
2085 
2086         for ( ;; ) {
2087             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2088 
2089             if (ext_conf == NULL) {
2090                 break;
2091             }
2092 
2093             type = nxt_str_dup(mp, NULL, &str);
2094             if (nxt_slow_path(type == NULL)) {
2095                 return NXT_ERROR;
2096             }
2097 
2098             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2099                 nxt_conf_get_string(ext_conf, &str);
2100 
2101                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2102                     return NXT_ERROR;
2103                 }
2104 
2105                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2106                                                       &exten, type);
2107                 if (nxt_slow_path(ret != NXT_OK)) {
2108                     return NXT_ERROR;
2109                 }
2110 
2111                 continue;
2112             }
2113 
2114             exts = nxt_conf_array_elements_count(ext_conf);
2115 
2116             for (i = 0; i < exts; i++) {
2117                 value = nxt_conf_get_array_element(ext_conf, i);
2118 
2119                 nxt_conf_get_string(value, &str);
2120 
2121                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2122                     return NXT_ERROR;
2123                 }
2124 
2125                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2126                                                       &exten, type);
2127                 if (nxt_slow_path(ret != NXT_OK)) {
2128                     return NXT_ERROR;
2129                 }
2130             }
2131         }
2132     }
2133 
2134     return NXT_OK;
2135 }
2136 
2137 
2138 static nxt_int_t
nxt_router_conf_process_client_ip(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf,nxt_conf_value_t * conf)2139 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2140     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2141 {
2142     char                        c;
2143     size_t                      i;
2144     nxt_mp_t                    *mp;
2145     uint32_t                    hash;
2146     nxt_str_t                   header;
2147     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2148     nxt_http_client_ip_t        *client_ip;
2149     nxt_http_route_addr_rule_t  *source;
2150 
2151     static nxt_str_t  header_path = nxt_string("/header");
2152     static nxt_str_t  source_path = nxt_string("/source");
2153     static nxt_str_t  recursive_path = nxt_string("/recursive");
2154 
2155     if (conf == NULL) {
2156         skcf->client_ip = NULL;
2157 
2158         return NXT_OK;
2159     }
2160 
2161     mp = tmcf->router_conf->mem_pool;
2162 
2163     source_conf = nxt_conf_get_path(conf, &source_path);
2164     header_conf = nxt_conf_get_path(conf, &header_path);
2165     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2166 
2167     if (source_conf == NULL || header_conf == NULL) {
2168         return NXT_ERROR;
2169     }
2170 
2171     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2172     if (nxt_slow_path(client_ip == NULL)) {
2173         return NXT_ERROR;
2174     }
2175 
2176     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2177     if (nxt_slow_path(source == NULL)) {
2178         return NXT_ERROR;
2179     }
2180 
2181     client_ip->source = source;
2182 
2183     nxt_conf_get_string(header_conf, &header);
2184 
2185     if (recursive_conf != NULL) {
2186         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2187     }
2188 
2189     client_ip->header = nxt_str_dup(mp, NULL, &header);
2190     if (nxt_slow_path(client_ip->header == NULL)) {
2191         return NXT_ERROR;
2192     }
2193 
2194     hash = NXT_HTTP_FIELD_HASH_INIT;
2195 
2196     for (i = 0; i < client_ip->header->length; i++) {
2197         c = client_ip->header->start[i];
2198         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2199     }
2200 
2201     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2202 
2203     client_ip->header_hash = hash;
2204 
2205     skcf->client_ip = client_ip;
2206 
2207     return NXT_OK;
2208 }
2209 
2210 
2211 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2212 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2213 {
2214     nxt_app_t  *app;
2215 
2216     nxt_queue_each(app, queue, nxt_app_t, link) {
2217 
2218         if (nxt_strstr_eq(name, &app->name)) {
2219             return app;
2220         }
2221 
2222     } nxt_queue_loop;
2223 
2224     return NULL;
2225 }
2226 
2227 
2228 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2229 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2230 {
2231     void       *mem;
2232     nxt_int_t  fd;
2233 
2234     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2235     if (nxt_slow_path(fd == -1)) {
2236         return NXT_ERROR;
2237     }
2238 
2239     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2240                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2241     if (nxt_slow_path(mem == MAP_FAILED)) {
2242         nxt_fd_close(fd);
2243 
2244         return NXT_ERROR;
2245     }
2246 
2247     nxt_app_queue_init(mem);
2248 
2249     port->queue_fd = fd;
2250     port->queue = mem;
2251 
2252     return NXT_OK;
2253 }
2254 
2255 
2256 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2257 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2258 {
2259     void       *mem;
2260     nxt_int_t  fd;
2261 
2262     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2263     if (nxt_slow_path(fd == -1)) {
2264         return NXT_ERROR;
2265     }
2266 
2267     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2268                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2269     if (nxt_slow_path(mem == MAP_FAILED)) {
2270         nxt_fd_close(fd);
2271 
2272         return NXT_ERROR;
2273     }
2274 
2275     nxt_port_queue_init(mem);
2276 
2277     port->queue_fd = fd;
2278     port->queue = mem;
2279 
2280     return NXT_OK;
2281 }
2282 
2283 
2284 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2285 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2286 {
2287     void  *mem;
2288 
2289     nxt_assert(fd != -1);
2290 
2291     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2292                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2293     if (nxt_slow_path(mem == MAP_FAILED)) {
2294 
2295         return NXT_ERROR;
2296     }
2297 
2298     port->queue = mem;
2299 
2300     return NXT_OK;
2301 }
2302 
2303 
2304 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2305     NXT_LVLHSH_DEFAULT,
2306     nxt_router_apps_hash_test,
2307     nxt_mp_lvlhsh_alloc,
2308     nxt_mp_lvlhsh_free,
2309 };
2310 
2311 
2312 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2313 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2314 {
2315     nxt_app_t  *app;
2316 
2317     app = data;
2318 
2319     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2320 }
2321 
2322 
2323 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2324 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2325 {
2326     nxt_lvlhsh_query_t  lhq;
2327 
2328     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2329     lhq.replace = 0;
2330     lhq.key = app->name;
2331     lhq.value = app;
2332     lhq.proto = &nxt_router_apps_hash_proto;
2333     lhq.pool = rtcf->mem_pool;
2334 
2335     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2336 
2337     case NXT_OK:
2338         return NXT_OK;
2339 
2340     case NXT_DECLINED:
2341         nxt_thread_log_alert("router app hash adding failed: "
2342                              "\"%V\" is already in hash", &lhq.key);
2343         /* Fall through. */
2344     default:
2345         return NXT_ERROR;
2346     }
2347 }
2348 
2349 
2350 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2351 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2352 {
2353     nxt_lvlhsh_query_t  lhq;
2354 
2355     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2356     lhq.key = *name;
2357     lhq.proto = &nxt_router_apps_hash_proto;
2358 
2359     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2360         return NULL;
2361     }
2362 
2363     return lhq.value;
2364 }
2365 
2366 
2367 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2368 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2369 {
2370     nxt_app_t          *app;
2371     nxt_lvlhsh_each_t  lhe;
2372 
2373     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2374 
2375     for ( ;; ) {
2376         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2377 
2378         if (app == NULL) {
2379             break;
2380         }
2381 
2382         nxt_router_app_use(task, app, i);
2383     }
2384 }
2385 
2386 
2387 typedef struct {
2388     nxt_app_t  *app;
2389     nxt_int_t  target;
2390 } nxt_http_app_conf_t;
2391 
2392 
2393 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2394 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2395     nxt_str_t *target, nxt_http_action_t *action)
2396 {
2397     nxt_app_t            *app;
2398     nxt_str_t            *targets;
2399     nxt_uint_t           i;
2400     nxt_http_app_conf_t  *conf;
2401 
2402     app = nxt_router_apps_hash_get(rtcf, name);
2403     if (app == NULL) {
2404         return NXT_DECLINED;
2405     }
2406 
2407     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2408     if (nxt_slow_path(conf == NULL)) {
2409         return NXT_ERROR;
2410     }
2411 
2412     action->handler = nxt_http_application_handler;
2413     action->u.conf = conf;
2414 
2415     conf->app = app;
2416 
2417     if (target != NULL && target->length != 0) {
2418         targets = app->targets;
2419 
2420         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2421 
2422         conf->target = i;
2423 
2424     } else {
2425         conf->target = 0;
2426     }
2427 
2428     return NXT_OK;
2429 }
2430 
2431 
2432 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2433 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2434     nxt_str_t *name)
2435 {
2436     size_t               size;
2437     nxt_int_t            ret;
2438     nxt_bool_t           wildcard;
2439     nxt_sockaddr_t       *sa;
2440     nxt_socket_conf_t    *skcf;
2441     nxt_listen_socket_t  *ls;
2442 
2443     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2444     if (nxt_slow_path(sa == NULL)) {
2445         nxt_alert(task, "invalid listener \"%V\"", name);
2446         return NULL;
2447     }
2448 
2449     sa->type = SOCK_STREAM;
2450 
2451     nxt_debug(task, "router listener: \"%*s\"",
2452               (size_t) sa->length, nxt_sockaddr_start(sa));
2453 
2454     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2455     if (nxt_slow_path(skcf == NULL)) {
2456         return NULL;
2457     }
2458 
2459     size = nxt_sockaddr_size(sa);
2460 
2461     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2462 
2463     if (ret != NXT_OK) {
2464 
2465         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2466         if (nxt_slow_path(ls == NULL)) {
2467             return NULL;
2468         }
2469 
2470         skcf->listen = ls;
2471 
2472         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2473         nxt_memcpy(ls->sockaddr, sa, size);
2474 
2475         nxt_listen_socket_remote_size(ls);
2476 
2477         ls->socket = -1;
2478         ls->backlog = NXT_LISTEN_BACKLOG;
2479         ls->flags = NXT_NONBLOCK;
2480         ls->read_after_accept = 1;
2481     }
2482 
2483     switch (sa->u.sockaddr.sa_family) {
2484 #if (NXT_HAVE_UNIX_DOMAIN)
2485     case AF_UNIX:
2486         wildcard = 0;
2487         break;
2488 #endif
2489 #if (NXT_INET6)
2490     case AF_INET6:
2491         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2492         break;
2493 #endif
2494     case AF_INET:
2495     default:
2496         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2497         break;
2498     }
2499 
2500     if (!wildcard) {
2501         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2502         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2503             return NULL;
2504         }
2505 
2506         nxt_memcpy(skcf->sockaddr, sa, size);
2507     }
2508 
2509     return skcf;
2510 }
2511 
2512 
2513 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2514 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2515     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2516 {
2517     nxt_router_t       *router;
2518     nxt_queue_link_t   *qlk;
2519     nxt_socket_conf_t  *skcf;
2520 
2521     router = tmcf->router_conf->router;
2522 
2523     for (qlk = nxt_queue_first(&router->sockets);
2524          qlk != nxt_queue_tail(&router->sockets);
2525          qlk = nxt_queue_next(qlk))
2526     {
2527         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2528 
2529         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2530             nskcf->listen = skcf->listen;
2531 
2532             nxt_queue_remove(qlk);
2533             nxt_queue_insert_tail(&keeping_sockets, qlk);
2534 
2535             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2536 
2537             return NXT_OK;
2538         }
2539     }
2540 
2541     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2542 
2543     return NXT_DECLINED;
2544 }
2545 
2546 
2547 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2548 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2549     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2550 {
2551     size_t            size;
2552     uint32_t          stream;
2553     nxt_int_t         ret;
2554     nxt_buf_t         *b;
2555     nxt_port_t        *main_port, *router_port;
2556     nxt_runtime_t     *rt;
2557     nxt_socket_rpc_t  *rpc;
2558 
2559     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2560     if (rpc == NULL) {
2561         goto fail;
2562     }
2563 
2564     rpc->socket_conf = skcf;
2565     rpc->temp_conf = tmcf;
2566 
2567     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2568 
2569     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2570     if (b == NULL) {
2571         goto fail;
2572     }
2573 
2574     b->completion_handler = nxt_buf_dummy_completion;
2575 
2576     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2577 
2578     rt = task->thread->runtime;
2579     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2580     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2581 
2582     stream = nxt_port_rpc_register_handler(task, router_port,
2583                                            nxt_router_listen_socket_ready,
2584                                            nxt_router_listen_socket_error,
2585                                            main_port->pid, rpc);
2586     if (nxt_slow_path(stream == 0)) {
2587         goto fail;
2588     }
2589 
2590     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2591                                 stream, router_port->id, b);
2592 
2593     if (nxt_slow_path(ret != NXT_OK)) {
2594         nxt_port_rpc_cancel(task, router_port, stream);
2595         goto fail;
2596     }
2597 
2598     return;
2599 
2600 fail:
2601 
2602     nxt_router_conf_error(task, tmcf);
2603 }
2604 
2605 
2606 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2607 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2608     void *data)
2609 {
2610     nxt_int_t         ret;
2611     nxt_socket_t      s;
2612     nxt_socket_rpc_t  *rpc;
2613 
2614     rpc = data;
2615 
2616     s = msg->fd[0];
2617 
2618     ret = nxt_socket_nonblocking(task, s);
2619     if (nxt_slow_path(ret != NXT_OK)) {
2620         goto fail;
2621     }
2622 
2623     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2624 
2625     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2626     if (nxt_slow_path(ret != NXT_OK)) {
2627         goto fail;
2628     }
2629 
2630     rpc->socket_conf->listen->socket = s;
2631 
2632     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2633                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2634 
2635     return;
2636 
2637 fail:
2638 
2639     nxt_socket_close(task, s);
2640 
2641     nxt_router_conf_error(task, rpc->temp_conf);
2642 }
2643 
2644 
2645 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2646 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2647     void *data)
2648 {
2649     nxt_socket_rpc_t        *rpc;
2650     nxt_router_temp_conf_t  *tmcf;
2651 
2652     rpc = data;
2653     tmcf = rpc->temp_conf;
2654 
2655 #if 0
2656     u_char                  *p;
2657     size_t                  size;
2658     uint8_t                 error;
2659     nxt_buf_t               *in, *out;
2660     nxt_sockaddr_t          *sa;
2661 
2662     static nxt_str_t  socket_errors[] = {
2663         nxt_string("ListenerSystem"),
2664         nxt_string("ListenerNoIPv6"),
2665         nxt_string("ListenerPort"),
2666         nxt_string("ListenerInUse"),
2667         nxt_string("ListenerNoAddress"),
2668         nxt_string("ListenerNoAccess"),
2669         nxt_string("ListenerPath"),
2670     };
2671 
2672     sa = rpc->socket_conf->listen->sockaddr;
2673 
2674     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2675 
2676     if (nxt_slow_path(in == NULL)) {
2677         return;
2678     }
2679 
2680     p = in->mem.pos;
2681 
2682     error = *p++;
2683 
2684     size = nxt_length("listen socket error: ")
2685            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2686            + sa->length + socket_errors[error].length + (in->mem.free - p);
2687 
2688     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2689     if (nxt_slow_path(out == NULL)) {
2690         return;
2691     }
2692 
2693     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2694                         "listen socket error: "
2695                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2696                         (size_t) sa->length, nxt_sockaddr_start(sa),
2697                         &socket_errors[error], in->mem.free - p, p);
2698 
2699     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2700 #endif
2701 
2702     nxt_router_conf_error(task, tmcf);
2703 }
2704 
2705 
2706 #if (NXT_TLS)
2707 
2708 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2709 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2710     void *data)
2711 {
2712     nxt_mp_t                *mp;
2713     nxt_int_t               ret;
2714     nxt_tls_conf_t          *tlscf;
2715     nxt_router_tlssock_t    *tls;
2716     nxt_tls_bundle_conf_t   *bundle;
2717     nxt_router_temp_conf_t  *tmcf;
2718 
2719     nxt_debug(task, "tls rpc handler");
2720 
2721     tls = data;
2722     tmcf = tls->temp_conf;
2723 
2724     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2725         goto fail;
2726     }
2727 
2728     mp = tmcf->router_conf->mem_pool;
2729 
2730     if (tls->socket_conf->tls == NULL){
2731         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2732         if (nxt_slow_path(tlscf == NULL)) {
2733             goto fail;
2734         }
2735 
2736         tlscf->no_wait_shutdown = 1;
2737         tls->socket_conf->tls = tlscf;
2738 
2739     } else {
2740         tlscf = tls->socket_conf->tls;
2741     }
2742 
2743     tls->tls_init->conf = tlscf;
2744 
2745     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2746     if (nxt_slow_path(bundle == NULL)) {
2747         goto fail;
2748     }
2749 
2750     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2751         goto fail;
2752     }
2753 
2754     bundle->chain_file = msg->fd[0];
2755     bundle->next = tlscf->bundle;
2756     tlscf->bundle = bundle;
2757 
2758     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2759                                                   tls->last);
2760     if (nxt_slow_path(ret != NXT_OK)) {
2761         goto fail;
2762     }
2763 
2764     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2765                        nxt_router_conf_apply, task, tmcf, NULL);
2766     return;
2767 
2768 fail:
2769 
2770     nxt_router_conf_error(task, tmcf);
2771 }
2772 
2773 #endif
2774 
2775 
2776 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)2777 nxt_router_app_rpc_create(nxt_task_t *task,
2778     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2779 {
2780     size_t         size;
2781     uint32_t       stream;
2782     nxt_int_t      ret;
2783     nxt_buf_t      *b;
2784     nxt_port_t     *router_port, *dport;
2785     nxt_runtime_t  *rt;
2786     nxt_app_rpc_t  *rpc;
2787 
2788     rt = task->thread->runtime;
2789 
2790     dport = app->proto_port;
2791 
2792     if (dport == NULL) {
2793         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2794 
2795         size = app->name.length + 1 + app->conf.length;
2796 
2797         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2798         if (nxt_slow_path(b == NULL)) {
2799             goto fail;
2800         }
2801 
2802         b->completion_handler = nxt_buf_dummy_completion;
2803 
2804         nxt_buf_cpystr(b, &app->name);
2805         *b->mem.free++ = '\0';
2806         nxt_buf_cpystr(b, &app->conf);
2807 
2808         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2809 
2810     } else {
2811         nxt_debug(task, "app '%V' prefork", &app->name);
2812 
2813         b = NULL;
2814     }
2815 
2816     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2817 
2818     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2819                                            nxt_router_app_prefork_ready,
2820                                            nxt_router_app_prefork_error,
2821                                            sizeof(nxt_app_rpc_t));
2822     if (nxt_slow_path(rpc == NULL)) {
2823         goto fail;
2824     }
2825 
2826     rpc->app = app;
2827     rpc->temp_conf = tmcf;
2828     rpc->proto = (b != NULL);
2829 
2830     stream = nxt_port_rpc_ex_stream(rpc);
2831 
2832     ret = nxt_port_socket_write(task, dport,
2833                                 NXT_PORT_MSG_START_PROCESS,
2834                                 -1, stream, router_port->id, b);
2835     if (nxt_slow_path(ret != NXT_OK)) {
2836         nxt_port_rpc_cancel(task, router_port, stream);
2837         goto fail;
2838     }
2839 
2840     if (b == NULL) {
2841         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2842 
2843         app->pending_processes++;
2844     }
2845 
2846     return;
2847 
2848 fail:
2849 
2850     nxt_router_conf_error(task, tmcf);
2851 }
2852 
2853 
2854 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2855 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2856     void *data)
2857 {
2858     nxt_app_t           *app;
2859     nxt_port_t          *port;
2860     nxt_app_rpc_t       *rpc;
2861     nxt_event_engine_t  *engine;
2862 
2863     rpc = data;
2864     app = rpc->app;
2865 
2866     port = msg->u.new_port;
2867 
2868     nxt_assert(port != NULL);
2869     nxt_assert(port->id == 0);
2870 
2871     if (rpc->proto) {
2872         nxt_assert(app->proto_port == NULL);
2873         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2874 
2875         nxt_port_inc_use(port);
2876 
2877         app->proto_port = port;
2878         port->app = app;
2879 
2880         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2881 
2882         return;
2883     }
2884 
2885     nxt_assert(port->type == NXT_PROCESS_APP);
2886 
2887     port->app = app;
2888     port->main_app_port = port;
2889 
2890     app->pending_processes--;
2891     app->processes++;
2892     app->idle_processes++;
2893 
2894     engine = task->thread->engine;
2895 
2896     nxt_queue_insert_tail(&app->ports, &port->app_link);
2897     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2898 
2899     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2900               &app->name, port->pid, port->id);
2901 
2902     nxt_port_hash_add(&app->port_hash, port);
2903     app->port_hash_count++;
2904 
2905     port->idle_start = 0;
2906 
2907     nxt_port_inc_use(port);
2908 
2909     nxt_router_app_shared_port_send(task, port);
2910 
2911     nxt_work_queue_add(&engine->fast_work_queue,
2912                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2913 }
2914 
2915 
2916 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2917 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2918     void *data)
2919 {
2920     nxt_app_t               *app;
2921     nxt_app_rpc_t           *rpc;
2922     nxt_router_temp_conf_t  *tmcf;
2923 
2924     rpc = data;
2925     app = rpc->app;
2926     tmcf = rpc->temp_conf;
2927 
2928     if (rpc->proto) {
2929         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2930                 &app->name);
2931 
2932     } else {
2933         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2934                 &app->name);
2935 
2936         app->pending_processes--;
2937     }
2938 
2939     nxt_router_conf_error(task, tmcf);
2940 }
2941 
2942 
2943 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)2944 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2945     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2946 {
2947     nxt_int_t                 ret;
2948     nxt_uint_t                n, threads;
2949     nxt_queue_link_t          *qlk;
2950     nxt_router_engine_conf_t  *recf;
2951 
2952     threads = tmcf->router_conf->threads;
2953 
2954     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2955                                      sizeof(nxt_router_engine_conf_t));
2956     if (nxt_slow_path(tmcf->engines == NULL)) {
2957         return NXT_ERROR;
2958     }
2959 
2960     n = 0;
2961 
2962     for (qlk = nxt_queue_first(&router->engines);
2963          qlk != nxt_queue_tail(&router->engines);
2964          qlk = nxt_queue_next(qlk))
2965     {
2966         recf = nxt_array_zero_add(tmcf->engines);
2967         if (nxt_slow_path(recf == NULL)) {
2968             return NXT_ERROR;
2969         }
2970 
2971         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2972 
2973         if (n < threads) {
2974             recf->action = NXT_ROUTER_ENGINE_KEEP;
2975             ret = nxt_router_engine_conf_update(tmcf, recf);
2976 
2977         } else {
2978             recf->action = NXT_ROUTER_ENGINE_DELETE;
2979             ret = nxt_router_engine_conf_delete(tmcf, recf);
2980         }
2981 
2982         if (nxt_slow_path(ret != NXT_OK)) {
2983             return ret;
2984         }
2985 
2986         n++;
2987     }
2988 
2989     tmcf->new_threads = n;
2990 
2991     while (n < threads) {
2992         recf = nxt_array_zero_add(tmcf->engines);
2993         if (nxt_slow_path(recf == NULL)) {
2994             return NXT_ERROR;
2995         }
2996 
2997         recf->action = NXT_ROUTER_ENGINE_ADD;
2998 
2999         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3000         if (nxt_slow_path(recf->engine == NULL)) {
3001             return NXT_ERROR;
3002         }
3003 
3004         ret = nxt_router_engine_conf_create(tmcf, recf);
3005         if (nxt_slow_path(ret != NXT_OK)) {
3006             return ret;
3007         }
3008 
3009         n++;
3010     }
3011 
3012     return NXT_OK;
3013 }
3014 
3015 
3016 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3017 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3018     nxt_router_engine_conf_t *recf)
3019 {
3020     nxt_int_t  ret;
3021 
3022     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3023                                           nxt_router_listen_socket_create);
3024     if (nxt_slow_path(ret != NXT_OK)) {
3025         return ret;
3026     }
3027 
3028     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3029                                           nxt_router_listen_socket_create);
3030     if (nxt_slow_path(ret != NXT_OK)) {
3031         return ret;
3032     }
3033 
3034     return ret;
3035 }
3036 
3037 
3038 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3039 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3040     nxt_router_engine_conf_t *recf)
3041 {
3042     nxt_int_t  ret;
3043 
3044     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3045                                           nxt_router_listen_socket_create);
3046     if (nxt_slow_path(ret != NXT_OK)) {
3047         return ret;
3048     }
3049 
3050     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3051                                           nxt_router_listen_socket_update);
3052     if (nxt_slow_path(ret != NXT_OK)) {
3053         return ret;
3054     }
3055 
3056     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3057     if (nxt_slow_path(ret != NXT_OK)) {
3058         return ret;
3059     }
3060 
3061     return ret;
3062 }
3063 
3064 
3065 static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3066 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3067     nxt_router_engine_conf_t *recf)
3068 {
3069     nxt_int_t  ret;
3070 
3071     ret = nxt_router_engine_quit(tmcf, recf);
3072     if (nxt_slow_path(ret != NXT_OK)) {
3073         return ret;
3074     }
3075 
3076     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3077     if (nxt_slow_path(ret != NXT_OK)) {
3078         return ret;
3079     }
3080 
3081     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3082 }
3083 
3084 
3085 static nxt_int_t
nxt_router_engine_joints_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets,nxt_work_handler_t handler)3086 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3087     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3088     nxt_work_handler_t handler)
3089 {
3090     nxt_int_t                ret;
3091     nxt_joint_job_t          *job;
3092     nxt_queue_link_t         *qlk;
3093     nxt_socket_conf_t        *skcf;
3094     nxt_socket_conf_joint_t  *joint;
3095 
3096     for (qlk = nxt_queue_first(sockets);
3097          qlk != nxt_queue_tail(sockets);
3098          qlk = nxt_queue_next(qlk))
3099     {
3100         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3101         if (nxt_slow_path(job == NULL)) {
3102             return NXT_ERROR;
3103         }
3104 
3105         job->work.next = recf->jobs;
3106         recf->jobs = &job->work;
3107 
3108         job->task = tmcf->engine->task;
3109         job->work.handler = handler;
3110         job->work.task = &job->task;
3111         job->work.obj = job;
3112         job->tmcf = tmcf;
3113 
3114         tmcf->count++;
3115 
3116         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3117                              sizeof(nxt_socket_conf_joint_t));
3118         if (nxt_slow_path(joint == NULL)) {
3119             return NXT_ERROR;
3120         }
3121 
3122         job->work.data = joint;
3123 
3124         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3125         if (nxt_slow_path(ret != NXT_OK)) {
3126             return ret;
3127         }
3128 
3129         joint->count = 1;
3130 
3131         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3132         skcf->count++;
3133         joint->socket_conf = skcf;
3134 
3135         joint->engine = recf->engine;
3136     }
3137 
3138     return NXT_OK;
3139 }
3140 
3141 
3142 static nxt_int_t
nxt_router_engine_quit(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3143 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3144     nxt_router_engine_conf_t *recf)
3145 {
3146     nxt_joint_job_t  *job;
3147 
3148     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3149     if (nxt_slow_path(job == NULL)) {
3150         return NXT_ERROR;
3151     }
3152 
3153     job->work.next = recf->jobs;
3154     recf->jobs = &job->work;
3155 
3156     job->task = tmcf->engine->task;
3157     job->work.handler = nxt_router_worker_thread_quit;
3158     job->work.task = &job->task;
3159     job->work.obj = NULL;
3160     job->work.data = NULL;
3161     job->tmcf = NULL;
3162 
3163     return NXT_OK;
3164 }
3165 
3166 
3167 static nxt_int_t
nxt_router_engine_joints_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets)3168 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3169     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3170 {
3171     nxt_joint_job_t   *job;
3172     nxt_queue_link_t  *qlk;
3173 
3174     for (qlk = nxt_queue_first(sockets);
3175          qlk != nxt_queue_tail(sockets);
3176          qlk = nxt_queue_next(qlk))
3177     {
3178         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3179         if (nxt_slow_path(job == NULL)) {
3180             return NXT_ERROR;
3181         }
3182 
3183         job->work.next = recf->jobs;
3184         recf->jobs = &job->work;
3185 
3186         job->task = tmcf->engine->task;
3187         job->work.handler = nxt_router_listen_socket_delete;
3188         job->work.task = &job->task;
3189         job->work.obj = job;
3190         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3191         job->tmcf = tmcf;
3192 
3193         tmcf->count++;
3194     }
3195 
3196     return NXT_OK;
3197 }
3198 
3199 
3200 static nxt_int_t
nxt_router_threads_create(nxt_task_t * task,nxt_runtime_t * rt,nxt_router_temp_conf_t * tmcf)3201 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3202     nxt_router_temp_conf_t *tmcf)
3203 {
3204     nxt_int_t                 ret;
3205     nxt_uint_t                i, threads;
3206     nxt_router_engine_conf_t  *recf;
3207 
3208     recf = tmcf->engines->elts;
3209     threads = tmcf->router_conf->threads;
3210 
3211     for (i = tmcf->new_threads; i < threads; i++) {
3212         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3213         if (nxt_slow_path(ret != NXT_OK)) {
3214             return ret;
3215         }
3216     }
3217 
3218     return NXT_OK;
3219 }
3220 
3221 
3222 static nxt_int_t
nxt_router_thread_create(nxt_task_t * task,nxt_runtime_t * rt,nxt_event_engine_t * engine)3223 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3224     nxt_event_engine_t *engine)
3225 {
3226     nxt_int_t            ret;
3227     nxt_thread_link_t    *link;
3228     nxt_thread_handle_t  handle;
3229 
3230     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3231 
3232     if (nxt_slow_path(link == NULL)) {
3233         return NXT_ERROR;
3234     }
3235 
3236     link->start = nxt_router_thread_start;
3237     link->engine = engine;
3238     link->work.handler = nxt_router_thread_exit_handler;
3239     link->work.task = task;
3240     link->work.data = link;
3241 
3242     nxt_queue_insert_tail(&rt->engines, &engine->link);
3243 
3244     ret = nxt_thread_create(&handle, link);
3245 
3246     if (nxt_slow_path(ret != NXT_OK)) {
3247         nxt_queue_remove(&engine->link);
3248     }
3249 
3250     return ret;
3251 }
3252 
3253 
3254 static void
nxt_router_apps_sort(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf)3255 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3256     nxt_router_temp_conf_t *tmcf)
3257 {
3258     nxt_app_t  *app;
3259 
3260     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3261 
3262         nxt_router_app_unlink(task, app);
3263 
3264     } nxt_queue_loop;
3265 
3266     nxt_queue_add(&router->apps, &tmcf->previous);
3267     nxt_queue_add(&router->apps, &tmcf->apps);
3268 }
3269 
3270 
3271 static void
nxt_router_engines_post(nxt_router_t * router,nxt_router_temp_conf_t * tmcf)3272 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3273 {
3274     nxt_uint_t                n;
3275     nxt_event_engine_t        *engine;
3276     nxt_router_engine_conf_t  *recf;
3277 
3278     recf = tmcf->engines->elts;
3279 
3280     for (n = tmcf->engines->nelts; n != 0; n--) {
3281         engine = recf->engine;
3282 
3283         switch (recf->action) {
3284 
3285         case NXT_ROUTER_ENGINE_KEEP:
3286             break;
3287 
3288         case NXT_ROUTER_ENGINE_ADD:
3289             nxt_queue_insert_tail(&router->engines, &engine->link0);
3290             break;
3291 
3292         case NXT_ROUTER_ENGINE_DELETE:
3293             nxt_queue_remove(&engine->link0);
3294             break;
3295         }
3296 
3297         nxt_router_engine_post(engine, recf->jobs);
3298 
3299         recf++;
3300     }
3301 }
3302 
3303 
3304 static void
nxt_router_engine_post(nxt_event_engine_t * engine,nxt_work_t * jobs)3305 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3306 {
3307     nxt_work_t  *work, *next;
3308 
3309     for (work = jobs; work != NULL; work = next) {
3310         next = work->next;
3311         work->next = NULL;
3312 
3313         nxt_event_engine_post(engine, work);
3314     }
3315 }
3316 
3317 
3318 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3319     .rpc_error       = nxt_port_rpc_handler,
3320     .mmap            = nxt_port_mmap_handler,
3321     .data            = nxt_port_rpc_handler,
3322     .oosm            = nxt_router_oosm_handler,
3323     .req_headers_ack = nxt_port_rpc_handler,
3324 };
3325 
3326 
3327 static void
nxt_router_thread_start(void * data)3328 nxt_router_thread_start(void *data)
3329 {
3330     nxt_int_t           ret;
3331     nxt_port_t          *port;
3332     nxt_task_t          *task;
3333     nxt_work_t          *work;
3334     nxt_thread_t        *thread;
3335     nxt_thread_link_t   *link;
3336     nxt_event_engine_t  *engine;
3337 
3338     link = data;
3339     engine = link->engine;
3340     task = &engine->task;
3341 
3342     thread = nxt_thread();
3343 
3344     nxt_event_engine_thread_adopt(engine);
3345 
3346     /* STUB */
3347     thread->runtime = engine->task.thread->runtime;
3348 
3349     engine->task.thread = thread;
3350     engine->task.log = thread->log;
3351     thread->engine = engine;
3352     thread->task = &engine->task;
3353 #if 0
3354     thread->fiber = &engine->fibers->fiber;
3355 #endif
3356 
3357     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3358     if (nxt_slow_path(engine->mem_pool == NULL)) {
3359         return;
3360     }
3361 
3362     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3363                         NXT_PROCESS_ROUTER);
3364     if (nxt_slow_path(port == NULL)) {
3365         return;
3366     }
3367 
3368     ret = nxt_port_socket_init(task, port, 0);
3369     if (nxt_slow_path(ret != NXT_OK)) {
3370         nxt_port_use(task, port, -1);
3371         return;
3372     }
3373 
3374     ret = nxt_router_port_queue_init(task, port);
3375     if (nxt_slow_path(ret != NXT_OK)) {
3376         nxt_port_use(task, port, -1);
3377         return;
3378     }
3379 
3380     engine->port = port;
3381 
3382     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3383 
3384     work = nxt_zalloc(sizeof(nxt_work_t));
3385     if (nxt_slow_path(work == NULL)) {
3386         return;
3387     }
3388 
3389     work->handler = nxt_router_rt_add_port;
3390     work->task = link->work.task;
3391     work->obj = work;
3392     work->data = port;
3393 
3394     nxt_event_engine_post(link->work.task->thread->engine, work);
3395 
3396     nxt_event_engine_start(engine);
3397 }
3398 
3399 
3400 static void
nxt_router_rt_add_port(nxt_task_t * task,void * obj,void * data)3401 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3402 {
3403     nxt_int_t      res;
3404     nxt_port_t     *port;
3405     nxt_runtime_t  *rt;
3406 
3407     rt = task->thread->runtime;
3408     port = data;
3409 
3410     nxt_free(obj);
3411 
3412     res = nxt_port_hash_add(&rt->ports, port);
3413 
3414     if (nxt_fast_path(res == NXT_OK)) {
3415         nxt_port_use(task, port, 1);
3416     }
3417 }
3418 
3419 
3420 static void
nxt_router_listen_socket_create(nxt_task_t * task,void * obj,void * data)3421 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3422 {
3423     nxt_joint_job_t          *job;
3424     nxt_socket_conf_t        *skcf;
3425     nxt_listen_event_t       *lev;
3426     nxt_listen_socket_t      *ls;
3427     nxt_thread_spinlock_t    *lock;
3428     nxt_socket_conf_joint_t  *joint;
3429 
3430     job = obj;
3431     joint = data;
3432 
3433     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3434 
3435     skcf = joint->socket_conf;
3436     ls = skcf->listen;
3437 
3438     lev = nxt_listen_event(task, ls);
3439     if (nxt_slow_path(lev == NULL)) {
3440         nxt_router_listen_socket_release(task, skcf);
3441         return;
3442     }
3443 
3444     lev->socket.data = joint;
3445 
3446     lock = &skcf->router_conf->router->lock;
3447 
3448     nxt_thread_spin_lock(lock);
3449     ls->count++;
3450     nxt_thread_spin_unlock(lock);
3451 
3452     job->work.next = NULL;
3453     job->work.handler = nxt_router_conf_wait;
3454 
3455     nxt_event_engine_post(job->tmcf->engine, &job->work);
3456 }
3457 
3458 
3459 nxt_inline nxt_listen_event_t *
nxt_router_listen_event(nxt_queue_t * listen_connections,nxt_socket_conf_t * skcf)3460 nxt_router_listen_event(nxt_queue_t *listen_connections,
3461     nxt_socket_conf_t *skcf)
3462 {
3463     nxt_socket_t        fd;
3464     nxt_queue_link_t    *qlk;
3465     nxt_listen_event_t  *lev;
3466 
3467     fd = skcf->listen->socket;
3468 
3469     for (qlk = nxt_queue_first(listen_connections);
3470          qlk != nxt_queue_tail(listen_connections);
3471          qlk = nxt_queue_next(qlk))
3472     {
3473         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3474 
3475         if (fd == lev->socket.fd) {
3476             return lev;
3477         }
3478     }
3479 
3480     return NULL;
3481 }
3482 
3483 
3484 static void
nxt_router_listen_socket_update(nxt_task_t * task,void * obj,void * data)3485 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3486 {
3487     nxt_joint_job_t          *job;
3488     nxt_event_engine_t       *engine;
3489     nxt_listen_event_t       *lev;
3490     nxt_socket_conf_joint_t  *joint, *old;
3491 
3492     job = obj;
3493     joint = data;
3494 
3495     engine = task->thread->engine;
3496 
3497     nxt_queue_insert_tail(&engine->joints, &joint->link);
3498 
3499     lev = nxt_router_listen_event(&engine->listen_connections,
3500                                   joint->socket_conf);
3501 
3502     old = lev->socket.data;
3503     lev->socket.data = joint;
3504     lev->listen = joint->socket_conf->listen;
3505 
3506     job->work.next = NULL;
3507     job->work.handler = nxt_router_conf_wait;
3508 
3509     nxt_event_engine_post(job->tmcf->engine, &job->work);
3510 
3511     /*
3512      * The task is allocated from configuration temporary
3513      * memory pool so it can be freed after engine post operation.
3514      */
3515 
3516     nxt_router_conf_release(&engine->task, old);
3517 }
3518 
3519 
3520 static void
nxt_router_listen_socket_delete(nxt_task_t * task,void * obj,void * data)3521 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3522 {
3523     nxt_socket_conf_t        *skcf;
3524     nxt_listen_event_t       *lev;
3525     nxt_event_engine_t       *engine;
3526     nxt_socket_conf_joint_t  *joint;
3527 
3528     skcf = data;
3529 
3530     engine = task->thread->engine;
3531 
3532     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3533 
3534     nxt_fd_event_delete(engine, &lev->socket);
3535 
3536     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3537               lev->socket.fd);
3538 
3539     joint = lev->socket.data;
3540     joint->close_job = obj;
3541 
3542     lev->timer.handler = nxt_router_listen_socket_close;
3543     lev->timer.work_queue = &engine->fast_work_queue;
3544 
3545     nxt_timer_add(engine, &lev->timer, 0);
3546 }
3547 
3548 
3549 static void
nxt_router_worker_thread_quit(nxt_task_t * task,void * obj,void * data)3550 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3551 {
3552     nxt_event_engine_t  *engine;
3553 
3554     nxt_debug(task, "router worker thread quit");
3555 
3556     engine = task->thread->engine;
3557 
3558     engine->shutdown = 1;
3559 
3560     if (nxt_queue_is_empty(&engine->joints)) {
3561         nxt_thread_exit(task->thread);
3562     }
3563 }
3564 
3565 
3566 static void
nxt_router_listen_socket_close(nxt_task_t * task,void * obj,void * data)3567 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3568 {
3569     nxt_timer_t              *timer;
3570     nxt_joint_job_t          *job;
3571     nxt_listen_event_t       *lev;
3572     nxt_socket_conf_joint_t  *joint;
3573 
3574     timer = obj;
3575     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3576 
3577     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3578               lev->socket.fd);
3579 
3580     nxt_queue_remove(&lev->link);
3581 
3582     joint = lev->socket.data;
3583     lev->socket.data = NULL;
3584 
3585     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3586     task = &task->thread->engine->task;
3587 
3588     nxt_router_listen_socket_release(task, joint->socket_conf);
3589 
3590     job = joint->close_job;
3591     job->work.next = NULL;
3592     job->work.handler = nxt_router_conf_wait;
3593 
3594     nxt_event_engine_post(job->tmcf->engine, &job->work);
3595 
3596     nxt_router_listen_event_release(task, lev, joint);
3597 }
3598 
3599 
3600 static void
nxt_router_listen_socket_release(nxt_task_t * task,nxt_socket_conf_t * skcf)3601 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3602 {
3603     nxt_listen_socket_t    *ls;
3604     nxt_thread_spinlock_t  *lock;
3605 
3606     ls = skcf->listen;
3607     lock = &skcf->router_conf->router->lock;
3608 
3609     nxt_thread_spin_lock(lock);
3610 
3611     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3612               task->thread->engine, ls->count);
3613 
3614     if (--ls->count != 0) {
3615         ls = NULL;
3616     }
3617 
3618     nxt_thread_spin_unlock(lock);
3619 
3620     if (ls != NULL) {
3621         nxt_socket_close(task, ls->socket);
3622         nxt_free(ls);
3623     }
3624 }
3625 
3626 
3627 void
nxt_router_listen_event_release(nxt_task_t * task,nxt_listen_event_t * lev,nxt_socket_conf_joint_t * joint)3628 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3629     nxt_socket_conf_joint_t *joint)
3630 {
3631     nxt_event_engine_t  *engine;
3632 
3633     nxt_debug(task, "listen event count: %D", lev->count);
3634 
3635     engine = task->thread->engine;
3636 
3637     if (--lev->count == 0) {
3638         if (lev->next != NULL) {
3639             nxt_sockaddr_cache_free(engine, lev->next);
3640 
3641             nxt_conn_free(task, lev->next);
3642         }
3643 
3644         nxt_free(lev);
3645     }
3646 
3647     if (joint != NULL) {
3648         nxt_router_conf_release(task, joint);
3649     }
3650 
3651     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3652         nxt_thread_exit(task->thread);
3653     }
3654 }
3655 
3656 
3657 void
nxt_router_conf_release(nxt_task_t * task,nxt_socket_conf_joint_t * joint)3658 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3659 {
3660     nxt_socket_conf_t      *skcf;
3661     nxt_router_conf_t      *rtcf;
3662     nxt_thread_spinlock_t  *lock;
3663 
3664     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3665 
3666     if (--joint->count != 0) {
3667         return;
3668     }
3669 
3670     nxt_queue_remove(&joint->link);
3671 
3672     /*
3673      * The joint content can not be safely used after the critical
3674      * section protected by the spinlock because its memory pool may
3675      * be already destroyed by another thread.
3676      */
3677     skcf = joint->socket_conf;
3678     rtcf = skcf->router_conf;
3679     lock = &rtcf->router->lock;
3680 
3681     nxt_thread_spin_lock(lock);
3682 
3683     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3684               rtcf, rtcf->count);
3685 
3686     if (--skcf->count != 0) {
3687         skcf = NULL;
3688         rtcf = NULL;
3689 
3690     } else {
3691         nxt_queue_remove(&skcf->link);
3692 
3693         if (--rtcf->count != 0) {
3694             rtcf = NULL;
3695         }
3696     }
3697 
3698     nxt_thread_spin_unlock(lock);
3699 
3700 #if (NXT_TLS)
3701     if (skcf != NULL && skcf->tls != NULL) {
3702         task->thread->runtime->tls->server_free(task, skcf->tls);
3703     }
3704 #endif
3705 
3706     /* TODO remove engine->port */
3707 
3708     if (rtcf != NULL) {
3709         nxt_debug(task, "old router conf is destroyed");
3710 
3711         nxt_router_apps_hash_use(task, rtcf, -1);
3712 
3713         nxt_router_access_log_release(task, lock, rtcf->access_log);
3714 
3715         nxt_mp_thread_adopt(rtcf->mem_pool);
3716 
3717         nxt_mp_destroy(rtcf->mem_pool);
3718     }
3719 }
3720 
3721 
3722 static void
nxt_router_access_log_writer(nxt_task_t * task,nxt_http_request_t * r,nxt_router_access_log_t * access_log)3723 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3724     nxt_router_access_log_t *access_log)
3725 {
3726     size_t     size;
3727     u_char     *buf, *p;
3728     nxt_off_t  bytes;
3729 
3730     static nxt_time_string_t  date_cache = {
3731         (nxt_atomic_uint_t) -1,
3732         nxt_router_access_log_date,
3733         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3734         nxt_length("31/Dec/1986:19:40:00 +0300"),
3735         NXT_THREAD_TIME_LOCAL,
3736         NXT_THREAD_TIME_SEC,
3737     };
3738 
3739     size = r->remote->address_length
3740            + 6                  /* ' - - [' */
3741            + date_cache.size
3742            + 3                  /* '] "' */
3743            + r->method->length
3744            + 1                  /* space */
3745            + r->target.length
3746            + 1                  /* space */
3747            + r->version.length
3748            + 2                  /* '" ' */
3749            + 3                  /* status */
3750            + 1                  /* space */
3751            + NXT_OFF_T_LEN
3752            + 2                  /* ' "' */
3753            + (r->referer != NULL ? r->referer->value_length : 1)
3754            + 3                  /* '" "' */
3755            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3756            + 2                  /* '"\n' */
3757     ;
3758 
3759     buf = nxt_mp_nget(r->mem_pool, size);
3760     if (nxt_slow_path(buf == NULL)) {
3761         return;
3762     }
3763 
3764     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3765                    r->remote->address_length);
3766 
3767     p = nxt_cpymem(p, " - - [", 6);
3768 
3769     p = nxt_thread_time_string(task->thread, &date_cache, p);
3770 
3771     p = nxt_cpymem(p, "] \"", 3);
3772 
3773     if (r->method->length != 0) {
3774         p = nxt_cpymem(p, r->method->start, r->method->length);
3775 
3776         if (r->target.length != 0) {
3777             *p++ = ' ';
3778             p = nxt_cpymem(p, r->target.start, r->target.length);
3779 
3780             if (r->version.length != 0) {
3781                 *p++ = ' ';
3782                 p = nxt_cpymem(p, r->version.start, r->version.length);
3783             }
3784         }
3785 
3786     } else {
3787         *p++ = '-';
3788     }
3789 
3790     p = nxt_cpymem(p, "\" ", 2);
3791 
3792     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3793 
3794     *p++ = ' ';
3795 
3796     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3797 
3798     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3799 
3800     p = nxt_cpymem(p, " \"", 2);
3801 
3802     if (r->referer != NULL) {
3803         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3804 
3805     } else {
3806         *p++ = '-';
3807     }
3808 
3809     p = nxt_cpymem(p, "\" \"", 3);
3810 
3811     if (r->user_agent != NULL) {
3812         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3813 
3814     } else {
3815         *p++ = '-';
3816     }
3817 
3818     p = nxt_cpymem(p, "\"\n", 2);
3819 
3820     nxt_fd_write(access_log->fd, buf, p - buf);
3821 }
3822 
3823 
3824 static u_char *
nxt_router_access_log_date(u_char * buf,nxt_realtime_t * now,struct tm * tm,size_t size,const char * format)3825 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3826     size_t size, const char *format)
3827 {
3828     u_char  sign;
3829     time_t  gmtoff;
3830 
3831     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3832                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3833 
3834     gmtoff = nxt_timezone(tm) / 60;
3835 
3836     if (gmtoff < 0) {
3837         gmtoff = -gmtoff;
3838         sign = '-';
3839 
3840     } else {
3841         sign = '+';
3842     }
3843 
3844     return nxt_sprintf(buf, buf + size, format,
3845                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3846                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3847                        sign, gmtoff / 60, gmtoff % 60);
3848 }
3849 
3850 
3851 static void
nxt_router_access_log_open(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)3852 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3853 {
3854     uint32_t                 stream;
3855     nxt_int_t                ret;
3856     nxt_buf_t                *b;
3857     nxt_port_t               *main_port, *router_port;
3858     nxt_runtime_t            *rt;
3859     nxt_router_access_log_t  *access_log;
3860 
3861     access_log = tmcf->router_conf->access_log;
3862 
3863     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3864     if (nxt_slow_path(b == NULL)) {
3865         goto fail;
3866     }
3867 
3868     b->completion_handler = nxt_buf_dummy_completion;
3869 
3870     nxt_buf_cpystr(b, &access_log->path);
3871     *b->mem.free++ = '\0';
3872 
3873     rt = task->thread->runtime;
3874     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3875     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3876 
3877     stream = nxt_port_rpc_register_handler(task, router_port,
3878                                            nxt_router_access_log_ready,
3879                                            nxt_router_access_log_error,
3880                                            -1, tmcf);
3881     if (nxt_slow_path(stream == 0)) {
3882         goto fail;
3883     }
3884 
3885     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3886                                 stream, router_port->id, b);
3887 
3888     if (nxt_slow_path(ret != NXT_OK)) {
3889         nxt_port_rpc_cancel(task, router_port, stream);
3890         goto fail;
3891     }
3892 
3893     return;
3894 
3895 fail:
3896 
3897     nxt_router_conf_error(task, tmcf);
3898 }
3899 
3900 
3901 static void
nxt_router_access_log_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3902 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3903     void *data)
3904 {
3905     nxt_router_temp_conf_t   *tmcf;
3906     nxt_router_access_log_t  *access_log;
3907 
3908     tmcf = data;
3909 
3910     access_log = tmcf->router_conf->access_log;
3911 
3912     access_log->fd = msg->fd[0];
3913 
3914     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3915                        nxt_router_conf_apply, task, tmcf, NULL);
3916 }
3917 
3918 
3919 static void
nxt_router_access_log_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3920 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3921     void *data)
3922 {
3923     nxt_router_temp_conf_t  *tmcf;
3924 
3925     tmcf = data;
3926 
3927     nxt_router_conf_error(task, tmcf);
3928 }
3929 
3930 
3931 static void
nxt_router_access_log_use(nxt_thread_spinlock_t * lock,nxt_router_access_log_t * access_log)3932 nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
3933     nxt_router_access_log_t *access_log)
3934 {
3935     if (access_log == NULL) {
3936         return;
3937     }
3938 
3939     nxt_thread_spin_lock(lock);
3940 
3941     access_log->count++;
3942 
3943     nxt_thread_spin_unlock(lock);
3944 }
3945 
3946 
3947 static void
nxt_router_access_log_release(nxt_task_t * task,nxt_thread_spinlock_t * lock,nxt_router_access_log_t * access_log)3948 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3949     nxt_router_access_log_t *access_log)
3950 {
3951     if (access_log == NULL) {
3952         return;
3953     }
3954 
3955     nxt_thread_spin_lock(lock);
3956 
3957     if (--access_log->count != 0) {
3958         access_log = NULL;
3959     }
3960 
3961     nxt_thread_spin_unlock(lock);
3962 
3963     if (access_log != NULL) {
3964 
3965         if (access_log->fd != -1) {
3966             nxt_fd_close(access_log->fd);
3967         }
3968 
3969         nxt_free(access_log);
3970     }
3971 }
3972 
3973 
3974 typedef struct {
3975     nxt_mp_t                 *mem_pool;
3976     nxt_router_access_log_t  *access_log;
3977 } nxt_router_access_log_reopen_t;
3978 
3979 
3980 static void
nxt_router_access_log_reopen_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)3981 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3982 {
3983     nxt_mp_t                        *mp;
3984     uint32_t                        stream;
3985     nxt_int_t                       ret;
3986     nxt_buf_t                       *b;
3987     nxt_port_t                      *main_port, *router_port;
3988     nxt_runtime_t                   *rt;
3989     nxt_router_access_log_t         *access_log;
3990     nxt_router_access_log_reopen_t  *reopen;
3991 
3992     access_log = nxt_router->access_log;
3993 
3994     if (access_log == NULL) {
3995         return;
3996     }
3997 
3998     mp = nxt_mp_create(1024, 128, 256, 32);
3999     if (nxt_slow_path(mp == NULL)) {
4000         return;
4001     }
4002 
4003     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
4004     if (nxt_slow_path(reopen == NULL)) {
4005         goto fail;
4006     }
4007 
4008     reopen->mem_pool = mp;
4009     reopen->access_log = access_log;
4010 
4011     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
4012     if (nxt_slow_path(b == NULL)) {
4013         goto fail;
4014     }
4015 
4016     b->completion_handler = nxt_router_access_log_reopen_completion;
4017 
4018     nxt_buf_cpystr(b, &access_log->path);
4019     *b->mem.free++ = '\0';
4020 
4021     rt = task->thread->runtime;
4022     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
4023     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
4024 
4025     stream = nxt_port_rpc_register_handler(task, router_port,
4026                                            nxt_router_access_log_reopen_ready,
4027                                            nxt_router_access_log_reopen_error,
4028                                            -1, reopen);
4029     if (nxt_slow_path(stream == 0)) {
4030         goto fail;
4031     }
4032 
4033     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
4034                                 stream, router_port->id, b);
4035 
4036     if (nxt_slow_path(ret != NXT_OK)) {
4037         nxt_port_rpc_cancel(task, router_port, stream);
4038         goto fail;
4039     }
4040 
4041     nxt_mp_retain(mp);
4042 
4043     return;
4044 
4045 fail:
4046 
4047     nxt_mp_destroy(mp);
4048 }
4049 
4050 
4051 static void
nxt_router_access_log_reopen_completion(nxt_task_t * task,void * obj,void * data)4052 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
4053 {
4054     nxt_mp_t   *mp;
4055     nxt_buf_t  *b;
4056 
4057     b = obj;
4058     mp = b->data;
4059 
4060     nxt_mp_release(mp);
4061 }
4062 
4063 
4064 static void
nxt_router_access_log_reopen_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4065 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4066     void *data)
4067 {
4068     nxt_router_access_log_t         *access_log;
4069     nxt_router_access_log_reopen_t  *reopen;
4070 
4071     reopen = data;
4072 
4073     access_log = reopen->access_log;
4074 
4075     if (access_log == nxt_router->access_log) {
4076 
4077         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
4078             nxt_alert(task, "dup2(%FD, %FD) failed %E",
4079                       msg->fd[0], access_log->fd, nxt_errno);
4080         }
4081     }
4082 
4083     nxt_fd_close(msg->fd[0]);
4084     nxt_mp_release(reopen->mem_pool);
4085 }
4086 
4087 
4088 static void
nxt_router_access_log_reopen_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4089 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4090     void *data)
4091 {
4092     nxt_router_access_log_reopen_t  *reopen;
4093 
4094     reopen = data;
4095 
4096     nxt_mp_release(reopen->mem_pool);
4097 }
4098 
4099 
4100 static void
nxt_router_thread_exit_handler(nxt_task_t * task,void * obj,void * data)4101 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4102 {
4103     nxt_port_t           *port;
4104     nxt_thread_link_t    *link;
4105     nxt_event_engine_t   *engine;
4106     nxt_thread_handle_t  handle;
4107 
4108     handle = (nxt_thread_handle_t) (uintptr_t) obj;
4109     link = data;
4110 
4111     nxt_thread_wait(handle);
4112 
4113     engine = link->engine;
4114 
4115     nxt_queue_remove(&engine->link);
4116 
4117     port = engine->port;
4118 
4119     // TODO notify all apps
4120 
4121     port->engine = task->thread->engine;
4122     nxt_mp_thread_adopt(port->mem_pool);
4123     nxt_port_use(task, port, -1);
4124 
4125     nxt_mp_thread_adopt(engine->mem_pool);
4126     nxt_mp_destroy(engine->mem_pool);
4127 
4128     nxt_event_engine_free(engine);
4129 
4130     nxt_free(link);
4131 }
4132 
4133 
4134 static void
nxt_router_response_ready_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4135 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4136     void *data)
4137 {
4138     size_t                  b_size, count;
4139     nxt_int_t               ret;
4140     nxt_app_t               *app;
4141     nxt_buf_t               *b, *next;
4142     nxt_port_t              *app_port;
4143     nxt_unit_field_t        *f;
4144     nxt_http_field_t        *field;
4145     nxt_http_request_t      *r;
4146     nxt_unit_response_t     *resp;
4147     nxt_request_rpc_data_t  *req_rpc_data;
4148 
4149     req_rpc_data = data;
4150 
4151     r = req_rpc_data->request;
4152     if (nxt_slow_path(r == NULL)) {
4153         return;
4154     }
4155 
4156     if (r->error) {
4157         nxt_request_rpc_data_unlink(task, req_rpc_data);
4158         return;
4159     }
4160 
4161     app = req_rpc_data->app;
4162     nxt_assert(app != NULL);
4163 
4164     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4165         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4166 
4167         return;
4168     }
4169 
4170     b = (msg->size == 0) ? NULL : msg->buf;
4171 
4172     if (msg->port_msg.last != 0) {
4173         nxt_debug(task, "router data create last buf");
4174 
4175         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4176 
4177         req_rpc_data->rpc_cancel = 0;
4178 
4179         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4180             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4181         }
4182 
4183         nxt_request_rpc_data_unlink(task, req_rpc_data);
4184 
4185     } else {
4186         if (app->timeout != 0) {
4187             r->timer.handler = nxt_router_app_timeout;
4188             r->timer_data = req_rpc_data;
4189             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4190         }
4191     }
4192 
4193     if (b == NULL) {
4194         return;
4195     }
4196 
4197     if (msg->buf == b) {
4198         /* Disable instant buffer completion/re-using by port. */
4199         msg->buf = NULL;
4200     }
4201 
4202     if (r->header_sent) {
4203         nxt_buf_chain_add(&r->out, b);
4204         nxt_http_request_send_body(task, r, NULL);
4205 
4206     } else {
4207         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4208 
4209         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4210             nxt_alert(task, "response buffer too small: %z", b_size);
4211             goto fail;
4212         }
4213 
4214         resp = (void *) b->mem.pos;
4215         count = (b_size - sizeof(nxt_unit_response_t))
4216                     / sizeof(nxt_unit_field_t);
4217 
4218         if (nxt_slow_path(count < resp->fields_count)) {
4219             nxt_alert(task, "response buffer too small for fields count: %D",
4220                       resp->fields_count);
4221             goto fail;
4222         }
4223 
4224         field = NULL;
4225 
4226         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4227             if (f->skip) {
4228                 continue;
4229             }
4230 
4231             field = nxt_list_add(r->resp.fields);
4232 
4233             if (nxt_slow_path(field == NULL)) {
4234                 goto fail;
4235             }
4236 
4237             field->hash = f->hash;
4238             field->skip = 0;
4239             field->hopbyhop = 0;
4240 
4241             field->name_length = f->name_length;
4242             field->value_length = f->value_length;
4243             field->name = nxt_unit_sptr_get(&f->name);
4244             field->value = nxt_unit_sptr_get(&f->value);
4245 
4246             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4247             if (nxt_slow_path(ret != NXT_OK)) {
4248                 goto fail;
4249             }
4250 
4251             nxt_debug(task, "header%s: %*s: %*s",
4252                       (field->skip ? " skipped" : ""),
4253                       (size_t) field->name_length, field->name,
4254                       (size_t) field->value_length, field->value);
4255 
4256             if (field->skip) {
4257                 r->resp.fields->last->nelts--;
4258             }
4259         }
4260 
4261         r->status = resp->status;
4262 
4263         if (resp->piggyback_content_length != 0) {
4264             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4265             b->mem.free = b->mem.pos + resp->piggyback_content_length;
4266 
4267         } else {
4268             b->mem.pos = b->mem.free;
4269         }
4270 
4271         if (nxt_buf_mem_used_size(&b->mem) == 0) {
4272             next = b->next;
4273             b->next = NULL;
4274 
4275             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4276                                b->completion_handler, task, b, b->parent);
4277 
4278             b = next;
4279         }
4280 
4281         if (b != NULL) {
4282             nxt_buf_chain_add(&r->out, b);
4283         }
4284 
4285         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4286 
4287         if (r->websocket_handshake
4288             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4289         {
4290             app_port = req_rpc_data->app_port;
4291             if (nxt_slow_path(app_port == NULL)) {
4292                 goto fail;
4293             }
4294 
4295             nxt_thread_mutex_lock(&app->mutex);
4296 
4297             app_port->main_app_port->active_websockets++;
4298 
4299             nxt_thread_mutex_unlock(&app->mutex);
4300 
4301             nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
4302             req_rpc_data->apr_action = NXT_APR_CLOSE;
4303 
4304             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4305 
4306             r->state = &nxt_http_websocket;
4307 
4308         } else {
4309             r->state = &nxt_http_request_send_state;
4310         }
4311     }
4312 
4313     return;
4314 
4315 fail:
4316 
4317     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4318 
4319     nxt_request_rpc_data_unlink(task, req_rpc_data);
4320 }
4321 
4322 
4323 static void
nxt_router_req_headers_ack_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,nxt_request_rpc_data_t * req_rpc_data)4324 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4325     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4326 {
4327     int                 res;
4328     nxt_app_t           *app;
4329     nxt_buf_t           *b;
4330     nxt_bool_t          start_process, unlinked;
4331     nxt_port_t          *app_port, *main_app_port, *idle_port;
4332     nxt_queue_link_t    *idle_lnk;
4333     nxt_http_request_t  *r;
4334 
4335     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4336               req_rpc_data->stream,
4337               msg->port_msg.pid, msg->port_msg.reply_port);
4338 
4339     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4340                              msg->port_msg.pid);
4341 
4342     app = req_rpc_data->app;
4343     r = req_rpc_data->request;
4344 
4345     start_process = 0;
4346     unlinked = 0;
4347 
4348     nxt_thread_mutex_lock(&app->mutex);
4349 
4350     if (r->app_link.next != NULL) {
4351         nxt_queue_remove(&r->app_link);
4352         r->app_link.next = NULL;
4353 
4354         unlinked = 1;
4355     }
4356 
4357     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4358                                   msg->port_msg.reply_port);
4359     if (nxt_slow_path(app_port == NULL)) {
4360         nxt_thread_mutex_unlock(&app->mutex);
4361 
4362         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4363 
4364         if (unlinked) {
4365             nxt_mp_release(r->mem_pool);
4366         }
4367 
4368         return;
4369     }
4370 
4371     main_app_port = app_port->main_app_port;
4372 
4373     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4374         app->idle_processes--;
4375 
4376         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4377                   &app->name, main_app_port->pid, main_app_port->id,
4378                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4379 
4380         /* Check port was in 'spare_ports' using idle_start field. */
4381         if (main_app_port->idle_start == 0
4382             && app->idle_processes >= app->spare_processes)
4383         {
4384             /*
4385              * If there is a vacant space in spare ports,
4386              * move the last idle to spare_ports.
4387              */
4388             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4389 
4390             idle_lnk = nxt_queue_last(&app->idle_ports);
4391             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4392             nxt_queue_remove(idle_lnk);
4393 
4394             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4395 
4396             idle_port->idle_start = 0;
4397 
4398             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4399                       "to spare_ports",
4400                       &app->name, idle_port->pid, idle_port->id);
4401         }
4402 
4403         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4404             app->pending_processes++;
4405             start_process = 1;
4406         }
4407     }
4408 
4409     main_app_port->active_requests++;
4410 
4411     nxt_port_inc_use(app_port);
4412 
4413     nxt_thread_mutex_unlock(&app->mutex);
4414 
4415     if (unlinked) {
4416         nxt_mp_release(r->mem_pool);
4417     }
4418 
4419     if (start_process) {
4420         nxt_router_start_app_process(task, app);
4421     }
4422 
4423     nxt_port_use(task, req_rpc_data->app_port, -1);
4424 
4425     req_rpc_data->app_port = app_port;
4426 
4427     b = req_rpc_data->msg_info.buf;
4428 
4429     if (b != NULL) {
4430         /* First buffer is already sent.  Start from second. */
4431         b = b->next;
4432 
4433         req_rpc_data->msg_info.buf->next = NULL;
4434     }
4435 
4436     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4437         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4438                   req_rpc_data->msg_info.body_fd);
4439 
4440         if (req_rpc_data->msg_info.body_fd != -1) {
4441             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4442         }
4443 
4444         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4445                                     req_rpc_data->msg_info.body_fd,
4446                                     req_rpc_data->stream,
4447                                     task->thread->engine->port->id, b);
4448 
4449         if (nxt_slow_path(res != NXT_OK)) {
4450             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4451         }
4452     }
4453 
4454     if (app->timeout != 0) {
4455         r->timer.handler = nxt_router_app_timeout;
4456         r->timer_data = req_rpc_data;
4457         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4458     }
4459 }
4460 
4461 
4462 static const nxt_http_request_state_t  nxt_http_request_send_state
4463     nxt_aligned(64) =
4464 {
4465     .error_handler = nxt_http_request_error_handler,
4466 };
4467 
4468 
4469 static void
nxt_http_request_send_body(nxt_task_t * task,void * obj,void * data)4470 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4471 {
4472     nxt_buf_t           *out;
4473     nxt_http_request_t  *r;
4474 
4475     r = obj;
4476 
4477     out = r->out;
4478 
4479     if (out != NULL) {
4480         r->out = NULL;
4481         nxt_http_request_send(task, r, out);
4482     }
4483 }
4484 
4485 
4486 static void
nxt_router_response_error_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4487 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4488     void *data)
4489 {
4490     nxt_request_rpc_data_t  *req_rpc_data;
4491 
4492     req_rpc_data = data;
4493 
4494     req_rpc_data->rpc_cancel = 0;
4495 
4496     /* TODO cancel message and return if cancelled. */
4497     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4498 
4499     if (req_rpc_data->request != NULL) {
4500         nxt_http_request_error(task, req_rpc_data->request,
4501                                NXT_HTTP_SERVICE_UNAVAILABLE);
4502     }
4503 
4504     nxt_request_rpc_data_unlink(task, req_rpc_data);
4505 }
4506 
4507 
4508 static void
nxt_router_app_port_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4509 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4510     void *data)
4511 {
4512     uint32_t             n;
4513     nxt_app_t            *app;
4514     nxt_bool_t           start_process, restarted;
4515     nxt_port_t           *port;
4516     nxt_app_joint_t      *app_joint;
4517     nxt_app_joint_rpc_t  *app_joint_rpc;
4518 
4519     nxt_assert(data != NULL);
4520 
4521     app_joint_rpc = data;
4522     app_joint = app_joint_rpc->app_joint;
4523     port = msg->u.new_port;
4524 
4525     nxt_assert(app_joint != NULL);
4526     nxt_assert(port != NULL);
4527     nxt_assert(port->id == 0);
4528 
4529     app = app_joint->app;
4530 
4531     nxt_router_app_joint_use(task, app_joint, -1);
4532 
4533     if (nxt_slow_path(app == NULL)) {
4534         nxt_debug(task, "new port ready for released app, send QUIT");
4535 
4536         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4537 
4538         return;
4539     }
4540 
4541     nxt_thread_mutex_lock(&app->mutex);
4542 
4543     restarted = (app->generation != app_joint_rpc->generation);
4544 
4545     if (app_joint_rpc->proto) {
4546         nxt_assert(app->proto_port == NULL);
4547         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
4548 
4549         n = app->proto_port_requests;
4550         app->proto_port_requests = 0;
4551 
4552         if (nxt_slow_path(restarted)) {
4553             nxt_thread_mutex_unlock(&app->mutex);
4554 
4555             nxt_debug(task, "proto port ready for restarted app, send QUIT");
4556 
4557             nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4558                                   NULL);
4559 
4560         } else {
4561             port->app = app;
4562             app->proto_port = port;
4563 
4564             nxt_thread_mutex_unlock(&app->mutex);
4565 
4566             nxt_port_use(task, port, 1);
4567         }
4568 
4569         port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER];
4570 
4571         while (n > 0) {
4572             nxt_router_app_use(task, app, 1);
4573 
4574             nxt_router_start_app_process_handler(task, port, app);
4575 
4576             n--;
4577         }
4578 
4579         return;
4580     }
4581 
4582     nxt_assert(port->type == NXT_PROCESS_APP);
4583     nxt_assert(app->pending_processes != 0);
4584 
4585     app->pending_processes--;
4586 
4587     if (nxt_slow_path(restarted)) {
4588         nxt_debug(task, "new port ready for restarted app, send QUIT");
4589 
4590         start_process = !task->thread->engine->shutdown
4591                         && nxt_router_app_can_start(app)
4592                         && nxt_router_app_need_start(app);
4593 
4594         if (start_process) {
4595             app->pending_processes++;
4596         }
4597 
4598         nxt_thread_mutex_unlock(&app->mutex);
4599 
4600         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4601 
4602         if (start_process) {
4603             nxt_router_start_app_process(task, app);
4604         }
4605 
4606         return;
4607     }
4608 
4609     port->app = app;
4610     port->main_app_port = port;
4611 
4612     app->processes++;
4613     nxt_port_hash_add(&app->port_hash, port);
4614     app->port_hash_count++;
4615 
4616     nxt_thread_mutex_unlock(&app->mutex);
4617 
4618     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4619               &app->name, port->pid, app->processes, app->pending_processes);
4620 
4621     nxt_router_app_shared_port_send(task, port);
4622 
4623     nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
4624 }
4625 
4626 
4627 static nxt_int_t
nxt_router_app_shared_port_send(nxt_task_t * task,nxt_port_t * app_port)4628 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
4629 {
4630     nxt_buf_t                *b;
4631     nxt_port_t               *port;
4632     nxt_port_msg_new_port_t  *msg;
4633 
4634     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
4635                              sizeof(nxt_port_data_t));
4636     if (nxt_slow_path(b == NULL)) {
4637         return NXT_ERROR;
4638     }
4639 
4640     port = app_port->app->shared_port;
4641 
4642     nxt_debug(task, "send port %FD to process %PI",
4643               port->pair[0], app_port->pid);
4644 
4645     b->mem.free += sizeof(nxt_port_msg_new_port_t);
4646     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
4647 
4648     msg->id = port->id;
4649     msg->pid = port->pid;
4650     msg->max_size = port->max_size;
4651     msg->max_share = port->max_share;
4652     msg->type = port->type;
4653 
4654     return nxt_port_socket_write2(task, app_port,
4655                                   NXT_PORT_MSG_NEW_PORT,
4656                                   port->pair[0], port->queue_fd,
4657                                   0, 0, b);
4658 }
4659 
4660 
4661 static void
nxt_router_app_port_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4662 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4663     void *data)
4664 {
4665     nxt_app_t            *app;
4666     nxt_app_joint_t      *app_joint;
4667     nxt_queue_link_t     *link;
4668     nxt_http_request_t   *r;
4669     nxt_app_joint_rpc_t  *app_joint_rpc;
4670 
4671     nxt_assert(data != NULL);
4672 
4673     app_joint_rpc = data;
4674     app_joint = app_joint_rpc->app_joint;
4675 
4676     nxt_assert(app_joint != NULL);
4677 
4678     app = app_joint->app;
4679 
4680     nxt_router_app_joint_use(task, app_joint, -1);
4681 
4682     if (nxt_slow_path(app == NULL)) {
4683         nxt_debug(task, "start error for released app");
4684 
4685         return;
4686     }
4687 
4688     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4689 
4690     link = NULL;
4691 
4692     nxt_thread_mutex_lock(&app->mutex);
4693 
4694     nxt_assert(app->pending_processes != 0);
4695 
4696     app->pending_processes--;
4697 
4698     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4699         link = nxt_queue_first(&app->ack_waiting_req);
4700 
4701         nxt_queue_remove(link);
4702         link->next = NULL;
4703     }
4704 
4705     nxt_thread_mutex_unlock(&app->mutex);
4706 
4707     while (link != NULL) {
4708         r = nxt_container_of(link, nxt_http_request_t, app_link);
4709 
4710         nxt_event_engine_post(r->engine, &r->err_work);
4711 
4712         link = NULL;
4713 
4714         nxt_thread_mutex_lock(&app->mutex);
4715 
4716         if (app->processes == 0 && app->pending_processes == 0
4717             && !nxt_queue_is_empty(&app->ack_waiting_req))
4718         {
4719             link = nxt_queue_first(&app->ack_waiting_req);
4720 
4721             nxt_queue_remove(link);
4722             link->next = NULL;
4723         }
4724 
4725         nxt_thread_mutex_unlock(&app->mutex);
4726     }
4727 }
4728 
4729 
4730 nxt_inline nxt_port_t *
nxt_router_app_get_port_for_quit(nxt_task_t * task,nxt_app_t * app)4731 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4732 {
4733     nxt_port_t  *port;
4734 
4735     port = NULL;
4736 
4737     nxt_thread_mutex_lock(&app->mutex);
4738 
4739     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4740 
4741         /* Caller is responsible to decrease port use count. */
4742         nxt_queue_chk_remove(&port->app_link);
4743 
4744         if (nxt_queue_chk_remove(&port->idle_link)) {
4745             app->idle_processes--;
4746 
4747             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4748                       &app->name, port->pid, port->id,
4749                       (port->idle_start ? "idle_ports" : "spare_ports"));
4750         }
4751 
4752         nxt_port_hash_remove(&app->port_hash, port);
4753         app->port_hash_count--;
4754 
4755         port->app = NULL;
4756         app->processes--;
4757 
4758         break;
4759 
4760     } nxt_queue_loop;
4761 
4762     nxt_thread_mutex_unlock(&app->mutex);
4763 
4764     return port;
4765 }
4766 
4767 
4768 static void
nxt_router_app_use(nxt_task_t * task,nxt_app_t * app,int i)4769 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4770 {
4771     int  c;
4772 
4773     c = nxt_atomic_fetch_add(&app->use_count, i);
4774 
4775     if (i < 0 && c == -i) {
4776 
4777         if (task->thread->engine != app->engine) {
4778             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4779 
4780         } else {
4781             nxt_router_free_app(task, app->joint, NULL);
4782         }
4783     }
4784 }
4785 
4786 
4787 static void
nxt_router_app_unlink(nxt_task_t * task,nxt_app_t * app)4788 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4789 {
4790     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4791 
4792     nxt_queue_remove(&app->link);
4793 
4794     nxt_router_app_use(task, app, -1);
4795 }
4796 
4797 
4798 static void
nxt_router_app_port_release(nxt_task_t * task,nxt_app_t * app,nxt_port_t * port,nxt_apr_action_t action)4799 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4800     nxt_apr_action_t action)
4801 {
4802     int         inc_use;
4803     uint32_t    got_response, dec_requests;
4804     nxt_bool_t  adjust_idle_timer;
4805     nxt_port_t  *main_app_port;
4806 
4807     nxt_assert(port != NULL);
4808 
4809     inc_use = 0;
4810     got_response = 0;
4811     dec_requests = 0;
4812 
4813     switch (action) {
4814     case NXT_APR_NEW_PORT:
4815         break;
4816     case NXT_APR_REQUEST_FAILED:
4817         dec_requests = 1;
4818         inc_use = -1;
4819         break;
4820     case NXT_APR_GOT_RESPONSE:
4821         got_response = 1;
4822         inc_use = -1;
4823         break;
4824     case NXT_APR_UPGRADE:
4825         got_response = 1;
4826         break;
4827     case NXT_APR_CLOSE:
4828         inc_use = -1;
4829         break;
4830     }
4831 
4832     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4833               port->pid, port->id,
4834               (int) inc_use, (int) got_response);
4835 
4836     if (port->id == NXT_SHARED_PORT_ID) {
4837         nxt_thread_mutex_lock(&app->mutex);
4838 
4839         app->active_requests -= got_response + dec_requests;
4840 
4841         nxt_thread_mutex_unlock(&app->mutex);
4842 
4843         goto adjust_use;
4844     }
4845 
4846     main_app_port = port->main_app_port;
4847 
4848     nxt_thread_mutex_lock(&app->mutex);
4849 
4850     main_app_port->active_requests -= got_response + dec_requests;
4851     app->active_requests -= got_response + dec_requests;
4852 
4853     if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4854         nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4855 
4856         nxt_port_inc_use(main_app_port);
4857     }
4858 
4859     adjust_idle_timer = 0;
4860 
4861     if (main_app_port->pair[1] != -1
4862         && main_app_port->active_requests == 0
4863         && main_app_port->active_websockets == 0
4864         && main_app_port->idle_link.next == NULL)
4865     {
4866         if (app->idle_processes == app->spare_processes
4867             && app->adjust_idle_work.data == NULL)
4868         {
4869             adjust_idle_timer = 1;
4870             app->adjust_idle_work.data = app;
4871             app->adjust_idle_work.next = NULL;
4872         }
4873 
4874         if (app->idle_processes < app->spare_processes) {
4875             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4876 
4877             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4878                       &app->name, main_app_port->pid, main_app_port->id);
4879         } else {
4880             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4881 
4882             main_app_port->idle_start = task->thread->engine->timers.now;
4883 
4884             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4885                       &app->name, main_app_port->pid, main_app_port->id);
4886         }
4887 
4888         app->idle_processes++;
4889     }
4890 
4891     nxt_thread_mutex_unlock(&app->mutex);
4892 
4893     if (adjust_idle_timer) {
4894         nxt_router_app_use(task, app, 1);
4895         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4896     }
4897 
4898     /* ? */
4899     if (main_app_port->pair[1] == -1) {
4900         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4901                   &app->name, app, main_app_port, main_app_port->pid);
4902 
4903         goto adjust_use;
4904     }
4905 
4906     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4907               &app->name, app);
4908 
4909 adjust_use:
4910 
4911     nxt_port_use(task, port, inc_use);
4912 }
4913 
4914 
4915 void
nxt_router_app_port_close(nxt_task_t * task,nxt_port_t * port)4916 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4917 {
4918     nxt_app_t         *app;
4919     nxt_bool_t        unchain, start_process;
4920     nxt_port_t        *idle_port;
4921     nxt_queue_link_t  *idle_lnk;
4922 
4923     app = port->app;
4924 
4925     nxt_assert(app != NULL);
4926 
4927     nxt_thread_mutex_lock(&app->mutex);
4928 
4929     if (port == app->proto_port) {
4930         app->proto_port = NULL;
4931         port->app = NULL;
4932 
4933         nxt_thread_mutex_unlock(&app->mutex);
4934 
4935         nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name,
4936                   port->pid);
4937 
4938         nxt_port_use(task, port, -1);
4939 
4940         return;
4941     }
4942 
4943     nxt_port_hash_remove(&app->port_hash, port);
4944     app->port_hash_count--;
4945 
4946     if (port->id != 0) {
4947         nxt_thread_mutex_unlock(&app->mutex);
4948 
4949         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4950                   port->pid, port->id);
4951 
4952         return;
4953     }
4954 
4955     unchain = nxt_queue_chk_remove(&port->app_link);
4956 
4957     if (nxt_queue_chk_remove(&port->idle_link)) {
4958         app->idle_processes--;
4959 
4960         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4961                   &app->name, port->pid, port->id,
4962                   (port->idle_start ? "idle_ports" : "spare_ports"));
4963 
4964         if (port->idle_start == 0
4965             && app->idle_processes >= app->spare_processes)
4966         {
4967             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4968 
4969             idle_lnk = nxt_queue_last(&app->idle_ports);
4970             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4971             nxt_queue_remove(idle_lnk);
4972 
4973             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4974 
4975             idle_port->idle_start = 0;
4976 
4977             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4978                       "to spare_ports",
4979                       &app->name, idle_port->pid, idle_port->id);
4980         }
4981     }
4982 
4983     app->processes--;
4984 
4985     start_process = !task->thread->engine->shutdown
4986                     && nxt_router_app_can_start(app)
4987                     && nxt_router_app_need_start(app);
4988 
4989     if (start_process) {
4990         app->pending_processes++;
4991     }
4992 
4993     nxt_thread_mutex_unlock(&app->mutex);
4994 
4995     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4996 
4997     if (unchain) {
4998         nxt_port_use(task, port, -1);
4999     }
5000 
5001     if (start_process) {
5002         nxt_router_start_app_process(task, app);
5003     }
5004 }
5005 
5006 
5007 static void
nxt_router_adjust_idle_timer(nxt_task_t * task,void * obj,void * data)5008 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
5009 {
5010     nxt_app_t           *app;
5011     nxt_bool_t          queued;
5012     nxt_port_t          *port;
5013     nxt_msec_t          timeout, threshold;
5014     nxt_queue_link_t    *lnk;
5015     nxt_event_engine_t  *engine;
5016 
5017     app = obj;
5018     queued = (data == app);
5019 
5020     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
5021               &app->name, queued);
5022 
5023     engine = task->thread->engine;
5024 
5025     nxt_assert(app->engine == engine);
5026 
5027     threshold = engine->timers.now + app->joint->idle_timer.bias;
5028     timeout = 0;
5029 
5030     nxt_thread_mutex_lock(&app->mutex);
5031 
5032     if (queued) {
5033         app->adjust_idle_work.data = NULL;
5034     }
5035 
5036     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
5037               &app->name,
5038               (int) app->idle_processes, (int) app->spare_processes);
5039 
5040     while (app->idle_processes > app->spare_processes) {
5041 
5042         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
5043 
5044         lnk = nxt_queue_first(&app->idle_ports);
5045         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
5046 
5047         timeout = port->idle_start + app->idle_timeout;
5048 
5049         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
5050                   &app->name, port->pid,
5051                   port->idle_start, timeout, threshold);
5052 
5053         if (timeout > threshold) {
5054             break;
5055         }
5056 
5057         nxt_queue_remove(lnk);
5058         lnk->next = NULL;
5059 
5060         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
5061                   &app->name, port->pid, port->id);
5062 
5063         nxt_queue_chk_remove(&port->app_link);
5064 
5065         nxt_port_hash_remove(&app->port_hash, port);
5066         app->port_hash_count--;
5067 
5068         app->idle_processes--;
5069         app->processes--;
5070         port->app = NULL;
5071 
5072         nxt_thread_mutex_unlock(&app->mutex);
5073 
5074         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
5075                   &app->name, port->pid);
5076 
5077         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
5078 
5079         nxt_port_use(task, port, -1);
5080 
5081         nxt_thread_mutex_lock(&app->mutex);
5082     }
5083 
5084     nxt_thread_mutex_unlock(&app->mutex);
5085 
5086     if (timeout > threshold) {
5087         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
5088 
5089     } else {
5090         nxt_timer_disable(engine, &app->joint->idle_timer);
5091     }
5092 
5093     if (queued) {
5094         nxt_router_app_use(task, app, -1);
5095     }
5096 }
5097 
5098 
5099 static void
nxt_router_app_idle_timeout(nxt_task_t * task,void * obj,void * data)5100 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
5101 {
5102     nxt_timer_t      *timer;
5103     nxt_app_joint_t  *app_joint;
5104 
5105     timer = obj;
5106     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5107 
5108     if (nxt_fast_path(app_joint->app != NULL)) {
5109         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
5110     }
5111 }
5112 
5113 
5114 static void
nxt_router_app_joint_release_handler(nxt_task_t * task,void * obj,void * data)5115 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
5116 {
5117     nxt_timer_t      *timer;
5118     nxt_app_joint_t  *app_joint;
5119 
5120     timer = obj;
5121     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5122 
5123     nxt_router_app_joint_use(task, app_joint, -1);
5124 }
5125 
5126 
5127 static void
nxt_router_free_app(nxt_task_t * task,void * obj,void * data)5128 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
5129 {
5130     nxt_app_t        *app;
5131     nxt_port_t       *port, *proto_port;
5132     nxt_app_joint_t  *app_joint;
5133 
5134     app_joint = obj;
5135     app = app_joint->app;
5136 
5137     for ( ;; ) {
5138         port = nxt_router_app_get_port_for_quit(task, app);
5139         if (port == NULL) {
5140             break;
5141         }
5142 
5143         nxt_port_use(task, port, -1);
5144     }
5145 
5146     nxt_thread_mutex_lock(&app->mutex);
5147 
5148     for ( ;; ) {
5149         port = nxt_port_hash_retrieve(&app->port_hash);
5150         if (port == NULL) {
5151             break;
5152         }
5153 
5154         app->port_hash_count--;
5155 
5156         port->app = NULL;
5157 
5158         nxt_port_close(task, port);
5159 
5160         nxt_port_use(task, port, -1);
5161     }
5162 
5163     proto_port = app->proto_port;
5164 
5165     if (proto_port != NULL) {
5166         nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
5167                   proto_port->pid);
5168 
5169         app->proto_port = NULL;
5170         proto_port->app = NULL;
5171     }
5172 
5173     nxt_thread_mutex_unlock(&app->mutex);
5174 
5175     if (proto_port != NULL) {
5176         nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
5177                               -1, 0, 0, NULL);
5178 
5179         nxt_port_close(task, proto_port);
5180 
5181         nxt_port_use(task, proto_port, -1);
5182     }
5183 
5184     nxt_assert(app->proto_port == NULL);
5185     nxt_assert(app->processes == 0);
5186     nxt_assert(app->active_requests == 0);
5187     nxt_assert(app->port_hash_count == 0);
5188     nxt_assert(app->idle_processes == 0);
5189     nxt_assert(nxt_queue_is_empty(&app->ports));
5190     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5191     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5192 
5193     nxt_port_mmaps_destroy(&app->outgoing, 1);
5194 
5195     nxt_thread_mutex_destroy(&app->outgoing.mutex);
5196 
5197     if (app->shared_port != NULL) {
5198         app->shared_port->app = NULL;
5199         nxt_port_close(task, app->shared_port);
5200         nxt_port_use(task, app->shared_port, -1);
5201 
5202         app->shared_port = NULL;
5203     }
5204 
5205     nxt_thread_mutex_destroy(&app->mutex);
5206     nxt_mp_destroy(app->mem_pool);
5207 
5208     app_joint->app = NULL;
5209 
5210     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5211         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5212         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5213 
5214     } else {
5215         nxt_router_app_joint_use(task, app_joint, -1);
5216     }
5217 }
5218 
5219 
5220 static void
nxt_router_app_port_get(nxt_task_t * task,nxt_app_t * app,nxt_request_rpc_data_t * req_rpc_data)5221 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5222     nxt_request_rpc_data_t *req_rpc_data)
5223 {
5224     nxt_bool_t          start_process;
5225     nxt_port_t          *port;
5226     nxt_http_request_t  *r;
5227 
5228     start_process = 0;
5229 
5230     nxt_thread_mutex_lock(&app->mutex);
5231 
5232     port = app->shared_port;
5233     nxt_port_inc_use(port);
5234 
5235     app->active_requests++;
5236 
5237     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5238         app->pending_processes++;
5239         start_process = 1;
5240     }
5241 
5242     r = req_rpc_data->request;
5243 
5244     /*
5245      * Put request into application-wide list to be able to cancel request
5246      * if something goes wrong with application processes.
5247      */
5248     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5249 
5250     nxt_thread_mutex_unlock(&app->mutex);
5251 
5252     /*
5253      * Retain request memory pool while request is linked in ack_waiting_req
5254      * to guarantee request structure memory is accessble.
5255      */
5256     nxt_mp_retain(r->mem_pool);
5257 
5258     req_rpc_data->app_port = port;
5259     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5260 
5261     if (start_process) {
5262         nxt_router_start_app_process(task, app);
5263     }
5264 }
5265 
5266 
5267 void
nxt_router_process_http_request(nxt_task_t * task,nxt_http_request_t * r,nxt_http_action_t * action)5268 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5269     nxt_http_action_t *action)
5270 {
5271     nxt_event_engine_t      *engine;
5272     nxt_http_app_conf_t     *conf;
5273     nxt_request_rpc_data_t  *req_rpc_data;
5274 
5275     conf = action->u.conf;
5276     engine = task->thread->engine;
5277 
5278     r->app_target = conf->target;
5279 
5280     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5281                                           nxt_router_response_ready_handler,
5282                                           nxt_router_response_error_handler,
5283                                           sizeof(nxt_request_rpc_data_t));
5284     if (nxt_slow_path(req_rpc_data == NULL)) {
5285         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5286         return;
5287     }
5288 
5289     /*
5290      * At this point we have request req_rpc_data allocated and registered
5291      * in port handlers.  Need to fixup request memory pool.  Counterpart
5292      * release will be called via following call chain:
5293      *    nxt_request_rpc_data_unlink() ->
5294      *        nxt_router_http_request_release_post() ->
5295      *            nxt_router_http_request_release()
5296      */
5297     nxt_mp_retain(r->mem_pool);
5298 
5299     r->timer.task = &engine->task;
5300     r->timer.work_queue = &engine->fast_work_queue;
5301     r->timer.log = engine->task.log;
5302     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5303 
5304     r->engine = engine;
5305     r->err_work.handler = nxt_router_http_request_error;
5306     r->err_work.task = task;
5307     r->err_work.obj = r;
5308 
5309     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5310     req_rpc_data->app = conf->app;
5311     req_rpc_data->msg_info.body_fd = -1;
5312     req_rpc_data->rpc_cancel = 1;
5313 
5314     nxt_router_app_use(task, conf->app, 1);
5315 
5316     req_rpc_data->request = r;
5317     r->req_rpc_data = req_rpc_data;
5318 
5319     if (r->last != NULL) {
5320         r->last->completion_handler = nxt_router_http_request_done;
5321     }
5322 
5323     nxt_router_app_port_get(task, conf->app, req_rpc_data);
5324     nxt_router_app_prepare_request(task, req_rpc_data);
5325 }
5326 
5327 
5328 static void
nxt_router_http_request_error(nxt_task_t * task,void * obj,void * data)5329 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5330 {
5331     nxt_http_request_t  *r;
5332 
5333     r = obj;
5334 
5335     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5336 
5337     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5338 
5339     if (r->req_rpc_data != NULL) {
5340         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5341     }
5342 
5343     nxt_mp_release(r->mem_pool);
5344 }
5345 
5346 
5347 static void
nxt_router_http_request_done(nxt_task_t * task,void * obj,void * data)5348 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5349 {
5350     nxt_http_request_t  *r;
5351 
5352     r = data;
5353 
5354     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5355 
5356     if (r->req_rpc_data != NULL) {
5357         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5358     }
5359 
5360     nxt_http_request_close_handler(task, r, r->proto.any);
5361 }
5362 
5363 
5364 static void
nxt_router_app_prepare_request(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)5365 nxt_router_app_prepare_request(nxt_task_t *task,
5366     nxt_request_rpc_data_t *req_rpc_data)
5367 {
5368     nxt_app_t         *app;
5369     nxt_buf_t         *buf, *body;
5370     nxt_int_t         res;
5371     nxt_port_t        *port, *reply_port;
5372 
5373     int                   notify;
5374     struct {
5375         nxt_port_msg_t       pm;
5376         nxt_port_mmap_msg_t  mm;
5377     } msg;
5378 
5379 
5380     app = req_rpc_data->app;
5381 
5382     nxt_assert(app != NULL);
5383 
5384     port = req_rpc_data->app_port;
5385 
5386     nxt_assert(port != NULL);
5387     nxt_assert(port->queue != NULL);
5388 
5389     reply_port = task->thread->engine->port;
5390 
5391     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5392                                  nxt_app_msg_prefix[app->type]);
5393     if (nxt_slow_path(buf == NULL)) {
5394         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5395                   req_rpc_data->stream, &app->name);
5396 
5397         nxt_http_request_error(task, req_rpc_data->request,
5398                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5399 
5400         return;
5401     }
5402 
5403     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5404                     nxt_buf_used_size(buf),
5405                     port->socket.fd);
5406 
5407     req_rpc_data->msg_info.buf = buf;
5408 
5409     body = req_rpc_data->request->body;
5410 
5411     if (body != NULL && nxt_buf_is_file(body)) {
5412         req_rpc_data->msg_info.body_fd = body->file->fd;
5413 
5414         body->file->fd = -1;
5415 
5416     } else {
5417         req_rpc_data->msg_info.body_fd = -1;
5418     }
5419 
5420     msg.pm.stream = req_rpc_data->stream;
5421     msg.pm.pid = reply_port->pid;
5422     msg.pm.reply_port = reply_port->id;
5423     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5424     msg.pm.last = 0;
5425     msg.pm.mmap = 1;
5426     msg.pm.nf = 0;
5427     msg.pm.mf = 0;
5428     msg.pm.tracking = 0;
5429 
5430     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5431     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5432 
5433     msg.mm.mmap_id = hdr->id;
5434     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5435     msg.mm.size = nxt_buf_used_size(buf);
5436 
5437     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5438                              req_rpc_data->stream, &notify,
5439                              &req_rpc_data->msg_info.tracking_cookie);
5440     if (nxt_fast_path(res == NXT_OK)) {
5441         if (notify != 0) {
5442             (void) nxt_port_socket_write(task, port,
5443                                          NXT_PORT_MSG_READ_QUEUE,
5444                                          -1, req_rpc_data->stream,
5445                                          reply_port->id, NULL);
5446 
5447         } else {
5448             nxt_debug(task, "queue is not empty");
5449         }
5450 
5451         buf->is_port_mmap_sent = 1;
5452         buf->mem.pos = buf->mem.free;
5453 
5454     } else {
5455         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5456                   req_rpc_data->stream, &app->name);
5457 
5458         nxt_http_request_error(task, req_rpc_data->request,
5459                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5460     }
5461 }
5462 
5463 
5464 struct nxt_fields_iter_s {
5465     nxt_list_part_t   *part;
5466     nxt_http_field_t  *field;
5467 };
5468 
5469 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5470 
5471 
5472 static nxt_http_field_t *
nxt_fields_part_first(nxt_list_part_t * part,nxt_fields_iter_t * i)5473 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5474 {
5475     if (part == NULL) {
5476         return NULL;
5477     }
5478 
5479     while (part->nelts == 0) {
5480         part = part->next;
5481         if (part == NULL) {
5482             return NULL;
5483         }
5484     }
5485 
5486     i->part = part;
5487     i->field = nxt_list_data(i->part);
5488 
5489     return i->field;
5490 }
5491 
5492 
5493 static nxt_http_field_t *
nxt_fields_first(nxt_list_t * fields,nxt_fields_iter_t * i)5494 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5495 {
5496     return nxt_fields_part_first(nxt_list_part(fields), i);
5497 }
5498 
5499 
5500 static nxt_http_field_t *
nxt_fields_next(nxt_fields_iter_t * i)5501 nxt_fields_next(nxt_fields_iter_t *i)
5502 {
5503     nxt_http_field_t  *end = nxt_list_data(i->part);
5504 
5505     end += i->part->nelts;
5506     i->field++;
5507 
5508     if (i->field < end) {
5509         return i->field;
5510     }
5511 
5512     return nxt_fields_part_first(i->part->next, i);
5513 }
5514 
5515 
5516 static nxt_buf_t *
nxt_router_prepare_msg(nxt_task_t * task,nxt_http_request_t * r,nxt_app_t * app,const nxt_str_t * prefix)5517 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5518     nxt_app_t *app, const nxt_str_t *prefix)
5519 {
5520     void                *target_pos, *query_pos;
5521     u_char              *pos, *end, *p, c;
5522     size_t              fields_count, req_size, size, free_size;
5523     size_t              copy_size;
5524     nxt_off_t           content_length;
5525     nxt_buf_t           *b, *buf, *out, **tail;
5526     nxt_http_field_t    *field, *dup;
5527     nxt_unit_field_t    *dst_field;
5528     nxt_fields_iter_t   iter, dup_iter;
5529     nxt_unit_request_t  *req;
5530 
5531     req_size = sizeof(nxt_unit_request_t)
5532                + r->method->length + 1
5533                + r->version.length + 1
5534                + r->remote->length + 1
5535                + r->local->length + 1
5536                + r->server_name.length + 1
5537                + r->target.length + 1
5538                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5539 
5540     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5541     fields_count = 0;
5542 
5543     nxt_list_each(field, r->fields) {
5544         fields_count++;
5545 
5546         req_size += field->name_length + prefix->length + 1
5547                     + field->value_length + 1;
5548     } nxt_list_loop;
5549 
5550     req_size += fields_count * sizeof(nxt_unit_field_t);
5551 
5552     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5553         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5554                   (int) req_size);
5555 
5556         return NULL;
5557     }
5558 
5559     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5560               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5561     if (nxt_slow_path(out == NULL)) {
5562         return NULL;
5563     }
5564 
5565     req = (nxt_unit_request_t *) out->mem.free;
5566     out->mem.free += req_size;
5567 
5568     req->app_target = r->app_target;
5569 
5570     req->content_length = content_length;
5571 
5572     p = (u_char *) (req->fields + fields_count);
5573 
5574     nxt_debug(task, "fields_count=%d", (int) fields_count);
5575 
5576     req->method_length = r->method->length;
5577     nxt_unit_sptr_set(&req->method, p);
5578     p = nxt_cpymem(p, r->method->start, r->method->length);
5579     *p++ = '\0';
5580 
5581     req->version_length = r->version.length;
5582     nxt_unit_sptr_set(&req->version, p);
5583     p = nxt_cpymem(p, r->version.start, r->version.length);
5584     *p++ = '\0';
5585 
5586     req->remote_length = r->remote->address_length;
5587     nxt_unit_sptr_set(&req->remote, p);
5588     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5589                    r->remote->address_length);
5590     *p++ = '\0';
5591 
5592     req->local_length = r->local->address_length;
5593     nxt_unit_sptr_set(&req->local, p);
5594     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5595     *p++ = '\0';
5596 
5597     req->tls = (r->tls != NULL);
5598     req->websocket_handshake = r->websocket_handshake;
5599 
5600     req->server_name_length = r->server_name.length;
5601     nxt_unit_sptr_set(&req->server_name, p);
5602     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5603     *p++ = '\0';
5604 
5605     target_pos = p;
5606     req->target_length = (uint32_t) r->target.length;
5607     nxt_unit_sptr_set(&req->target, p);
5608     p = nxt_cpymem(p, r->target.start, r->target.length);
5609     *p++ = '\0';
5610 
5611     req->path_length = (uint32_t) r->path->length;
5612     if (r->path->start == r->target.start) {
5613         nxt_unit_sptr_set(&req->path, target_pos);
5614 
5615     } else {
5616         nxt_unit_sptr_set(&req->path, p);
5617         p = nxt_cpymem(p, r->path->start, r->path->length);
5618         *p++ = '\0';
5619     }
5620 
5621     req->query_length = (uint32_t) r->args->length;
5622     if (r->args->start != NULL) {
5623         query_pos = nxt_pointer_to(target_pos,
5624                                    r->args->start - r->target.start);
5625 
5626         nxt_unit_sptr_set(&req->query, query_pos);
5627 
5628     } else {
5629         req->query.offset = 0;
5630     }
5631 
5632     req->content_length_field = NXT_UNIT_NONE_FIELD;
5633     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5634     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5635     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5636 
5637     dst_field = req->fields;
5638 
5639     for (field = nxt_fields_first(r->fields, &iter);
5640          field != NULL;
5641          field = nxt_fields_next(&iter))
5642     {
5643         if (field->skip) {
5644             continue;
5645         }
5646 
5647         dst_field->hash = field->hash;
5648         dst_field->skip = 0;
5649         dst_field->name_length = field->name_length + prefix->length;
5650         dst_field->value_length = field->value_length;
5651 
5652         if (field == r->content_length) {
5653             req->content_length_field = dst_field - req->fields;
5654 
5655         } else if (field == r->content_type) {
5656             req->content_type_field = dst_field - req->fields;
5657 
5658         } else if (field == r->cookie) {
5659             req->cookie_field = dst_field - req->fields;
5660 
5661         } else if (field == r->authorization) {
5662             req->authorization_field = dst_field - req->fields;
5663         }
5664 
5665         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5666                   (int) field->hash, (int) field->skip,
5667                   (int) field->name_length, field->name,
5668                   (int) field->value_length, field->value);
5669 
5670         if (prefix->length != 0) {
5671             nxt_unit_sptr_set(&dst_field->name, p);
5672             p = nxt_cpymem(p, prefix->start, prefix->length);
5673 
5674             end = field->name + field->name_length;
5675             for (pos = field->name; pos < end; pos++) {
5676                 c = *pos;
5677 
5678                 if (c >= 'a' && c <= 'z') {
5679                     *p++ = (c & ~0x20);
5680                     continue;
5681                 }
5682 
5683                 if (c == '-') {
5684                     *p++ = '_';
5685                     continue;
5686                 }
5687 
5688                 *p++ = c;
5689             }
5690 
5691         } else {
5692             nxt_unit_sptr_set(&dst_field->name, p);
5693             p = nxt_cpymem(p, field->name, field->name_length);
5694         }
5695 
5696         *p++ = '\0';
5697 
5698         nxt_unit_sptr_set(&dst_field->value, p);
5699         p = nxt_cpymem(p, field->value, field->value_length);
5700 
5701         if (prefix->length != 0) {
5702             dup_iter = iter;
5703 
5704             for (dup = nxt_fields_next(&dup_iter);
5705                  dup != NULL;
5706                  dup = nxt_fields_next(&dup_iter))
5707             {
5708                 if (dup->name_length != field->name_length
5709                     || dup->skip
5710                     || dup->hash != field->hash
5711                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5712                 {
5713                     continue;
5714                 }
5715 
5716                 p = nxt_cpymem(p, ", ", 2);
5717                 p = nxt_cpymem(p, dup->value, dup->value_length);
5718 
5719                 dst_field->value_length += 2 + dup->value_length;
5720 
5721                 dup->skip = 1;
5722             }
5723         }
5724 
5725         *p++ = '\0';
5726 
5727         dst_field++;
5728     }
5729 
5730     req->fields_count = (uint32_t) (dst_field - req->fields);
5731 
5732     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5733 
5734     buf = out;
5735     tail = &buf->next;
5736 
5737     for (b = r->body; b != NULL; b = b->next) {
5738         size = nxt_buf_mem_used_size(&b->mem);
5739         pos = b->mem.pos;
5740 
5741         while (size > 0) {
5742             if (buf == NULL) {
5743                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5744 
5745                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5746                 if (nxt_slow_path(buf == NULL)) {
5747                     while (out != NULL) {
5748                         buf = out->next;
5749                         out->next = NULL;
5750                         out->completion_handler(task, out, out->parent);
5751                         out = buf;
5752                     }
5753                     return NULL;
5754                 }
5755 
5756                 *tail = buf;
5757                 tail = &buf->next;
5758 
5759             } else {
5760                 free_size = nxt_buf_mem_free_size(&buf->mem);
5761                 if (free_size < size
5762                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5763                        == NXT_OK)
5764                 {
5765                     free_size = nxt_buf_mem_free_size(&buf->mem);
5766                 }
5767             }
5768 
5769             if (free_size > 0) {
5770                 copy_size = nxt_min(free_size, size);
5771 
5772                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5773 
5774                 size -= copy_size;
5775                 pos += copy_size;
5776 
5777                 if (size == 0) {
5778                     break;
5779                 }
5780             }
5781 
5782             buf = NULL;
5783         }
5784     }
5785 
5786     return out;
5787 }
5788 
5789 
5790 static void
nxt_router_app_timeout(nxt_task_t * task,void * obj,void * data)5791 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5792 {
5793     nxt_timer_t              *timer;
5794     nxt_http_request_t       *r;
5795     nxt_request_rpc_data_t   *req_rpc_data;
5796 
5797     timer = obj;
5798 
5799     nxt_debug(task, "router app timeout");
5800 
5801     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5802     req_rpc_data = r->timer_data;
5803 
5804     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5805 
5806     nxt_request_rpc_data_unlink(task, req_rpc_data);
5807 }
5808 
5809 
5810 static void
nxt_router_http_request_release_post(nxt_task_t * task,nxt_http_request_t * r)5811 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5812 {
5813     r->timer.handler = nxt_router_http_request_release;
5814     nxt_timer_add(task->thread->engine, &r->timer, 0);
5815 }
5816 
5817 
5818 static void
nxt_router_http_request_release(nxt_task_t * task,void * obj,void * data)5819 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5820 {
5821     nxt_http_request_t  *r;
5822 
5823     nxt_debug(task, "http request pool release");
5824 
5825     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5826 
5827     nxt_mp_release(r->mem_pool);
5828 }
5829 
5830 
5831 static void
nxt_router_oosm_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5832 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5833 {
5834     size_t                   mi;
5835     uint32_t                 i;
5836     nxt_bool_t               ack;
5837     nxt_process_t            *process;
5838     nxt_free_map_t           *m;
5839     nxt_port_mmap_handler_t  *mmap_handler;
5840 
5841     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5842 
5843     process = nxt_runtime_process_find(task->thread->runtime,
5844                                        msg->port_msg.pid);
5845     if (nxt_slow_path(process == NULL)) {
5846         return;
5847     }
5848 
5849     ack = 0;
5850 
5851     /*
5852      * To mitigate possible racing condition (when OOSM message received
5853      * after some of the memory was already freed), need to try to find
5854      * first free segment in shared memory and send ACK if found.
5855      */
5856 
5857     nxt_thread_mutex_lock(&process->incoming.mutex);
5858 
5859     for (i = 0; i < process->incoming.size; i++) {
5860         mmap_handler = process->incoming.elts[i].mmap_handler;
5861 
5862         if (nxt_slow_path(mmap_handler == NULL)) {
5863             continue;
5864         }
5865 
5866         m = mmap_handler->hdr->free_map;
5867 
5868         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5869             if (m[mi] != 0) {
5870                 ack = 1;
5871 
5872                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5873                           i, mi, m[mi]);
5874 
5875                 break;
5876             }
5877         }
5878     }
5879 
5880     nxt_thread_mutex_unlock(&process->incoming.mutex);
5881 
5882     if (ack) {
5883         nxt_process_broadcast_shm_ack(task, process);
5884     }
5885 }
5886 
5887 
5888 static void
nxt_router_get_mmap_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5889 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5890 {
5891     nxt_fd_t                 fd;
5892     nxt_port_t               *port;
5893     nxt_runtime_t            *rt;
5894     nxt_port_mmaps_t         *mmaps;
5895     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5896     nxt_port_mmap_handler_t  *mmap_handler;
5897 
5898     rt = task->thread->runtime;
5899 
5900     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5901                                  msg->port_msg.reply_port);
5902     if (nxt_slow_path(port == NULL)) {
5903         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5904                   msg->port_msg.pid, msg->port_msg.reply_port);
5905 
5906         return;
5907     }
5908 
5909     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5910                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5911     {
5912         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5913                   (int) nxt_buf_used_size(msg->buf));
5914 
5915         return;
5916     }
5917 
5918     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5919 
5920     nxt_assert(port->type == NXT_PROCESS_APP);
5921 
5922     if (nxt_slow_path(port->app == NULL)) {
5923         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5924                   port->pid, port->id);
5925 
5926         // FIXME
5927         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5928                               -1, msg->port_msg.stream, 0, NULL);
5929 
5930         return;
5931     }
5932 
5933     mmaps = &port->app->outgoing;
5934     nxt_thread_mutex_lock(&mmaps->mutex);
5935 
5936     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5937         nxt_thread_mutex_unlock(&mmaps->mutex);
5938 
5939         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5940                   (int) get_mmap_msg->id);
5941 
5942         // FIXME
5943         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5944                               -1, msg->port_msg.stream, 0, NULL);
5945         return;
5946     }
5947 
5948     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5949 
5950     fd = mmap_handler->fd;
5951 
5952     nxt_thread_mutex_unlock(&mmaps->mutex);
5953 
5954     nxt_debug(task, "get mmap %PI:%d found",
5955               msg->port_msg.pid, (int) get_mmap_msg->id);
5956 
5957     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5958 }
5959 
5960 
5961 static void
nxt_router_get_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5962 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5963 {
5964     nxt_port_t               *port, *reply_port;
5965     nxt_runtime_t            *rt;
5966     nxt_port_msg_get_port_t  *get_port_msg;
5967 
5968     rt = task->thread->runtime;
5969 
5970     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5971                                        msg->port_msg.reply_port);
5972     if (nxt_slow_path(reply_port == NULL)) {
5973         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5974                   msg->port_msg.pid, msg->port_msg.reply_port);
5975 
5976         return;
5977     }
5978 
5979     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5980                       < (int) sizeof(nxt_port_msg_get_port_t)))
5981     {
5982         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5983                   (int) nxt_buf_used_size(msg->buf));
5984 
5985         return;
5986     }
5987 
5988     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5989 
5990     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5991     if (nxt_slow_path(port == NULL)) {
5992         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5993                   get_port_msg->pid, get_port_msg->id);
5994 
5995         return;
5996     }
5997 
5998     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5999               get_port_msg->id);
6000 
6001     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
6002 }
6003