1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16
17 #include "nxt_websocket.h"
18
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22
23 #define NXT_UNIT_MAX_PLAIN_SIZE 1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE \
25 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26
27 enum {
28 NXT_QUIT_NORMAL = 0,
29 NXT_QUIT_GRACEFUL = 1,
30 };
31
32 typedef struct nxt_unit_impl_s nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
43
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46 nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52 nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54 nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58 int *log_fd, uint32_t *stream, uint32_t *shm_limit,
59 uint32_t *request_limit);
60 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
61 int queue_fd);
62 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
63 nxt_unit_request_info_t **preq);
64 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
65 nxt_unit_recv_msg_t *recv_msg);
66 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
67 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
68 nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
69 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
70 nxt_unit_recv_msg_t *recv_msg);
71 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
72 nxt_unit_port_id_t *port_id);
73 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
74 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
75 nxt_unit_recv_msg_t *recv_msg);
76 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
77 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
78 nxt_unit_ctx_t *ctx);
79 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
80 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
81 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
82 nxt_unit_ctx_t *ctx);
83 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
84 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
85 nxt_unit_websocket_frame_impl_t *ws);
86 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
87 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
88 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
89 nxt_unit_mmap_buf_t *mmap_buf, int last);
90 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
91 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
92 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
94 nxt_unit_ctx_impl_t *ctx_impl);
95 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
96 nxt_unit_read_buf_t *rbuf);
97 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
98 nxt_unit_request_info_t *req, size_t size);
99 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
100 size_t size);
101 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
102 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
103 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
104 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
105 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
106 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
107 nxt_unit_port_t *port, int n);
108 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
109 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
110 int fd);
111 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
112 nxt_unit_port_t *port, uint32_t size,
113 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
114 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
115
116 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
117 nxt_unit_ctx_impl_t *ctx_impl);
118 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
119 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
120 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
121 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
122 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
123 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
124 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
125 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
126 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
127 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
128 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
129 nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
130 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
131
132 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
133 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
134 pid_t pid, int remove);
135 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
136 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
137 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
138 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
139 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
140 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
141 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
142 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
145 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
146 nxt_unit_port_t *port);
147 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
148 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
149
150 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
151 nxt_unit_port_t *port, int queue_fd);
152
153 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
154 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
155 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
156 nxt_unit_port_t *port, void *queue);
157 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
158 nxt_queue_t *awaiting_req);
159 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
160 nxt_unit_port_id_t *port_id);
161 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
162 nxt_unit_port_id_t *port_id);
163 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
164 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
165 nxt_unit_process_t *process);
166 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
167 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
168 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
169 nxt_unit_port_t *port, const void *buf, size_t buf_size,
170 const nxt_send_oob_t *oob);
171 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
172 const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
173 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
174 nxt_unit_read_buf_t *rbuf);
175 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
176 nxt_unit_read_buf_t *src);
177 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
178 nxt_unit_read_buf_t *rbuf);
179 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
180 nxt_unit_read_buf_t *rbuf);
181 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
182 nxt_unit_read_buf_t *rbuf);
183 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
184 nxt_unit_read_buf_t *rbuf);
185 nxt_inline int nxt_unit_close(int fd);
186 static int nxt_unit_fd_blocking(int fd);
187
188 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
189 nxt_unit_port_t *port);
190 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
191 nxt_unit_port_id_t *port_id, int remove);
192
193 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
194 nxt_unit_request_info_t *req);
195 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
196 nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
197
198 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
199 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
200 static void nxt_unit_lvlhsh_free(void *data, void *p);
201 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
202
203
204 struct nxt_unit_mmap_buf_s {
205 nxt_unit_buf_t buf;
206
207 nxt_unit_mmap_buf_t *next;
208 nxt_unit_mmap_buf_t **prev;
209
210 nxt_port_mmap_header_t *hdr;
211 nxt_unit_request_info_t *req;
212 nxt_unit_ctx_impl_t *ctx_impl;
213 char *free_ptr;
214 char *plain_ptr;
215 };
216
217
218 struct nxt_unit_recv_msg_s {
219 uint32_t stream;
220 nxt_pid_t pid;
221 nxt_port_id_t reply_port;
222
223 uint8_t last; /* 1 bit */
224 uint8_t mmap; /* 1 bit */
225
226 void *start;
227 uint32_t size;
228
229 int fd[2];
230
231 nxt_unit_mmap_buf_t *incoming_buf;
232 };
233
234
235 typedef enum {
236 NXT_UNIT_RS_START = 0,
237 NXT_UNIT_RS_RESPONSE_INIT,
238 NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
239 NXT_UNIT_RS_RESPONSE_SENT,
240 NXT_UNIT_RS_RELEASED,
241 } nxt_unit_req_state_t;
242
243
244 struct nxt_unit_request_info_impl_s {
245 nxt_unit_request_info_t req;
246
247 uint32_t stream;
248
249 nxt_unit_mmap_buf_t *outgoing_buf;
250 nxt_unit_mmap_buf_t *incoming_buf;
251
252 nxt_unit_req_state_t state;
253 uint8_t websocket;
254 uint8_t in_hash;
255
256 /* for nxt_unit_ctx_impl_t.free_req or active_req */
257 nxt_queue_link_t link;
258 /* for nxt_unit_port_impl_t.awaiting_req */
259 nxt_queue_link_t port_wait_link;
260
261 char extra_data[];
262 };
263
264
265 struct nxt_unit_websocket_frame_impl_s {
266 nxt_unit_websocket_frame_t ws;
267
268 nxt_unit_mmap_buf_t *buf;
269
270 nxt_queue_link_t link;
271
272 nxt_unit_ctx_impl_t *ctx_impl;
273 };
274
275
276 struct nxt_unit_read_buf_s {
277 nxt_queue_link_t link;
278 nxt_unit_ctx_impl_t *ctx_impl;
279 ssize_t size;
280 nxt_recv_oob_t oob;
281 char buf[16384];
282 };
283
284
285 struct nxt_unit_ctx_impl_s {
286 nxt_unit_ctx_t ctx;
287
288 nxt_atomic_t use_count;
289 nxt_atomic_t wait_items;
290
291 pthread_mutex_t mutex;
292
293 nxt_unit_port_t *read_port;
294
295 nxt_queue_link_t link;
296
297 nxt_unit_mmap_buf_t *free_buf;
298
299 /* of nxt_unit_request_info_impl_t */
300 nxt_queue_t free_req;
301
302 /* of nxt_unit_websocket_frame_impl_t */
303 nxt_queue_t free_ws;
304
305 /* of nxt_unit_request_info_impl_t */
306 nxt_queue_t active_req;
307
308 /* of nxt_unit_request_info_impl_t */
309 nxt_lvlhsh_t requests;
310
311 /* of nxt_unit_request_info_impl_t */
312 nxt_queue_t ready_req;
313
314 /* of nxt_unit_read_buf_t */
315 nxt_queue_t pending_rbuf;
316
317 /* of nxt_unit_read_buf_t */
318 nxt_queue_t free_rbuf;
319
320 uint8_t online; /* 1 bit */
321 uint8_t ready; /* 1 bit */
322 uint8_t quit_param;
323
324 nxt_unit_mmap_buf_t ctx_buf[2];
325 nxt_unit_read_buf_t ctx_read_buf;
326
327 nxt_unit_request_info_impl_t req;
328 };
329
330
331 struct nxt_unit_mmap_s {
332 nxt_port_mmap_header_t *hdr;
333 pthread_t src_thread;
334
335 /* of nxt_unit_read_buf_t */
336 nxt_queue_t awaiting_rbuf;
337 };
338
339
340 struct nxt_unit_mmaps_s {
341 pthread_mutex_t mutex;
342 uint32_t size;
343 uint32_t cap;
344 nxt_atomic_t allocated_chunks;
345 nxt_unit_mmap_t *elts;
346 };
347
348
349 struct nxt_unit_impl_s {
350 nxt_unit_t unit;
351 nxt_unit_callbacks_t callbacks;
352
353 nxt_atomic_t use_count;
354 nxt_atomic_t request_count;
355
356 uint32_t request_data_size;
357 uint32_t shm_mmap_limit;
358 uint32_t request_limit;
359
360 pthread_mutex_t mutex;
361
362 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
363 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
364
365 nxt_unit_port_t *router_port;
366 nxt_unit_port_t *shared_port;
367
368 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
369
370 nxt_unit_mmaps_t incoming;
371 nxt_unit_mmaps_t outgoing;
372
373 pid_t pid;
374 int log_fd;
375
376 nxt_unit_ctx_impl_t main_ctx;
377 };
378
379
380 struct nxt_unit_port_impl_s {
381 nxt_unit_port_t port;
382
383 nxt_atomic_t use_count;
384
385 /* for nxt_unit_process_t.ports */
386 nxt_queue_link_t link;
387 nxt_unit_process_t *process;
388
389 /* of nxt_unit_request_info_impl_t */
390 nxt_queue_t awaiting_req;
391
392 int ready;
393
394 void *queue;
395
396 int from_socket;
397 nxt_unit_read_buf_t *socket_rbuf;
398 };
399
400
401 struct nxt_unit_process_s {
402 pid_t pid;
403
404 nxt_queue_t ports; /* of nxt_unit_port_impl_t */
405
406 nxt_unit_impl_t *lib;
407
408 nxt_atomic_t use_count;
409
410 uint32_t next_port_id;
411 };
412
413
414 /* Explicitly using 32 bit types to avoid possible alignment. */
415 typedef struct {
416 int32_t pid;
417 uint32_t id;
418 } nxt_unit_port_hash_id_t;
419
420
421 static pid_t nxt_unit_pid;
422
423
424 nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t * init)425 nxt_unit_init(nxt_unit_init_t *init)
426 {
427 int rc, queue_fd;
428 void *mem;
429 uint32_t ready_stream, shm_limit, request_limit;
430 nxt_unit_ctx_t *ctx;
431 nxt_unit_impl_t *lib;
432 nxt_unit_port_t ready_port, router_port, read_port;
433
434 nxt_unit_pid = getpid();
435
436 lib = nxt_unit_create(init);
437 if (nxt_slow_path(lib == NULL)) {
438 return NULL;
439 }
440
441 queue_fd = -1;
442 mem = MAP_FAILED;
443
444 if (init->ready_port.id.pid != 0
445 && init->ready_stream != 0
446 && init->read_port.id.pid != 0)
447 {
448 ready_port = init->ready_port;
449 ready_stream = init->ready_stream;
450 router_port = init->router_port;
451 read_port = init->read_port;
452 lib->log_fd = init->log_fd;
453
454 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
455 ready_port.id.id);
456 nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
457 router_port.id.id);
458 nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
459 read_port.id.id);
460
461 } else {
462 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
463 &lib->log_fd, &ready_stream, &shm_limit,
464 &request_limit);
465 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
466 goto fail;
467 }
468
469 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
470 / PORT_MMAP_DATA_SIZE;
471 lib->request_limit = request_limit;
472 }
473
474 if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
475 lib->shm_mmap_limit = 1;
476 }
477
478 lib->pid = read_port.id.pid;
479 nxt_unit_pid = lib->pid;
480
481 ctx = &lib->main_ctx.ctx;
482
483 rc = nxt_unit_fd_blocking(router_port.out_fd);
484 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
485 goto fail;
486 }
487
488 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
489 if (nxt_slow_path(lib->router_port == NULL)) {
490 nxt_unit_alert(NULL, "failed to add router_port");
491
492 goto fail;
493 }
494
495 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
496 if (nxt_slow_path(queue_fd == -1)) {
497 goto fail;
498 }
499
500 mem = mmap(NULL, sizeof(nxt_port_queue_t),
501 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
502 if (nxt_slow_path(mem == MAP_FAILED)) {
503 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
504 strerror(errno), errno);
505
506 goto fail;
507 }
508
509 nxt_port_queue_init(mem);
510
511 rc = nxt_unit_fd_blocking(read_port.in_fd);
512 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
513 goto fail;
514 }
515
516 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
517 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
518 nxt_unit_alert(NULL, "failed to add read_port");
519
520 goto fail;
521 }
522
523 rc = nxt_unit_fd_blocking(ready_port.out_fd);
524 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
525 goto fail;
526 }
527
528 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
529 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
530 nxt_unit_alert(NULL, "failed to send READY message");
531
532 goto fail;
533 }
534
535 nxt_unit_close(ready_port.out_fd);
536 nxt_unit_close(queue_fd);
537
538 return ctx;
539
540 fail:
541
542 if (mem != MAP_FAILED) {
543 munmap(mem, sizeof(nxt_port_queue_t));
544 }
545
546 if (queue_fd != -1) {
547 nxt_unit_close(queue_fd);
548 }
549
550 nxt_unit_ctx_release(&lib->main_ctx.ctx);
551
552 return NULL;
553 }
554
555
556 static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t * init)557 nxt_unit_create(nxt_unit_init_t *init)
558 {
559 int rc;
560 nxt_unit_impl_t *lib;
561 nxt_unit_callbacks_t *cb;
562
563 lib = nxt_unit_malloc(NULL,
564 sizeof(nxt_unit_impl_t) + init->request_data_size);
565 if (nxt_slow_path(lib == NULL)) {
566 nxt_unit_alert(NULL, "failed to allocate unit struct");
567
568 return NULL;
569 }
570
571 rc = pthread_mutex_init(&lib->mutex, NULL);
572 if (nxt_slow_path(rc != 0)) {
573 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
574
575 goto fail;
576 }
577
578 lib->unit.data = init->data;
579 lib->callbacks = init->callbacks;
580
581 lib->request_data_size = init->request_data_size;
582 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
583 / PORT_MMAP_DATA_SIZE;
584 lib->request_limit = init->request_limit;
585
586 lib->processes.slot = NULL;
587 lib->ports.slot = NULL;
588
589 lib->log_fd = STDERR_FILENO;
590
591 nxt_queue_init(&lib->contexts);
592
593 lib->use_count = 0;
594 lib->request_count = 0;
595 lib->router_port = NULL;
596 lib->shared_port = NULL;
597
598 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
599 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
600 pthread_mutex_destroy(&lib->mutex);
601 goto fail;
602 }
603
604 cb = &lib->callbacks;
605
606 if (cb->request_handler == NULL) {
607 nxt_unit_alert(NULL, "request_handler is NULL");
608
609 pthread_mutex_destroy(&lib->mutex);
610 goto fail;
611 }
612
613 nxt_unit_mmaps_init(&lib->incoming);
614 nxt_unit_mmaps_init(&lib->outgoing);
615
616 return lib;
617
618 fail:
619
620 nxt_unit_free(NULL, lib);
621
622 return NULL;
623 }
624
625
626 static int
nxt_unit_ctx_init(nxt_unit_impl_t * lib,nxt_unit_ctx_impl_t * ctx_impl,void * data)627 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
628 void *data)
629 {
630 int rc;
631
632 ctx_impl->ctx.data = data;
633 ctx_impl->ctx.unit = &lib->unit;
634
635 rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
636 if (nxt_slow_path(rc != 0)) {
637 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
638
639 return NXT_UNIT_ERROR;
640 }
641
642 nxt_unit_lib_use(lib);
643
644 pthread_mutex_lock(&lib->mutex);
645
646 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
647
648 pthread_mutex_unlock(&lib->mutex);
649
650 ctx_impl->use_count = 1;
651 ctx_impl->wait_items = 0;
652 ctx_impl->online = 1;
653 ctx_impl->ready = 0;
654 ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
655
656 nxt_queue_init(&ctx_impl->free_req);
657 nxt_queue_init(&ctx_impl->free_ws);
658 nxt_queue_init(&ctx_impl->active_req);
659 nxt_queue_init(&ctx_impl->ready_req);
660 nxt_queue_init(&ctx_impl->pending_rbuf);
661 nxt_queue_init(&ctx_impl->free_rbuf);
662
663 ctx_impl->free_buf = NULL;
664 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
665 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
666
667 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
668 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
669
670 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
671
672 ctx_impl->req.req.ctx = &ctx_impl->ctx;
673 ctx_impl->req.req.unit = &lib->unit;
674
675 ctx_impl->read_port = NULL;
676 ctx_impl->requests.slot = 0;
677
678 return NXT_UNIT_OK;
679 }
680
681
682 nxt_inline void
nxt_unit_ctx_use(nxt_unit_ctx_t * ctx)683 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
684 {
685 nxt_unit_ctx_impl_t *ctx_impl;
686
687 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
688
689 nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
690 }
691
692
693 nxt_inline void
nxt_unit_ctx_release(nxt_unit_ctx_t * ctx)694 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
695 {
696 long c;
697 nxt_unit_ctx_impl_t *ctx_impl;
698
699 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
700
701 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
702
703 if (c == 1) {
704 nxt_unit_ctx_free(ctx_impl);
705 }
706 }
707
708
709 nxt_inline void
nxt_unit_lib_use(nxt_unit_impl_t * lib)710 nxt_unit_lib_use(nxt_unit_impl_t *lib)
711 {
712 nxt_atomic_fetch_add(&lib->use_count, 1);
713 }
714
715
716 nxt_inline void
nxt_unit_lib_release(nxt_unit_impl_t * lib)717 nxt_unit_lib_release(nxt_unit_impl_t *lib)
718 {
719 long c;
720 nxt_unit_process_t *process;
721
722 c = nxt_atomic_fetch_add(&lib->use_count, -1);
723
724 if (c == 1) {
725 for ( ;; ) {
726 pthread_mutex_lock(&lib->mutex);
727
728 process = nxt_unit_process_pop_first(lib);
729 if (process == NULL) {
730 pthread_mutex_unlock(&lib->mutex);
731
732 break;
733 }
734
735 nxt_unit_remove_process(lib, process);
736 }
737
738 pthread_mutex_destroy(&lib->mutex);
739
740 if (nxt_fast_path(lib->router_port != NULL)) {
741 nxt_unit_port_release(lib->router_port);
742 }
743
744 if (nxt_fast_path(lib->shared_port != NULL)) {
745 nxt_unit_port_release(lib->shared_port);
746 }
747
748 nxt_unit_mmaps_destroy(&lib->incoming);
749 nxt_unit_mmaps_destroy(&lib->outgoing);
750
751 nxt_unit_free(NULL, lib);
752 }
753 }
754
755
756 nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t ** head,nxt_unit_mmap_buf_t * mmap_buf)757 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
758 nxt_unit_mmap_buf_t *mmap_buf)
759 {
760 mmap_buf->next = *head;
761
762 if (mmap_buf->next != NULL) {
763 mmap_buf->next->prev = &mmap_buf->next;
764 }
765
766 *head = mmap_buf;
767 mmap_buf->prev = head;
768 }
769
770
771 nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t ** prev,nxt_unit_mmap_buf_t * mmap_buf)772 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
773 nxt_unit_mmap_buf_t *mmap_buf)
774 {
775 while (*prev != NULL) {
776 prev = &(*prev)->next;
777 }
778
779 nxt_unit_mmap_buf_insert(prev, mmap_buf);
780 }
781
782
783 nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t * mmap_buf)784 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
785 {
786 nxt_unit_mmap_buf_t **prev;
787
788 prev = mmap_buf->prev;
789
790 if (mmap_buf->next != NULL) {
791 mmap_buf->next->prev = prev;
792 }
793
794 if (prev != NULL) {
795 *prev = mmap_buf->next;
796 }
797 }
798
799
800 static int
nxt_unit_read_env(nxt_unit_port_t * ready_port,nxt_unit_port_t * router_port,nxt_unit_port_t * read_port,int * log_fd,uint32_t * stream,uint32_t * shm_limit,uint32_t * request_limit)801 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
802 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
803 uint32_t *shm_limit, uint32_t *request_limit)
804 {
805 int rc;
806 int ready_fd, router_fd, read_in_fd, read_out_fd;
807 char *unit_init, *version_end, *vars;
808 size_t version_length;
809 int64_t ready_pid, router_pid, read_pid;
810 uint32_t ready_stream, router_id, ready_id, read_id;
811
812 unit_init = getenv(NXT_UNIT_INIT_ENV);
813 if (nxt_slow_path(unit_init == NULL)) {
814 nxt_unit_alert(NULL, "%s is not in the current environment",
815 NXT_UNIT_INIT_ENV);
816
817 return NXT_UNIT_ERROR;
818 }
819
820 version_end = strchr(unit_init, ';');
821 if (nxt_slow_path(version_end == NULL)) {
822 nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
823 NXT_UNIT_INIT_ENV, unit_init);
824
825 return NXT_UNIT_ERROR;
826 }
827
828 version_length = version_end - unit_init;
829
830 rc = version_length != nxt_length(NXT_VERSION)
831 || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
832
833 if (nxt_slow_path(rc != 0)) {
834 nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
835 "%.*s, while the app was compiled with libunit %s",
836 (int) version_length, unit_init, NXT_VERSION);
837
838 return NXT_UNIT_ERROR;
839 }
840
841 vars = version_end + 1;
842
843 rc = sscanf(vars,
844 "%"PRIu32";"
845 "%"PRId64",%"PRIu32",%d;"
846 "%"PRId64",%"PRIu32",%d;"
847 "%"PRId64",%"PRIu32",%d,%d;"
848 "%d,%"PRIu32",%"PRIu32,
849 &ready_stream,
850 &ready_pid, &ready_id, &ready_fd,
851 &router_pid, &router_id, &router_fd,
852 &read_pid, &read_id, &read_in_fd, &read_out_fd,
853 log_fd, shm_limit, request_limit);
854
855 if (nxt_slow_path(rc == EOF)) {
856 nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
857 vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
858
859 return NXT_UNIT_ERROR;
860 }
861
862 if (nxt_slow_path(rc != 14)) {
863 nxt_unit_alert(NULL, "invalid number of variables in %s env: "
864 "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars);
865
866 return NXT_UNIT_ERROR;
867 }
868
869 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
870
871 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
872
873 ready_port->in_fd = -1;
874 ready_port->out_fd = ready_fd;
875 ready_port->data = NULL;
876
877 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
878
879 router_port->in_fd = -1;
880 router_port->out_fd = router_fd;
881 router_port->data = NULL;
882
883 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
884
885 read_port->in_fd = read_in_fd;
886 read_port->out_fd = read_out_fd;
887 read_port->data = NULL;
888
889 *stream = ready_stream;
890
891 return NXT_UNIT_OK;
892 }
893
894
895 static int
nxt_unit_ready(nxt_unit_ctx_t * ctx,int ready_fd,uint32_t stream,int queue_fd)896 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
897 {
898 ssize_t res;
899 nxt_send_oob_t oob;
900 nxt_port_msg_t msg;
901 nxt_unit_impl_t *lib;
902 int fds[2] = {queue_fd, -1};
903
904 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
905
906 msg.stream = stream;
907 msg.pid = lib->pid;
908 msg.reply_port = 0;
909 msg.type = _NXT_PORT_MSG_PROCESS_READY;
910 msg.last = 1;
911 msg.mmap = 0;
912 msg.nf = 0;
913 msg.mf = 0;
914 msg.tracking = 0;
915
916 nxt_socket_msg_oob_init(&oob, fds);
917
918 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
919 if (res != sizeof(msg)) {
920 return NXT_UNIT_ERROR;
921 }
922
923 return NXT_UNIT_OK;
924 }
925
926
927 static int
nxt_unit_process_msg(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf,nxt_unit_request_info_t ** preq)928 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
929 nxt_unit_request_info_t **preq)
930 {
931 int rc;
932 pid_t pid;
933 uint8_t quit_param;
934 nxt_port_msg_t *port_msg;
935 nxt_unit_impl_t *lib;
936 nxt_unit_recv_msg_t recv_msg;
937
938 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
939
940 recv_msg.incoming_buf = NULL;
941 recv_msg.fd[0] = -1;
942 recv_msg.fd[1] = -1;
943
944 rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
945 if (nxt_slow_path(rc != NXT_OK)) {
946 nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
947 rc = NXT_UNIT_ERROR;
948 goto done;
949 }
950
951 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
952 if (nxt_slow_path(rbuf->size == 0)) {
953 nxt_unit_debug(ctx, "read port closed");
954
955 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
956 rc = NXT_UNIT_OK;
957 goto done;
958 }
959
960 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
961
962 rc = NXT_UNIT_ERROR;
963 goto done;
964 }
965
966 port_msg = (nxt_port_msg_t *) rbuf->buf;
967
968 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
969 port_msg->stream, (int) port_msg->type,
970 recv_msg.fd[0], recv_msg.fd[1]);
971
972 recv_msg.stream = port_msg->stream;
973 recv_msg.pid = port_msg->pid;
974 recv_msg.reply_port = port_msg->reply_port;
975 recv_msg.last = port_msg->last;
976 recv_msg.mmap = port_msg->mmap;
977
978 recv_msg.start = port_msg + 1;
979 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
980
981 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
982 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
983 port_msg->stream, (int) port_msg->type);
984 rc = NXT_UNIT_ERROR;
985 goto done;
986 }
987
988 /* Fragmentation is unsupported. */
989 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
990 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
991 port_msg->stream, (int) port_msg->type);
992 rc = NXT_UNIT_ERROR;
993 goto done;
994 }
995
996 if (port_msg->mmap) {
997 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
998
999 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1000 if (rc == NXT_UNIT_AGAIN) {
1001 recv_msg.fd[0] = -1;
1002 recv_msg.fd[1] = -1;
1003 }
1004
1005 goto done;
1006 }
1007 }
1008
1009 switch (port_msg->type) {
1010
1011 case _NXT_PORT_MSG_RPC_READY:
1012 rc = NXT_UNIT_OK;
1013 break;
1014
1015 case _NXT_PORT_MSG_QUIT:
1016 if (recv_msg.size == sizeof(quit_param)) {
1017 memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1018
1019 } else {
1020 quit_param = NXT_QUIT_NORMAL;
1021 }
1022
1023 nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1024 (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1025
1026 nxt_unit_quit(ctx, quit_param);
1027
1028 rc = NXT_UNIT_OK;
1029 break;
1030
1031 case _NXT_PORT_MSG_NEW_PORT:
1032 rc = nxt_unit_process_new_port(ctx, &recv_msg);
1033 break;
1034
1035 case _NXT_PORT_MSG_PORT_ACK:
1036 rc = nxt_unit_ctx_ready(ctx);
1037 break;
1038
1039 case _NXT_PORT_MSG_CHANGE_FILE:
1040 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1041 port_msg->stream, recv_msg.fd[0]);
1042
1043 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1044 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1045 port_msg->stream, recv_msg.fd[0], lib->log_fd,
1046 strerror(errno), errno);
1047
1048 rc = NXT_UNIT_ERROR;
1049 goto done;
1050 }
1051
1052 rc = NXT_UNIT_OK;
1053 break;
1054
1055 case _NXT_PORT_MSG_MMAP:
1056 if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1057 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1058 port_msg->stream, recv_msg.fd[0]);
1059
1060 rc = NXT_UNIT_ERROR;
1061 goto done;
1062 }
1063
1064 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1065 break;
1066
1067 case _NXT_PORT_MSG_REQ_HEADERS:
1068 rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1069 break;
1070
1071 case _NXT_PORT_MSG_REQ_BODY:
1072 rc = nxt_unit_process_req_body(ctx, &recv_msg);
1073 break;
1074
1075 case _NXT_PORT_MSG_WEBSOCKET:
1076 rc = nxt_unit_process_websocket(ctx, &recv_msg);
1077 break;
1078
1079 case _NXT_PORT_MSG_REMOVE_PID:
1080 if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1081 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1082 "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1083 (int) sizeof(pid));
1084
1085 rc = NXT_UNIT_ERROR;
1086 goto done;
1087 }
1088
1089 memcpy(&pid, recv_msg.start, sizeof(pid));
1090
1091 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1092 port_msg->stream, (int) pid);
1093
1094 nxt_unit_remove_pid(lib, pid);
1095
1096 rc = NXT_UNIT_OK;
1097 break;
1098
1099 case _NXT_PORT_MSG_SHM_ACK:
1100 rc = nxt_unit_process_shm_ack(ctx);
1101 break;
1102
1103 default:
1104 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1105 port_msg->stream, (int) port_msg->type);
1106
1107 rc = NXT_UNIT_ERROR;
1108 goto done;
1109 }
1110
1111 done:
1112
1113 if (recv_msg.fd[0] != -1) {
1114 nxt_unit_close(recv_msg.fd[0]);
1115 }
1116
1117 if (recv_msg.fd[1] != -1) {
1118 nxt_unit_close(recv_msg.fd[1]);
1119 }
1120
1121 while (recv_msg.incoming_buf != NULL) {
1122 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1123 }
1124
1125 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1126 #if (NXT_DEBUG)
1127 memset(rbuf->buf, 0xAC, rbuf->size);
1128 #endif
1129 nxt_unit_read_buf_release(ctx, rbuf);
1130 }
1131
1132 return rc;
1133 }
1134
1135
1136 static int
nxt_unit_process_new_port(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1137 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1138 {
1139 void *mem;
1140 nxt_unit_impl_t *lib;
1141 nxt_unit_port_t new_port, *port;
1142 nxt_port_msg_new_port_t *new_port_msg;
1143
1144 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1145 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1146 "invalid message size (%d)",
1147 recv_msg->stream, (int) recv_msg->size);
1148
1149 return NXT_UNIT_ERROR;
1150 }
1151
1152 if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1153 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1154 recv_msg->stream, recv_msg->fd[0]);
1155
1156 return NXT_UNIT_ERROR;
1157 }
1158
1159 new_port_msg = recv_msg->start;
1160
1161 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1162 recv_msg->stream, (int) new_port_msg->pid,
1163 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1164
1165 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1166
1167 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1168 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1169
1170 new_port.in_fd = recv_msg->fd[0];
1171 new_port.out_fd = -1;
1172
1173 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1174 MAP_SHARED, recv_msg->fd[1], 0);
1175
1176 } else {
1177 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
1178 != NXT_UNIT_OK))
1179 {
1180 return NXT_UNIT_ERROR;
1181 }
1182
1183 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1184 new_port_msg->id);
1185
1186 new_port.in_fd = -1;
1187 new_port.out_fd = recv_msg->fd[0];
1188
1189 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1190 MAP_SHARED, recv_msg->fd[1], 0);
1191 }
1192
1193 if (nxt_slow_path(mem == MAP_FAILED)) {
1194 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1195 strerror(errno), errno);
1196
1197 return NXT_UNIT_ERROR;
1198 }
1199
1200 new_port.data = NULL;
1201
1202 recv_msg->fd[0] = -1;
1203
1204 port = nxt_unit_add_port(ctx, &new_port, mem);
1205 if (nxt_slow_path(port == NULL)) {
1206 return NXT_UNIT_ERROR;
1207 }
1208
1209 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1210 lib->shared_port = port;
1211
1212 return nxt_unit_ctx_ready(ctx);
1213 }
1214
1215 nxt_unit_port_release(port);
1216
1217 return NXT_UNIT_OK;
1218 }
1219
1220
1221 static int
nxt_unit_ctx_ready(nxt_unit_ctx_t * ctx)1222 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1223 {
1224 nxt_unit_impl_t *lib;
1225 nxt_unit_ctx_impl_t *ctx_impl;
1226
1227 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1228
1229 if (nxt_slow_path(ctx_impl->ready)) {
1230 return NXT_UNIT_OK;
1231 }
1232
1233 ctx_impl->ready = 1;
1234
1235 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1236
1237 /* Call ready_handler() only for main context. */
1238 if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1239 return lib->callbacks.ready_handler(ctx);
1240 }
1241
1242 if (&lib->main_ctx != ctx_impl) {
1243 /* Check if the main context is already stopped or quit. */
1244 if (nxt_slow_path(!lib->main_ctx.ready)) {
1245 ctx_impl->ready = 0;
1246
1247 nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1248
1249 return NXT_UNIT_OK;
1250 }
1251
1252 if (lib->callbacks.add_port != NULL) {
1253 lib->callbacks.add_port(ctx, lib->shared_port);
1254 }
1255 }
1256
1257 return NXT_UNIT_OK;
1258 }
1259
1260
1261 static int
nxt_unit_process_req_headers(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_request_info_t ** preq)1262 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1263 nxt_unit_request_info_t **preq)
1264 {
1265 int res;
1266 nxt_unit_impl_t *lib;
1267 nxt_unit_port_id_t port_id;
1268 nxt_unit_request_t *r;
1269 nxt_unit_mmap_buf_t *b;
1270 nxt_unit_request_info_t *req;
1271 nxt_unit_request_info_impl_t *req_impl;
1272
1273 if (nxt_slow_path(recv_msg->mmap == 0)) {
1274 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1275 recv_msg->stream);
1276
1277 return NXT_UNIT_ERROR;
1278 }
1279
1280 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1281 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1282 "%d expected", recv_msg->stream, (int) recv_msg->size,
1283 (int) sizeof(nxt_unit_request_t));
1284
1285 return NXT_UNIT_ERROR;
1286 }
1287
1288 req_impl = nxt_unit_request_info_get(ctx);
1289 if (nxt_slow_path(req_impl == NULL)) {
1290 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1291 recv_msg->stream);
1292
1293 return NXT_UNIT_ERROR;
1294 }
1295
1296 req = &req_impl->req;
1297
1298 req->request = recv_msg->start;
1299
1300 b = recv_msg->incoming_buf;
1301
1302 req->request_buf = &b->buf;
1303 req->response = NULL;
1304 req->response_buf = NULL;
1305
1306 r = req->request;
1307
1308 req->content_length = r->content_length;
1309
1310 req->content_buf = req->request_buf;
1311 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1312
1313 req_impl->stream = recv_msg->stream;
1314
1315 req_impl->outgoing_buf = NULL;
1316
1317 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1318 b->req = req;
1319 }
1320
1321 /* "Move" incoming buffer list to req_impl. */
1322 req_impl->incoming_buf = recv_msg->incoming_buf;
1323 req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1324 recv_msg->incoming_buf = NULL;
1325
1326 req->content_fd = recv_msg->fd[0];
1327 recv_msg->fd[0] = -1;
1328
1329 req->response_max_fields = 0;
1330 req_impl->state = NXT_UNIT_RS_START;
1331 req_impl->websocket = 0;
1332 req_impl->in_hash = 0;
1333
1334 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1335 (int) r->method_length,
1336 (char *) nxt_unit_sptr_get(&r->method),
1337 (int) r->target_length,
1338 (char *) nxt_unit_sptr_get(&r->target),
1339 (int) r->content_length);
1340
1341 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1342
1343 res = nxt_unit_request_check_response_port(req, &port_id);
1344 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1345 return NXT_UNIT_ERROR;
1346 }
1347
1348 if (nxt_fast_path(res == NXT_UNIT_OK)) {
1349 res = nxt_unit_send_req_headers_ack(req);
1350 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1351 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1352
1353 return NXT_UNIT_ERROR;
1354 }
1355
1356 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1357
1358 if (req->content_length
1359 > (uint64_t) (req->content_buf->end - req->content_buf->free))
1360 {
1361 res = nxt_unit_request_hash_add(ctx, req);
1362 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1363 nxt_unit_req_warn(req, "failed to add request to hash");
1364
1365 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1366
1367 return NXT_UNIT_ERROR;
1368 }
1369
1370 /*
1371 * If application have separate data handler, we may start
1372 * request processing and process data when it is arrived.
1373 */
1374 if (lib->callbacks.data_handler == NULL) {
1375 return NXT_UNIT_OK;
1376 }
1377 }
1378
1379 if (preq == NULL) {
1380 lib->callbacks.request_handler(req);
1381
1382 } else {
1383 *preq = req;
1384 }
1385 }
1386
1387 return NXT_UNIT_OK;
1388 }
1389
1390
1391 static int
nxt_unit_process_req_body(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1392 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1393 {
1394 uint64_t l;
1395 nxt_unit_impl_t *lib;
1396 nxt_unit_mmap_buf_t *b;
1397 nxt_unit_request_info_t *req;
1398
1399 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1400 if (req == NULL) {
1401 return NXT_UNIT_OK;
1402 }
1403
1404 l = req->content_buf->end - req->content_buf->free;
1405
1406 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1407 b->req = req;
1408 l += b->buf.end - b->buf.free;
1409 }
1410
1411 if (recv_msg->incoming_buf != NULL) {
1412 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1413
1414 while (b->next != NULL) {
1415 b = b->next;
1416 }
1417
1418 /* "Move" incoming buffer list to req_impl. */
1419 b->next = recv_msg->incoming_buf;
1420 b->next->prev = &b->next;
1421
1422 recv_msg->incoming_buf = NULL;
1423 }
1424
1425 req->content_fd = recv_msg->fd[0];
1426 recv_msg->fd[0] = -1;
1427
1428 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1429
1430 if (lib->callbacks.data_handler != NULL) {
1431 lib->callbacks.data_handler(req);
1432
1433 return NXT_UNIT_OK;
1434 }
1435
1436 if (req->content_fd != -1 || l == req->content_length) {
1437 lib->callbacks.request_handler(req);
1438 }
1439
1440 return NXT_UNIT_OK;
1441 }
1442
1443
1444 static int
nxt_unit_request_check_response_port(nxt_unit_request_info_t * req,nxt_unit_port_id_t * port_id)1445 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1446 nxt_unit_port_id_t *port_id)
1447 {
1448 int res;
1449 nxt_unit_ctx_t *ctx;
1450 nxt_unit_impl_t *lib;
1451 nxt_unit_port_t *port;
1452 nxt_unit_process_t *process;
1453 nxt_unit_ctx_impl_t *ctx_impl;
1454 nxt_unit_port_impl_t *port_impl;
1455 nxt_unit_request_info_impl_t *req_impl;
1456
1457 ctx = req->ctx;
1458 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1459 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1460
1461 pthread_mutex_lock(&lib->mutex);
1462
1463 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1464 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1465
1466 if (nxt_fast_path(port != NULL)) {
1467 req->response_port = port;
1468
1469 if (nxt_fast_path(port_impl->ready)) {
1470 pthread_mutex_unlock(&lib->mutex);
1471
1472 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1473 (int) port->id.pid, (int) port->id.id);
1474
1475 return NXT_UNIT_OK;
1476 }
1477
1478 nxt_unit_debug(ctx, "check_response_port: "
1479 "port{%d,%d} already requested",
1480 (int) port->id.pid, (int) port->id.id);
1481
1482 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1483
1484 nxt_queue_insert_tail(&port_impl->awaiting_req,
1485 &req_impl->port_wait_link);
1486
1487 pthread_mutex_unlock(&lib->mutex);
1488
1489 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1490
1491 return NXT_UNIT_AGAIN;
1492 }
1493
1494 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1495 if (nxt_slow_path(port_impl == NULL)) {
1496 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1497 (int) sizeof(nxt_unit_port_impl_t));
1498
1499 pthread_mutex_unlock(&lib->mutex);
1500
1501 return NXT_UNIT_ERROR;
1502 }
1503
1504 port = &port_impl->port;
1505
1506 port->id = *port_id;
1507 port->in_fd = -1;
1508 port->out_fd = -1;
1509 port->data = NULL;
1510
1511 res = nxt_unit_port_hash_add(&lib->ports, port);
1512 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1513 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1514 port->id.pid, port->id.id);
1515
1516 pthread_mutex_unlock(&lib->mutex);
1517
1518 nxt_unit_free(ctx, port);
1519
1520 return NXT_UNIT_ERROR;
1521 }
1522
1523 process = nxt_unit_process_find(lib, port_id->pid, 0);
1524 if (nxt_slow_path(process == NULL)) {
1525 nxt_unit_alert(ctx, "check_response_port: process %d not found",
1526 port->id.pid);
1527
1528 nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1529
1530 pthread_mutex_unlock(&lib->mutex);
1531
1532 nxt_unit_free(ctx, port);
1533
1534 return NXT_UNIT_ERROR;
1535 }
1536
1537 nxt_queue_insert_tail(&process->ports, &port_impl->link);
1538
1539 port_impl->process = process;
1540 port_impl->queue = NULL;
1541 port_impl->from_socket = 0;
1542 port_impl->socket_rbuf = NULL;
1543
1544 nxt_queue_init(&port_impl->awaiting_req);
1545
1546 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1547
1548 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1549
1550 port_impl->use_count = 2;
1551 port_impl->ready = 0;
1552
1553 req->response_port = port;
1554
1555 pthread_mutex_unlock(&lib->mutex);
1556
1557 res = nxt_unit_get_port(ctx, port_id);
1558 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1559 return NXT_UNIT_ERROR;
1560 }
1561
1562 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1563
1564 return NXT_UNIT_AGAIN;
1565 }
1566
1567
1568 static int
nxt_unit_send_req_headers_ack(nxt_unit_request_info_t * req)1569 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1570 {
1571 ssize_t res;
1572 nxt_port_msg_t msg;
1573 nxt_unit_impl_t *lib;
1574 nxt_unit_ctx_impl_t *ctx_impl;
1575 nxt_unit_request_info_impl_t *req_impl;
1576
1577 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1578 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1579 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1580
1581 memset(&msg, 0, sizeof(nxt_port_msg_t));
1582
1583 msg.stream = req_impl->stream;
1584 msg.pid = lib->pid;
1585 msg.reply_port = ctx_impl->read_port->id.id;
1586 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1587
1588 res = nxt_unit_port_send(req->ctx, req->response_port,
1589 &msg, sizeof(msg), NULL);
1590 if (nxt_slow_path(res != sizeof(msg))) {
1591 return NXT_UNIT_ERROR;
1592 }
1593
1594 return NXT_UNIT_OK;
1595 }
1596
1597
1598 static int
nxt_unit_process_websocket(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1599 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1600 {
1601 size_t hsize;
1602 nxt_unit_impl_t *lib;
1603 nxt_unit_mmap_buf_t *b;
1604 nxt_unit_callbacks_t *cb;
1605 nxt_unit_request_info_t *req;
1606 nxt_unit_request_info_impl_t *req_impl;
1607 nxt_unit_websocket_frame_impl_t *ws_impl;
1608
1609 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1610 if (nxt_slow_path(req == NULL)) {
1611 return NXT_UNIT_OK;
1612 }
1613
1614 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1615
1616 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1617 cb = &lib->callbacks;
1618
1619 if (cb->websocket_handler && recv_msg->size >= 2) {
1620 ws_impl = nxt_unit_websocket_frame_get(ctx);
1621 if (nxt_slow_path(ws_impl == NULL)) {
1622 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1623 req_impl->stream);
1624
1625 return NXT_UNIT_ERROR;
1626 }
1627
1628 ws_impl->ws.req = req;
1629
1630 ws_impl->buf = NULL;
1631
1632 if (recv_msg->mmap) {
1633 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1634 b->req = req;
1635 }
1636
1637 /* "Move" incoming buffer list to ws_impl. */
1638 ws_impl->buf = recv_msg->incoming_buf;
1639 ws_impl->buf->prev = &ws_impl->buf;
1640 recv_msg->incoming_buf = NULL;
1641
1642 b = ws_impl->buf;
1643
1644 } else {
1645 b = nxt_unit_mmap_buf_get(ctx);
1646 if (nxt_slow_path(b == NULL)) {
1647 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1648 req_impl->stream);
1649
1650 nxt_unit_websocket_frame_release(&ws_impl->ws);
1651
1652 return NXT_UNIT_ERROR;
1653 }
1654
1655 b->req = req;
1656 b->buf.start = recv_msg->start;
1657 b->buf.free = b->buf.start;
1658 b->buf.end = b->buf.start + recv_msg->size;
1659
1660 nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1661 }
1662
1663 ws_impl->ws.header = (void *) b->buf.start;
1664 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1665 ws_impl->ws.header);
1666
1667 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1668
1669 if (ws_impl->ws.header->mask) {
1670 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1671
1672 } else {
1673 ws_impl->ws.mask = NULL;
1674 }
1675
1676 b->buf.free += hsize;
1677
1678 ws_impl->ws.content_buf = &b->buf;
1679 ws_impl->ws.content_length = ws_impl->ws.payload_len;
1680
1681 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1682 "payload_len=%"PRIu64,
1683 ws_impl->ws.header->opcode,
1684 ws_impl->ws.payload_len);
1685
1686 cb->websocket_handler(&ws_impl->ws);
1687 }
1688
1689 if (recv_msg->last) {
1690 if (cb->close_handler) {
1691 nxt_unit_req_debug(req, "close_handler");
1692
1693 cb->close_handler(req);
1694
1695 } else {
1696 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1697 }
1698 }
1699
1700 return NXT_UNIT_OK;
1701 }
1702
1703
1704 static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t * ctx)1705 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1706 {
1707 nxt_unit_impl_t *lib;
1708 nxt_unit_callbacks_t *cb;
1709
1710 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1711 cb = &lib->callbacks;
1712
1713 if (cb->shm_ack_handler != NULL) {
1714 cb->shm_ack_handler(ctx);
1715 }
1716
1717 return NXT_UNIT_OK;
1718 }
1719
1720
1721 static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t * ctx)1722 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1723 {
1724 nxt_unit_impl_t *lib;
1725 nxt_queue_link_t *lnk;
1726 nxt_unit_ctx_impl_t *ctx_impl;
1727 nxt_unit_request_info_impl_t *req_impl;
1728
1729 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1730
1731 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1732
1733 pthread_mutex_lock(&ctx_impl->mutex);
1734
1735 if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1736 pthread_mutex_unlock(&ctx_impl->mutex);
1737
1738 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1739 + lib->request_data_size);
1740 if (nxt_slow_path(req_impl == NULL)) {
1741 return NULL;
1742 }
1743
1744 req_impl->req.unit = ctx->unit;
1745 req_impl->req.ctx = ctx;
1746
1747 pthread_mutex_lock(&ctx_impl->mutex);
1748
1749 } else {
1750 lnk = nxt_queue_first(&ctx_impl->free_req);
1751 nxt_queue_remove(lnk);
1752
1753 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1754 }
1755
1756 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1757
1758 pthread_mutex_unlock(&ctx_impl->mutex);
1759
1760 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1761
1762 return req_impl;
1763 }
1764
1765
1766 static void
nxt_unit_request_info_release(nxt_unit_request_info_t * req)1767 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1768 {
1769 nxt_unit_ctx_t *ctx;
1770 nxt_unit_ctx_impl_t *ctx_impl;
1771 nxt_unit_request_info_impl_t *req_impl;
1772
1773 ctx = req->ctx;
1774 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1775 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1776
1777 req->response = NULL;
1778 req->response_buf = NULL;
1779
1780 if (req_impl->in_hash) {
1781 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1782 }
1783
1784 while (req_impl->outgoing_buf != NULL) {
1785 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1786 }
1787
1788 while (req_impl->incoming_buf != NULL) {
1789 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1790 }
1791
1792 if (req->content_fd != -1) {
1793 nxt_unit_close(req->content_fd);
1794
1795 req->content_fd = -1;
1796 }
1797
1798 if (req->response_port != NULL) {
1799 nxt_unit_port_release(req->response_port);
1800
1801 req->response_port = NULL;
1802 }
1803
1804 req_impl->state = NXT_UNIT_RS_RELEASED;
1805
1806 pthread_mutex_lock(&ctx_impl->mutex);
1807
1808 nxt_queue_remove(&req_impl->link);
1809
1810 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1811
1812 pthread_mutex_unlock(&ctx_impl->mutex);
1813
1814 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1815 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1816 }
1817 }
1818
1819
1820 static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t * req_impl)1821 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1822 {
1823 nxt_unit_ctx_impl_t *ctx_impl;
1824
1825 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1826
1827 nxt_queue_remove(&req_impl->link);
1828
1829 if (req_impl != &ctx_impl->req) {
1830 nxt_unit_free(&ctx_impl->ctx, req_impl);
1831 }
1832 }
1833
1834
1835 static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t * ctx)1836 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1837 {
1838 nxt_queue_link_t *lnk;
1839 nxt_unit_ctx_impl_t *ctx_impl;
1840 nxt_unit_websocket_frame_impl_t *ws_impl;
1841
1842 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1843
1844 pthread_mutex_lock(&ctx_impl->mutex);
1845
1846 if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1847 pthread_mutex_unlock(&ctx_impl->mutex);
1848
1849 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1850 if (nxt_slow_path(ws_impl == NULL)) {
1851 return NULL;
1852 }
1853
1854 } else {
1855 lnk = nxt_queue_first(&ctx_impl->free_ws);
1856 nxt_queue_remove(lnk);
1857
1858 pthread_mutex_unlock(&ctx_impl->mutex);
1859
1860 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1861 }
1862
1863 ws_impl->ctx_impl = ctx_impl;
1864
1865 return ws_impl;
1866 }
1867
1868
1869 static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t * ws)1870 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1871 {
1872 nxt_unit_websocket_frame_impl_t *ws_impl;
1873
1874 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1875
1876 while (ws_impl->buf != NULL) {
1877 nxt_unit_mmap_buf_free(ws_impl->buf);
1878 }
1879
1880 ws->req = NULL;
1881
1882 pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1883
1884 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1885
1886 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1887 }
1888
1889
1890 static void
nxt_unit_websocket_frame_free(nxt_unit_ctx_t * ctx,nxt_unit_websocket_frame_impl_t * ws_impl)1891 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1892 nxt_unit_websocket_frame_impl_t *ws_impl)
1893 {
1894 nxt_queue_remove(&ws_impl->link);
1895
1896 nxt_unit_free(ctx, ws_impl);
1897 }
1898
1899
1900 uint16_t
nxt_unit_field_hash(const char * name,size_t name_length)1901 nxt_unit_field_hash(const char *name, size_t name_length)
1902 {
1903 u_char ch;
1904 uint32_t hash;
1905 const char *p, *end;
1906
1907 hash = 159406; /* Magic value copied from nxt_http_parse.c */
1908 end = name + name_length;
1909
1910 for (p = name; p < end; p++) {
1911 ch = *p;
1912 hash = (hash << 4) + hash + nxt_lowcase(ch);
1913 }
1914
1915 hash = (hash >> 16) ^ hash;
1916
1917 return hash;
1918 }
1919
1920
1921 void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t * req)1922 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1923 {
1924 char *name;
1925 uint32_t i, j;
1926 nxt_unit_field_t *fields, f;
1927 nxt_unit_request_t *r;
1928
1929 static nxt_str_t content_length = nxt_string("content-length");
1930 static nxt_str_t content_type = nxt_string("content-type");
1931 static nxt_str_t cookie = nxt_string("cookie");
1932
1933 nxt_unit_req_debug(req, "group_dup_fields");
1934
1935 r = req->request;
1936 fields = r->fields;
1937
1938 for (i = 0; i < r->fields_count; i++) {
1939 name = nxt_unit_sptr_get(&fields[i].name);
1940
1941 switch (fields[i].hash) {
1942 case NXT_UNIT_HASH_CONTENT_LENGTH:
1943 if (fields[i].name_length == content_length.length
1944 && nxt_unit_memcasecmp(name, content_length.start,
1945 content_length.length) == 0)
1946 {
1947 r->content_length_field = i;
1948 }
1949
1950 break;
1951
1952 case NXT_UNIT_HASH_CONTENT_TYPE:
1953 if (fields[i].name_length == content_type.length
1954 && nxt_unit_memcasecmp(name, content_type.start,
1955 content_type.length) == 0)
1956 {
1957 r->content_type_field = i;
1958 }
1959
1960 break;
1961
1962 case NXT_UNIT_HASH_COOKIE:
1963 if (fields[i].name_length == cookie.length
1964 && nxt_unit_memcasecmp(name, cookie.start,
1965 cookie.length) == 0)
1966 {
1967 r->cookie_field = i;
1968 }
1969
1970 break;
1971 }
1972
1973 for (j = i + 1; j < r->fields_count; j++) {
1974 if (fields[i].hash != fields[j].hash
1975 || fields[i].name_length != fields[j].name_length
1976 || nxt_unit_memcasecmp(name,
1977 nxt_unit_sptr_get(&fields[j].name),
1978 fields[j].name_length) != 0)
1979 {
1980 continue;
1981 }
1982
1983 f = fields[j];
1984 f.value.offset += (j - (i + 1)) * sizeof(f);
1985
1986 while (j > i + 1) {
1987 fields[j] = fields[j - 1];
1988 fields[j].name.offset -= sizeof(f);
1989 fields[j].value.offset -= sizeof(f);
1990 j--;
1991 }
1992
1993 fields[j] = f;
1994
1995 /* Assign the same name pointer for further grouping simplicity. */
1996 nxt_unit_sptr_set(&fields[j].name, name);
1997
1998 i++;
1999 }
2000 }
2001 }
2002
2003
2004 int
nxt_unit_response_init(nxt_unit_request_info_t * req,uint16_t status,uint32_t max_fields_count,uint32_t max_fields_size)2005 nxt_unit_response_init(nxt_unit_request_info_t *req,
2006 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2007 {
2008 uint32_t buf_size;
2009 nxt_unit_buf_t *buf;
2010 nxt_unit_request_info_impl_t *req_impl;
2011
2012 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2013
2014 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2015 nxt_unit_req_warn(req, "init: response already sent");
2016
2017 return NXT_UNIT_ERROR;
2018 }
2019
2020 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2021 (int) max_fields_count, (int) max_fields_size);
2022
2023 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2024 nxt_unit_req_debug(req, "duplicate response init");
2025 }
2026
2027 /*
2028 * Each field name and value 0-terminated by libunit,
2029 * this is the reason of '+ 2' below.
2030 */
2031 buf_size = sizeof(nxt_unit_response_t)
2032 + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2033 + max_fields_size;
2034
2035 if (nxt_slow_path(req->response_buf != NULL)) {
2036 buf = req->response_buf;
2037
2038 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2039 goto init_response;
2040 }
2041
2042 nxt_unit_buf_free(buf);
2043
2044 req->response_buf = NULL;
2045 req->response = NULL;
2046 req->response_max_fields = 0;
2047
2048 req_impl->state = NXT_UNIT_RS_START;
2049 }
2050
2051 buf = nxt_unit_response_buf_alloc(req, buf_size);
2052 if (nxt_slow_path(buf == NULL)) {
2053 return NXT_UNIT_ERROR;
2054 }
2055
2056 init_response:
2057
2058 memset(buf->start, 0, sizeof(nxt_unit_response_t));
2059
2060 req->response_buf = buf;
2061
2062 req->response = (nxt_unit_response_t *) buf->start;
2063 req->response->status = status;
2064
2065 buf->free = buf->start + sizeof(nxt_unit_response_t)
2066 + max_fields_count * sizeof(nxt_unit_field_t);
2067
2068 req->response_max_fields = max_fields_count;
2069 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2070
2071 return NXT_UNIT_OK;
2072 }
2073
2074
2075 int
nxt_unit_response_realloc(nxt_unit_request_info_t * req,uint32_t max_fields_count,uint32_t max_fields_size)2076 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2077 uint32_t max_fields_count, uint32_t max_fields_size)
2078 {
2079 char *p;
2080 uint32_t i, buf_size;
2081 nxt_unit_buf_t *buf;
2082 nxt_unit_field_t *f, *src;
2083 nxt_unit_response_t *resp;
2084 nxt_unit_request_info_impl_t *req_impl;
2085
2086 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2087
2088 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2089 nxt_unit_req_warn(req, "realloc: response not init");
2090
2091 return NXT_UNIT_ERROR;
2092 }
2093
2094 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2095 nxt_unit_req_warn(req, "realloc: response already sent");
2096
2097 return NXT_UNIT_ERROR;
2098 }
2099
2100 if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2101 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2102
2103 return NXT_UNIT_ERROR;
2104 }
2105
2106 /*
2107 * Each field name and value 0-terminated by libunit,
2108 * this is the reason of '+ 2' below.
2109 */
2110 buf_size = sizeof(nxt_unit_response_t)
2111 + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2112 + max_fields_size;
2113
2114 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2115
2116 buf = nxt_unit_response_buf_alloc(req, buf_size);
2117 if (nxt_slow_path(buf == NULL)) {
2118 nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2119 return NXT_UNIT_ERROR;
2120 }
2121
2122 resp = (nxt_unit_response_t *) buf->start;
2123
2124 memset(resp, 0, sizeof(nxt_unit_response_t));
2125
2126 resp->status = req->response->status;
2127 resp->content_length = req->response->content_length;
2128
2129 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2130 f = resp->fields;
2131
2132 for (i = 0; i < req->response->fields_count; i++) {
2133 src = req->response->fields + i;
2134
2135 if (nxt_slow_path(src->skip != 0)) {
2136 continue;
2137 }
2138
2139 if (nxt_slow_path(src->name_length + src->value_length + 2
2140 > (uint32_t) (buf->end - p)))
2141 {
2142 nxt_unit_req_warn(req, "realloc: not enough space for field"
2143 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2144 i, src, src->name_length, src->value_length);
2145
2146 goto fail;
2147 }
2148
2149 nxt_unit_sptr_set(&f->name, p);
2150 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2151 *p++ = '\0';
2152
2153 nxt_unit_sptr_set(&f->value, p);
2154 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2155 *p++ = '\0';
2156
2157 f->hash = src->hash;
2158 f->skip = 0;
2159 f->name_length = src->name_length;
2160 f->value_length = src->value_length;
2161
2162 resp->fields_count++;
2163 f++;
2164 }
2165
2166 if (req->response->piggyback_content_length > 0) {
2167 if (nxt_slow_path(req->response->piggyback_content_length
2168 > (uint32_t) (buf->end - p)))
2169 {
2170 nxt_unit_req_warn(req, "realloc: not enought space for content"
2171 " #%"PRIu32", %"PRIu32" required",
2172 i, req->response->piggyback_content_length);
2173
2174 goto fail;
2175 }
2176
2177 resp->piggyback_content_length =
2178 req->response->piggyback_content_length;
2179
2180 nxt_unit_sptr_set(&resp->piggyback_content, p);
2181 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2182 req->response->piggyback_content_length);
2183 }
2184
2185 buf->free = p;
2186
2187 nxt_unit_buf_free(req->response_buf);
2188
2189 req->response = resp;
2190 req->response_buf = buf;
2191 req->response_max_fields = max_fields_count;
2192
2193 return NXT_UNIT_OK;
2194
2195 fail:
2196
2197 nxt_unit_buf_free(buf);
2198
2199 return NXT_UNIT_ERROR;
2200 }
2201
2202
2203 int
nxt_unit_response_is_init(nxt_unit_request_info_t * req)2204 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2205 {
2206 nxt_unit_request_info_impl_t *req_impl;
2207
2208 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2209
2210 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2211 }
2212
2213
2214 int
nxt_unit_response_add_field(nxt_unit_request_info_t * req,const char * name,uint8_t name_length,const char * value,uint32_t value_length)2215 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2216 const char *name, uint8_t name_length,
2217 const char *value, uint32_t value_length)
2218 {
2219 nxt_unit_buf_t *buf;
2220 nxt_unit_field_t *f;
2221 nxt_unit_response_t *resp;
2222 nxt_unit_request_info_impl_t *req_impl;
2223
2224 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2225
2226 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2227 nxt_unit_req_warn(req, "add_field: response not initialized or "
2228 "already sent");
2229
2230 return NXT_UNIT_ERROR;
2231 }
2232
2233 resp = req->response;
2234
2235 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2236 nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2237 (int) resp->fields_count);
2238
2239 return NXT_UNIT_ERROR;
2240 }
2241
2242 buf = req->response_buf;
2243
2244 if (nxt_slow_path(name_length + value_length + 2
2245 > (uint32_t) (buf->end - buf->free)))
2246 {
2247 nxt_unit_req_warn(req, "add_field: response buffer overflow");
2248
2249 return NXT_UNIT_ERROR;
2250 }
2251
2252 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2253 resp->fields_count,
2254 (int) name_length, name,
2255 (int) value_length, value);
2256
2257 f = resp->fields + resp->fields_count;
2258
2259 nxt_unit_sptr_set(&f->name, buf->free);
2260 buf->free = nxt_cpymem(buf->free, name, name_length);
2261 *buf->free++ = '\0';
2262
2263 nxt_unit_sptr_set(&f->value, buf->free);
2264 buf->free = nxt_cpymem(buf->free, value, value_length);
2265 *buf->free++ = '\0';
2266
2267 f->hash = nxt_unit_field_hash(name, name_length);
2268 f->skip = 0;
2269 f->name_length = name_length;
2270 f->value_length = value_length;
2271
2272 resp->fields_count++;
2273
2274 return NXT_UNIT_OK;
2275 }
2276
2277
2278 int
nxt_unit_response_add_content(nxt_unit_request_info_t * req,const void * src,uint32_t size)2279 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2280 const void* src, uint32_t size)
2281 {
2282 nxt_unit_buf_t *buf;
2283 nxt_unit_response_t *resp;
2284 nxt_unit_request_info_impl_t *req_impl;
2285
2286 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2287
2288 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2289 nxt_unit_req_warn(req, "add_content: response not initialized yet");
2290
2291 return NXT_UNIT_ERROR;
2292 }
2293
2294 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2295 nxt_unit_req_warn(req, "add_content: response already sent");
2296
2297 return NXT_UNIT_ERROR;
2298 }
2299
2300 buf = req->response_buf;
2301
2302 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2303 nxt_unit_req_warn(req, "add_content: buffer overflow");
2304
2305 return NXT_UNIT_ERROR;
2306 }
2307
2308 resp = req->response;
2309
2310 if (resp->piggyback_content_length == 0) {
2311 nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2312 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2313 }
2314
2315 resp->piggyback_content_length += size;
2316
2317 buf->free = nxt_cpymem(buf->free, src, size);
2318
2319 return NXT_UNIT_OK;
2320 }
2321
2322
2323 int
nxt_unit_response_send(nxt_unit_request_info_t * req)2324 nxt_unit_response_send(nxt_unit_request_info_t *req)
2325 {
2326 int rc;
2327 nxt_unit_mmap_buf_t *mmap_buf;
2328 nxt_unit_request_info_impl_t *req_impl;
2329
2330 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2331
2332 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2333 nxt_unit_req_warn(req, "send: response is not initialized yet");
2334
2335 return NXT_UNIT_ERROR;
2336 }
2337
2338 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2339 nxt_unit_req_warn(req, "send: response already sent");
2340
2341 return NXT_UNIT_ERROR;
2342 }
2343
2344 if (req->request->websocket_handshake && req->response->status == 101) {
2345 nxt_unit_response_upgrade(req);
2346 }
2347
2348 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2349 req->response->fields_count,
2350 (int) (req->response_buf->free
2351 - req->response_buf->start));
2352
2353 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2354
2355 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2356 if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2357 req->response = NULL;
2358 req->response_buf = NULL;
2359 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2360
2361 nxt_unit_mmap_buf_free(mmap_buf);
2362 }
2363
2364 return rc;
2365 }
2366
2367
2368 int
nxt_unit_response_is_sent(nxt_unit_request_info_t * req)2369 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2370 {
2371 nxt_unit_request_info_impl_t *req_impl;
2372
2373 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2374
2375 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2376 }
2377
2378
2379 nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t * req,uint32_t size)2380 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2381 {
2382 int rc;
2383 nxt_unit_mmap_buf_t *mmap_buf;
2384 nxt_unit_request_info_impl_t *req_impl;
2385
2386 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2387 nxt_unit_req_warn(req, "response_buf_alloc: "
2388 "requested buffer (%"PRIu32") too big", size);
2389
2390 return NULL;
2391 }
2392
2393 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2394
2395 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2396
2397 mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2398 if (nxt_slow_path(mmap_buf == NULL)) {
2399 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2400
2401 return NULL;
2402 }
2403
2404 mmap_buf->req = req;
2405
2406 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2407
2408 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2409 size, size, mmap_buf,
2410 NULL);
2411 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2412 nxt_unit_mmap_buf_release(mmap_buf);
2413
2414 nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2415
2416 return NULL;
2417 }
2418
2419 return &mmap_buf->buf;
2420 }
2421
2422
2423 static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t * ctx)2424 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2425 {
2426 nxt_unit_mmap_buf_t *mmap_buf;
2427 nxt_unit_ctx_impl_t *ctx_impl;
2428
2429 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2430
2431 pthread_mutex_lock(&ctx_impl->mutex);
2432
2433 if (ctx_impl->free_buf == NULL) {
2434 pthread_mutex_unlock(&ctx_impl->mutex);
2435
2436 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2437 if (nxt_slow_path(mmap_buf == NULL)) {
2438 return NULL;
2439 }
2440
2441 } else {
2442 mmap_buf = ctx_impl->free_buf;
2443
2444 nxt_unit_mmap_buf_unlink(mmap_buf);
2445
2446 pthread_mutex_unlock(&ctx_impl->mutex);
2447 }
2448
2449 mmap_buf->ctx_impl = ctx_impl;
2450
2451 mmap_buf->hdr = NULL;
2452 mmap_buf->free_ptr = NULL;
2453
2454 return mmap_buf;
2455 }
2456
2457
2458 static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t * mmap_buf)2459 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2460 {
2461 nxt_unit_mmap_buf_unlink(mmap_buf);
2462
2463 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2464
2465 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2466
2467 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2468 }
2469
2470
2471 int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t * req)2472 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2473 {
2474 return req->request->websocket_handshake;
2475 }
2476
2477
2478 int
nxt_unit_response_upgrade(nxt_unit_request_info_t * req)2479 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2480 {
2481 int rc;
2482 nxt_unit_request_info_impl_t *req_impl;
2483
2484 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2485
2486 if (nxt_slow_path(req_impl->websocket != 0)) {
2487 nxt_unit_req_debug(req, "upgrade: already upgraded");
2488
2489 return NXT_UNIT_OK;
2490 }
2491
2492 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2493 nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2494
2495 return NXT_UNIT_ERROR;
2496 }
2497
2498 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2499 nxt_unit_req_warn(req, "upgrade: response already sent");
2500
2501 return NXT_UNIT_ERROR;
2502 }
2503
2504 rc = nxt_unit_request_hash_add(req->ctx, req);
2505 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2506 nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2507
2508 return NXT_UNIT_ERROR;
2509 }
2510
2511 req_impl->websocket = 1;
2512
2513 req->response->status = 101;
2514
2515 return NXT_UNIT_OK;
2516 }
2517
2518
2519 int
nxt_unit_response_is_websocket(nxt_unit_request_info_t * req)2520 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2521 {
2522 nxt_unit_request_info_impl_t *req_impl;
2523
2524 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2525
2526 return req_impl->websocket;
2527 }
2528
2529
2530 nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void * data)2531 nxt_unit_get_request_info_from_data(void *data)
2532 {
2533 nxt_unit_request_info_impl_t *req_impl;
2534
2535 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2536
2537 return &req_impl->req;
2538 }
2539
2540
2541 int
nxt_unit_buf_send(nxt_unit_buf_t * buf)2542 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2543 {
2544 int rc;
2545 nxt_unit_mmap_buf_t *mmap_buf;
2546 nxt_unit_request_info_t *req;
2547 nxt_unit_request_info_impl_t *req_impl;
2548
2549 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2550
2551 req = mmap_buf->req;
2552 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2553
2554 nxt_unit_req_debug(req, "buf_send: %d bytes",
2555 (int) (buf->free - buf->start));
2556
2557 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2558 nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2559
2560 return NXT_UNIT_ERROR;
2561 }
2562
2563 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2564 nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2565
2566 return NXT_UNIT_ERROR;
2567 }
2568
2569 if (nxt_fast_path(buf->free > buf->start)) {
2570 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2571 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2572 return rc;
2573 }
2574 }
2575
2576 nxt_unit_mmap_buf_free(mmap_buf);
2577
2578 return NXT_UNIT_OK;
2579 }
2580
2581
2582 static void
nxt_unit_buf_send_done(nxt_unit_buf_t * buf)2583 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2584 {
2585 int rc;
2586 nxt_unit_mmap_buf_t *mmap_buf;
2587 nxt_unit_request_info_t *req;
2588
2589 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2590
2591 req = mmap_buf->req;
2592
2593 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2594 if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2595 nxt_unit_mmap_buf_free(mmap_buf);
2596
2597 nxt_unit_request_info_release(req);
2598
2599 } else {
2600 nxt_unit_request_done(req, rc);
2601 }
2602 }
2603
2604
2605 static int
nxt_unit_mmap_buf_send(nxt_unit_request_info_t * req,nxt_unit_mmap_buf_t * mmap_buf,int last)2606 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2607 nxt_unit_mmap_buf_t *mmap_buf, int last)
2608 {
2609 struct {
2610 nxt_port_msg_t msg;
2611 nxt_port_mmap_msg_t mmap_msg;
2612 } m;
2613
2614 int rc;
2615 u_char *last_used, *first_free;
2616 ssize_t res;
2617 nxt_chunk_id_t first_free_chunk;
2618 nxt_unit_buf_t *buf;
2619 nxt_unit_impl_t *lib;
2620 nxt_port_mmap_header_t *hdr;
2621 nxt_unit_request_info_impl_t *req_impl;
2622
2623 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2624 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2625
2626 buf = &mmap_buf->buf;
2627 hdr = mmap_buf->hdr;
2628
2629 m.mmap_msg.size = buf->free - buf->start;
2630
2631 m.msg.stream = req_impl->stream;
2632 m.msg.pid = lib->pid;
2633 m.msg.reply_port = 0;
2634 m.msg.type = _NXT_PORT_MSG_DATA;
2635 m.msg.last = last != 0;
2636 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2637 m.msg.nf = 0;
2638 m.msg.mf = 0;
2639 m.msg.tracking = 0;
2640
2641 rc = NXT_UNIT_ERROR;
2642
2643 if (m.msg.mmap) {
2644 m.mmap_msg.mmap_id = hdr->id;
2645 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2646 (u_char *) buf->start);
2647
2648 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2649 req_impl->stream,
2650 (int) m.mmap_msg.mmap_id,
2651 (int) m.mmap_msg.chunk_id,
2652 (int) m.mmap_msg.size);
2653
2654 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2655 NULL);
2656 if (nxt_slow_path(res != sizeof(m))) {
2657 goto free_buf;
2658 }
2659
2660 last_used = (u_char *) buf->free - 1;
2661 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2662
2663 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2664 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2665
2666 buf->start = (char *) first_free;
2667 buf->free = buf->start;
2668
2669 if (buf->end < buf->start) {
2670 buf->end = buf->start;
2671 }
2672
2673 } else {
2674 buf->start = NULL;
2675 buf->free = NULL;
2676 buf->end = NULL;
2677
2678 mmap_buf->hdr = NULL;
2679 }
2680
2681 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2682 (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2683
2684 nxt_unit_debug(req->ctx, "allocated_chunks %d",
2685 (int) lib->outgoing.allocated_chunks);
2686
2687 } else {
2688 if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2689 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2690 {
2691 nxt_unit_alert(req->ctx,
2692 "#%"PRIu32": failed to send plain memory buffer"
2693 ": no space reserved for message header",
2694 req_impl->stream);
2695
2696 goto free_buf;
2697 }
2698
2699 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2700
2701 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2702 req_impl->stream,
2703 (int) (sizeof(m.msg) + m.mmap_msg.size));
2704
2705 res = nxt_unit_port_send(req->ctx, req->response_port,
2706 buf->start - sizeof(m.msg),
2707 m.mmap_msg.size + sizeof(m.msg), NULL);
2708
2709 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2710 goto free_buf;
2711 }
2712 }
2713
2714 rc = NXT_UNIT_OK;
2715
2716 free_buf:
2717
2718 nxt_unit_free_outgoing_buf(mmap_buf);
2719
2720 return rc;
2721 }
2722
2723
2724 void
nxt_unit_buf_free(nxt_unit_buf_t * buf)2725 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2726 {
2727 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2728 }
2729
2730
2731 static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t * mmap_buf)2732 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2733 {
2734 nxt_unit_free_outgoing_buf(mmap_buf);
2735
2736 nxt_unit_mmap_buf_release(mmap_buf);
2737 }
2738
2739
2740 static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t * mmap_buf)2741 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2742 {
2743 if (mmap_buf->hdr != NULL) {
2744 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2745 mmap_buf->hdr, mmap_buf->buf.start,
2746 mmap_buf->buf.end - mmap_buf->buf.start);
2747
2748 mmap_buf->hdr = NULL;
2749
2750 return;
2751 }
2752
2753 if (mmap_buf->free_ptr != NULL) {
2754 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2755
2756 mmap_buf->free_ptr = NULL;
2757 }
2758 }
2759
2760
2761 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t * ctx)2762 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2763 {
2764 nxt_unit_ctx_impl_t *ctx_impl;
2765 nxt_unit_read_buf_t *rbuf;
2766
2767 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2768
2769 pthread_mutex_lock(&ctx_impl->mutex);
2770
2771 rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2772
2773 pthread_mutex_unlock(&ctx_impl->mutex);
2774
2775 rbuf->oob.size = 0;
2776
2777 return rbuf;
2778 }
2779
2780
2781 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t * ctx_impl)2782 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2783 {
2784 nxt_queue_link_t *link;
2785 nxt_unit_read_buf_t *rbuf;
2786
2787 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2788 link = nxt_queue_first(&ctx_impl->free_rbuf);
2789 nxt_queue_remove(link);
2790
2791 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2792
2793 return rbuf;
2794 }
2795
2796 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2797
2798 if (nxt_fast_path(rbuf != NULL)) {
2799 rbuf->ctx_impl = ctx_impl;
2800 }
2801
2802 return rbuf;
2803 }
2804
2805
2806 static void
nxt_unit_read_buf_release(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)2807 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2808 nxt_unit_read_buf_t *rbuf)
2809 {
2810 nxt_unit_ctx_impl_t *ctx_impl;
2811
2812 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2813
2814 pthread_mutex_lock(&ctx_impl->mutex);
2815
2816 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2817
2818 pthread_mutex_unlock(&ctx_impl->mutex);
2819 }
2820
2821
2822 nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t * buf)2823 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2824 {
2825 nxt_unit_mmap_buf_t *mmap_buf;
2826
2827 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2828
2829 if (mmap_buf->next == NULL) {
2830 return NULL;
2831 }
2832
2833 return &mmap_buf->next->buf;
2834 }
2835
2836
2837 uint32_t
nxt_unit_buf_max(void)2838 nxt_unit_buf_max(void)
2839 {
2840 return PORT_MMAP_DATA_SIZE;
2841 }
2842
2843
2844 uint32_t
nxt_unit_buf_min(void)2845 nxt_unit_buf_min(void)
2846 {
2847 return PORT_MMAP_CHUNK_SIZE;
2848 }
2849
2850
2851 int
nxt_unit_response_write(nxt_unit_request_info_t * req,const void * start,size_t size)2852 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2853 size_t size)
2854 {
2855 ssize_t res;
2856
2857 res = nxt_unit_response_write_nb(req, start, size, size);
2858
2859 return res < 0 ? -res : NXT_UNIT_OK;
2860 }
2861
2862
2863 ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t * req,const void * start,size_t size,size_t min_size)2864 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2865 size_t size, size_t min_size)
2866 {
2867 int rc;
2868 ssize_t sent;
2869 uint32_t part_size, min_part_size, buf_size;
2870 const char *part_start;
2871 nxt_unit_mmap_buf_t mmap_buf;
2872 nxt_unit_request_info_impl_t *req_impl;
2873 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2874
2875 nxt_unit_req_debug(req, "write: %d", (int) size);
2876
2877 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2878
2879 part_start = start;
2880 sent = 0;
2881
2882 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2883 nxt_unit_req_alert(req, "write: response not initialized yet");
2884
2885 return -NXT_UNIT_ERROR;
2886 }
2887
2888 /* Check if response is not send yet. */
2889 if (nxt_slow_path(req->response_buf != NULL)) {
2890 part_size = req->response_buf->end - req->response_buf->free;
2891 part_size = nxt_min(size, part_size);
2892
2893 rc = nxt_unit_response_add_content(req, part_start, part_size);
2894 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2895 return -rc;
2896 }
2897
2898 rc = nxt_unit_response_send(req);
2899 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2900 return -rc;
2901 }
2902
2903 size -= part_size;
2904 part_start += part_size;
2905 sent += part_size;
2906
2907 min_size -= nxt_min(min_size, part_size);
2908 }
2909
2910 while (size > 0) {
2911 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2912 min_part_size = nxt_min(min_size, part_size);
2913 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2914
2915 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2916 min_part_size, &mmap_buf, local_buf);
2917 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2918 return -rc;
2919 }
2920
2921 buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2922 if (nxt_slow_path(buf_size == 0)) {
2923 return sent;
2924 }
2925 part_size = nxt_min(buf_size, part_size);
2926
2927 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2928 part_start, part_size);
2929
2930 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2931 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2932 return -rc;
2933 }
2934
2935 size -= part_size;
2936 part_start += part_size;
2937 sent += part_size;
2938
2939 min_size -= nxt_min(min_size, part_size);
2940 }
2941
2942 return sent;
2943 }
2944
2945
2946 int
nxt_unit_response_write_cb(nxt_unit_request_info_t * req,nxt_unit_read_info_t * read_info)2947 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2948 nxt_unit_read_info_t *read_info)
2949 {
2950 int rc;
2951 ssize_t n;
2952 uint32_t buf_size;
2953 nxt_unit_buf_t *buf;
2954 nxt_unit_mmap_buf_t mmap_buf;
2955 nxt_unit_request_info_impl_t *req_impl;
2956 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2957
2958 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2959
2960 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2961 nxt_unit_req_alert(req, "write: response not initialized yet");
2962
2963 return NXT_UNIT_ERROR;
2964 }
2965
2966 /* Check if response is not send yet. */
2967 if (nxt_slow_path(req->response_buf != NULL)) {
2968
2969 /* Enable content in headers buf. */
2970 rc = nxt_unit_response_add_content(req, "", 0);
2971 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2972 nxt_unit_req_error(req, "Failed to add piggyback content");
2973
2974 return rc;
2975 }
2976
2977 buf = req->response_buf;
2978
2979 while (buf->end - buf->free > 0) {
2980 n = read_info->read(read_info, buf->free, buf->end - buf->free);
2981 if (nxt_slow_path(n < 0)) {
2982 nxt_unit_req_error(req, "Read error");
2983
2984 return NXT_UNIT_ERROR;
2985 }
2986
2987 /* Manually increase sizes. */
2988 buf->free += n;
2989 req->response->piggyback_content_length += n;
2990
2991 if (read_info->eof) {
2992 break;
2993 }
2994 }
2995
2996 rc = nxt_unit_response_send(req);
2997 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2998 nxt_unit_req_error(req, "Failed to send headers with content");
2999
3000 return rc;
3001 }
3002
3003 if (read_info->eof) {
3004 return NXT_UNIT_OK;
3005 }
3006 }
3007
3008 while (!read_info->eof) {
3009 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3010 read_info->buf_size);
3011
3012 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3013
3014 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3015 buf_size, buf_size,
3016 &mmap_buf, local_buf);
3017 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3018 return rc;
3019 }
3020
3021 buf = &mmap_buf.buf;
3022
3023 while (!read_info->eof && buf->end > buf->free) {
3024 n = read_info->read(read_info, buf->free, buf->end - buf->free);
3025 if (nxt_slow_path(n < 0)) {
3026 nxt_unit_req_error(req, "Read error");
3027
3028 nxt_unit_free_outgoing_buf(&mmap_buf);
3029
3030 return NXT_UNIT_ERROR;
3031 }
3032
3033 buf->free += n;
3034 }
3035
3036 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3037 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3038 nxt_unit_req_error(req, "Failed to send content");
3039
3040 return rc;
3041 }
3042 }
3043
3044 return NXT_UNIT_OK;
3045 }
3046
3047
3048 ssize_t
nxt_unit_request_read(nxt_unit_request_info_t * req,void * dst,size_t size)3049 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3050 {
3051 ssize_t buf_res, res;
3052
3053 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3054 dst, size);
3055
3056 if (buf_res < (ssize_t) size && req->content_fd != -1) {
3057 res = read(req->content_fd, dst, size);
3058 if (nxt_slow_path(res < 0)) {
3059 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3060 strerror(errno), errno);
3061
3062 return res;
3063 }
3064
3065 if (res < (ssize_t) size) {
3066 nxt_unit_close(req->content_fd);
3067
3068 req->content_fd = -1;
3069 }
3070
3071 req->content_length -= res;
3072 size -= res;
3073
3074 dst = nxt_pointer_to(dst, res);
3075
3076 } else {
3077 res = 0;
3078 }
3079
3080 return buf_res + res;
3081 }
3082
3083
3084 ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t * req,size_t max_size)3085 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3086 {
3087 char *p;
3088 size_t l_size, b_size;
3089 nxt_unit_buf_t *b;
3090 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf;
3091
3092 if (req->content_length == 0) {
3093 return 0;
3094 }
3095
3096 l_size = 0;
3097
3098 b = req->content_buf;
3099
3100 while (b != NULL) {
3101 b_size = b->end - b->free;
3102 p = memchr(b->free, '\n', b_size);
3103
3104 if (p != NULL) {
3105 p++;
3106 l_size += p - b->free;
3107 break;
3108 }
3109
3110 l_size += b_size;
3111
3112 if (max_size <= l_size) {
3113 break;
3114 }
3115
3116 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3117 if (mmap_buf->next == NULL
3118 && req->content_fd != -1
3119 && l_size < req->content_length)
3120 {
3121 preread_buf = nxt_unit_request_preread(req, 16384);
3122 if (nxt_slow_path(preread_buf == NULL)) {
3123 return -1;
3124 }
3125
3126 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3127 }
3128
3129 b = nxt_unit_buf_next(b);
3130 }
3131
3132 return nxt_min(max_size, l_size);
3133 }
3134
3135
3136 static nxt_unit_mmap_buf_t *
nxt_unit_request_preread(nxt_unit_request_info_t * req,size_t size)3137 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
3138 {
3139 ssize_t res;
3140 nxt_unit_mmap_buf_t *mmap_buf;
3141
3142 if (req->content_fd == -1) {
3143 nxt_unit_req_alert(req, "preread: content_fd == -1");
3144 return NULL;
3145 }
3146
3147 mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
3148 if (nxt_slow_path(mmap_buf == NULL)) {
3149 nxt_unit_req_alert(req, "preread: failed to allocate buf");
3150 return NULL;
3151 }
3152
3153 mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size);
3154 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3155 nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
3156 nxt_unit_mmap_buf_release(mmap_buf);
3157 return NULL;
3158 }
3159
3160 mmap_buf->plain_ptr = mmap_buf->free_ptr;
3161
3162 mmap_buf->hdr = NULL;
3163 mmap_buf->buf.start = mmap_buf->free_ptr;
3164 mmap_buf->buf.free = mmap_buf->buf.start;
3165 mmap_buf->buf.end = mmap_buf->buf.start + size;
3166
3167 res = read(req->content_fd, mmap_buf->free_ptr, size);
3168 if (res < 0) {
3169 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3170 strerror(errno), errno);
3171
3172 nxt_unit_mmap_buf_free(mmap_buf);
3173
3174 return NULL;
3175 }
3176
3177 if (res < (ssize_t) size) {
3178 nxt_unit_close(req->content_fd);
3179
3180 req->content_fd = -1;
3181 }
3182
3183 nxt_unit_req_debug(req, "preread: read %d", (int) res);
3184
3185 mmap_buf->buf.end = mmap_buf->buf.free + res;
3186
3187 return mmap_buf;
3188 }
3189
3190
3191 static ssize_t
nxt_unit_buf_read(nxt_unit_buf_t ** b,uint64_t * len,void * dst,size_t size)3192 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
3193 {
3194 u_char *p;
3195 size_t rest, copy, read;
3196 nxt_unit_buf_t *buf, *last_buf;
3197
3198 p = dst;
3199 rest = size;
3200
3201 buf = *b;
3202 last_buf = buf;
3203
3204 while (buf != NULL) {
3205 last_buf = buf;
3206
3207 copy = buf->end - buf->free;
3208 copy = nxt_min(rest, copy);
3209
3210 p = nxt_cpymem(p, buf->free, copy);
3211
3212 buf->free += copy;
3213 rest -= copy;
3214
3215 if (rest == 0) {
3216 if (buf->end == buf->free) {
3217 buf = nxt_unit_buf_next(buf);
3218 }
3219
3220 break;
3221 }
3222
3223 buf = nxt_unit_buf_next(buf);
3224 }
3225
3226 *b = last_buf;
3227
3228 read = size - rest;
3229
3230 *len -= read;
3231
3232 return read;
3233 }
3234
3235
3236 void
nxt_unit_request_done(nxt_unit_request_info_t * req,int rc)3237 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
3238 {
3239 uint32_t size;
3240 nxt_port_msg_t msg;
3241 nxt_unit_impl_t *lib;
3242 nxt_unit_request_info_impl_t *req_impl;
3243
3244 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
3245
3246 nxt_unit_req_debug(req, "done: %d", rc);
3247
3248 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3249 goto skip_response_send;
3250 }
3251
3252 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
3253
3254 size = nxt_length("Content-Type") + nxt_length("text/plain");
3255
3256 rc = nxt_unit_response_init(req, 200, 1, size);
3257 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3258 goto skip_response_send;
3259 }
3260
3261 rc = nxt_unit_response_add_field(req, "Content-Type",
3262 nxt_length("Content-Type"),
3263 "text/plain", nxt_length("text/plain"));
3264 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3265 goto skip_response_send;
3266 }
3267 }
3268
3269 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
3270
3271 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
3272
3273 nxt_unit_buf_send_done(req->response_buf);
3274
3275 return;
3276 }
3277
3278 skip_response_send:
3279
3280 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
3281
3282 msg.stream = req_impl->stream;
3283 msg.pid = lib->pid;
3284 msg.reply_port = 0;
3285 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
3286 : _NXT_PORT_MSG_RPC_ERROR;
3287 msg.last = 1;
3288 msg.mmap = 0;
3289 msg.nf = 0;
3290 msg.mf = 0;
3291 msg.tracking = 0;
3292
3293 (void) nxt_unit_port_send(req->ctx, req->response_port,
3294 &msg, sizeof(msg), NULL);
3295
3296 nxt_unit_request_info_release(req);
3297 }
3298
3299
3300 int
nxt_unit_websocket_send(nxt_unit_request_info_t * req,uint8_t opcode,uint8_t last,const void * start,size_t size)3301 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
3302 uint8_t last, const void *start, size_t size)
3303 {
3304 const struct iovec iov = { (void *) start, size };
3305
3306 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
3307 }
3308
3309
3310 int
nxt_unit_websocket_sendv(nxt_unit_request_info_t * req,uint8_t opcode,uint8_t last,const struct iovec * iov,int iovcnt)3311 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
3312 uint8_t last, const struct iovec *iov, int iovcnt)
3313 {
3314 int i, rc;
3315 size_t l, copy;
3316 uint32_t payload_len, buf_size, alloc_size;
3317 const uint8_t *b;
3318 nxt_unit_buf_t *buf;
3319 nxt_unit_mmap_buf_t mmap_buf;
3320 nxt_websocket_header_t *wh;
3321 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
3322
3323 payload_len = 0;
3324
3325 for (i = 0; i < iovcnt; i++) {
3326 payload_len += iov[i].iov_len;
3327 }
3328
3329 buf_size = 10 + payload_len;
3330 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3331
3332 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3333 alloc_size, alloc_size,
3334 &mmap_buf, local_buf);
3335 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3336 return rc;
3337 }
3338
3339 buf = &mmap_buf.buf;
3340
3341 buf->start[0] = 0;
3342 buf->start[1] = 0;
3343
3344 buf_size -= buf->end - buf->start;
3345
3346 wh = (void *) buf->free;
3347
3348 buf->free = nxt_websocket_frame_init(wh, payload_len);
3349 wh->fin = last;
3350 wh->opcode = opcode;
3351
3352 for (i = 0; i < iovcnt; i++) {
3353 b = iov[i].iov_base;
3354 l = iov[i].iov_len;
3355
3356 while (l > 0) {
3357 copy = buf->end - buf->free;
3358 copy = nxt_min(l, copy);
3359
3360 buf->free = nxt_cpymem(buf->free, b, copy);
3361 b += copy;
3362 l -= copy;
3363
3364 if (l > 0) {
3365 if (nxt_fast_path(buf->free > buf->start)) {
3366 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3367
3368 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3369 return rc;
3370 }
3371 }
3372
3373 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3374
3375 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3376 alloc_size, alloc_size,
3377 &mmap_buf, local_buf);
3378 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3379 return rc;
3380 }
3381
3382 buf_size -= buf->end - buf->start;
3383 }
3384 }
3385 }
3386
3387 if (buf->free > buf->start) {
3388 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3389 }
3390
3391 return rc;
3392 }
3393
3394
3395 ssize_t
nxt_unit_websocket_read(nxt_unit_websocket_frame_t * ws,void * dst,size_t size)3396 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
3397 size_t size)
3398 {
3399 ssize_t res;
3400 uint8_t *b;
3401 uint64_t i, d;
3402
3403 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
3404 dst, size);
3405
3406 if (ws->mask == NULL) {
3407 return res;
3408 }
3409
3410 b = dst;
3411 d = (ws->payload_len - ws->content_length - res) % 4;
3412
3413 for (i = 0; i < (uint64_t) res; i++) {
3414 b[i] ^= ws->mask[ (i + d) % 4 ];
3415 }
3416
3417 return res;
3418 }
3419
3420
3421 int
nxt_unit_websocket_retain(nxt_unit_websocket_frame_t * ws)3422 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
3423 {
3424 char *b;
3425 size_t size, hsize;
3426 nxt_unit_websocket_frame_impl_t *ws_impl;
3427
3428 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
3429
3430 if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
3431 return NXT_UNIT_OK;
3432 }
3433
3434 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
3435
3436 b = nxt_unit_malloc(ws->req->ctx, size);
3437 if (nxt_slow_path(b == NULL)) {
3438 return NXT_UNIT_ERROR;
3439 }
3440
3441 memcpy(b, ws_impl->buf->buf.start, size);
3442
3443 hsize = nxt_websocket_frame_header_size(b);
3444
3445 ws_impl->buf->buf.start = b;
3446 ws_impl->buf->buf.free = b + hsize;
3447 ws_impl->buf->buf.end = b + size;
3448
3449 ws_impl->buf->free_ptr = b;
3450
3451 ws_impl->ws.header = (nxt_websocket_header_t *) b;
3452
3453 if (ws_impl->ws.header->mask) {
3454 ws_impl->ws.mask = (uint8_t *) b + hsize - 4;
3455
3456 } else {
3457 ws_impl->ws.mask = NULL;
3458 }
3459
3460 return NXT_UNIT_OK;
3461 }
3462
3463
3464 void
nxt_unit_websocket_done(nxt_unit_websocket_frame_t * ws)3465 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
3466 {
3467 nxt_unit_websocket_frame_release(ws);
3468 }
3469
3470
3471 static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_chunk_id_t * c,int * n,int min_n)3472 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3473 nxt_chunk_id_t *c, int *n, int min_n)
3474 {
3475 int res, nchunks, i;
3476 uint32_t outgoing_size;
3477 nxt_unit_mmap_t *mm, *mm_end;
3478 nxt_unit_impl_t *lib;
3479 nxt_port_mmap_header_t *hdr;
3480
3481 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3482
3483 pthread_mutex_lock(&lib->outgoing.mutex);
3484
3485 retry:
3486
3487 outgoing_size = lib->outgoing.size;
3488
3489 mm_end = lib->outgoing.elts + outgoing_size;
3490
3491 for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
3492 hdr = mm->hdr;
3493
3494 if (hdr->sent_over != 0xFFFFu
3495 && (hdr->sent_over != port->id.id
3496 || mm->src_thread != pthread_self()))
3497 {
3498 continue;
3499 }
3500
3501 *c = 0;
3502
3503 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
3504 nchunks = 1;
3505
3506 while (nchunks < *n) {
3507 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
3508 *c + nchunks);
3509
3510 if (res == 0) {
3511 if (nchunks >= min_n) {
3512 *n = nchunks;
3513
3514 goto unlock;
3515 }
3516
3517 for (i = 0; i < nchunks; i++) {
3518 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
3519 }
3520
3521 *c += nchunks + 1;
3522 nchunks = 0;
3523 break;
3524 }
3525
3526 nchunks++;
3527 }
3528
3529 if (nchunks >= min_n) {
3530 *n = nchunks;
3531
3532 goto unlock;
3533 }
3534 }
3535
3536 hdr->oosm = 1;
3537 }
3538
3539 if (outgoing_size >= lib->shm_mmap_limit) {
3540 /* Cannot allocate more shared memory. */
3541 pthread_mutex_unlock(&lib->outgoing.mutex);
3542
3543 if (min_n == 0) {
3544 *n = 0;
3545 }
3546
3547 if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n
3548 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
3549 {
3550 /* Memory allocated by application, but not send to router. */
3551 return NULL;
3552 }
3553
3554 /* Notify router about OOSM condition. */
3555
3556 res = nxt_unit_send_oosm(ctx, port);
3557 if (nxt_slow_path(res != NXT_UNIT_OK)) {
3558 return NULL;
3559 }
3560
3561 /* Return if caller can handle OOSM condition. Non-blocking mode. */
3562
3563 if (min_n == 0) {
3564 return NULL;
3565 }
3566
3567 nxt_unit_debug(ctx, "oosm: waiting for ACK");
3568
3569 res = nxt_unit_wait_shm_ack(ctx);
3570 if (nxt_slow_path(res != NXT_UNIT_OK)) {
3571 return NULL;
3572 }
3573
3574 nxt_unit_debug(ctx, "oosm: retry");
3575
3576 pthread_mutex_lock(&lib->outgoing.mutex);
3577
3578 goto retry;
3579 }
3580
3581 *c = 0;
3582 hdr = nxt_unit_new_mmap(ctx, port, *n);
3583
3584 unlock:
3585
3586 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n);
3587
3588 nxt_unit_debug(ctx, "allocated_chunks %d",
3589 (int) lib->outgoing.allocated_chunks);
3590
3591 pthread_mutex_unlock(&lib->outgoing.mutex);
3592
3593 return hdr;
3594 }
3595
3596
3597 static int
nxt_unit_send_oosm(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)3598 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3599 {
3600 ssize_t res;
3601 nxt_port_msg_t msg;
3602 nxt_unit_impl_t *lib;
3603
3604 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3605
3606 msg.stream = 0;
3607 msg.pid = lib->pid;
3608 msg.reply_port = 0;
3609 msg.type = _NXT_PORT_MSG_OOSM;
3610 msg.last = 0;
3611 msg.mmap = 0;
3612 msg.nf = 0;
3613 msg.mf = 0;
3614 msg.tracking = 0;
3615
3616 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
3617 if (nxt_slow_path(res != sizeof(msg))) {
3618 return NXT_UNIT_ERROR;
3619 }
3620
3621 return NXT_UNIT_OK;
3622 }
3623
3624
3625 static int
nxt_unit_wait_shm_ack(nxt_unit_ctx_t * ctx)3626 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3627 {
3628 int res;
3629 nxt_unit_ctx_impl_t *ctx_impl;
3630 nxt_unit_read_buf_t *rbuf;
3631
3632 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3633
3634 while (1) {
3635 rbuf = nxt_unit_read_buf_get(ctx);
3636 if (nxt_slow_path(rbuf == NULL)) {
3637 return NXT_UNIT_ERROR;
3638 }
3639
3640 do {
3641 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
3642 } while (res == NXT_UNIT_AGAIN);
3643
3644 if (res == NXT_UNIT_ERROR) {
3645 nxt_unit_read_buf_release(ctx, rbuf);
3646
3647 return NXT_UNIT_ERROR;
3648 }
3649
3650 if (nxt_unit_is_shm_ack(rbuf)) {
3651 nxt_unit_read_buf_release(ctx, rbuf);
3652 break;
3653 }
3654
3655 pthread_mutex_lock(&ctx_impl->mutex);
3656
3657 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3658
3659 pthread_mutex_unlock(&ctx_impl->mutex);
3660
3661 if (nxt_unit_is_quit(rbuf)) {
3662 nxt_unit_debug(ctx, "oosm: quit received");
3663
3664 return NXT_UNIT_ERROR;
3665 }
3666 }
3667
3668 return NXT_UNIT_OK;
3669 }
3670
3671
3672 static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t * mmaps,uint32_t i)3673 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3674 {
3675 uint32_t cap, n;
3676 nxt_unit_mmap_t *e;
3677
3678 if (nxt_fast_path(mmaps->size > i)) {
3679 return mmaps->elts + i;
3680 }
3681
3682 cap = mmaps->cap;
3683
3684 if (cap == 0) {
3685 cap = i + 1;
3686 }
3687
3688 while (i + 1 > cap) {
3689
3690 if (cap < 16) {
3691 cap = cap * 2;
3692
3693 } else {
3694 cap = cap + cap / 2;
3695 }
3696 }
3697
3698 if (cap != mmaps->cap) {
3699
3700 e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
3701 if (nxt_slow_path(e == NULL)) {
3702 return NULL;
3703 }
3704
3705 mmaps->elts = e;
3706
3707 for (n = mmaps->cap; n < cap; n++) {
3708 e = mmaps->elts + n;
3709
3710 e->hdr = NULL;
3711 nxt_queue_init(&e->awaiting_rbuf);
3712 }
3713
3714 mmaps->cap = cap;
3715 }
3716
3717 if (i + 1 > mmaps->size) {
3718 mmaps->size = i + 1;
3719 }
3720
3721 return mmaps->elts + i;
3722 }
3723
3724
3725 static nxt_port_mmap_header_t *
nxt_unit_new_mmap(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,int n)3726 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3727 {
3728 int i, fd, rc;
3729 void *mem;
3730 nxt_unit_mmap_t *mm;
3731 nxt_unit_impl_t *lib;
3732 nxt_port_mmap_header_t *hdr;
3733
3734 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3735
3736 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
3737 if (nxt_slow_path(mm == NULL)) {
3738 nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3739
3740 return NULL;
3741 }
3742
3743 fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
3744 if (nxt_slow_path(fd == -1)) {
3745 goto remove_fail;
3746 }
3747
3748 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3749 if (nxt_slow_path(mem == MAP_FAILED)) {
3750 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3751 strerror(errno), errno);
3752
3753 nxt_unit_close(fd);
3754
3755 goto remove_fail;
3756 }
3757
3758 mm->hdr = mem;
3759 hdr = mem;
3760
3761 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3762 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
3763
3764 hdr->id = lib->outgoing.size - 1;
3765 hdr->src_pid = lib->pid;
3766 hdr->dst_pid = port->id.pid;
3767 hdr->sent_over = port->id.id;
3768 mm->src_thread = pthread_self();
3769
3770 /* Mark first n chunk(s) as busy */
3771 for (i = 0; i < n; i++) {
3772 nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
3773 }
3774
3775 /* Mark as busy chunk followed the last available chunk. */
3776 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
3777 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
3778
3779 pthread_mutex_unlock(&lib->outgoing.mutex);
3780
3781 rc = nxt_unit_send_mmap(ctx, port, fd);
3782 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3783 munmap(mem, PORT_MMAP_SIZE);
3784 hdr = NULL;
3785
3786 } else {
3787 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3788 hdr->id, (int) lib->pid, (int) port->id.pid);
3789 }
3790
3791 nxt_unit_close(fd);
3792
3793 pthread_mutex_lock(&lib->outgoing.mutex);
3794
3795 if (nxt_fast_path(hdr != NULL)) {
3796 return hdr;
3797 }
3798
3799 remove_fail:
3800
3801 lib->outgoing.size--;
3802
3803 return NULL;
3804 }
3805
3806
3807 static int
nxt_unit_shm_open(nxt_unit_ctx_t * ctx,size_t size)3808 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
3809 {
3810 int fd;
3811 nxt_unit_impl_t *lib;
3812
3813 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3814
3815 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
3816 char name[64];
3817
3818 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3819 lib->pid, (void *) (uintptr_t) pthread_self());
3820 #endif
3821
3822 #if (NXT_HAVE_MEMFD_CREATE)
3823
3824 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3825 if (nxt_slow_path(fd == -1)) {
3826 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3827 strerror(errno), errno);
3828
3829 return -1;
3830 }
3831
3832 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3833
3834 #elif (NXT_HAVE_SHM_OPEN_ANON)
3835
3836 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3837 if (nxt_slow_path(fd == -1)) {
3838 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3839 strerror(errno), errno);
3840
3841 return -1;
3842 }
3843
3844 #elif (NXT_HAVE_SHM_OPEN)
3845
3846 /* Just in case. */
3847 shm_unlink(name);
3848
3849 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3850 if (nxt_slow_path(fd == -1)) {
3851 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3852 strerror(errno), errno);
3853
3854 return -1;
3855 }
3856
3857 if (nxt_slow_path(shm_unlink(name) == -1)) {
3858 nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3859 strerror(errno), errno);
3860 }
3861
3862 #else
3863
3864 #error No working shared memory implementation.
3865
3866 #endif
3867
3868 if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3869 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3870 strerror(errno), errno);
3871
3872 nxt_unit_close(fd);
3873
3874 return -1;
3875 }
3876
3877 return fd;
3878 }
3879
3880
3881 static int
nxt_unit_send_mmap(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,int fd)3882 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3883 {
3884 ssize_t res;
3885 nxt_send_oob_t oob;
3886 nxt_port_msg_t msg;
3887 nxt_unit_impl_t *lib;
3888 int fds[2] = {fd, -1};
3889
3890 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3891
3892 msg.stream = 0;
3893 msg.pid = lib->pid;
3894 msg.reply_port = 0;
3895 msg.type = _NXT_PORT_MSG_MMAP;
3896 msg.last = 0;
3897 msg.mmap = 0;
3898 msg.nf = 0;
3899 msg.mf = 0;
3900 msg.tracking = 0;
3901
3902 nxt_socket_msg_oob_init(&oob, fds);
3903
3904 res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), &oob);
3905 if (nxt_slow_path(res != sizeof(msg))) {
3906 return NXT_UNIT_ERROR;
3907 }
3908
3909 return NXT_UNIT_OK;
3910 }
3911
3912
3913 static int
nxt_unit_get_outgoing_buf(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,uint32_t size,uint32_t min_size,nxt_unit_mmap_buf_t * mmap_buf,char * local_buf)3914 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3915 uint32_t size, uint32_t min_size,
3916 nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
3917 {
3918 int nchunks, min_nchunks;
3919 nxt_chunk_id_t c;
3920 nxt_port_mmap_header_t *hdr;
3921
3922 if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
3923 if (local_buf != NULL) {
3924 mmap_buf->free_ptr = NULL;
3925 mmap_buf->plain_ptr = local_buf;
3926
3927 } else {
3928 mmap_buf->free_ptr = nxt_unit_malloc(ctx,
3929 size + sizeof(nxt_port_msg_t));
3930 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3931 return NXT_UNIT_ERROR;
3932 }
3933
3934 mmap_buf->plain_ptr = mmap_buf->free_ptr;
3935 }
3936
3937 mmap_buf->hdr = NULL;
3938 mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
3939 mmap_buf->buf.free = mmap_buf->buf.start;
3940 mmap_buf->buf.end = mmap_buf->buf.start + size;
3941
3942 nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
3943 mmap_buf->buf.start, (int) size);
3944
3945 return NXT_UNIT_OK;
3946 }
3947
3948 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3949 min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3950
3951 hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks);
3952 if (nxt_slow_path(hdr == NULL)) {
3953 if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
3954 mmap_buf->hdr = NULL;
3955 mmap_buf->buf.start = NULL;
3956 mmap_buf->buf.free = NULL;
3957 mmap_buf->buf.end = NULL;
3958 mmap_buf->free_ptr = NULL;
3959
3960 return NXT_UNIT_OK;
3961 }
3962
3963 return NXT_UNIT_ERROR;
3964 }
3965
3966 mmap_buf->hdr = hdr;
3967 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
3968 mmap_buf->buf.free = mmap_buf->buf.start;
3969 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
3970 mmap_buf->free_ptr = NULL;
3971 mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3972
3973 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
3974 (int) hdr->id, (int) c,
3975 (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
3976
3977 return NXT_UNIT_OK;
3978 }
3979
3980
3981 static int
nxt_unit_incoming_mmap(nxt_unit_ctx_t * ctx,pid_t pid,int fd)3982 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
3983 {
3984 int rc;
3985 void *mem;
3986 nxt_queue_t awaiting_rbuf;
3987 struct stat mmap_stat;
3988 nxt_unit_mmap_t *mm;
3989 nxt_unit_impl_t *lib;
3990 nxt_unit_ctx_impl_t *ctx_impl;
3991 nxt_unit_read_buf_t *rbuf;
3992 nxt_port_mmap_header_t *hdr;
3993
3994 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3995
3996 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
3997
3998 if (fstat(fd, &mmap_stat) == -1) {
3999 nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
4000 strerror(errno), errno);
4001
4002 return NXT_UNIT_ERROR;
4003 }
4004
4005 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
4006 MAP_SHARED, fd, 0);
4007 if (nxt_slow_path(mem == MAP_FAILED)) {
4008 nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)",
4009 strerror(errno), errno);
4010
4011 return NXT_UNIT_ERROR;
4012 }
4013
4014 hdr = mem;
4015
4016 if (nxt_slow_path(hdr->src_pid != pid)) {
4017
4018 nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header "
4019 "detected: %d != %d or %d != %d", (int) hdr->src_pid,
4020 (int) pid, (int) hdr->dst_pid, (int) lib->pid);
4021
4022 munmap(mem, PORT_MMAP_SIZE);
4023
4024 return NXT_UNIT_ERROR;
4025 }
4026
4027 nxt_queue_init(&awaiting_rbuf);
4028
4029 pthread_mutex_lock(&lib->incoming.mutex);
4030
4031 mm = nxt_unit_mmap_at(&lib->incoming, hdr->id);
4032 if (nxt_slow_path(mm == NULL)) {
4033 nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array");
4034
4035 munmap(mem, PORT_MMAP_SIZE);
4036
4037 rc = NXT_UNIT_ERROR;
4038
4039 } else {
4040 mm->hdr = hdr;
4041
4042 hdr->sent_over = 0xFFFFu;
4043
4044 nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
4045 nxt_queue_init(&mm->awaiting_rbuf);
4046
4047 rc = NXT_UNIT_OK;
4048 }
4049
4050 pthread_mutex_unlock(&lib->incoming.mutex);
4051
4052 nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
4053
4054 ctx_impl = rbuf->ctx_impl;
4055
4056 pthread_mutex_lock(&ctx_impl->mutex);
4057
4058 nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
4059
4060 pthread_mutex_unlock(&ctx_impl->mutex);
4061
4062 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
4063
4064 nxt_unit_awake_ctx(ctx, ctx_impl);
4065
4066 } nxt_queue_loop;
4067
4068 return rc;
4069 }
4070
4071
4072 static void
nxt_unit_awake_ctx(nxt_unit_ctx_t * ctx,nxt_unit_ctx_impl_t * ctx_impl)4073 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
4074 {
4075 nxt_port_msg_t msg;
4076
4077 if (nxt_fast_path(ctx == &ctx_impl->ctx)) {
4078 return;
4079 }
4080
4081 if (nxt_slow_path(ctx_impl->read_port == NULL
4082 || ctx_impl->read_port->out_fd == -1))
4083 {
4084 nxt_unit_alert(ctx, "target context read_port is NULL or not writable");
4085
4086 return;
4087 }
4088
4089 memset(&msg, 0, sizeof(nxt_port_msg_t));
4090
4091 msg.type = _NXT_PORT_MSG_RPC_READY;
4092
4093 (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
4094 &msg, sizeof(msg), NULL);
4095 }
4096
4097
4098 static void
nxt_unit_mmaps_init(nxt_unit_mmaps_t * mmaps)4099 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
4100 {
4101 pthread_mutex_init(&mmaps->mutex, NULL);
4102
4103 mmaps->size = 0;
4104 mmaps->cap = 0;
4105 mmaps->elts = NULL;
4106 mmaps->allocated_chunks = 0;
4107 }
4108
4109
4110 nxt_inline void
nxt_unit_process_use(nxt_unit_process_t * process)4111 nxt_unit_process_use(nxt_unit_process_t *process)
4112 {
4113 nxt_atomic_fetch_add(&process->use_count, 1);
4114 }
4115
4116
4117 nxt_inline void
nxt_unit_process_release(nxt_unit_process_t * process)4118 nxt_unit_process_release(nxt_unit_process_t *process)
4119 {
4120 long c;
4121
4122 c = nxt_atomic_fetch_add(&process->use_count, -1);
4123
4124 if (c == 1) {
4125 nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
4126
4127 nxt_unit_free(NULL, process);
4128 }
4129 }
4130
4131
4132 static void
nxt_unit_mmaps_destroy(nxt_unit_mmaps_t * mmaps)4133 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
4134 {
4135 nxt_unit_mmap_t *mm, *end;
4136
4137 if (mmaps->elts != NULL) {
4138 end = mmaps->elts + mmaps->size;
4139
4140 for (mm = mmaps->elts; mm < end; mm++) {
4141 munmap(mm->hdr, PORT_MMAP_SIZE);
4142 }
4143
4144 nxt_unit_free(NULL, mmaps->elts);
4145 }
4146
4147 pthread_mutex_destroy(&mmaps->mutex);
4148 }
4149
4150
4151 static int
nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t * ctx,nxt_unit_mmaps_t * mmaps,pid_t pid,uint32_t id,nxt_port_mmap_header_t ** hdr,nxt_unit_read_buf_t * rbuf)4152 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
4153 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
4154 nxt_unit_read_buf_t *rbuf)
4155 {
4156 int res, need_rbuf;
4157 nxt_unit_mmap_t *mm;
4158 nxt_unit_ctx_impl_t *ctx_impl;
4159
4160 mm = nxt_unit_mmap_at(mmaps, id);
4161 if (nxt_slow_path(mm == NULL)) {
4162 nxt_unit_alert(ctx, "failed to allocate mmap");
4163
4164 pthread_mutex_unlock(&mmaps->mutex);
4165
4166 *hdr = NULL;
4167
4168 return NXT_UNIT_ERROR;
4169 }
4170
4171 *hdr = mm->hdr;
4172
4173 if (nxt_fast_path(*hdr != NULL)) {
4174 return NXT_UNIT_OK;
4175 }
4176
4177 need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
4178
4179 nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
4180
4181 pthread_mutex_unlock(&mmaps->mutex);
4182
4183 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4184
4185 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
4186
4187 if (need_rbuf) {
4188 res = nxt_unit_get_mmap(ctx, pid, id);
4189 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
4190 return NXT_UNIT_ERROR;
4191 }
4192 }
4193
4194 return NXT_UNIT_AGAIN;
4195 }
4196
4197
4198 static int
nxt_unit_mmap_read(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_read_buf_t * rbuf)4199 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
4200 nxt_unit_read_buf_t *rbuf)
4201 {
4202 int res;
4203 void *start;
4204 uint32_t size;
4205 nxt_unit_impl_t *lib;
4206 nxt_unit_mmaps_t *mmaps;
4207 nxt_unit_mmap_buf_t *b, **incoming_tail;
4208 nxt_port_mmap_msg_t *mmap_msg, *end;
4209 nxt_port_mmap_header_t *hdr;
4210
4211 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
4212 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
4213 recv_msg->stream, (int) recv_msg->size);
4214
4215 return NXT_UNIT_ERROR;
4216 }
4217
4218 mmap_msg = recv_msg->start;
4219 end = nxt_pointer_to(recv_msg->start, recv_msg->size);
4220
4221 incoming_tail = &recv_msg->incoming_buf;
4222
4223 /* Allocating buffer structures. */
4224 for (; mmap_msg < end; mmap_msg++) {
4225 b = nxt_unit_mmap_buf_get(ctx);
4226 if (nxt_slow_path(b == NULL)) {
4227 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
4228 recv_msg->stream);
4229
4230 while (recv_msg->incoming_buf != NULL) {
4231 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4232 }
4233
4234 return NXT_UNIT_ERROR;
4235 }
4236
4237 nxt_unit_mmap_buf_insert(incoming_tail, b);
4238 incoming_tail = &b->next;
4239 }
4240
4241 b = recv_msg->incoming_buf;
4242 mmap_msg = recv_msg->start;
4243
4244 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4245
4246 mmaps = &lib->incoming;
4247
4248 pthread_mutex_lock(&mmaps->mutex);
4249
4250 for (; mmap_msg < end; mmap_msg++) {
4251 res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
4252 recv_msg->pid, mmap_msg->mmap_id,
4253 &hdr, rbuf);
4254
4255 if (nxt_slow_path(res != NXT_UNIT_OK)) {
4256 while (recv_msg->incoming_buf != NULL) {
4257 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4258 }
4259
4260 return res;
4261 }
4262
4263 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
4264 size = mmap_msg->size;
4265
4266 if (recv_msg->start == mmap_msg) {
4267 recv_msg->start = start;
4268 recv_msg->size = size;
4269 }
4270
4271 b->buf.start = start;
4272 b->buf.free = start;
4273 b->buf.end = b->buf.start + size;
4274 b->hdr = hdr;
4275
4276 b = b->next;
4277
4278 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
4279 recv_msg->stream,
4280 start, (int) size,
4281 (int) hdr->src_pid, (int) hdr->dst_pid,
4282 (int) hdr->id, (int) mmap_msg->chunk_id,
4283 (int) mmap_msg->size);
4284 }
4285
4286 pthread_mutex_unlock(&mmaps->mutex);
4287
4288 return NXT_UNIT_OK;
4289 }
4290
4291
4292 static int
nxt_unit_get_mmap(nxt_unit_ctx_t * ctx,pid_t pid,uint32_t id)4293 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
4294 {
4295 ssize_t res;
4296 nxt_unit_impl_t *lib;
4297 nxt_unit_ctx_impl_t *ctx_impl;
4298
4299 struct {
4300 nxt_port_msg_t msg;
4301 nxt_port_msg_get_mmap_t get_mmap;
4302 } m;
4303
4304 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4305 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4306
4307 memset(&m.msg, 0, sizeof(nxt_port_msg_t));
4308
4309 m.msg.pid = lib->pid;
4310 m.msg.reply_port = ctx_impl->read_port->id.id;
4311 m.msg.type = _NXT_PORT_MSG_GET_MMAP;
4312
4313 m.get_mmap.id = id;
4314
4315 nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
4316
4317 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
4318 if (nxt_slow_path(res != sizeof(m))) {
4319 return NXT_UNIT_ERROR;
4320 }
4321
4322 return NXT_UNIT_OK;
4323 }
4324
4325
4326 static void
nxt_unit_mmap_release(nxt_unit_ctx_t * ctx,nxt_port_mmap_header_t * hdr,void * start,uint32_t size)4327 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr,
4328 void *start, uint32_t size)
4329 {
4330 int freed_chunks;
4331 u_char *p, *end;
4332 nxt_chunk_id_t c;
4333 nxt_unit_impl_t *lib;
4334
4335 memset(start, 0xA5, size);
4336
4337 p = start;
4338 end = p + size;
4339 c = nxt_port_mmap_chunk_id(hdr, p);
4340 freed_chunks = 0;
4341
4342 while (p < end) {
4343 nxt_port_mmap_set_chunk_free(hdr->free_map, c);
4344
4345 p += PORT_MMAP_CHUNK_SIZE;
4346 c++;
4347 freed_chunks++;
4348 }
4349
4350 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4351
4352 if (hdr->src_pid == lib->pid && freed_chunks != 0) {
4353 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks);
4354
4355 nxt_unit_debug(ctx, "allocated_chunks %d",
4356 (int) lib->outgoing.allocated_chunks);
4357 }
4358
4359 if (hdr->dst_pid == lib->pid
4360 && freed_chunks != 0
4361 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
4362 {
4363 nxt_unit_send_shm_ack(ctx, hdr->src_pid);
4364 }
4365 }
4366
4367
4368 static int
nxt_unit_send_shm_ack(nxt_unit_ctx_t * ctx,pid_t pid)4369 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
4370 {
4371 ssize_t res;
4372 nxt_port_msg_t msg;
4373 nxt_unit_impl_t *lib;
4374
4375 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4376
4377 msg.stream = 0;
4378 msg.pid = lib->pid;
4379 msg.reply_port = 0;
4380 msg.type = _NXT_PORT_MSG_SHM_ACK;
4381 msg.last = 0;
4382 msg.mmap = 0;
4383 msg.nf = 0;
4384 msg.mf = 0;
4385 msg.tracking = 0;
4386
4387 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
4388 if (nxt_slow_path(res != sizeof(msg))) {
4389 return NXT_UNIT_ERROR;
4390 }
4391
4392 return NXT_UNIT_OK;
4393 }
4394
4395
4396 static nxt_int_t
nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t * lhq,void * data)4397 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
4398 {
4399 nxt_process_t *process;
4400
4401 process = data;
4402
4403 if (lhq->key.length == sizeof(pid_t)
4404 && *(pid_t *) lhq->key.start == process->pid)
4405 {
4406 return NXT_OK;
4407 }
4408
4409 return NXT_DECLINED;
4410 }
4411
4412
4413 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
4414 NXT_LVLHSH_DEFAULT,
4415 nxt_unit_lvlhsh_pid_test,
4416 nxt_unit_lvlhsh_alloc,
4417 nxt_unit_lvlhsh_free,
4418 };
4419
4420
4421 static inline void
nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t * lhq,pid_t * pid)4422 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
4423 {
4424 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
4425 lhq->key.length = sizeof(*pid);
4426 lhq->key.start = (u_char *) pid;
4427 lhq->proto = &lvlhsh_processes_proto;
4428 }
4429
4430
4431 static nxt_unit_process_t *
nxt_unit_process_get(nxt_unit_ctx_t * ctx,pid_t pid)4432 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
4433 {
4434 nxt_unit_impl_t *lib;
4435 nxt_unit_process_t *process;
4436 nxt_lvlhsh_query_t lhq;
4437
4438 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4439
4440 nxt_unit_process_lhq_pid(&lhq, &pid);
4441
4442 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
4443 process = lhq.value;
4444 nxt_unit_process_use(process);
4445
4446 return process;
4447 }
4448
4449 process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t));
4450 if (nxt_slow_path(process == NULL)) {
4451 nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid);
4452
4453 return NULL;
4454 }
4455
4456 process->pid = pid;
4457 process->use_count = 2;
4458 process->next_port_id = 0;
4459 process->lib = lib;
4460
4461 nxt_queue_init(&process->ports);
4462
4463 lhq.replace = 0;
4464 lhq.value = process;
4465
4466 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
4467
4468 case NXT_OK:
4469 break;
4470
4471 default:
4472 nxt_unit_alert(ctx, "process %d insert failed", (int) pid);
4473
4474 nxt_unit_free(ctx, process);
4475 process = NULL;
4476 break;
4477 }
4478
4479 return process;
4480 }
4481
4482
4483 static nxt_unit_process_t *
nxt_unit_process_find(nxt_unit_impl_t * lib,pid_t pid,int remove)4484 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
4485 {
4486 int rc;
4487 nxt_lvlhsh_query_t lhq;
4488
4489 nxt_unit_process_lhq_pid(&lhq, &pid);
4490
4491 if (remove) {
4492 rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
4493
4494 } else {
4495 rc = nxt_lvlhsh_find(&lib->processes, &lhq);
4496 }
4497
4498 if (rc == NXT_OK) {
4499 if (!remove) {
4500 nxt_unit_process_use(lhq.value);
4501 }
4502
4503 return lhq.value;
4504 }
4505
4506 return NULL;
4507 }
4508
4509
4510 static nxt_unit_process_t *
nxt_unit_process_pop_first(nxt_unit_impl_t * lib)4511 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
4512 {
4513 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
4514 }
4515
4516
4517 int
nxt_unit_run(nxt_unit_ctx_t * ctx)4518 nxt_unit_run(nxt_unit_ctx_t *ctx)
4519 {
4520 int rc;
4521 nxt_unit_ctx_impl_t *ctx_impl;
4522
4523 nxt_unit_ctx_use(ctx);
4524
4525 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4526
4527 rc = NXT_UNIT_OK;
4528
4529 while (nxt_fast_path(ctx_impl->online)) {
4530 rc = nxt_unit_run_once_impl(ctx);
4531
4532 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4533 nxt_unit_quit(ctx, NXT_QUIT_NORMAL);
4534 break;
4535 }
4536 }
4537
4538 nxt_unit_ctx_release(ctx);
4539
4540 return rc;
4541 }
4542
4543
4544 int
nxt_unit_run_once(nxt_unit_ctx_t * ctx)4545 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
4546 {
4547 int rc;
4548
4549 nxt_unit_ctx_use(ctx);
4550
4551 rc = nxt_unit_run_once_impl(ctx);
4552
4553 nxt_unit_ctx_release(ctx);
4554
4555 return rc;
4556 }
4557
4558
4559 static int
nxt_unit_run_once_impl(nxt_unit_ctx_t * ctx)4560 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
4561 {
4562 int rc;
4563 nxt_unit_read_buf_t *rbuf;
4564
4565 rbuf = nxt_unit_read_buf_get(ctx);
4566 if (nxt_slow_path(rbuf == NULL)) {
4567 return NXT_UNIT_ERROR;
4568 }
4569
4570 rc = nxt_unit_read_buf(ctx, rbuf);
4571 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4572 nxt_unit_read_buf_release(ctx, rbuf);
4573
4574 return rc;
4575 }
4576
4577 rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4578 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4579 return NXT_UNIT_ERROR;
4580 }
4581
4582 rc = nxt_unit_process_pending_rbuf(ctx);
4583 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4584 return NXT_UNIT_ERROR;
4585 }
4586
4587 nxt_unit_process_ready_req(ctx);
4588
4589 return rc;
4590 }
4591
4592
4593 static int
nxt_unit_read_buf(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)4594 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4595 {
4596 int nevents, res, err;
4597 nxt_uint_t nfds;
4598 nxt_unit_impl_t *lib;
4599 nxt_unit_ctx_impl_t *ctx_impl;
4600 nxt_unit_port_impl_t *port_impl;
4601 struct pollfd fds[2];
4602
4603 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4604
4605 if (ctx_impl->wait_items > 0 || !nxt_unit_chk_ready(ctx)) {
4606 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4607 }
4608
4609 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4610 port);
4611
4612 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4613
4614 retry:
4615
4616 if (port_impl->from_socket == 0) {
4617 res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
4618 if (res == NXT_UNIT_OK) {
4619 if (nxt_unit_is_read_socket(rbuf)) {
4620 port_impl->from_socket++;
4621
4622 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
4623 (int) ctx_impl->read_port->id.pid,
4624 (int) ctx_impl->read_port->id.id,
4625 port_impl->from_socket);
4626
4627 } else {
4628 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
4629 (int) ctx_impl->read_port->id.pid,
4630 (int) ctx_impl->read_port->id.id,
4631 (int) rbuf->size);
4632
4633 return NXT_UNIT_OK;
4634 }
4635 }
4636 }
4637
4638 if (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
4639 res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
4640 if (res == NXT_UNIT_OK) {
4641 return NXT_UNIT_OK;
4642 }
4643
4644 fds[1].fd = lib->shared_port->in_fd;
4645 fds[1].events = POLLIN;
4646
4647 nfds = 2;
4648
4649 } else {
4650 nfds = 1;
4651 }
4652
4653 fds[0].fd = ctx_impl->read_port->in_fd;
4654 fds[0].events = POLLIN;
4655 fds[0].revents = 0;
4656
4657 fds[1].revents = 0;
4658
4659 nevents = poll(fds, nfds, -1);
4660 if (nxt_slow_path(nevents == -1)) {
4661 err = errno;
4662
4663 if (err == EINTR) {
4664 goto retry;
4665 }
4666
4667 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
4668 fds[0].fd, fds[1].fd, strerror(err), err);
4669
4670 rbuf->size = -1;
4671
4672 return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
4673 }
4674
4675 nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04X, %04X]",
4676 fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4677 fds[1].revents);
4678
4679 if ((fds[0].revents & POLLIN) != 0) {
4680 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4681 if (res == NXT_UNIT_AGAIN) {
4682 goto retry;
4683 }
4684
4685 return res;
4686 }
4687
4688 if ((fds[1].revents & POLLIN) != 0) {
4689 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4690 if (res == NXT_UNIT_AGAIN) {
4691 goto retry;
4692 }
4693
4694 return res;
4695 }
4696
4697 nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
4698 fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4699 fds[1].revents);
4700
4701 return NXT_UNIT_ERROR;
4702 }
4703
4704
4705 static int
nxt_unit_chk_ready(nxt_unit_ctx_t * ctx)4706 nxt_unit_chk_ready(nxt_unit_ctx_t *ctx)
4707 {
4708 nxt_unit_impl_t *lib;
4709 nxt_unit_ctx_impl_t *ctx_impl;
4710
4711 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4712 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4713
4714 return (ctx_impl->ready
4715 && (lib->request_limit == 0
4716 || lib->request_count < lib->request_limit));
4717 }
4718
4719
4720 static int
nxt_unit_process_pending_rbuf(nxt_unit_ctx_t * ctx)4721 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4722 {
4723 int rc;
4724 nxt_queue_t pending_rbuf;
4725 nxt_unit_ctx_impl_t *ctx_impl;
4726 nxt_unit_read_buf_t *rbuf;
4727
4728 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4729
4730 pthread_mutex_lock(&ctx_impl->mutex);
4731
4732 if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
4733 pthread_mutex_unlock(&ctx_impl->mutex);
4734
4735 return NXT_UNIT_OK;
4736 }
4737
4738 nxt_queue_init(&pending_rbuf);
4739
4740 nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
4741 nxt_queue_init(&ctx_impl->pending_rbuf);
4742
4743 pthread_mutex_unlock(&ctx_impl->mutex);
4744
4745 rc = NXT_UNIT_OK;
4746
4747 nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
4748
4749 if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
4750 rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
4751
4752 } else {
4753 nxt_unit_read_buf_release(ctx, rbuf);
4754 }
4755
4756 } nxt_queue_loop;
4757
4758 if (!ctx_impl->ready) {
4759 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
4760 }
4761
4762 return rc;
4763 }
4764
4765
4766 static void
nxt_unit_process_ready_req(nxt_unit_ctx_t * ctx)4767 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4768 {
4769 int res;
4770 nxt_queue_t ready_req;
4771 nxt_unit_impl_t *lib;
4772 nxt_unit_ctx_impl_t *ctx_impl;
4773 nxt_unit_request_info_t *req;
4774 nxt_unit_request_info_impl_t *req_impl;
4775
4776 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4777
4778 pthread_mutex_lock(&ctx_impl->mutex);
4779
4780 if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
4781 pthread_mutex_unlock(&ctx_impl->mutex);
4782
4783 return;
4784 }
4785
4786 nxt_queue_init(&ready_req);
4787
4788 nxt_queue_add(&ready_req, &ctx_impl->ready_req);
4789 nxt_queue_init(&ctx_impl->ready_req);
4790
4791 pthread_mutex_unlock(&ctx_impl->mutex);
4792
4793 nxt_queue_each(req_impl, &ready_req,
4794 nxt_unit_request_info_impl_t, port_wait_link)
4795 {
4796 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4797
4798 req = &req_impl->req;
4799
4800 res = nxt_unit_send_req_headers_ack(req);
4801 if (nxt_slow_path(res != NXT_UNIT_OK)) {
4802 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4803
4804 continue;
4805 }
4806
4807 if (req->content_length
4808 > (uint64_t) (req->content_buf->end - req->content_buf->free))
4809 {
4810 res = nxt_unit_request_hash_add(ctx, req);
4811 if (nxt_slow_path(res != NXT_UNIT_OK)) {
4812 nxt_unit_req_warn(req, "failed to add request to hash");
4813
4814 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4815
4816 continue;
4817 }
4818
4819 /*
4820 * If application have separate data handler, we may start
4821 * request processing and process data when it is arrived.
4822 */
4823 if (lib->callbacks.data_handler == NULL) {
4824 continue;
4825 }
4826 }
4827
4828 lib->callbacks.request_handler(&req_impl->req);
4829
4830 } nxt_queue_loop;
4831 }
4832
4833
4834 int
nxt_unit_run_ctx(nxt_unit_ctx_t * ctx)4835 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
4836 {
4837 int rc;
4838 nxt_unit_read_buf_t *rbuf;
4839 nxt_unit_ctx_impl_t *ctx_impl;
4840
4841 nxt_unit_ctx_use(ctx);
4842
4843 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4844
4845 rc = NXT_UNIT_OK;
4846
4847 while (nxt_fast_path(ctx_impl->online)) {
4848 rbuf = nxt_unit_read_buf_get(ctx);
4849 if (nxt_slow_path(rbuf == NULL)) {
4850 rc = NXT_UNIT_ERROR;
4851 break;
4852 }
4853
4854 retry:
4855
4856 rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4857 if (rc == NXT_UNIT_AGAIN) {
4858 goto retry;
4859 }
4860
4861 rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4862 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4863 break;
4864 }
4865
4866 rc = nxt_unit_process_pending_rbuf(ctx);
4867 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4868 break;
4869 }
4870
4871 nxt_unit_process_ready_req(ctx);
4872 }
4873
4874 nxt_unit_ctx_release(ctx);
4875
4876 return rc;
4877 }
4878
4879
4880 nxt_inline int
nxt_unit_is_read_queue(nxt_unit_read_buf_t * rbuf)4881 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
4882 {
4883 nxt_port_msg_t *port_msg;
4884
4885 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4886 port_msg = (nxt_port_msg_t *) rbuf->buf;
4887
4888 return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
4889 }
4890
4891 return 0;
4892 }
4893
4894
4895 nxt_inline int
nxt_unit_is_read_socket(nxt_unit_read_buf_t * rbuf)4896 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
4897 {
4898 if (nxt_fast_path(rbuf->size == 1)) {
4899 return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
4900 }
4901
4902 return 0;
4903 }
4904
4905
4906 nxt_inline int
nxt_unit_is_shm_ack(nxt_unit_read_buf_t * rbuf)4907 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
4908 {
4909 nxt_port_msg_t *port_msg;
4910
4911 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4912 port_msg = (nxt_port_msg_t *) rbuf->buf;
4913
4914 return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
4915 }
4916
4917 return 0;
4918 }
4919
4920
4921 nxt_inline int
nxt_unit_is_quit(nxt_unit_read_buf_t * rbuf)4922 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
4923 {
4924 nxt_port_msg_t *port_msg;
4925
4926 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4927 port_msg = (nxt_port_msg_t *) rbuf->buf;
4928
4929 return port_msg->type == _NXT_PORT_MSG_QUIT;
4930 }
4931
4932 return 0;
4933 }
4934
4935
4936 int
nxt_unit_run_shared(nxt_unit_ctx_t * ctx)4937 nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4938 {
4939 int rc;
4940 nxt_unit_impl_t *lib;
4941 nxt_unit_read_buf_t *rbuf;
4942
4943 nxt_unit_ctx_use(ctx);
4944
4945 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4946
4947 rc = NXT_UNIT_OK;
4948
4949 while (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
4950 rbuf = nxt_unit_read_buf_get(ctx);
4951 if (nxt_slow_path(rbuf == NULL)) {
4952 rc = NXT_UNIT_ERROR;
4953 break;
4954 }
4955
4956 retry:
4957
4958 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4959 if (rc == NXT_UNIT_AGAIN) {
4960 goto retry;
4961 }
4962
4963 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4964 nxt_unit_read_buf_release(ctx, rbuf);
4965 break;
4966 }
4967
4968 rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4969 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4970 break;
4971 }
4972 }
4973
4974 nxt_unit_ctx_release(ctx);
4975
4976 return rc;
4977 }
4978
4979
4980 nxt_unit_request_info_t *
nxt_unit_dequeue_request(nxt_unit_ctx_t * ctx)4981 nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
4982 {
4983 int rc;
4984 nxt_unit_impl_t *lib;
4985 nxt_unit_read_buf_t *rbuf;
4986 nxt_unit_request_info_t *req;
4987
4988 nxt_unit_ctx_use(ctx);
4989
4990 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4991
4992 req = NULL;
4993
4994 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
4995 goto done;
4996 }
4997
4998 rbuf = nxt_unit_read_buf_get(ctx);
4999 if (nxt_slow_path(rbuf == NULL)) {
5000 goto done;
5001 }
5002
5003 rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
5004 if (rc != NXT_UNIT_OK) {
5005 nxt_unit_read_buf_release(ctx, rbuf);
5006 goto done;
5007 }
5008
5009 (void) nxt_unit_process_msg(ctx, rbuf, &req);
5010
5011 done:
5012
5013 nxt_unit_ctx_release(ctx);
5014
5015 return req;
5016 }
5017
5018
5019 int
nxt_unit_process_port_msg(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)5020 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5021 {
5022 int rc;
5023
5024 nxt_unit_ctx_use(ctx);
5025
5026 rc = nxt_unit_process_port_msg_impl(ctx, port);
5027
5028 nxt_unit_ctx_release(ctx);
5029
5030 return rc;
5031 }
5032
5033
5034 static int
nxt_unit_process_port_msg_impl(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)5035 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5036 {
5037 int rc;
5038 nxt_unit_impl_t *lib;
5039 nxt_unit_read_buf_t *rbuf;
5040
5041 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5042
5043 if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) {
5044 return NXT_UNIT_AGAIN;
5045 }
5046
5047 rbuf = nxt_unit_read_buf_get(ctx);
5048 if (nxt_slow_path(rbuf == NULL)) {
5049 return NXT_UNIT_ERROR;
5050 }
5051
5052 if (port == lib->shared_port) {
5053 rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
5054
5055 } else {
5056 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
5057 }
5058
5059 if (rc != NXT_UNIT_OK) {
5060 nxt_unit_read_buf_release(ctx, rbuf);
5061 return rc;
5062 }
5063
5064 rc = nxt_unit_process_msg(ctx, rbuf, NULL);
5065 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5066 return NXT_UNIT_ERROR;
5067 }
5068
5069 rc = nxt_unit_process_pending_rbuf(ctx);
5070 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5071 return NXT_UNIT_ERROR;
5072 }
5073
5074 nxt_unit_process_ready_req(ctx);
5075
5076 return rc;
5077 }
5078
5079
5080 void
nxt_unit_done(nxt_unit_ctx_t * ctx)5081 nxt_unit_done(nxt_unit_ctx_t *ctx)
5082 {
5083 nxt_unit_ctx_release(ctx);
5084 }
5085
5086
5087 nxt_unit_ctx_t *
nxt_unit_ctx_alloc(nxt_unit_ctx_t * ctx,void * data)5088 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
5089 {
5090 int rc, queue_fd;
5091 void *mem;
5092 nxt_unit_impl_t *lib;
5093 nxt_unit_port_t *port;
5094 nxt_unit_ctx_impl_t *new_ctx;
5095 nxt_unit_port_impl_t *port_impl;
5096
5097 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5098
5099 new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t)
5100 + lib->request_data_size);
5101 if (nxt_slow_path(new_ctx == NULL)) {
5102 nxt_unit_alert(ctx, "failed to allocate context");
5103
5104 return NULL;
5105 }
5106
5107 rc = nxt_unit_ctx_init(lib, new_ctx, data);
5108 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5109 nxt_unit_free(ctx, new_ctx);
5110
5111 return NULL;
5112 }
5113
5114 queue_fd = -1;
5115
5116 port = nxt_unit_create_port(&new_ctx->ctx);
5117 if (nxt_slow_path(port == NULL)) {
5118 goto fail;
5119 }
5120
5121 new_ctx->read_port = port;
5122
5123 queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t));
5124 if (nxt_slow_path(queue_fd == -1)) {
5125 goto fail;
5126 }
5127
5128 mem = mmap(NULL, sizeof(nxt_port_queue_t),
5129 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
5130 if (nxt_slow_path(mem == MAP_FAILED)) {
5131 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
5132 strerror(errno), errno);
5133
5134 goto fail;
5135 }
5136
5137 nxt_port_queue_init(mem);
5138
5139 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5140 port_impl->queue = mem;
5141
5142 rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd);
5143 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5144 goto fail;
5145 }
5146
5147 nxt_unit_close(queue_fd);
5148
5149 return &new_ctx->ctx;
5150
5151 fail:
5152
5153 if (queue_fd != -1) {
5154 nxt_unit_close(queue_fd);
5155 }
5156
5157 nxt_unit_ctx_release(&new_ctx->ctx);
5158
5159 return NULL;
5160 }
5161
5162
5163 static void
nxt_unit_ctx_free(nxt_unit_ctx_impl_t * ctx_impl)5164 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
5165 {
5166 nxt_unit_impl_t *lib;
5167 nxt_unit_mmap_buf_t *mmap_buf;
5168 nxt_unit_read_buf_t *rbuf;
5169 nxt_unit_request_info_impl_t *req_impl;
5170 nxt_unit_websocket_frame_impl_t *ws_impl;
5171
5172 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
5173
5174 nxt_queue_each(req_impl, &ctx_impl->active_req,
5175 nxt_unit_request_info_impl_t, link)
5176 {
5177 nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
5178
5179 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
5180
5181 } nxt_queue_loop;
5182
5183 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
5184 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
5185
5186 while (ctx_impl->free_buf != NULL) {
5187 mmap_buf = ctx_impl->free_buf;
5188 nxt_unit_mmap_buf_unlink(mmap_buf);
5189 nxt_unit_free(&ctx_impl->ctx, mmap_buf);
5190 }
5191
5192 nxt_queue_each(req_impl, &ctx_impl->free_req,
5193 nxt_unit_request_info_impl_t, link)
5194 {
5195 nxt_unit_request_info_free(req_impl);
5196
5197 } nxt_queue_loop;
5198
5199 nxt_queue_each(ws_impl, &ctx_impl->free_ws,
5200 nxt_unit_websocket_frame_impl_t, link)
5201 {
5202 nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl);
5203
5204 } nxt_queue_loop;
5205
5206 nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link)
5207 {
5208 if (rbuf != &ctx_impl->ctx_read_buf) {
5209 nxt_unit_free(&ctx_impl->ctx, rbuf);
5210 }
5211 } nxt_queue_loop;
5212
5213 pthread_mutex_destroy(&ctx_impl->mutex);
5214
5215 pthread_mutex_lock(&lib->mutex);
5216
5217 nxt_queue_remove(&ctx_impl->link);
5218
5219 pthread_mutex_unlock(&lib->mutex);
5220
5221 if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5222 nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id);
5223 nxt_unit_port_release(ctx_impl->read_port);
5224 }
5225
5226 if (ctx_impl != &lib->main_ctx) {
5227 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
5228 }
5229
5230 nxt_unit_lib_release(lib);
5231 }
5232
5233
5234 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
5235 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
5236 #define NXT_UNIX_SOCKET SOCK_SEQPACKET
5237 #else
5238 #define NXT_UNIX_SOCKET SOCK_DGRAM
5239 #endif
5240
5241
5242 void
nxt_unit_port_id_init(nxt_unit_port_id_t * port_id,pid_t pid,uint16_t id)5243 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
5244 {
5245 nxt_unit_port_hash_id_t port_hash_id;
5246
5247 port_hash_id.pid = pid;
5248 port_hash_id.id = id;
5249
5250 port_id->pid = pid;
5251 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
5252 port_id->id = id;
5253 }
5254
5255
5256 static nxt_unit_port_t *
nxt_unit_create_port(nxt_unit_ctx_t * ctx)5257 nxt_unit_create_port(nxt_unit_ctx_t *ctx)
5258 {
5259 int rc, port_sockets[2];
5260 nxt_unit_impl_t *lib;
5261 nxt_unit_port_t new_port, *port;
5262 nxt_unit_process_t *process;
5263
5264 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5265
5266 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
5267 if (nxt_slow_path(rc != 0)) {
5268 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
5269 strerror(errno), errno);
5270
5271 return NULL;
5272 }
5273
5274 #if (NXT_HAVE_SOCKOPT_SO_PASSCRED)
5275 int enable_creds = 1;
5276
5277 if (nxt_slow_path(setsockopt(port_sockets[0], SOL_SOCKET, SO_PASSCRED,
5278 &enable_creds, sizeof(enable_creds)) == -1))
5279 {
5280 nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
5281 return NULL;
5282 }
5283
5284 if (nxt_slow_path(setsockopt(port_sockets[1], SOL_SOCKET, SO_PASSCRED,
5285 &enable_creds, sizeof(enable_creds)) == -1))
5286 {
5287 nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
5288 return NULL;
5289 }
5290 #endif
5291
5292 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
5293 port_sockets[0], port_sockets[1]);
5294
5295 pthread_mutex_lock(&lib->mutex);
5296
5297 process = nxt_unit_process_get(ctx, lib->pid);
5298 if (nxt_slow_path(process == NULL)) {
5299 pthread_mutex_unlock(&lib->mutex);
5300
5301 nxt_unit_close(port_sockets[0]);
5302 nxt_unit_close(port_sockets[1]);
5303
5304 return NULL;
5305 }
5306
5307 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
5308
5309 new_port.in_fd = port_sockets[0];
5310 new_port.out_fd = port_sockets[1];
5311 new_port.data = NULL;
5312
5313 pthread_mutex_unlock(&lib->mutex);
5314
5315 nxt_unit_process_release(process);
5316
5317 port = nxt_unit_add_port(ctx, &new_port, NULL);
5318 if (nxt_slow_path(port == NULL)) {
5319 nxt_unit_close(port_sockets[0]);
5320 nxt_unit_close(port_sockets[1]);
5321 }
5322
5323 return port;
5324 }
5325
5326
5327 static int
nxt_unit_send_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * dst,nxt_unit_port_t * port,int queue_fd)5328 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
5329 nxt_unit_port_t *port, int queue_fd)
5330 {
5331 ssize_t res;
5332 nxt_send_oob_t oob;
5333 nxt_unit_impl_t *lib;
5334 int fds[2] = { port->out_fd, queue_fd };
5335
5336 struct {
5337 nxt_port_msg_t msg;
5338 nxt_port_msg_new_port_t new_port;
5339 } m;
5340
5341 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5342
5343 m.msg.stream = 0;
5344 m.msg.pid = lib->pid;
5345 m.msg.reply_port = 0;
5346 m.msg.type = _NXT_PORT_MSG_NEW_PORT;
5347 m.msg.last = 0;
5348 m.msg.mmap = 0;
5349 m.msg.nf = 0;
5350 m.msg.mf = 0;
5351 m.msg.tracking = 0;
5352
5353 m.new_port.id = port->id.id;
5354 m.new_port.pid = port->id.pid;
5355 m.new_port.type = NXT_PROCESS_APP;
5356 m.new_port.max_size = 16 * 1024;
5357 m.new_port.max_share = 64 * 1024;
5358
5359 nxt_socket_msg_oob_init(&oob, fds);
5360
5361 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &oob);
5362
5363 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
5364 }
5365
5366
nxt_unit_port_use(nxt_unit_port_t * port)5367 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)
5368 {
5369 nxt_unit_port_impl_t *port_impl;
5370
5371 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5372
5373 nxt_atomic_fetch_add(&port_impl->use_count, 1);
5374 }
5375
5376
nxt_unit_port_release(nxt_unit_port_t * port)5377 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
5378 {
5379 long c;
5380 nxt_unit_port_impl_t *port_impl;
5381
5382 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5383
5384 c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5385
5386 if (c == 1) {
5387 nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d",
5388 (int) port->id.pid, (int) port->id.id,
5389 port->in_fd, port->out_fd);
5390
5391 nxt_unit_process_release(port_impl->process);
5392
5393 if (port->in_fd != -1) {
5394 nxt_unit_close(port->in_fd);
5395
5396 port->in_fd = -1;
5397 }
5398
5399 if (port->out_fd != -1) {
5400 nxt_unit_close(port->out_fd);
5401
5402 port->out_fd = -1;
5403 }
5404
5405 if (port_impl->queue != NULL) {
5406 munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID)
5407 ? sizeof(nxt_app_queue_t)
5408 : sizeof(nxt_port_queue_t));
5409 }
5410
5411 nxt_unit_free(NULL, port_impl);
5412 }
5413 }
5414
5415
5416 static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,void * queue)5417 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
5418 {
5419 int rc, ready;
5420 nxt_queue_t awaiting_req;
5421 nxt_unit_impl_t *lib;
5422 nxt_unit_port_t *old_port;
5423 nxt_unit_process_t *process;
5424 nxt_unit_port_impl_t *new_port, *old_port_impl;
5425
5426 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5427
5428 pthread_mutex_lock(&lib->mutex);
5429
5430 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
5431
5432 if (nxt_slow_path(old_port != NULL)) {
5433 nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
5434 "in_fd %d out_fd %d queue %p",
5435 port->id.pid, port->id.id,
5436 port->in_fd, port->out_fd, queue);
5437
5438 if (old_port->data == NULL) {
5439 old_port->data = port->data;
5440 port->data = NULL;
5441 }
5442
5443 if (old_port->in_fd == -1) {
5444 old_port->in_fd = port->in_fd;
5445 port->in_fd = -1;
5446 }
5447
5448 if (port->in_fd != -1) {
5449 nxt_unit_close(port->in_fd);
5450 port->in_fd = -1;
5451 }
5452
5453 if (old_port->out_fd == -1) {
5454 old_port->out_fd = port->out_fd;
5455 port->out_fd = -1;
5456 }
5457
5458 if (port->out_fd != -1) {
5459 nxt_unit_close(port->out_fd);
5460 port->out_fd = -1;
5461 }
5462
5463 *port = *old_port;
5464
5465 nxt_queue_init(&awaiting_req);
5466
5467 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
5468
5469 if (old_port_impl->queue == NULL) {
5470 old_port_impl->queue = queue;
5471 }
5472
5473 ready = (port->in_fd != -1 || port->out_fd != -1);
5474
5475 /*
5476 * Port can be market as 'ready' only after callbacks.add_port() call.
5477 * Otherwise, request may try to use the port before callback.
5478 */
5479 if (lib->callbacks.add_port == NULL && ready) {
5480 old_port_impl->ready = ready;
5481
5482 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5483 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5484 nxt_queue_init(&old_port_impl->awaiting_req);
5485 }
5486 }
5487
5488 pthread_mutex_unlock(&lib->mutex);
5489
5490 if (lib->callbacks.add_port != NULL && ready) {
5491 lib->callbacks.add_port(ctx, old_port);
5492
5493 pthread_mutex_lock(&lib->mutex);
5494
5495 old_port_impl->ready = ready;
5496
5497 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5498 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5499 nxt_queue_init(&old_port_impl->awaiting_req);
5500 }
5501
5502 pthread_mutex_unlock(&lib->mutex);
5503 }
5504
5505 nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5506
5507 return old_port;
5508 }
5509
5510 new_port = NULL;
5511 ready = 0;
5512
5513 nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
5514 port->id.pid, port->id.id,
5515 port->in_fd, port->out_fd, queue);
5516
5517 process = nxt_unit_process_get(ctx, port->id.pid);
5518 if (nxt_slow_path(process == NULL)) {
5519 goto unlock;
5520 }
5521
5522 if (port->id.id != NXT_UNIT_SHARED_PORT_ID
5523 && port->id.id >= process->next_port_id)
5524 {
5525 process->next_port_id = port->id.id + 1;
5526 }
5527
5528 new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
5529 if (nxt_slow_path(new_port == NULL)) {
5530 nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
5531 port->id.pid, port->id.id);
5532
5533 goto unlock;
5534 }
5535
5536 new_port->port = *port;
5537
5538 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
5539 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5540 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
5541 port->id.pid, port->id.id);
5542
5543 nxt_unit_free(ctx, new_port);
5544
5545 new_port = NULL;
5546
5547 goto unlock;
5548 }
5549
5550 nxt_queue_insert_tail(&process->ports, &new_port->link);
5551
5552 new_port->use_count = 2;
5553 new_port->process = process;
5554 new_port->queue = queue;
5555 new_port->from_socket = 0;
5556 new_port->socket_rbuf = NULL;
5557
5558 nxt_queue_init(&new_port->awaiting_req);
5559
5560 ready = (port->in_fd != -1 || port->out_fd != -1);
5561
5562 if (lib->callbacks.add_port == NULL) {
5563 new_port->ready = ready;
5564
5565 } else {
5566 new_port->ready = 0;
5567 }
5568
5569 process = NULL;
5570
5571 unlock:
5572
5573 pthread_mutex_unlock(&lib->mutex);
5574
5575 if (nxt_slow_path(process != NULL)) {
5576 nxt_unit_process_release(process);
5577 }
5578
5579 if (lib->callbacks.add_port != NULL && new_port != NULL && ready) {
5580 lib->callbacks.add_port(ctx, &new_port->port);
5581
5582 nxt_queue_init(&awaiting_req);
5583
5584 pthread_mutex_lock(&lib->mutex);
5585
5586 new_port->ready = 1;
5587
5588 if (!nxt_queue_is_empty(&new_port->awaiting_req)) {
5589 nxt_queue_add(&awaiting_req, &new_port->awaiting_req);
5590 nxt_queue_init(&new_port->awaiting_req);
5591 }
5592
5593 pthread_mutex_unlock(&lib->mutex);
5594
5595 nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5596 }
5597
5598 return (new_port == NULL) ? NULL : &new_port->port;
5599 }
5600
5601
5602 static void
nxt_unit_process_awaiting_req(nxt_unit_ctx_t * ctx,nxt_queue_t * awaiting_req)5603 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
5604 {
5605 nxt_unit_ctx_impl_t *ctx_impl;
5606 nxt_unit_request_info_impl_t *req_impl;
5607
5608 nxt_queue_each(req_impl, awaiting_req,
5609 nxt_unit_request_info_impl_t, port_wait_link)
5610 {
5611 nxt_queue_remove(&req_impl->port_wait_link);
5612
5613 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
5614 ctx);
5615
5616 pthread_mutex_lock(&ctx_impl->mutex);
5617
5618 nxt_queue_insert_tail(&ctx_impl->ready_req,
5619 &req_impl->port_wait_link);
5620
5621 pthread_mutex_unlock(&ctx_impl->mutex);
5622
5623 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
5624
5625 nxt_unit_awake_ctx(ctx, ctx_impl);
5626
5627 } nxt_queue_loop;
5628 }
5629
5630
5631 static void
nxt_unit_remove_port(nxt_unit_impl_t * lib,nxt_unit_ctx_t * ctx,nxt_unit_port_id_t * port_id)5632 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
5633 nxt_unit_port_id_t *port_id)
5634 {
5635 nxt_unit_port_t *port;
5636 nxt_unit_port_impl_t *port_impl;
5637
5638 pthread_mutex_lock(&lib->mutex);
5639
5640 port = nxt_unit_remove_port_unsafe(lib, port_id);
5641
5642 if (nxt_fast_path(port != NULL)) {
5643 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5644
5645 nxt_queue_remove(&port_impl->link);
5646 }
5647
5648 pthread_mutex_unlock(&lib->mutex);
5649
5650 if (lib->callbacks.remove_port != NULL && port != NULL) {
5651 lib->callbacks.remove_port(&lib->unit, ctx, port);
5652 }
5653
5654 if (nxt_fast_path(port != NULL)) {
5655 nxt_unit_port_release(port);
5656 }
5657 }
5658
5659
5660 static nxt_unit_port_t *
nxt_unit_remove_port_unsafe(nxt_unit_impl_t * lib,nxt_unit_port_id_t * port_id)5661 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5662 {
5663 nxt_unit_port_t *port;
5664
5665 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
5666 if (nxt_slow_path(port == NULL)) {
5667 nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
5668 (int) port_id->pid, (int) port_id->id);
5669
5670 return NULL;
5671 }
5672
5673 nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
5674 (int) port_id->pid, (int) port_id->id,
5675 port->in_fd, port->out_fd, port->data);
5676
5677 return port;
5678 }
5679
5680
5681 static void
nxt_unit_remove_pid(nxt_unit_impl_t * lib,pid_t pid)5682 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
5683 {
5684 nxt_unit_process_t *process;
5685
5686 pthread_mutex_lock(&lib->mutex);
5687
5688 process = nxt_unit_process_find(lib, pid, 1);
5689 if (nxt_slow_path(process == NULL)) {
5690 nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
5691
5692 pthread_mutex_unlock(&lib->mutex);
5693
5694 return;
5695 }
5696
5697 nxt_unit_remove_process(lib, process);
5698
5699 if (lib->callbacks.remove_pid != NULL) {
5700 lib->callbacks.remove_pid(&lib->unit, pid);
5701 }
5702 }
5703
5704
5705 static void
nxt_unit_remove_process(nxt_unit_impl_t * lib,nxt_unit_process_t * process)5706 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
5707 {
5708 nxt_queue_t ports;
5709 nxt_unit_port_impl_t *port;
5710
5711 nxt_queue_init(&ports);
5712
5713 nxt_queue_add(&ports, &process->ports);
5714
5715 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5716
5717 nxt_unit_remove_port_unsafe(lib, &port->port.id);
5718
5719 } nxt_queue_loop;
5720
5721 pthread_mutex_unlock(&lib->mutex);
5722
5723 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5724
5725 nxt_queue_remove(&port->link);
5726
5727 if (lib->callbacks.remove_port != NULL) {
5728 lib->callbacks.remove_port(&lib->unit, NULL, &port->port);
5729 }
5730
5731 nxt_unit_port_release(&port->port);
5732
5733 } nxt_queue_loop;
5734
5735 nxt_unit_process_release(process);
5736 }
5737
5738
5739 static void
nxt_unit_quit(nxt_unit_ctx_t * ctx,uint8_t quit_param)5740 nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param)
5741 {
5742 nxt_bool_t skip_graceful_broadcast, quit;
5743 nxt_unit_impl_t *lib;
5744 nxt_unit_ctx_impl_t *ctx_impl;
5745 nxt_unit_callbacks_t *cb;
5746 nxt_unit_request_info_t *req;
5747 nxt_unit_request_info_impl_t *req_impl;
5748
5749 struct {
5750 nxt_port_msg_t msg;
5751 uint8_t quit_param;
5752 } nxt_packed m;
5753
5754 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5755 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5756
5757 nxt_unit_debug(ctx, "quit: %d/%d/%d", (int) quit_param, ctx_impl->ready,
5758 ctx_impl->online);
5759
5760 if (nxt_slow_path(!ctx_impl->online)) {
5761 return;
5762 }
5763
5764 skip_graceful_broadcast = quit_param == NXT_QUIT_GRACEFUL
5765 && !ctx_impl->ready;
5766
5767 cb = &lib->callbacks;
5768
5769 if (nxt_fast_path(ctx_impl->ready)) {
5770 ctx_impl->ready = 0;
5771
5772 if (cb->remove_port != NULL) {
5773 cb->remove_port(&lib->unit, ctx, lib->shared_port);
5774 }
5775 }
5776
5777 if (quit_param == NXT_QUIT_GRACEFUL) {
5778 pthread_mutex_lock(&ctx_impl->mutex);
5779
5780 quit = nxt_queue_is_empty(&ctx_impl->active_req)
5781 && nxt_queue_is_empty(&ctx_impl->pending_rbuf)
5782 && ctx_impl->wait_items == 0;
5783
5784 pthread_mutex_unlock(&ctx_impl->mutex);
5785
5786 } else {
5787 quit = 1;
5788 ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
5789 }
5790
5791 if (quit) {
5792 ctx_impl->online = 0;
5793
5794 if (cb->quit != NULL) {
5795 cb->quit(ctx);
5796 }
5797
5798 nxt_queue_each(req_impl, &ctx_impl->active_req,
5799 nxt_unit_request_info_impl_t, link)
5800 {
5801 req = &req_impl->req;
5802
5803 nxt_unit_req_warn(req, "active request on ctx quit");
5804
5805 if (cb->close_handler) {
5806 nxt_unit_req_debug(req, "close_handler");
5807
5808 cb->close_handler(req);
5809
5810 } else {
5811 nxt_unit_request_done(req, NXT_UNIT_ERROR);
5812 }
5813
5814 } nxt_queue_loop;
5815
5816 if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5817 nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id);
5818 }
5819 }
5820
5821 if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) {
5822 return;
5823 }
5824
5825 memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5826
5827 m.msg.pid = lib->pid;
5828 m.msg.type = _NXT_PORT_MSG_QUIT;
5829 m.quit_param = quit_param;
5830
5831 pthread_mutex_lock(&lib->mutex);
5832
5833 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
5834
5835 if (ctx == &ctx_impl->ctx
5836 || ctx_impl->read_port == NULL
5837 || ctx_impl->read_port->out_fd == -1)
5838 {
5839 continue;
5840 }
5841
5842 (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
5843 &m, sizeof(m), NULL);
5844
5845 } nxt_queue_loop;
5846
5847 pthread_mutex_unlock(&lib->mutex);
5848 }
5849
5850
5851 static int
nxt_unit_get_port(nxt_unit_ctx_t * ctx,nxt_unit_port_id_t * port_id)5852 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
5853 {
5854 ssize_t res;
5855 nxt_unit_impl_t *lib;
5856 nxt_unit_ctx_impl_t *ctx_impl;
5857
5858 struct {
5859 nxt_port_msg_t msg;
5860 nxt_port_msg_get_port_t get_port;
5861 } m;
5862
5863 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5864 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5865
5866 memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5867
5868 m.msg.pid = lib->pid;
5869 m.msg.reply_port = ctx_impl->read_port->id.id;
5870 m.msg.type = _NXT_PORT_MSG_GET_PORT;
5871
5872 m.get_port.id = port_id->id;
5873 m.get_port.pid = port_id->pid;
5874
5875 nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
5876 (int) port_id->id);
5877
5878 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
5879 if (nxt_slow_path(res != sizeof(m))) {
5880 return NXT_UNIT_ERROR;
5881 }
5882
5883 return NXT_UNIT_OK;
5884 }
5885
5886
5887 static ssize_t
nxt_unit_port_send(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,const void * buf,size_t buf_size,const nxt_send_oob_t * oob)5888 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5889 const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
5890 {
5891 int notify;
5892 ssize_t ret;
5893 nxt_int_t rc;
5894 nxt_port_msg_t msg;
5895 nxt_unit_impl_t *lib;
5896 nxt_unit_port_impl_t *port_impl;
5897
5898 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5899
5900 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5901 if (port_impl->queue != NULL && (oob == NULL || oob->size == 0)
5902 && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
5903 {
5904 rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify);
5905 if (nxt_slow_path(rc != NXT_OK)) {
5906 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5907 (int) port->id.pid, (int) port->id.id);
5908
5909 return -1;
5910 }
5911
5912 nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
5913 (int) port->id.pid, (int) port->id.id,
5914 (int) buf_size, notify);
5915
5916 if (notify) {
5917 memcpy(&msg, buf, sizeof(nxt_port_msg_t));
5918
5919 msg.type = _NXT_PORT_MSG_READ_QUEUE;
5920
5921 if (lib->callbacks.port_send == NULL) {
5922 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
5923 sizeof(nxt_port_msg_t), NULL);
5924
5925 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
5926 (int) port->id.pid, (int) port->id.id,
5927 (int) ret);
5928
5929 } else {
5930 ret = lib->callbacks.port_send(ctx, port, &msg,
5931 sizeof(nxt_port_msg_t), NULL, 0);
5932
5933 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
5934 (int) port->id.pid, (int) port->id.id,
5935 (int) ret);
5936 }
5937
5938 }
5939
5940 return buf_size;
5941 }
5942
5943 if (port_impl->queue != NULL) {
5944 msg.type = _NXT_PORT_MSG_READ_SOCKET;
5945
5946 rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify);
5947 if (nxt_slow_path(rc != NXT_OK)) {
5948 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5949 (int) port->id.pid, (int) port->id.id);
5950
5951 return -1;
5952 }
5953
5954 nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
5955 (int) port->id.pid, (int) port->id.id, notify);
5956 }
5957
5958 if (lib->callbacks.port_send != NULL) {
5959 ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
5960 oob != NULL ? oob->buf : NULL,
5961 oob != NULL ? oob->size : 0);
5962
5963 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
5964 (int) port->id.pid, (int) port->id.id,
5965 (int) ret);
5966
5967 } else {
5968 ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, oob);
5969
5970 nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
5971 (int) port->id.pid, (int) port->id.id,
5972 (int) ret);
5973 }
5974
5975 return ret;
5976 }
5977
5978
5979 static ssize_t
nxt_unit_sendmsg(nxt_unit_ctx_t * ctx,int fd,const void * buf,size_t buf_size,const nxt_send_oob_t * oob)5980 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
5981 const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
5982 {
5983 int err;
5984 ssize_t n;
5985 struct iovec iov[1];
5986
5987 iov[0].iov_base = (void *) buf;
5988 iov[0].iov_len = buf_size;
5989
5990 retry:
5991
5992 n = nxt_sendmsg(fd, iov, 1, oob);
5993
5994 if (nxt_slow_path(n == -1)) {
5995 err = errno;
5996
5997 if (err == EINTR) {
5998 goto retry;
5999 }
6000
6001 /*
6002 * FIXME: This should be "alert" after router graceful shutdown
6003 * implementation.
6004 */
6005 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
6006 fd, (int) buf_size, strerror(err), err);
6007
6008 } else {
6009 nxt_unit_debug(ctx, "sendmsg(%d, %d, %d): %d", fd, (int) buf_size,
6010 (oob != NULL ? (int) oob->size : 0), (int) n);
6011 }
6012
6013 return n;
6014 }
6015
6016
6017 static int
nxt_unit_ctx_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6018 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6019 nxt_unit_read_buf_t *rbuf)
6020 {
6021 int res, read;
6022 nxt_unit_port_impl_t *port_impl;
6023
6024 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6025
6026 read = 0;
6027
6028 retry:
6029
6030 if (port_impl->from_socket > 0) {
6031 if (port_impl->socket_rbuf != NULL
6032 && port_impl->socket_rbuf->size > 0)
6033 {
6034 port_impl->from_socket--;
6035
6036 nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
6037 port_impl->socket_rbuf->size = 0;
6038
6039 nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
6040 (int) port->id.pid, (int) port->id.id,
6041 (int) rbuf->size);
6042
6043 return NXT_UNIT_OK;
6044 }
6045
6046 } else {
6047 res = nxt_unit_port_queue_recv(port, rbuf);
6048
6049 if (res == NXT_UNIT_OK) {
6050 if (nxt_unit_is_read_socket(rbuf)) {
6051 port_impl->from_socket++;
6052
6053 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
6054 (int) port->id.pid, (int) port->id.id,
6055 port_impl->from_socket);
6056
6057 goto retry;
6058 }
6059
6060 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
6061 (int) port->id.pid, (int) port->id.id,
6062 (int) rbuf->size);
6063
6064 return NXT_UNIT_OK;
6065 }
6066 }
6067
6068 if (read) {
6069 return NXT_UNIT_AGAIN;
6070 }
6071
6072 res = nxt_unit_port_recv(ctx, port, rbuf);
6073 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6074 return NXT_UNIT_ERROR;
6075 }
6076
6077 read = 1;
6078
6079 if (nxt_unit_is_read_queue(rbuf)) {
6080 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6081 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6082
6083 goto retry;
6084 }
6085
6086 nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
6087 (int) port->id.pid, (int) port->id.id,
6088 (int) rbuf->size);
6089
6090 if (res == NXT_UNIT_AGAIN) {
6091 return NXT_UNIT_AGAIN;
6092 }
6093
6094 if (port_impl->from_socket > 0) {
6095 port_impl->from_socket--;
6096
6097 return NXT_UNIT_OK;
6098 }
6099
6100 nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
6101 (int) port->id.pid, (int) port->id.id,
6102 (int) rbuf->size);
6103
6104 if (port_impl->socket_rbuf == NULL) {
6105 port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
6106
6107 if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
6108 return NXT_UNIT_ERROR;
6109 }
6110
6111 port_impl->socket_rbuf->size = 0;
6112 }
6113
6114 if (port_impl->socket_rbuf->size > 0) {
6115 nxt_unit_alert(ctx, "too many port socket messages");
6116
6117 return NXT_UNIT_ERROR;
6118 }
6119
6120 nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
6121
6122 rbuf->oob.size = 0;
6123
6124 goto retry;
6125 }
6126
6127
6128 nxt_inline void
nxt_unit_rbuf_cpy(nxt_unit_read_buf_t * dst,nxt_unit_read_buf_t * src)6129 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
6130 {
6131 memcpy(dst->buf, src->buf, src->size);
6132 dst->size = src->size;
6133 dst->oob.size = src->oob.size;
6134 memcpy(dst->oob.buf, src->oob.buf, src->oob.size);
6135 }
6136
6137
6138 static int
nxt_unit_shared_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6139 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6140 nxt_unit_read_buf_t *rbuf)
6141 {
6142 int res;
6143 nxt_unit_port_impl_t *port_impl;
6144
6145 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6146
6147 retry:
6148
6149 res = nxt_unit_app_queue_recv(ctx, port, rbuf);
6150
6151 if (res == NXT_UNIT_OK) {
6152 return NXT_UNIT_OK;
6153 }
6154
6155 if (res == NXT_UNIT_AGAIN) {
6156 res = nxt_unit_port_recv(ctx, port, rbuf);
6157 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6158 return NXT_UNIT_ERROR;
6159 }
6160
6161 if (nxt_unit_is_read_queue(rbuf)) {
6162 nxt_app_queue_notification_received(port_impl->queue);
6163
6164 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6165 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6166
6167 goto retry;
6168 }
6169 }
6170
6171 return res;
6172 }
6173
6174
6175 static int
nxt_unit_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6176 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6177 nxt_unit_read_buf_t *rbuf)
6178 {
6179 int fd, err;
6180 size_t oob_size;
6181 struct iovec iov[1];
6182 nxt_unit_impl_t *lib;
6183
6184 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6185
6186 if (lib->callbacks.port_recv != NULL) {
6187 oob_size = sizeof(rbuf->oob.buf);
6188
6189 rbuf->size = lib->callbacks.port_recv(ctx, port,
6190 rbuf->buf, sizeof(rbuf->buf),
6191 rbuf->oob.buf, &oob_size);
6192
6193 nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
6194 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6195
6196 if (nxt_slow_path(rbuf->size < 0)) {
6197 return NXT_UNIT_ERROR;
6198 }
6199
6200 rbuf->oob.size = oob_size;
6201 return NXT_UNIT_OK;
6202 }
6203
6204 iov[0].iov_base = rbuf->buf;
6205 iov[0].iov_len = sizeof(rbuf->buf);
6206
6207 fd = port->in_fd;
6208
6209 retry:
6210
6211 rbuf->size = nxt_recvmsg(fd, iov, 1, &rbuf->oob);
6212
6213 if (nxt_slow_path(rbuf->size == -1)) {
6214 err = errno;
6215
6216 if (err == EINTR) {
6217 goto retry;
6218 }
6219
6220 if (err == EAGAIN) {
6221 nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
6222 fd, strerror(err), err);
6223
6224 return NXT_UNIT_AGAIN;
6225 }
6226
6227 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
6228 fd, strerror(err), err);
6229
6230 return NXT_UNIT_ERROR;
6231 }
6232
6233 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
6234
6235 return NXT_UNIT_OK;
6236 }
6237
6238
6239 static int
nxt_unit_port_queue_recv(nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6240 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6241 {
6242 nxt_unit_port_impl_t *port_impl;
6243
6244 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6245
6246 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
6247
6248 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6249 }
6250
6251
6252 static int
nxt_unit_app_queue_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6253 nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6254 nxt_unit_read_buf_t *rbuf)
6255 {
6256 uint32_t cookie;
6257 nxt_port_msg_t *port_msg;
6258 nxt_app_queue_t *queue;
6259 nxt_unit_impl_t *lib;
6260 nxt_unit_port_impl_t *port_impl;
6261
6262 struct {
6263 nxt_port_msg_t msg;
6264 uint8_t quit_param;
6265 } nxt_packed m;
6266
6267 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6268 queue = port_impl->queue;
6269
6270 retry:
6271
6272 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
6273
6274 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
6275
6276 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
6277 port_msg = (nxt_port_msg_t *) rbuf->buf;
6278
6279 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
6280 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6281
6282 if (lib->request_limit != 0) {
6283 nxt_atomic_fetch_add(&lib->request_count, 1);
6284
6285 if (nxt_slow_path(lib->request_count >= lib->request_limit)) {
6286 nxt_unit_debug(ctx, "request limit reached");
6287
6288 memset(&m.msg, 0, sizeof(nxt_port_msg_t));
6289
6290 m.msg.pid = lib->pid;
6291 m.msg.type = _NXT_PORT_MSG_QUIT;
6292 m.quit_param = NXT_QUIT_GRACEFUL;
6293
6294 (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port,
6295 &m, sizeof(m), NULL);
6296 }
6297 }
6298
6299 return NXT_UNIT_OK;
6300 }
6301
6302 nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
6303
6304 goto retry;
6305 }
6306
6307 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6308 }
6309
6310
6311 nxt_inline int
nxt_unit_close(int fd)6312 nxt_unit_close(int fd)
6313 {
6314 int res;
6315
6316 res = close(fd);
6317
6318 if (nxt_slow_path(res == -1)) {
6319 nxt_unit_alert(NULL, "close(%d) failed: %s (%d)",
6320 fd, strerror(errno), errno);
6321
6322 } else {
6323 nxt_unit_debug(NULL, "close(%d): %d", fd, res);
6324 }
6325
6326 return res;
6327 }
6328
6329
6330 static int
nxt_unit_fd_blocking(int fd)6331 nxt_unit_fd_blocking(int fd)
6332 {
6333 int nb;
6334
6335 nb = 0;
6336
6337 if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
6338 nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
6339 fd, strerror(errno), errno);
6340
6341 return NXT_UNIT_ERROR;
6342 }
6343
6344 return NXT_UNIT_OK;
6345 }
6346
6347
6348 static nxt_int_t
nxt_unit_port_hash_test(nxt_lvlhsh_query_t * lhq,void * data)6349 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6350 {
6351 nxt_unit_port_t *port;
6352 nxt_unit_port_hash_id_t *port_id;
6353
6354 port = data;
6355 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
6356
6357 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
6358 && port_id->pid == port->id.pid
6359 && port_id->id == port->id.id)
6360 {
6361 return NXT_OK;
6362 }
6363
6364 return NXT_DECLINED;
6365 }
6366
6367
6368 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = {
6369 NXT_LVLHSH_DEFAULT,
6370 nxt_unit_port_hash_test,
6371 nxt_unit_lvlhsh_alloc,
6372 nxt_unit_lvlhsh_free,
6373 };
6374
6375
6376 static inline void
nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t * lhq,nxt_unit_port_hash_id_t * port_hash_id,nxt_unit_port_id_t * port_id)6377 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
6378 nxt_unit_port_hash_id_t *port_hash_id,
6379 nxt_unit_port_id_t *port_id)
6380 {
6381 port_hash_id->pid = port_id->pid;
6382 port_hash_id->id = port_id->id;
6383
6384 if (nxt_fast_path(port_id->hash != 0)) {
6385 lhq->key_hash = port_id->hash;
6386
6387 } else {
6388 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
6389
6390 port_id->hash = lhq->key_hash;
6391
6392 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
6393 (int) port_id->pid, (int) port_id->id,
6394 (int) port_id->hash);
6395 }
6396
6397 lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
6398 lhq->key.start = (u_char *) port_hash_id;
6399 lhq->proto = &lvlhsh_ports_proto;
6400 lhq->pool = NULL;
6401 }
6402
6403
6404 static int
nxt_unit_port_hash_add(nxt_lvlhsh_t * port_hash,nxt_unit_port_t * port)6405 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
6406 {
6407 nxt_int_t res;
6408 nxt_lvlhsh_query_t lhq;
6409 nxt_unit_port_hash_id_t port_hash_id;
6410
6411 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
6412 lhq.replace = 0;
6413 lhq.value = port;
6414
6415 res = nxt_lvlhsh_insert(port_hash, &lhq);
6416
6417 switch (res) {
6418
6419 case NXT_OK:
6420 return NXT_UNIT_OK;
6421
6422 default:
6423 return NXT_UNIT_ERROR;
6424 }
6425 }
6426
6427
6428 static nxt_unit_port_t *
nxt_unit_port_hash_find(nxt_lvlhsh_t * port_hash,nxt_unit_port_id_t * port_id,int remove)6429 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
6430 int remove)
6431 {
6432 nxt_int_t res;
6433 nxt_lvlhsh_query_t lhq;
6434 nxt_unit_port_hash_id_t port_hash_id;
6435
6436 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
6437
6438 if (remove) {
6439 res = nxt_lvlhsh_delete(port_hash, &lhq);
6440
6441 } else {
6442 res = nxt_lvlhsh_find(port_hash, &lhq);
6443 }
6444
6445 switch (res) {
6446
6447 case NXT_OK:
6448 if (!remove) {
6449 nxt_unit_port_use(lhq.value);
6450 }
6451
6452 return lhq.value;
6453
6454 default:
6455 return NULL;
6456 }
6457 }
6458
6459
6460 static nxt_int_t
nxt_unit_request_hash_test(nxt_lvlhsh_query_t * lhq,void * data)6461 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6462 {
6463 return NXT_OK;
6464 }
6465
6466
6467 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
6468 NXT_LVLHSH_DEFAULT,
6469 nxt_unit_request_hash_test,
6470 nxt_unit_lvlhsh_alloc,
6471 nxt_unit_lvlhsh_free,
6472 };
6473
6474
6475 static int
nxt_unit_request_hash_add(nxt_unit_ctx_t * ctx,nxt_unit_request_info_t * req)6476 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
6477 nxt_unit_request_info_t *req)
6478 {
6479 uint32_t *stream;
6480 nxt_int_t res;
6481 nxt_lvlhsh_query_t lhq;
6482 nxt_unit_ctx_impl_t *ctx_impl;
6483 nxt_unit_request_info_impl_t *req_impl;
6484
6485 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6486 if (req_impl->in_hash) {
6487 return NXT_UNIT_OK;
6488 }
6489
6490 stream = &req_impl->stream;
6491
6492 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
6493 lhq.key.length = sizeof(*stream);
6494 lhq.key.start = (u_char *) stream;
6495 lhq.proto = &lvlhsh_requests_proto;
6496 lhq.pool = NULL;
6497 lhq.replace = 0;
6498 lhq.value = req_impl;
6499
6500 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6501
6502 pthread_mutex_lock(&ctx_impl->mutex);
6503
6504 res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
6505
6506 pthread_mutex_unlock(&ctx_impl->mutex);
6507
6508 switch (res) {
6509
6510 case NXT_OK:
6511 req_impl->in_hash = 1;
6512 return NXT_UNIT_OK;
6513
6514 default:
6515 return NXT_UNIT_ERROR;
6516 }
6517 }
6518
6519
6520 static nxt_unit_request_info_t *
nxt_unit_request_hash_find(nxt_unit_ctx_t * ctx,uint32_t stream,int remove)6521 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
6522 {
6523 nxt_int_t res;
6524 nxt_lvlhsh_query_t lhq;
6525 nxt_unit_ctx_impl_t *ctx_impl;
6526 nxt_unit_request_info_impl_t *req_impl;
6527
6528 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
6529 lhq.key.length = sizeof(stream);
6530 lhq.key.start = (u_char *) &stream;
6531 lhq.proto = &lvlhsh_requests_proto;
6532 lhq.pool = NULL;
6533
6534 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6535
6536 pthread_mutex_lock(&ctx_impl->mutex);
6537
6538 if (remove) {
6539 res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
6540
6541 } else {
6542 res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
6543 }
6544
6545 pthread_mutex_unlock(&ctx_impl->mutex);
6546
6547 switch (res) {
6548
6549 case NXT_OK:
6550 req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
6551 req);
6552 if (remove) {
6553 req_impl->in_hash = 0;
6554 }
6555
6556 return lhq.value;
6557
6558 default:
6559 return NULL;
6560 }
6561 }
6562
6563
6564 void
nxt_unit_log(nxt_unit_ctx_t * ctx,int level,const char * fmt,...)6565 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
6566 {
6567 int log_fd, n;
6568 char msg[NXT_MAX_ERROR_STR], *p, *end;
6569 pid_t pid;
6570 va_list ap;
6571 nxt_unit_impl_t *lib;
6572
6573 if (nxt_fast_path(ctx != NULL)) {
6574 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6575
6576 pid = lib->pid;
6577 log_fd = lib->log_fd;
6578
6579 } else {
6580 pid = nxt_unit_pid;
6581 log_fd = STDERR_FILENO;
6582 }
6583
6584 p = msg;
6585 end = p + sizeof(msg) - 1;
6586
6587 p = nxt_unit_snprint_prefix(p, end, pid, level);
6588
6589 va_start(ap, fmt);
6590 p += vsnprintf(p, end - p, fmt, ap);
6591 va_end(ap);
6592
6593 if (nxt_slow_path(p > end)) {
6594 memcpy(end - 5, "[...]", 5);
6595 p = end;
6596 }
6597
6598 *p++ = '\n';
6599
6600 n = write(log_fd, msg, p - msg);
6601 if (nxt_slow_path(n < 0)) {
6602 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6603 }
6604 }
6605
6606
6607 void
nxt_unit_req_log(nxt_unit_request_info_t * req,int level,const char * fmt,...)6608 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
6609 {
6610 int log_fd, n;
6611 char msg[NXT_MAX_ERROR_STR], *p, *end;
6612 pid_t pid;
6613 va_list ap;
6614 nxt_unit_impl_t *lib;
6615 nxt_unit_request_info_impl_t *req_impl;
6616
6617 if (nxt_fast_path(req != NULL)) {
6618 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
6619
6620 pid = lib->pid;
6621 log_fd = lib->log_fd;
6622
6623 } else {
6624 pid = nxt_unit_pid;
6625 log_fd = STDERR_FILENO;
6626 }
6627
6628 p = msg;
6629 end = p + sizeof(msg) - 1;
6630
6631 p = nxt_unit_snprint_prefix(p, end, pid, level);
6632
6633 if (nxt_fast_path(req != NULL)) {
6634 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6635
6636 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
6637 }
6638
6639 va_start(ap, fmt);
6640 p += vsnprintf(p, end - p, fmt, ap);
6641 va_end(ap);
6642
6643 if (nxt_slow_path(p > end)) {
6644 memcpy(end - 5, "[...]", 5);
6645 p = end;
6646 }
6647
6648 *p++ = '\n';
6649
6650 n = write(log_fd, msg, p - msg);
6651 if (nxt_slow_path(n < 0)) {
6652 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6653 }
6654 }
6655
6656
6657 static const char * nxt_unit_log_levels[] = {
6658 "alert",
6659 "error",
6660 "warn",
6661 "notice",
6662 "info",
6663 "debug",
6664 };
6665
6666
6667 static char *
nxt_unit_snprint_prefix(char * p,char * end,pid_t pid,int level)6668 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
6669 {
6670 struct tm tm;
6671 struct timespec ts;
6672
6673 (void) clock_gettime(CLOCK_REALTIME, &ts);
6674
6675 #if (NXT_HAVE_LOCALTIME_R)
6676 (void) localtime_r(&ts.tv_sec, &tm);
6677 #else
6678 tm = *localtime(&ts.tv_sec);
6679 #endif
6680
6681 #if (NXT_DEBUG)
6682 p += snprintf(p, end - p,
6683 "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
6684 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6685 tm.tm_hour, tm.tm_min, tm.tm_sec,
6686 (int) ts.tv_nsec / 1000000);
6687 #else
6688 p += snprintf(p, end - p,
6689 "%4d/%02d/%02d %02d:%02d:%02d ",
6690 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6691 tm.tm_hour, tm.tm_min, tm.tm_sec);
6692 #endif
6693
6694 p += snprintf(p, end - p,
6695 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
6696 (int) pid,
6697 (uint64_t) (uintptr_t) nxt_thread_get_tid());
6698
6699 return p;
6700 }
6701
6702
6703 static void *
nxt_unit_lvlhsh_alloc(void * data,size_t size)6704 nxt_unit_lvlhsh_alloc(void *data, size_t size)
6705 {
6706 int err;
6707 void *p;
6708
6709 err = posix_memalign(&p, size, size);
6710
6711 if (nxt_fast_path(err == 0)) {
6712 nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p",
6713 (int) size, (int) size, p);
6714 return p;
6715 }
6716
6717 nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)",
6718 (int) size, (int) size, strerror(err), err);
6719 return NULL;
6720 }
6721
6722
6723 static void
nxt_unit_lvlhsh_free(void * data,void * p)6724 nxt_unit_lvlhsh_free(void *data, void *p)
6725 {
6726 nxt_unit_free(NULL, p);
6727 }
6728
6729
6730 void *
nxt_unit_malloc(nxt_unit_ctx_t * ctx,size_t size)6731 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
6732 {
6733 void *p;
6734
6735 p = malloc(size);
6736
6737 if (nxt_fast_path(p != NULL)) {
6738 #if (NXT_DEBUG_ALLOC)
6739 nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p);
6740 #endif
6741
6742 } else {
6743 nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)",
6744 (int) size, strerror(errno), errno);
6745 }
6746
6747 return p;
6748 }
6749
6750
6751 void
nxt_unit_free(nxt_unit_ctx_t * ctx,void * p)6752 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p)
6753 {
6754 #if (NXT_DEBUG_ALLOC)
6755 nxt_unit_debug(ctx, "free(%p)", p);
6756 #endif
6757
6758 free(p);
6759 }
6760
6761
6762 static int
nxt_unit_memcasecmp(const void * p1,const void * p2,size_t length)6763 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length)
6764 {
6765 u_char c1, c2;
6766 nxt_int_t n;
6767 const u_char *s1, *s2;
6768
6769 s1 = p1;
6770 s2 = p2;
6771
6772 while (length-- != 0) {
6773 c1 = *s1++;
6774 c2 = *s2++;
6775
6776 c1 = nxt_lowcase(c1);
6777 c2 = nxt_lowcase(c2);
6778
6779 n = c1 - c2;
6780
6781 if (n != 0) {
6782 return n;
6783 }
6784 }
6785
6786 return 0;
6787 }
6788