1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_router.h>
11 #include <nxt_app_queue.h>
12 #include <nxt_port_queue.h>
13
14
15 static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
16 nxt_pid_t pid);
17 static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
18
19 static nxt_atomic_uint_t nxt_port_last_id = 1;
20
21
22 static void
nxt_port_mp_cleanup(nxt_task_t * task,void * obj,void * data)23 nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
24 {
25 nxt_mp_t *mp;
26 nxt_port_t *port;
27
28 port = obj;
29 mp = data;
30
31 nxt_assert(port->pair[0] == -1);
32 nxt_assert(port->pair[1] == -1);
33
34 nxt_assert(port->use_count == 0);
35 nxt_assert(port->app_link.next == NULL);
36 nxt_assert(port->idle_link.next == NULL);
37
38 nxt_assert(nxt_queue_is_empty(&port->messages));
39 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
40 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
41
42 nxt_thread_mutex_destroy(&port->write_mutex);
43
44 nxt_mp_free(mp, port);
45 }
46
47
48 nxt_port_t *
nxt_port_new(nxt_task_t * task,nxt_port_id_t id,nxt_pid_t pid,nxt_process_type_t type)49 nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
50 nxt_process_type_t type)
51 {
52 nxt_mp_t *mp;
53 nxt_port_t *port;
54
55 mp = nxt_mp_create(1024, 128, 256, 32);
56
57 if (nxt_slow_path(mp == NULL)) {
58 return NULL;
59 }
60
61 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t));
62
63 if (nxt_fast_path(port != NULL)) {
64 port->id = id;
65 port->pid = pid;
66 port->type = type;
67 port->mem_pool = mp;
68 port->use_count = 1;
69
70 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
71
72 nxt_queue_init(&port->messages);
73 nxt_thread_mutex_create(&port->write_mutex);
74
75 port->queue_fd = -1;
76
77 } else {
78 nxt_mp_destroy(mp);
79 }
80
81 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
82
83 return port;
84 }
85
86
87 void
nxt_port_close(nxt_task_t * task,nxt_port_t * port)88 nxt_port_close(nxt_task_t *task, nxt_port_t *port)
89 {
90 size_t size;
91
92 nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
93 port->id, port->type);
94
95 if (port->pair[0] != -1) {
96 nxt_port_rpc_close(task, port);
97
98 nxt_fd_close(port->pair[0]);
99 port->pair[0] = -1;
100 }
101
102 if (port->pair[1] != -1) {
103 nxt_fd_close(port->pair[1]);
104 port->pair[1] = -1;
105
106 if (port->app != NULL) {
107 nxt_router_app_port_close(task, port);
108 }
109 }
110
111 if (port->queue_fd != -1) {
112 nxt_fd_close(port->queue_fd);
113 port->queue_fd = -1;
114 }
115
116 if (port->queue != NULL) {
117 size = (port->id == (nxt_port_id_t) -1) ? sizeof(nxt_app_queue_t)
118 : sizeof(nxt_port_queue_t);
119 nxt_mem_munmap(port->queue, size);
120
121 port->queue = NULL;
122 }
123 }
124
125
126 static void
nxt_port_release(nxt_task_t * task,nxt_port_t * port)127 nxt_port_release(nxt_task_t *task, nxt_port_t *port)
128 {
129 nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
130 port->id, port->type);
131
132 port->app = NULL;
133
134 if (port->link.next != NULL) {
135 nxt_assert(port->process != NULL);
136
137 nxt_process_port_remove(port);
138
139 nxt_process_use(task, port->process, -1);
140 }
141
142 nxt_mp_release(port->mem_pool);
143 }
144
145
146 nxt_port_id_t
nxt_port_get_next_id()147 nxt_port_get_next_id()
148 {
149 return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
150 }
151
152
153 void
nxt_port_reset_next_id()154 nxt_port_reset_next_id()
155 {
156 nxt_port_last_id = 1;
157 }
158
159
160 void
nxt_port_enable(nxt_task_t * task,nxt_port_t * port,const nxt_port_handlers_t * handlers)161 nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
162 const nxt_port_handlers_t *handlers)
163 {
164 port->pid = nxt_pid;
165 port->handler = nxt_port_handler;
166 port->data = (nxt_port_handler_t *) (handlers);
167
168 nxt_port_read_enable(task, port);
169 }
170
171
172 static void
nxt_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)173 nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
174 {
175 nxt_port_handler_t *handlers;
176
177 if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
178
179 nxt_debug(task, "port %d: message type:%uD",
180 msg->port->socket.fd, msg->port_msg.type);
181
182 handlers = msg->port->data;
183 handlers[msg->port_msg.type](task, msg);
184
185 return;
186 }
187
188 nxt_alert(task, "port %d: unknown message type:%uD",
189 msg->port->socket.fd, msg->port_msg.type);
190 }
191
192
193 void
nxt_port_quit_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)194 nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
195 {
196 nxt_runtime_quit(task, 0);
197 }
198
199
200 /* TODO join with process_ready and move to nxt_main_process.c */
201 nxt_inline void
nxt_port_send_new_port(nxt_task_t * task,nxt_runtime_t * rt,nxt_port_t * new_port,uint32_t stream)202 nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
203 nxt_port_t *new_port, uint32_t stream)
204 {
205 nxt_port_t *port;
206 nxt_process_t *process;
207
208 nxt_debug(task, "new port %d for process %PI",
209 new_port->pair[1], new_port->pid);
210
211 nxt_runtime_process_each(rt, process) {
212
213 if (process->pid == new_port->pid || process->pid == nxt_pid) {
214 continue;
215 }
216
217 port = nxt_process_port_first(process);
218
219 if (nxt_proc_send_matrix[port->type][new_port->type]) {
220 (void) nxt_port_send_port(task, port, new_port, stream);
221 }
222
223 } nxt_runtime_process_loop;
224 }
225
226
227 nxt_int_t
nxt_port_send_port(nxt_task_t * task,nxt_port_t * port,nxt_port_t * new_port,uint32_t stream)228 nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
229 uint32_t stream)
230 {
231 nxt_buf_t *b;
232 nxt_port_msg_new_port_t *msg;
233
234 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
235 sizeof(nxt_port_data_t));
236 if (nxt_slow_path(b == NULL)) {
237 return NXT_ERROR;
238 }
239
240 nxt_debug(task, "send port %FD to process %PI",
241 new_port->pair[1], port->pid);
242
243 b->mem.free += sizeof(nxt_port_msg_new_port_t);
244 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
245
246 msg->id = new_port->id;
247 msg->pid = new_port->pid;
248 msg->max_size = port->max_size;
249 msg->max_share = port->max_share;
250 msg->type = new_port->type;
251
252 return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT,
253 new_port->pair[1], new_port->queue_fd,
254 stream, 0, b);
255 }
256
257
258 void
nxt_port_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)259 nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
260 {
261 nxt_port_t *port;
262 nxt_runtime_t *rt;
263 nxt_port_msg_new_port_t *new_port_msg;
264
265 rt = task->thread->runtime;
266
267 new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
268
269 /* TODO check b size and make plain */
270
271 nxt_debug(task, "new port %d received for process %PI:%d",
272 msg->fd[0], new_port_msg->pid, new_port_msg->id);
273
274 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
275 if (port != NULL) {
276 nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
277 new_port_msg->id);
278
279 msg->u.new_port = port;
280
281 nxt_fd_close(msg->fd[0]);
282 msg->fd[0] = -1;
283 return;
284 }
285
286 port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
287 new_port_msg->id,
288 new_port_msg->type);
289 if (nxt_slow_path(port == NULL)) {
290 return;
291 }
292
293 nxt_fd_nonblocking(task, msg->fd[0]);
294
295 port->pair[0] = -1;
296 port->pair[1] = msg->fd[0];
297 port->max_size = new_port_msg->max_size;
298 port->max_share = new_port_msg->max_share;
299
300 port->socket.task = task;
301
302 nxt_port_write_enable(task, port);
303
304 msg->u.new_port = port;
305 }
306
307 /* TODO move to nxt_main_process.c */
308 void
nxt_port_process_ready_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)309 nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
310 {
311 nxt_port_t *port;
312 nxt_process_t *process;
313 nxt_runtime_t *rt;
314
315 rt = task->thread->runtime;
316
317 process = nxt_runtime_process_find(rt, msg->port_msg.pid);
318 if (nxt_slow_path(process == NULL)) {
319 return;
320 }
321
322 nxt_assert(process->state != NXT_PROCESS_STATE_READY);
323
324 process->state = NXT_PROCESS_STATE_READY;
325
326 nxt_assert(!nxt_queue_is_empty(&process->ports));
327
328 port = nxt_process_port_first(process);
329
330 nxt_debug(task, "process %PI ready", msg->port_msg.pid);
331
332 if (msg->fd[0] != -1) {
333 port->queue_fd = msg->fd[0];
334 port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
335 PROT_READ | PROT_WRITE, MAP_SHARED,
336 msg->fd[0], 0);
337 }
338
339 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
340 }
341
342
343 void
nxt_port_mmap_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)344 nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
345 {
346 nxt_runtime_t *rt;
347 nxt_process_t *process;
348
349 rt = task->thread->runtime;
350
351 if (nxt_slow_path(msg->fd[0] == -1)) {
352 nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
353
354 return;
355 }
356
357 process = nxt_runtime_process_find(rt, msg->port_msg.pid);
358 if (nxt_slow_path(process == NULL)) {
359 nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
360 msg->port_msg.pid);
361
362 goto fail_close;
363 }
364
365 nxt_port_incoming_port_mmap(task, process, msg->fd[0]);
366
367 fail_close:
368
369 nxt_fd_close(msg->fd[0]);
370 }
371
372
373 void
nxt_port_change_log_file(nxt_task_t * task,nxt_runtime_t * rt,nxt_uint_t slot,nxt_fd_t fd)374 nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
375 nxt_fd_t fd)
376 {
377 nxt_buf_t *b;
378 nxt_port_t *port;
379 nxt_process_t *process;
380
381 nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
382
383 nxt_runtime_process_each(rt, process) {
384
385 if (nxt_pid == process->pid) {
386 continue;
387 }
388
389 port = nxt_process_port_first(process);
390
391 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
392 sizeof(nxt_uint_t), 0);
393 if (nxt_slow_path(b == NULL)) {
394 continue;
395 }
396
397 b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t));
398
399 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
400 fd, 0, 0, b);
401
402 } nxt_runtime_process_loop;
403 }
404
405
406 void
nxt_port_change_log_file_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)407 nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
408 {
409 nxt_buf_t *b;
410 nxt_uint_t slot;
411 nxt_file_t *log_file;
412 nxt_runtime_t *rt;
413
414 rt = task->thread->runtime;
415
416 b = msg->buf;
417 slot = *(nxt_uint_t *) b->mem.pos;
418
419 log_file = nxt_list_elt(rt->log_files, slot);
420
421 nxt_debug(task, "change log file %FD:%FD", msg->fd[0], log_file->fd);
422
423 /*
424 * The old log file descriptor must be closed at the moment when no
425 * other threads use it. dup2() allows to use the old file descriptor
426 * for new log file. This change is performed atomically in the kernel.
427 */
428 if (nxt_file_redirect(log_file, msg->fd[0]) == NXT_OK) {
429 if (slot == 0) {
430 (void) nxt_file_stderr(log_file);
431 }
432 }
433 }
434
435
436 void
nxt_port_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)437 nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
438 {
439 size_t dump_size;
440 nxt_buf_t *b;
441
442 b = msg->buf;
443 dump_size = b->mem.free - b->mem.pos;
444
445 if (dump_size > 300) {
446 dump_size = 300;
447 }
448
449 nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
450 }
451
452
453 void
nxt_port_remove_notify_others(nxt_task_t * task,nxt_process_t * process)454 nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process)
455 {
456 nxt_pid_t pid;
457 nxt_buf_t *buf;
458 nxt_port_t *port;
459 nxt_runtime_t *rt;
460 nxt_process_t *p;
461 nxt_process_type_t ptype;
462
463 pid = process->pid;
464
465 ptype = nxt_process_type(process);
466
467 rt = task->thread->runtime;
468
469 nxt_runtime_process_each(rt, p) {
470
471 if (p->pid == nxt_pid
472 || p->pid == pid
473 || nxt_queue_is_empty(&p->ports))
474 {
475 continue;
476 }
477
478 port = nxt_process_port_first(p);
479
480 if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) {
481 continue;
482 }
483
484 buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
485 sizeof(pid));
486
487 if (nxt_slow_path(buf == NULL)) {
488 continue;
489 }
490
491 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
492
493 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
494 process->stream, 0, buf);
495
496 } nxt_runtime_process_loop;
497 }
498
499
500 void
nxt_port_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)501 nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
502 {
503 nxt_pid_t pid;
504 nxt_buf_t *buf;
505
506 buf = msg->buf;
507
508 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
509
510 nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t));
511
512 nxt_port_remove_pid(task, msg, pid);
513 }
514
515
516 static void
nxt_port_remove_pid(nxt_task_t * task,nxt_port_recv_msg_t * msg,nxt_pid_t pid)517 nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
518 nxt_pid_t pid)
519 {
520 nxt_runtime_t *rt;
521 nxt_process_t *process;
522
523 msg->u.removed_pid = pid;
524
525 nxt_debug(task, "port remove pid %PI handler", pid);
526
527 rt = task->thread->runtime;
528
529 nxt_port_rpc_remove_peer(task, msg->port, pid);
530
531 process = nxt_runtime_process_find(rt, pid);
532
533 if (process) {
534 nxt_process_close_ports(task, process);
535 }
536 }
537
538
539 void
nxt_port_empty_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)540 nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
541 {
542 nxt_debug(task, "port empty handler");
543 }
544
545
546 typedef struct {
547 nxt_work_t work;
548 nxt_port_t *port;
549 nxt_port_post_handler_t handler;
550 } nxt_port_work_t;
551
552
553 static void
nxt_port_post_handler(nxt_task_t * task,void * obj,void * data)554 nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
555 {
556 nxt_port_t *port;
557 nxt_port_work_t *pw;
558 nxt_port_post_handler_t handler;
559
560 pw = obj;
561 port = pw->port;
562 handler = pw->handler;
563
564 nxt_free(pw);
565
566 handler(task, port, data);
567
568 nxt_port_use(task, port, -1);
569 }
570
571
572 nxt_int_t
nxt_port_post(nxt_task_t * task,nxt_port_t * port,nxt_port_post_handler_t handler,void * data)573 nxt_port_post(nxt_task_t *task, nxt_port_t *port,
574 nxt_port_post_handler_t handler, void *data)
575 {
576 nxt_port_work_t *pw;
577
578 if (task->thread->engine == port->engine) {
579 handler(task, port, data);
580
581 return NXT_OK;
582 }
583
584 pw = nxt_zalloc(sizeof(nxt_port_work_t));
585
586 if (nxt_slow_path(pw == NULL)) {
587 return NXT_ERROR;
588 }
589
590 nxt_atomic_fetch_add(&port->use_count, 1);
591
592 pw->work.handler = nxt_port_post_handler;
593 pw->work.task = &port->engine->task;
594 pw->work.obj = pw;
595 pw->work.data = data;
596
597 pw->port = port;
598 pw->handler = handler;
599
600 nxt_event_engine_post(port->engine, &pw->work);
601
602 return NXT_OK;
603 }
604
605
606 static void
nxt_port_release_handler(nxt_task_t * task,nxt_port_t * port,void * data)607 nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
608 {
609 /* no op */
610 }
611
612
613 void
nxt_port_use(nxt_task_t * task,nxt_port_t * port,int i)614 nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
615 {
616 int c;
617
618 c = nxt_atomic_fetch_add(&port->use_count, i);
619
620 if (i < 0 && c == -i) {
621
622 if (port->engine == NULL || task->thread->engine == port->engine) {
623 nxt_port_release(task, port);
624
625 return;
626 }
627
628 nxt_port_post(task, port, nxt_port_release_handler, NULL);
629 }
630 }
631