1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11 
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16 
17 #include "nxt_websocket.h"
18 
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22 
23 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE  \
25     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26 
27 enum {
28     NXT_QUIT_NORMAL   = 0,
29     NXT_QUIT_GRACEFUL = 1,
30 };
31 
32 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s               nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
43 
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46     nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52     nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54     nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58     int *log_fd, uint32_t *stream, uint32_t *shm_limit,
59     uint32_t *request_limit);
60 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
61     int queue_fd);
62 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
63     nxt_unit_request_info_t **preq);
64 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
65     nxt_unit_recv_msg_t *recv_msg);
66 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
67 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
68     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
69 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
70     nxt_unit_recv_msg_t *recv_msg);
71 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
72     nxt_unit_port_id_t *port_id);
73 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
74 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
75     nxt_unit_recv_msg_t *recv_msg);
76 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
77 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
78     nxt_unit_ctx_t *ctx);
79 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
80 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
81 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
82     nxt_unit_ctx_t *ctx);
83 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
84 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
85     nxt_unit_websocket_frame_impl_t *ws);
86 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
87 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
88 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
89     nxt_unit_mmap_buf_t *mmap_buf, int last);
90 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
91 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
92 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
94     nxt_unit_ctx_impl_t *ctx_impl);
95 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
96     nxt_unit_read_buf_t *rbuf);
97 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
98     nxt_unit_request_info_t *req, size_t size);
99 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
100     size_t size);
101 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
102     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
103 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
104 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
105 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
106 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
107     nxt_unit_port_t *port, int n);
108 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
109 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
110     int fd);
111 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
112     nxt_unit_port_t *port, uint32_t size,
113     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
114 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
115 
116 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
117     nxt_unit_ctx_impl_t *ctx_impl);
118 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
119 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
120 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
121 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
122 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
123     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
124     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
125 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
126     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
127 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
128 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
129     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
130 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
131 
132 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
133 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
134     pid_t pid, int remove);
135 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
136 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
137 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
138 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
139 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
140 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
141 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
142 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
145 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
146     nxt_unit_port_t *port);
147 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
148 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
149 
150 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
151     nxt_unit_port_t *port, int queue_fd);
152 
153 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
154 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
155 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
156     nxt_unit_port_t *port, void *queue);
157 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
158     nxt_queue_t *awaiting_req);
159 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
160     nxt_unit_port_id_t *port_id);
161 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
162     nxt_unit_port_id_t *port_id);
163 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
164 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
165     nxt_unit_process_t *process);
166 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
167 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
168 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
169     nxt_unit_port_t *port, const void *buf, size_t buf_size,
170     const nxt_send_oob_t *oob);
171 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
172     const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
173 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
174     nxt_unit_read_buf_t *rbuf);
175 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
176     nxt_unit_read_buf_t *src);
177 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
178     nxt_unit_read_buf_t *rbuf);
179 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
180     nxt_unit_read_buf_t *rbuf);
181 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
182     nxt_unit_read_buf_t *rbuf);
183 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
184     nxt_unit_read_buf_t *rbuf);
185 nxt_inline int nxt_unit_close(int fd);
186 static int nxt_unit_fd_blocking(int fd);
187 
188 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
189     nxt_unit_port_t *port);
190 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
191     nxt_unit_port_id_t *port_id, int remove);
192 
193 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
194     nxt_unit_request_info_t *req);
195 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
196     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
197 
198 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
199 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
200 static void nxt_unit_lvlhsh_free(void *data, void *p);
201 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
202 
203 
204 struct nxt_unit_mmap_buf_s {
205     nxt_unit_buf_t           buf;
206 
207     nxt_unit_mmap_buf_t      *next;
208     nxt_unit_mmap_buf_t      **prev;
209 
210     nxt_port_mmap_header_t   *hdr;
211     nxt_unit_request_info_t  *req;
212     nxt_unit_ctx_impl_t      *ctx_impl;
213     char                     *free_ptr;
214     char                     *plain_ptr;
215 };
216 
217 
218 struct nxt_unit_recv_msg_s {
219     uint32_t                 stream;
220     nxt_pid_t                pid;
221     nxt_port_id_t            reply_port;
222 
223     uint8_t                  last;      /* 1 bit */
224     uint8_t                  mmap;      /* 1 bit */
225 
226     void                     *start;
227     uint32_t                 size;
228 
229     int                      fd[2];
230 
231     nxt_unit_mmap_buf_t      *incoming_buf;
232 };
233 
234 
235 typedef enum {
236     NXT_UNIT_RS_START           = 0,
237     NXT_UNIT_RS_RESPONSE_INIT,
238     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
239     NXT_UNIT_RS_RESPONSE_SENT,
240     NXT_UNIT_RS_RELEASED,
241 } nxt_unit_req_state_t;
242 
243 
244 struct nxt_unit_request_info_impl_s {
245     nxt_unit_request_info_t  req;
246 
247     uint32_t                 stream;
248 
249     nxt_unit_mmap_buf_t      *outgoing_buf;
250     nxt_unit_mmap_buf_t      *incoming_buf;
251 
252     nxt_unit_req_state_t     state;
253     uint8_t                  websocket;
254     uint8_t                  in_hash;
255 
256     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
257     nxt_queue_link_t         link;
258     /*  for nxt_unit_port_impl_t.awaiting_req */
259     nxt_queue_link_t         port_wait_link;
260 
261     char                     extra_data[];
262 };
263 
264 
265 struct nxt_unit_websocket_frame_impl_s {
266     nxt_unit_websocket_frame_t  ws;
267 
268     nxt_unit_mmap_buf_t         *buf;
269 
270     nxt_queue_link_t            link;
271 
272     nxt_unit_ctx_impl_t         *ctx_impl;
273 };
274 
275 
276 struct nxt_unit_read_buf_s {
277     nxt_queue_link_t              link;
278     nxt_unit_ctx_impl_t           *ctx_impl;
279     ssize_t                       size;
280     nxt_recv_oob_t                oob;
281     char                          buf[16384];
282 };
283 
284 
285 struct nxt_unit_ctx_impl_s {
286     nxt_unit_ctx_t                ctx;
287 
288     nxt_atomic_t                  use_count;
289     nxt_atomic_t                  wait_items;
290 
291     pthread_mutex_t               mutex;
292 
293     nxt_unit_port_t               *read_port;
294 
295     nxt_queue_link_t              link;
296 
297     nxt_unit_mmap_buf_t           *free_buf;
298 
299     /*  of nxt_unit_request_info_impl_t */
300     nxt_queue_t                   free_req;
301 
302     /*  of nxt_unit_websocket_frame_impl_t */
303     nxt_queue_t                   free_ws;
304 
305     /*  of nxt_unit_request_info_impl_t */
306     nxt_queue_t                   active_req;
307 
308     /*  of nxt_unit_request_info_impl_t */
309     nxt_lvlhsh_t                  requests;
310 
311     /*  of nxt_unit_request_info_impl_t */
312     nxt_queue_t                   ready_req;
313 
314     /*  of nxt_unit_read_buf_t */
315     nxt_queue_t                   pending_rbuf;
316 
317     /*  of nxt_unit_read_buf_t */
318     nxt_queue_t                   free_rbuf;
319 
320     uint8_t                       online;       /* 1 bit */
321     uint8_t                       ready;        /* 1 bit */
322     uint8_t                       quit_param;
323 
324     nxt_unit_mmap_buf_t           ctx_buf[2];
325     nxt_unit_read_buf_t           ctx_read_buf;
326 
327     nxt_unit_request_info_impl_t  req;
328 };
329 
330 
331 struct nxt_unit_mmap_s {
332     nxt_port_mmap_header_t   *hdr;
333     pthread_t                src_thread;
334 
335     /*  of nxt_unit_read_buf_t */
336     nxt_queue_t              awaiting_rbuf;
337 };
338 
339 
340 struct nxt_unit_mmaps_s {
341     pthread_mutex_t          mutex;
342     uint32_t                 size;
343     uint32_t                 cap;
344     nxt_atomic_t             allocated_chunks;
345     nxt_unit_mmap_t          *elts;
346 };
347 
348 
349 struct nxt_unit_impl_s {
350     nxt_unit_t               unit;
351     nxt_unit_callbacks_t     callbacks;
352 
353     nxt_atomic_t             use_count;
354     nxt_atomic_t             request_count;
355 
356     uint32_t                 request_data_size;
357     uint32_t                 shm_mmap_limit;
358     uint32_t                 request_limit;
359 
360     pthread_mutex_t          mutex;
361 
362     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
363     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
364 
365     nxt_unit_port_t          *router_port;
366     nxt_unit_port_t          *shared_port;
367 
368     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
369 
370     nxt_unit_mmaps_t         incoming;
371     nxt_unit_mmaps_t         outgoing;
372 
373     pid_t                    pid;
374     int                      log_fd;
375 
376     nxt_unit_ctx_impl_t      main_ctx;
377 };
378 
379 
380 struct nxt_unit_port_impl_s {
381     nxt_unit_port_t          port;
382 
383     nxt_atomic_t             use_count;
384 
385     /*  for nxt_unit_process_t.ports */
386     nxt_queue_link_t         link;
387     nxt_unit_process_t       *process;
388 
389     /*  of nxt_unit_request_info_impl_t */
390     nxt_queue_t              awaiting_req;
391 
392     int                      ready;
393 
394     void                     *queue;
395 
396     int                      from_socket;
397     nxt_unit_read_buf_t      *socket_rbuf;
398 };
399 
400 
401 struct nxt_unit_process_s {
402     pid_t                    pid;
403 
404     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
405 
406     nxt_unit_impl_t          *lib;
407 
408     nxt_atomic_t             use_count;
409 
410     uint32_t                 next_port_id;
411 };
412 
413 
414 /* Explicitly using 32 bit types to avoid possible alignment. */
415 typedef struct {
416     int32_t   pid;
417     uint32_t  id;
418 } nxt_unit_port_hash_id_t;
419 
420 
421 static pid_t  nxt_unit_pid;
422 
423 
424 nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t * init)425 nxt_unit_init(nxt_unit_init_t *init)
426 {
427     int              rc, queue_fd;
428     void             *mem;
429     uint32_t         ready_stream, shm_limit, request_limit;
430     nxt_unit_ctx_t   *ctx;
431     nxt_unit_impl_t  *lib;
432     nxt_unit_port_t  ready_port, router_port, read_port;
433 
434     nxt_unit_pid = getpid();
435 
436     lib = nxt_unit_create(init);
437     if (nxt_slow_path(lib == NULL)) {
438         return NULL;
439     }
440 
441     queue_fd = -1;
442     mem = MAP_FAILED;
443 
444     if (init->ready_port.id.pid != 0
445         && init->ready_stream != 0
446         && init->read_port.id.pid != 0)
447     {
448         ready_port = init->ready_port;
449         ready_stream = init->ready_stream;
450         router_port = init->router_port;
451         read_port = init->read_port;
452         lib->log_fd = init->log_fd;
453 
454         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
455                               ready_port.id.id);
456         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
457                               router_port.id.id);
458         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
459                               read_port.id.id);
460 
461     } else {
462         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
463                                &lib->log_fd, &ready_stream, &shm_limit,
464                                &request_limit);
465         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
466             goto fail;
467         }
468 
469         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
470                                 / PORT_MMAP_DATA_SIZE;
471         lib->request_limit = request_limit;
472     }
473 
474     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
475         lib->shm_mmap_limit = 1;
476     }
477 
478     lib->pid = read_port.id.pid;
479     nxt_unit_pid = lib->pid;
480 
481     ctx = &lib->main_ctx.ctx;
482 
483     rc = nxt_unit_fd_blocking(router_port.out_fd);
484     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
485         goto fail;
486     }
487 
488     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
489     if (nxt_slow_path(lib->router_port == NULL)) {
490         nxt_unit_alert(NULL, "failed to add router_port");
491 
492         goto fail;
493     }
494 
495     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
496     if (nxt_slow_path(queue_fd == -1)) {
497         goto fail;
498     }
499 
500     mem = mmap(NULL, sizeof(nxt_port_queue_t),
501                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
502     if (nxt_slow_path(mem == MAP_FAILED)) {
503         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
504                        strerror(errno), errno);
505 
506         goto fail;
507     }
508 
509     nxt_port_queue_init(mem);
510 
511     rc = nxt_unit_fd_blocking(read_port.in_fd);
512     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
513         goto fail;
514     }
515 
516     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
517     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
518         nxt_unit_alert(NULL, "failed to add read_port");
519 
520         goto fail;
521     }
522 
523     rc = nxt_unit_fd_blocking(ready_port.out_fd);
524     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
525         goto fail;
526     }
527 
528     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
529     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
530         nxt_unit_alert(NULL, "failed to send READY message");
531 
532         goto fail;
533     }
534 
535     nxt_unit_close(ready_port.out_fd);
536     nxt_unit_close(queue_fd);
537 
538     return ctx;
539 
540 fail:
541 
542     if (mem != MAP_FAILED) {
543         munmap(mem, sizeof(nxt_port_queue_t));
544     }
545 
546     if (queue_fd != -1) {
547         nxt_unit_close(queue_fd);
548     }
549 
550     nxt_unit_ctx_release(&lib->main_ctx.ctx);
551 
552     return NULL;
553 }
554 
555 
556 static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t * init)557 nxt_unit_create(nxt_unit_init_t *init)
558 {
559     int                   rc;
560     nxt_unit_impl_t       *lib;
561     nxt_unit_callbacks_t  *cb;
562 
563     lib = nxt_unit_malloc(NULL,
564                           sizeof(nxt_unit_impl_t) + init->request_data_size);
565     if (nxt_slow_path(lib == NULL)) {
566         nxt_unit_alert(NULL, "failed to allocate unit struct");
567 
568         return NULL;
569     }
570 
571     rc = pthread_mutex_init(&lib->mutex, NULL);
572     if (nxt_slow_path(rc != 0)) {
573         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
574 
575         goto fail;
576     }
577 
578     lib->unit.data = init->data;
579     lib->callbacks = init->callbacks;
580 
581     lib->request_data_size = init->request_data_size;
582     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
583                             / PORT_MMAP_DATA_SIZE;
584     lib->request_limit = init->request_limit;
585 
586     lib->processes.slot = NULL;
587     lib->ports.slot = NULL;
588 
589     lib->log_fd = STDERR_FILENO;
590 
591     nxt_queue_init(&lib->contexts);
592 
593     lib->use_count = 0;
594     lib->request_count = 0;
595     lib->router_port = NULL;
596     lib->shared_port = NULL;
597 
598     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
599     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
600         pthread_mutex_destroy(&lib->mutex);
601         goto fail;
602     }
603 
604     cb = &lib->callbacks;
605 
606     if (cb->request_handler == NULL) {
607         nxt_unit_alert(NULL, "request_handler is NULL");
608 
609         pthread_mutex_destroy(&lib->mutex);
610         goto fail;
611     }
612 
613     nxt_unit_mmaps_init(&lib->incoming);
614     nxt_unit_mmaps_init(&lib->outgoing);
615 
616     return lib;
617 
618 fail:
619 
620     nxt_unit_free(NULL, lib);
621 
622     return NULL;
623 }
624 
625 
626 static int
nxt_unit_ctx_init(nxt_unit_impl_t * lib,nxt_unit_ctx_impl_t * ctx_impl,void * data)627 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
628     void *data)
629 {
630     int  rc;
631 
632     ctx_impl->ctx.data = data;
633     ctx_impl->ctx.unit = &lib->unit;
634 
635     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
636     if (nxt_slow_path(rc != 0)) {
637         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
638 
639         return NXT_UNIT_ERROR;
640     }
641 
642     nxt_unit_lib_use(lib);
643 
644     pthread_mutex_lock(&lib->mutex);
645 
646     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
647 
648     pthread_mutex_unlock(&lib->mutex);
649 
650     ctx_impl->use_count = 1;
651     ctx_impl->wait_items = 0;
652     ctx_impl->online = 1;
653     ctx_impl->ready = 0;
654     ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
655 
656     nxt_queue_init(&ctx_impl->free_req);
657     nxt_queue_init(&ctx_impl->free_ws);
658     nxt_queue_init(&ctx_impl->active_req);
659     nxt_queue_init(&ctx_impl->ready_req);
660     nxt_queue_init(&ctx_impl->pending_rbuf);
661     nxt_queue_init(&ctx_impl->free_rbuf);
662 
663     ctx_impl->free_buf = NULL;
664     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
665     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
666 
667     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
668     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
669 
670     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
671 
672     ctx_impl->req.req.ctx = &ctx_impl->ctx;
673     ctx_impl->req.req.unit = &lib->unit;
674 
675     ctx_impl->read_port = NULL;
676     ctx_impl->requests.slot = 0;
677 
678     return NXT_UNIT_OK;
679 }
680 
681 
682 nxt_inline void
nxt_unit_ctx_use(nxt_unit_ctx_t * ctx)683 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
684 {
685     nxt_unit_ctx_impl_t  *ctx_impl;
686 
687     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
688 
689     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
690 }
691 
692 
693 nxt_inline void
nxt_unit_ctx_release(nxt_unit_ctx_t * ctx)694 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
695 {
696     long                 c;
697     nxt_unit_ctx_impl_t  *ctx_impl;
698 
699     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
700 
701     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
702 
703     if (c == 1) {
704         nxt_unit_ctx_free(ctx_impl);
705     }
706 }
707 
708 
709 nxt_inline void
nxt_unit_lib_use(nxt_unit_impl_t * lib)710 nxt_unit_lib_use(nxt_unit_impl_t *lib)
711 {
712     nxt_atomic_fetch_add(&lib->use_count, 1);
713 }
714 
715 
716 nxt_inline void
nxt_unit_lib_release(nxt_unit_impl_t * lib)717 nxt_unit_lib_release(nxt_unit_impl_t *lib)
718 {
719     long                c;
720     nxt_unit_process_t  *process;
721 
722     c = nxt_atomic_fetch_add(&lib->use_count, -1);
723 
724     if (c == 1) {
725         for ( ;; ) {
726             pthread_mutex_lock(&lib->mutex);
727 
728             process = nxt_unit_process_pop_first(lib);
729             if (process == NULL) {
730                 pthread_mutex_unlock(&lib->mutex);
731 
732                 break;
733             }
734 
735             nxt_unit_remove_process(lib, process);
736         }
737 
738         pthread_mutex_destroy(&lib->mutex);
739 
740         if (nxt_fast_path(lib->router_port != NULL)) {
741             nxt_unit_port_release(lib->router_port);
742         }
743 
744         if (nxt_fast_path(lib->shared_port != NULL)) {
745             nxt_unit_port_release(lib->shared_port);
746         }
747 
748         nxt_unit_mmaps_destroy(&lib->incoming);
749         nxt_unit_mmaps_destroy(&lib->outgoing);
750 
751         nxt_unit_free(NULL, lib);
752     }
753 }
754 
755 
756 nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t ** head,nxt_unit_mmap_buf_t * mmap_buf)757 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
758     nxt_unit_mmap_buf_t *mmap_buf)
759 {
760     mmap_buf->next = *head;
761 
762     if (mmap_buf->next != NULL) {
763         mmap_buf->next->prev = &mmap_buf->next;
764     }
765 
766     *head = mmap_buf;
767     mmap_buf->prev = head;
768 }
769 
770 
771 nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t ** prev,nxt_unit_mmap_buf_t * mmap_buf)772 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
773     nxt_unit_mmap_buf_t *mmap_buf)
774 {
775     while (*prev != NULL) {
776         prev = &(*prev)->next;
777     }
778 
779     nxt_unit_mmap_buf_insert(prev, mmap_buf);
780 }
781 
782 
783 nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t * mmap_buf)784 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
785 {
786     nxt_unit_mmap_buf_t  **prev;
787 
788     prev = mmap_buf->prev;
789 
790     if (mmap_buf->next != NULL) {
791         mmap_buf->next->prev = prev;
792     }
793 
794     if (prev != NULL) {
795         *prev = mmap_buf->next;
796     }
797 }
798 
799 
800 static int
nxt_unit_read_env(nxt_unit_port_t * ready_port,nxt_unit_port_t * router_port,nxt_unit_port_t * read_port,int * log_fd,uint32_t * stream,uint32_t * shm_limit,uint32_t * request_limit)801 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
802     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
803     uint32_t *shm_limit, uint32_t *request_limit)
804 {
805     int       rc;
806     int       ready_fd, router_fd, read_in_fd, read_out_fd;
807     char      *unit_init, *version_end, *vars;
808     size_t    version_length;
809     int64_t   ready_pid, router_pid, read_pid;
810     uint32_t  ready_stream, router_id, ready_id, read_id;
811 
812     unit_init = getenv(NXT_UNIT_INIT_ENV);
813     if (nxt_slow_path(unit_init == NULL)) {
814         nxt_unit_alert(NULL, "%s is not in the current environment",
815                        NXT_UNIT_INIT_ENV);
816 
817         return NXT_UNIT_ERROR;
818     }
819 
820     version_end = strchr(unit_init, ';');
821     if (nxt_slow_path(version_end == NULL)) {
822         nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
823                        NXT_UNIT_INIT_ENV, unit_init);
824 
825         return NXT_UNIT_ERROR;
826     }
827 
828     version_length = version_end - unit_init;
829 
830     rc = version_length != nxt_length(NXT_VERSION)
831          || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
832 
833     if (nxt_slow_path(rc != 0)) {
834         nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
835                        "%.*s, while the app was compiled with libunit %s",
836                        (int) version_length, unit_init, NXT_VERSION);
837 
838         return NXT_UNIT_ERROR;
839     }
840 
841     vars = version_end + 1;
842 
843     rc = sscanf(vars,
844                 "%"PRIu32";"
845                 "%"PRId64",%"PRIu32",%d;"
846                 "%"PRId64",%"PRIu32",%d;"
847                 "%"PRId64",%"PRIu32",%d,%d;"
848                 "%d,%"PRIu32",%"PRIu32,
849                 &ready_stream,
850                 &ready_pid, &ready_id, &ready_fd,
851                 &router_pid, &router_id, &router_fd,
852                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
853                 log_fd, shm_limit, request_limit);
854 
855     if (nxt_slow_path(rc == EOF)) {
856         nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
857                        vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
858 
859         return NXT_UNIT_ERROR;
860     }
861 
862     if (nxt_slow_path(rc != 14)) {
863         nxt_unit_alert(NULL, "invalid number of variables in %s env: "
864                        "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars);
865 
866         return NXT_UNIT_ERROR;
867     }
868 
869     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
870 
871     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
872 
873     ready_port->in_fd = -1;
874     ready_port->out_fd = ready_fd;
875     ready_port->data = NULL;
876 
877     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
878 
879     router_port->in_fd = -1;
880     router_port->out_fd = router_fd;
881     router_port->data = NULL;
882 
883     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
884 
885     read_port->in_fd = read_in_fd;
886     read_port->out_fd = read_out_fd;
887     read_port->data = NULL;
888 
889     *stream = ready_stream;
890 
891     return NXT_UNIT_OK;
892 }
893 
894 
895 static int
nxt_unit_ready(nxt_unit_ctx_t * ctx,int ready_fd,uint32_t stream,int queue_fd)896 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
897 {
898     ssize_t          res;
899     nxt_send_oob_t   oob;
900     nxt_port_msg_t   msg;
901     nxt_unit_impl_t  *lib;
902     int              fds[2] = {queue_fd, -1};
903 
904     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
905 
906     msg.stream = stream;
907     msg.pid = lib->pid;
908     msg.reply_port = 0;
909     msg.type = _NXT_PORT_MSG_PROCESS_READY;
910     msg.last = 1;
911     msg.mmap = 0;
912     msg.nf = 0;
913     msg.mf = 0;
914     msg.tracking = 0;
915 
916     nxt_socket_msg_oob_init(&oob, fds);
917 
918     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
919     if (res != sizeof(msg)) {
920         return NXT_UNIT_ERROR;
921     }
922 
923     return NXT_UNIT_OK;
924 }
925 
926 
927 static int
nxt_unit_process_msg(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf,nxt_unit_request_info_t ** preq)928 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
929     nxt_unit_request_info_t **preq)
930 {
931     int                  rc;
932     pid_t                pid;
933     uint8_t              quit_param;
934     nxt_port_msg_t       *port_msg;
935     nxt_unit_impl_t      *lib;
936     nxt_unit_recv_msg_t  recv_msg;
937 
938     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
939 
940     recv_msg.incoming_buf = NULL;
941     recv_msg.fd[0] = -1;
942     recv_msg.fd[1] = -1;
943 
944     rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
945     if (nxt_slow_path(rc != NXT_OK)) {
946         nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
947         rc = NXT_UNIT_ERROR;
948         goto done;
949     }
950 
951     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
952         if (nxt_slow_path(rbuf->size == 0)) {
953             nxt_unit_debug(ctx, "read port closed");
954 
955             nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
956             rc = NXT_UNIT_OK;
957             goto done;
958         }
959 
960         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
961 
962         rc = NXT_UNIT_ERROR;
963         goto done;
964     }
965 
966     port_msg = (nxt_port_msg_t *) rbuf->buf;
967 
968     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
969                    port_msg->stream, (int) port_msg->type,
970                    recv_msg.fd[0], recv_msg.fd[1]);
971 
972     recv_msg.stream = port_msg->stream;
973     recv_msg.pid = port_msg->pid;
974     recv_msg.reply_port = port_msg->reply_port;
975     recv_msg.last = port_msg->last;
976     recv_msg.mmap = port_msg->mmap;
977 
978     recv_msg.start = port_msg + 1;
979     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
980 
981     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
982         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
983                        port_msg->stream, (int) port_msg->type);
984         rc = NXT_UNIT_ERROR;
985         goto done;
986     }
987 
988     /* Fragmentation is unsupported. */
989     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
990         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
991                        port_msg->stream, (int) port_msg->type);
992         rc = NXT_UNIT_ERROR;
993         goto done;
994     }
995 
996     if (port_msg->mmap) {
997         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
998 
999         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1000             if (rc == NXT_UNIT_AGAIN) {
1001                 recv_msg.fd[0] = -1;
1002                 recv_msg.fd[1] = -1;
1003             }
1004 
1005             goto done;
1006         }
1007     }
1008 
1009     switch (port_msg->type) {
1010 
1011     case _NXT_PORT_MSG_RPC_READY:
1012         rc = NXT_UNIT_OK;
1013         break;
1014 
1015     case _NXT_PORT_MSG_QUIT:
1016         if (recv_msg.size == sizeof(quit_param)) {
1017             memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1018 
1019         } else {
1020             quit_param = NXT_QUIT_NORMAL;
1021         }
1022 
1023         nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1024                        (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1025 
1026         nxt_unit_quit(ctx, quit_param);
1027 
1028         rc = NXT_UNIT_OK;
1029         break;
1030 
1031     case _NXT_PORT_MSG_NEW_PORT:
1032         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1033         break;
1034 
1035     case _NXT_PORT_MSG_PORT_ACK:
1036         rc = nxt_unit_ctx_ready(ctx);
1037         break;
1038 
1039     case _NXT_PORT_MSG_CHANGE_FILE:
1040         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1041                        port_msg->stream, recv_msg.fd[0]);
1042 
1043         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1044             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1045                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1046                            strerror(errno), errno);
1047 
1048             rc = NXT_UNIT_ERROR;
1049             goto done;
1050         }
1051 
1052         rc = NXT_UNIT_OK;
1053         break;
1054 
1055     case _NXT_PORT_MSG_MMAP:
1056         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1057             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1058                            port_msg->stream, recv_msg.fd[0]);
1059 
1060             rc = NXT_UNIT_ERROR;
1061             goto done;
1062         }
1063 
1064         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1065         break;
1066 
1067     case _NXT_PORT_MSG_REQ_HEADERS:
1068         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1069         break;
1070 
1071     case _NXT_PORT_MSG_REQ_BODY:
1072         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1073         break;
1074 
1075     case _NXT_PORT_MSG_WEBSOCKET:
1076         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1077         break;
1078 
1079     case _NXT_PORT_MSG_REMOVE_PID:
1080         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1081             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1082                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1083                            (int) sizeof(pid));
1084 
1085             rc = NXT_UNIT_ERROR;
1086             goto done;
1087         }
1088 
1089         memcpy(&pid, recv_msg.start, sizeof(pid));
1090 
1091         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1092                        port_msg->stream, (int) pid);
1093 
1094         nxt_unit_remove_pid(lib, pid);
1095 
1096         rc = NXT_UNIT_OK;
1097         break;
1098 
1099     case _NXT_PORT_MSG_SHM_ACK:
1100         rc = nxt_unit_process_shm_ack(ctx);
1101         break;
1102 
1103     default:
1104         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1105                        port_msg->stream, (int) port_msg->type);
1106 
1107         rc = NXT_UNIT_ERROR;
1108         goto done;
1109     }
1110 
1111 done:
1112 
1113     if (recv_msg.fd[0] != -1) {
1114         nxt_unit_close(recv_msg.fd[0]);
1115     }
1116 
1117     if (recv_msg.fd[1] != -1) {
1118         nxt_unit_close(recv_msg.fd[1]);
1119     }
1120 
1121     while (recv_msg.incoming_buf != NULL) {
1122         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1123     }
1124 
1125     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1126 #if (NXT_DEBUG)
1127         memset(rbuf->buf, 0xAC, rbuf->size);
1128 #endif
1129         nxt_unit_read_buf_release(ctx, rbuf);
1130     }
1131 
1132     return rc;
1133 }
1134 
1135 
1136 static int
nxt_unit_process_new_port(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1137 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1138 {
1139     void                     *mem;
1140     nxt_unit_impl_t          *lib;
1141     nxt_unit_port_t          new_port, *port;
1142     nxt_port_msg_new_port_t  *new_port_msg;
1143 
1144     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1145         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1146                       "invalid message size (%d)",
1147                       recv_msg->stream, (int) recv_msg->size);
1148 
1149         return NXT_UNIT_ERROR;
1150     }
1151 
1152     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1153         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1154                        recv_msg->stream, recv_msg->fd[0]);
1155 
1156         return NXT_UNIT_ERROR;
1157     }
1158 
1159     new_port_msg = recv_msg->start;
1160 
1161     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1162                    recv_msg->stream, (int) new_port_msg->pid,
1163                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1164 
1165     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1166 
1167     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1168         nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1169 
1170         new_port.in_fd = recv_msg->fd[0];
1171         new_port.out_fd = -1;
1172 
1173         mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1174                    MAP_SHARED, recv_msg->fd[1], 0);
1175 
1176     } else {
1177         if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
1178                           != NXT_UNIT_OK))
1179         {
1180             return NXT_UNIT_ERROR;
1181         }
1182 
1183         nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1184                               new_port_msg->id);
1185 
1186         new_port.in_fd = -1;
1187         new_port.out_fd = recv_msg->fd[0];
1188 
1189         mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1190                    MAP_SHARED, recv_msg->fd[1], 0);
1191     }
1192 
1193     if (nxt_slow_path(mem == MAP_FAILED)) {
1194         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1195                        strerror(errno), errno);
1196 
1197         return NXT_UNIT_ERROR;
1198     }
1199 
1200     new_port.data = NULL;
1201 
1202     recv_msg->fd[0] = -1;
1203 
1204     port = nxt_unit_add_port(ctx, &new_port, mem);
1205     if (nxt_slow_path(port == NULL)) {
1206         return NXT_UNIT_ERROR;
1207     }
1208 
1209     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1210         lib->shared_port = port;
1211 
1212         return nxt_unit_ctx_ready(ctx);
1213     }
1214 
1215     nxt_unit_port_release(port);
1216 
1217     return NXT_UNIT_OK;
1218 }
1219 
1220 
1221 static int
nxt_unit_ctx_ready(nxt_unit_ctx_t * ctx)1222 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1223 {
1224     nxt_unit_impl_t      *lib;
1225     nxt_unit_ctx_impl_t  *ctx_impl;
1226 
1227     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1228 
1229     if (nxt_slow_path(ctx_impl->ready)) {
1230         return NXT_UNIT_OK;
1231     }
1232 
1233     ctx_impl->ready = 1;
1234 
1235     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1236 
1237     /* Call ready_handler() only for main context. */
1238     if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1239         return lib->callbacks.ready_handler(ctx);
1240     }
1241 
1242     if (&lib->main_ctx != ctx_impl) {
1243         /* Check if the main context is already stopped or quit. */
1244         if (nxt_slow_path(!lib->main_ctx.ready)) {
1245             ctx_impl->ready = 0;
1246 
1247             nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1248 
1249             return NXT_UNIT_OK;
1250         }
1251 
1252         if (lib->callbacks.add_port != NULL) {
1253             lib->callbacks.add_port(ctx, lib->shared_port);
1254         }
1255     }
1256 
1257     return NXT_UNIT_OK;
1258 }
1259 
1260 
1261 static int
nxt_unit_process_req_headers(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_request_info_t ** preq)1262 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1263     nxt_unit_request_info_t **preq)
1264 {
1265     int                           res;
1266     nxt_unit_impl_t               *lib;
1267     nxt_unit_port_id_t            port_id;
1268     nxt_unit_request_t            *r;
1269     nxt_unit_mmap_buf_t           *b;
1270     nxt_unit_request_info_t       *req;
1271     nxt_unit_request_info_impl_t  *req_impl;
1272 
1273     if (nxt_slow_path(recv_msg->mmap == 0)) {
1274         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1275                       recv_msg->stream);
1276 
1277         return NXT_UNIT_ERROR;
1278     }
1279 
1280     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1281         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1282                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1283                       (int) sizeof(nxt_unit_request_t));
1284 
1285         return NXT_UNIT_ERROR;
1286     }
1287 
1288     req_impl = nxt_unit_request_info_get(ctx);
1289     if (nxt_slow_path(req_impl == NULL)) {
1290         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1291                       recv_msg->stream);
1292 
1293         return NXT_UNIT_ERROR;
1294     }
1295 
1296     req = &req_impl->req;
1297 
1298     req->request = recv_msg->start;
1299 
1300     b = recv_msg->incoming_buf;
1301 
1302     req->request_buf = &b->buf;
1303     req->response = NULL;
1304     req->response_buf = NULL;
1305 
1306     r = req->request;
1307 
1308     req->content_length = r->content_length;
1309 
1310     req->content_buf = req->request_buf;
1311     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1312 
1313     req_impl->stream = recv_msg->stream;
1314 
1315     req_impl->outgoing_buf = NULL;
1316 
1317     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1318         b->req = req;
1319     }
1320 
1321     /* "Move" incoming buffer list to req_impl. */
1322     req_impl->incoming_buf = recv_msg->incoming_buf;
1323     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1324     recv_msg->incoming_buf = NULL;
1325 
1326     req->content_fd = recv_msg->fd[0];
1327     recv_msg->fd[0] = -1;
1328 
1329     req->response_max_fields = 0;
1330     req_impl->state = NXT_UNIT_RS_START;
1331     req_impl->websocket = 0;
1332     req_impl->in_hash = 0;
1333 
1334     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1335                    (int) r->method_length,
1336                    (char *) nxt_unit_sptr_get(&r->method),
1337                    (int) r->target_length,
1338                    (char *) nxt_unit_sptr_get(&r->target),
1339                    (int) r->content_length);
1340 
1341     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1342 
1343     res = nxt_unit_request_check_response_port(req, &port_id);
1344     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1345         return NXT_UNIT_ERROR;
1346     }
1347 
1348     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1349         res = nxt_unit_send_req_headers_ack(req);
1350         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1351             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1352 
1353             return NXT_UNIT_ERROR;
1354         }
1355 
1356         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1357 
1358         if (req->content_length
1359             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1360         {
1361             res = nxt_unit_request_hash_add(ctx, req);
1362             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1363                 nxt_unit_req_warn(req, "failed to add request to hash");
1364 
1365                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1366 
1367                 return NXT_UNIT_ERROR;
1368             }
1369 
1370             /*
1371              * If application have separate data handler, we may start
1372              * request processing and process data when it is arrived.
1373              */
1374             if (lib->callbacks.data_handler == NULL) {
1375                 return NXT_UNIT_OK;
1376             }
1377         }
1378 
1379         if (preq == NULL) {
1380             lib->callbacks.request_handler(req);
1381 
1382         } else {
1383             *preq = req;
1384         }
1385     }
1386 
1387     return NXT_UNIT_OK;
1388 }
1389 
1390 
1391 static int
nxt_unit_process_req_body(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1392 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1393 {
1394     uint64_t                 l;
1395     nxt_unit_impl_t          *lib;
1396     nxt_unit_mmap_buf_t      *b;
1397     nxt_unit_request_info_t  *req;
1398 
1399     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1400     if (req == NULL) {
1401         return NXT_UNIT_OK;
1402     }
1403 
1404     l = req->content_buf->end - req->content_buf->free;
1405 
1406     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1407         b->req = req;
1408         l += b->buf.end - b->buf.free;
1409     }
1410 
1411     if (recv_msg->incoming_buf != NULL) {
1412         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1413 
1414         while (b->next != NULL) {
1415             b = b->next;
1416         }
1417 
1418         /* "Move" incoming buffer list to req_impl. */
1419         b->next = recv_msg->incoming_buf;
1420         b->next->prev = &b->next;
1421 
1422         recv_msg->incoming_buf = NULL;
1423     }
1424 
1425     req->content_fd = recv_msg->fd[0];
1426     recv_msg->fd[0] = -1;
1427 
1428     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1429 
1430     if (lib->callbacks.data_handler != NULL) {
1431         lib->callbacks.data_handler(req);
1432 
1433         return NXT_UNIT_OK;
1434     }
1435 
1436     if (req->content_fd != -1 || l == req->content_length) {
1437         lib->callbacks.request_handler(req);
1438     }
1439 
1440     return NXT_UNIT_OK;
1441 }
1442 
1443 
1444 static int
nxt_unit_request_check_response_port(nxt_unit_request_info_t * req,nxt_unit_port_id_t * port_id)1445 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1446     nxt_unit_port_id_t *port_id)
1447 {
1448     int                           res;
1449     nxt_unit_ctx_t                *ctx;
1450     nxt_unit_impl_t               *lib;
1451     nxt_unit_port_t               *port;
1452     nxt_unit_process_t            *process;
1453     nxt_unit_ctx_impl_t           *ctx_impl;
1454     nxt_unit_port_impl_t          *port_impl;
1455     nxt_unit_request_info_impl_t  *req_impl;
1456 
1457     ctx = req->ctx;
1458     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1459     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1460 
1461     pthread_mutex_lock(&lib->mutex);
1462 
1463     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1464     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1465 
1466     if (nxt_fast_path(port != NULL)) {
1467         req->response_port = port;
1468 
1469         if (nxt_fast_path(port_impl->ready)) {
1470             pthread_mutex_unlock(&lib->mutex);
1471 
1472             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1473                            (int) port->id.pid, (int) port->id.id);
1474 
1475             return NXT_UNIT_OK;
1476         }
1477 
1478         nxt_unit_debug(ctx, "check_response_port: "
1479                        "port{%d,%d} already requested",
1480                        (int) port->id.pid, (int) port->id.id);
1481 
1482         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1483 
1484         nxt_queue_insert_tail(&port_impl->awaiting_req,
1485                               &req_impl->port_wait_link);
1486 
1487         pthread_mutex_unlock(&lib->mutex);
1488 
1489         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1490 
1491         return NXT_UNIT_AGAIN;
1492     }
1493 
1494     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1495     if (nxt_slow_path(port_impl == NULL)) {
1496         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1497                        (int) sizeof(nxt_unit_port_impl_t));
1498 
1499         pthread_mutex_unlock(&lib->mutex);
1500 
1501         return NXT_UNIT_ERROR;
1502     }
1503 
1504     port = &port_impl->port;
1505 
1506     port->id = *port_id;
1507     port->in_fd = -1;
1508     port->out_fd = -1;
1509     port->data = NULL;
1510 
1511     res = nxt_unit_port_hash_add(&lib->ports, port);
1512     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1513         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1514                        port->id.pid, port->id.id);
1515 
1516         pthread_mutex_unlock(&lib->mutex);
1517 
1518         nxt_unit_free(ctx, port);
1519 
1520         return NXT_UNIT_ERROR;
1521     }
1522 
1523     process = nxt_unit_process_find(lib, port_id->pid, 0);
1524     if (nxt_slow_path(process == NULL)) {
1525         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1526                        port->id.pid);
1527 
1528         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1529 
1530         pthread_mutex_unlock(&lib->mutex);
1531 
1532         nxt_unit_free(ctx, port);
1533 
1534         return NXT_UNIT_ERROR;
1535     }
1536 
1537     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1538 
1539     port_impl->process = process;
1540     port_impl->queue = NULL;
1541     port_impl->from_socket = 0;
1542     port_impl->socket_rbuf = NULL;
1543 
1544     nxt_queue_init(&port_impl->awaiting_req);
1545 
1546     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1547 
1548     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1549 
1550     port_impl->use_count = 2;
1551     port_impl->ready = 0;
1552 
1553     req->response_port = port;
1554 
1555     pthread_mutex_unlock(&lib->mutex);
1556 
1557     res = nxt_unit_get_port(ctx, port_id);
1558     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1559         return NXT_UNIT_ERROR;
1560     }
1561 
1562     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1563 
1564     return NXT_UNIT_AGAIN;
1565 }
1566 
1567 
1568 static int
nxt_unit_send_req_headers_ack(nxt_unit_request_info_t * req)1569 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1570 {
1571     ssize_t                       res;
1572     nxt_port_msg_t                msg;
1573     nxt_unit_impl_t               *lib;
1574     nxt_unit_ctx_impl_t           *ctx_impl;
1575     nxt_unit_request_info_impl_t  *req_impl;
1576 
1577     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1578     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1579     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1580 
1581     memset(&msg, 0, sizeof(nxt_port_msg_t));
1582 
1583     msg.stream = req_impl->stream;
1584     msg.pid = lib->pid;
1585     msg.reply_port = ctx_impl->read_port->id.id;
1586     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1587 
1588     res = nxt_unit_port_send(req->ctx, req->response_port,
1589                              &msg, sizeof(msg), NULL);
1590     if (nxt_slow_path(res != sizeof(msg))) {
1591         return NXT_UNIT_ERROR;
1592     }
1593 
1594     return NXT_UNIT_OK;
1595 }
1596 
1597 
1598 static int
nxt_unit_process_websocket(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1599 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1600 {
1601     size_t                           hsize;
1602     nxt_unit_impl_t                  *lib;
1603     nxt_unit_mmap_buf_t              *b;
1604     nxt_unit_callbacks_t             *cb;
1605     nxt_unit_request_info_t          *req;
1606     nxt_unit_request_info_impl_t     *req_impl;
1607     nxt_unit_websocket_frame_impl_t  *ws_impl;
1608 
1609     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1610     if (nxt_slow_path(req == NULL)) {
1611         return NXT_UNIT_OK;
1612     }
1613 
1614     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1615 
1616     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1617     cb = &lib->callbacks;
1618 
1619     if (cb->websocket_handler && recv_msg->size >= 2) {
1620         ws_impl = nxt_unit_websocket_frame_get(ctx);
1621         if (nxt_slow_path(ws_impl == NULL)) {
1622             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1623                           req_impl->stream);
1624 
1625             return NXT_UNIT_ERROR;
1626         }
1627 
1628         ws_impl->ws.req = req;
1629 
1630         ws_impl->buf = NULL;
1631 
1632         if (recv_msg->mmap) {
1633             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1634                 b->req = req;
1635             }
1636 
1637             /* "Move" incoming buffer list to ws_impl. */
1638             ws_impl->buf = recv_msg->incoming_buf;
1639             ws_impl->buf->prev = &ws_impl->buf;
1640             recv_msg->incoming_buf = NULL;
1641 
1642             b = ws_impl->buf;
1643 
1644         } else {
1645             b = nxt_unit_mmap_buf_get(ctx);
1646             if (nxt_slow_path(b == NULL)) {
1647                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1648                                req_impl->stream);
1649 
1650                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1651 
1652                 return NXT_UNIT_ERROR;
1653             }
1654 
1655             b->req = req;
1656             b->buf.start = recv_msg->start;
1657             b->buf.free = b->buf.start;
1658             b->buf.end = b->buf.start + recv_msg->size;
1659 
1660             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1661         }
1662 
1663         ws_impl->ws.header = (void *) b->buf.start;
1664         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1665             ws_impl->ws.header);
1666 
1667         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1668 
1669         if (ws_impl->ws.header->mask) {
1670             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1671 
1672         } else {
1673             ws_impl->ws.mask = NULL;
1674         }
1675 
1676         b->buf.free += hsize;
1677 
1678         ws_impl->ws.content_buf = &b->buf;
1679         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1680 
1681         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1682                            "payload_len=%"PRIu64,
1683                             ws_impl->ws.header->opcode,
1684                             ws_impl->ws.payload_len);
1685 
1686         cb->websocket_handler(&ws_impl->ws);
1687     }
1688 
1689     if (recv_msg->last) {
1690         if (cb->close_handler) {
1691             nxt_unit_req_debug(req, "close_handler");
1692 
1693             cb->close_handler(req);
1694 
1695         } else {
1696             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1697         }
1698     }
1699 
1700     return NXT_UNIT_OK;
1701 }
1702 
1703 
1704 static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t * ctx)1705 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1706 {
1707     nxt_unit_impl_t       *lib;
1708     nxt_unit_callbacks_t  *cb;
1709 
1710     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1711     cb = &lib->callbacks;
1712 
1713     if (cb->shm_ack_handler != NULL) {
1714         cb->shm_ack_handler(ctx);
1715     }
1716 
1717     return NXT_UNIT_OK;
1718 }
1719 
1720 
1721 static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t * ctx)1722 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1723 {
1724     nxt_unit_impl_t               *lib;
1725     nxt_queue_link_t              *lnk;
1726     nxt_unit_ctx_impl_t           *ctx_impl;
1727     nxt_unit_request_info_impl_t  *req_impl;
1728 
1729     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1730 
1731     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1732 
1733     pthread_mutex_lock(&ctx_impl->mutex);
1734 
1735     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1736         pthread_mutex_unlock(&ctx_impl->mutex);
1737 
1738         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1739                                         + lib->request_data_size);
1740         if (nxt_slow_path(req_impl == NULL)) {
1741             return NULL;
1742         }
1743 
1744         req_impl->req.unit = ctx->unit;
1745         req_impl->req.ctx = ctx;
1746 
1747         pthread_mutex_lock(&ctx_impl->mutex);
1748 
1749     } else {
1750         lnk = nxt_queue_first(&ctx_impl->free_req);
1751         nxt_queue_remove(lnk);
1752 
1753         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1754     }
1755 
1756     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1757 
1758     pthread_mutex_unlock(&ctx_impl->mutex);
1759 
1760     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1761 
1762     return req_impl;
1763 }
1764 
1765 
1766 static void
nxt_unit_request_info_release(nxt_unit_request_info_t * req)1767 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1768 {
1769     nxt_unit_ctx_t                *ctx;
1770     nxt_unit_ctx_impl_t           *ctx_impl;
1771     nxt_unit_request_info_impl_t  *req_impl;
1772 
1773     ctx = req->ctx;
1774     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1775     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1776 
1777     req->response = NULL;
1778     req->response_buf = NULL;
1779 
1780     if (req_impl->in_hash) {
1781         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1782     }
1783 
1784     while (req_impl->outgoing_buf != NULL) {
1785         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1786     }
1787 
1788     while (req_impl->incoming_buf != NULL) {
1789         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1790     }
1791 
1792     if (req->content_fd != -1) {
1793         nxt_unit_close(req->content_fd);
1794 
1795         req->content_fd = -1;
1796     }
1797 
1798     if (req->response_port != NULL) {
1799         nxt_unit_port_release(req->response_port);
1800 
1801         req->response_port = NULL;
1802     }
1803 
1804     req_impl->state = NXT_UNIT_RS_RELEASED;
1805 
1806     pthread_mutex_lock(&ctx_impl->mutex);
1807 
1808     nxt_queue_remove(&req_impl->link);
1809 
1810     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1811 
1812     pthread_mutex_unlock(&ctx_impl->mutex);
1813 
1814     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1815         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1816     }
1817 }
1818 
1819 
1820 static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t * req_impl)1821 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1822 {
1823     nxt_unit_ctx_impl_t  *ctx_impl;
1824 
1825     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1826 
1827     nxt_queue_remove(&req_impl->link);
1828 
1829     if (req_impl != &ctx_impl->req) {
1830         nxt_unit_free(&ctx_impl->ctx, req_impl);
1831     }
1832 }
1833 
1834 
1835 static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t * ctx)1836 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1837 {
1838     nxt_queue_link_t                 *lnk;
1839     nxt_unit_ctx_impl_t              *ctx_impl;
1840     nxt_unit_websocket_frame_impl_t  *ws_impl;
1841 
1842     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1843 
1844     pthread_mutex_lock(&ctx_impl->mutex);
1845 
1846     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1847         pthread_mutex_unlock(&ctx_impl->mutex);
1848 
1849         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1850         if (nxt_slow_path(ws_impl == NULL)) {
1851             return NULL;
1852         }
1853 
1854     } else {
1855         lnk = nxt_queue_first(&ctx_impl->free_ws);
1856         nxt_queue_remove(lnk);
1857 
1858         pthread_mutex_unlock(&ctx_impl->mutex);
1859 
1860         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1861     }
1862 
1863     ws_impl->ctx_impl = ctx_impl;
1864 
1865     return ws_impl;
1866 }
1867 
1868 
1869 static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t * ws)1870 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1871 {
1872     nxt_unit_websocket_frame_impl_t  *ws_impl;
1873 
1874     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1875 
1876     while (ws_impl->buf != NULL) {
1877         nxt_unit_mmap_buf_free(ws_impl->buf);
1878     }
1879 
1880     ws->req = NULL;
1881 
1882     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1883 
1884     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1885 
1886     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1887 }
1888 
1889 
1890 static void
nxt_unit_websocket_frame_free(nxt_unit_ctx_t * ctx,nxt_unit_websocket_frame_impl_t * ws_impl)1891 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1892     nxt_unit_websocket_frame_impl_t *ws_impl)
1893 {
1894     nxt_queue_remove(&ws_impl->link);
1895 
1896     nxt_unit_free(ctx, ws_impl);
1897 }
1898 
1899 
1900 uint16_t
nxt_unit_field_hash(const char * name,size_t name_length)1901 nxt_unit_field_hash(const char *name, size_t name_length)
1902 {
1903     u_char      ch;
1904     uint32_t    hash;
1905     const char  *p, *end;
1906 
1907     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1908     end = name + name_length;
1909 
1910     for (p = name; p < end; p++) {
1911         ch = *p;
1912         hash = (hash << 4) + hash + nxt_lowcase(ch);
1913     }
1914 
1915     hash = (hash >> 16) ^ hash;
1916 
1917     return hash;
1918 }
1919 
1920 
1921 void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t * req)1922 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1923 {
1924     char                *name;
1925     uint32_t            i, j;
1926     nxt_unit_field_t    *fields, f;
1927     nxt_unit_request_t  *r;
1928 
1929     static nxt_str_t  content_length = nxt_string("content-length");
1930     static nxt_str_t  content_type = nxt_string("content-type");
1931     static nxt_str_t  cookie = nxt_string("cookie");
1932 
1933     nxt_unit_req_debug(req, "group_dup_fields");
1934 
1935     r = req->request;
1936     fields = r->fields;
1937 
1938     for (i = 0; i < r->fields_count; i++) {
1939         name = nxt_unit_sptr_get(&fields[i].name);
1940 
1941         switch (fields[i].hash) {
1942         case NXT_UNIT_HASH_CONTENT_LENGTH:
1943             if (fields[i].name_length == content_length.length
1944                 && nxt_unit_memcasecmp(name, content_length.start,
1945                                        content_length.length) == 0)
1946             {
1947                 r->content_length_field = i;
1948             }
1949 
1950             break;
1951 
1952         case NXT_UNIT_HASH_CONTENT_TYPE:
1953             if (fields[i].name_length == content_type.length
1954                 && nxt_unit_memcasecmp(name, content_type.start,
1955                                        content_type.length) == 0)
1956             {
1957                 r->content_type_field = i;
1958             }
1959 
1960             break;
1961 
1962         case NXT_UNIT_HASH_COOKIE:
1963             if (fields[i].name_length == cookie.length
1964                 && nxt_unit_memcasecmp(name, cookie.start,
1965                                        cookie.length) == 0)
1966             {
1967                 r->cookie_field = i;
1968             }
1969 
1970             break;
1971         }
1972 
1973         for (j = i + 1; j < r->fields_count; j++) {
1974             if (fields[i].hash != fields[j].hash
1975                 || fields[i].name_length != fields[j].name_length
1976                 || nxt_unit_memcasecmp(name,
1977                                        nxt_unit_sptr_get(&fields[j].name),
1978                                        fields[j].name_length) != 0)
1979             {
1980                 continue;
1981             }
1982 
1983             f = fields[j];
1984             f.value.offset += (j - (i + 1)) * sizeof(f);
1985 
1986             while (j > i + 1) {
1987                 fields[j] = fields[j - 1];
1988                 fields[j].name.offset -= sizeof(f);
1989                 fields[j].value.offset -= sizeof(f);
1990                 j--;
1991             }
1992 
1993             fields[j] = f;
1994 
1995             /* Assign the same name pointer for further grouping simplicity. */
1996             nxt_unit_sptr_set(&fields[j].name, name);
1997 
1998             i++;
1999         }
2000     }
2001 }
2002 
2003 
2004 int
nxt_unit_response_init(nxt_unit_request_info_t * req,uint16_t status,uint32_t max_fields_count,uint32_t max_fields_size)2005 nxt_unit_response_init(nxt_unit_request_info_t *req,
2006     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2007 {
2008     uint32_t                      buf_size;
2009     nxt_unit_buf_t                *buf;
2010     nxt_unit_request_info_impl_t  *req_impl;
2011 
2012     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2013 
2014     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2015         nxt_unit_req_warn(req, "init: response already sent");
2016 
2017         return NXT_UNIT_ERROR;
2018     }
2019 
2020     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2021                        (int) max_fields_count, (int) max_fields_size);
2022 
2023     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2024         nxt_unit_req_debug(req, "duplicate response init");
2025     }
2026 
2027     /*
2028      * Each field name and value 0-terminated by libunit,
2029      * this is the reason of '+ 2' below.
2030      */
2031     buf_size = sizeof(nxt_unit_response_t)
2032                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2033                + max_fields_size;
2034 
2035     if (nxt_slow_path(req->response_buf != NULL)) {
2036         buf = req->response_buf;
2037 
2038         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2039             goto init_response;
2040         }
2041 
2042         nxt_unit_buf_free(buf);
2043 
2044         req->response_buf = NULL;
2045         req->response = NULL;
2046         req->response_max_fields = 0;
2047 
2048         req_impl->state = NXT_UNIT_RS_START;
2049     }
2050 
2051     buf = nxt_unit_response_buf_alloc(req, buf_size);
2052     if (nxt_slow_path(buf == NULL)) {
2053         return NXT_UNIT_ERROR;
2054     }
2055 
2056 init_response:
2057 
2058     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2059 
2060     req->response_buf = buf;
2061 
2062     req->response = (nxt_unit_response_t *) buf->start;
2063     req->response->status = status;
2064 
2065     buf->free = buf->start + sizeof(nxt_unit_response_t)
2066                 + max_fields_count * sizeof(nxt_unit_field_t);
2067 
2068     req->response_max_fields = max_fields_count;
2069     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2070 
2071     return NXT_UNIT_OK;
2072 }
2073 
2074 
2075 int
nxt_unit_response_realloc(nxt_unit_request_info_t * req,uint32_t max_fields_count,uint32_t max_fields_size)2076 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2077     uint32_t max_fields_count, uint32_t max_fields_size)
2078 {
2079     char                          *p;
2080     uint32_t                      i, buf_size;
2081     nxt_unit_buf_t                *buf;
2082     nxt_unit_field_t              *f, *src;
2083     nxt_unit_response_t           *resp;
2084     nxt_unit_request_info_impl_t  *req_impl;
2085 
2086     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2087 
2088     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2089         nxt_unit_req_warn(req, "realloc: response not init");
2090 
2091         return NXT_UNIT_ERROR;
2092     }
2093 
2094     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2095         nxt_unit_req_warn(req, "realloc: response already sent");
2096 
2097         return NXT_UNIT_ERROR;
2098     }
2099 
2100     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2101         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2102 
2103         return NXT_UNIT_ERROR;
2104     }
2105 
2106     /*
2107      * Each field name and value 0-terminated by libunit,
2108      * this is the reason of '+ 2' below.
2109      */
2110     buf_size = sizeof(nxt_unit_response_t)
2111                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2112                + max_fields_size;
2113 
2114     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2115 
2116     buf = nxt_unit_response_buf_alloc(req, buf_size);
2117     if (nxt_slow_path(buf == NULL)) {
2118         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2119         return NXT_UNIT_ERROR;
2120     }
2121 
2122     resp = (nxt_unit_response_t *) buf->start;
2123 
2124     memset(resp, 0, sizeof(nxt_unit_response_t));
2125 
2126     resp->status = req->response->status;
2127     resp->content_length = req->response->content_length;
2128 
2129     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2130     f = resp->fields;
2131 
2132     for (i = 0; i < req->response->fields_count; i++) {
2133         src = req->response->fields + i;
2134 
2135         if (nxt_slow_path(src->skip != 0)) {
2136             continue;
2137         }
2138 
2139         if (nxt_slow_path(src->name_length + src->value_length + 2
2140                           > (uint32_t) (buf->end - p)))
2141         {
2142             nxt_unit_req_warn(req, "realloc: not enough space for field"
2143                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2144                   i, src, src->name_length, src->value_length);
2145 
2146             goto fail;
2147         }
2148 
2149         nxt_unit_sptr_set(&f->name, p);
2150         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2151         *p++ = '\0';
2152 
2153         nxt_unit_sptr_set(&f->value, p);
2154         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2155         *p++ = '\0';
2156 
2157         f->hash = src->hash;
2158         f->skip = 0;
2159         f->name_length = src->name_length;
2160         f->value_length = src->value_length;
2161 
2162         resp->fields_count++;
2163         f++;
2164     }
2165 
2166     if (req->response->piggyback_content_length > 0) {
2167         if (nxt_slow_path(req->response->piggyback_content_length
2168                           > (uint32_t) (buf->end - p)))
2169         {
2170             nxt_unit_req_warn(req, "realloc: not enought space for content"
2171                   " #%"PRIu32", %"PRIu32" required",
2172                   i, req->response->piggyback_content_length);
2173 
2174             goto fail;
2175         }
2176 
2177         resp->piggyback_content_length =
2178                                        req->response->piggyback_content_length;
2179 
2180         nxt_unit_sptr_set(&resp->piggyback_content, p);
2181         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2182                        req->response->piggyback_content_length);
2183     }
2184 
2185     buf->free = p;
2186 
2187     nxt_unit_buf_free(req->response_buf);
2188 
2189     req->response = resp;
2190     req->response_buf = buf;
2191     req->response_max_fields = max_fields_count;
2192 
2193     return NXT_UNIT_OK;
2194 
2195 fail:
2196 
2197     nxt_unit_buf_free(buf);
2198 
2199     return NXT_UNIT_ERROR;
2200 }
2201 
2202 
2203 int
nxt_unit_response_is_init(nxt_unit_request_info_t * req)2204 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2205 {
2206     nxt_unit_request_info_impl_t  *req_impl;
2207 
2208     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2209 
2210     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2211 }
2212 
2213 
2214 int
nxt_unit_response_add_field(nxt_unit_request_info_t * req,const char * name,uint8_t name_length,const char * value,uint32_t value_length)2215 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2216     const char *name, uint8_t name_length,
2217     const char *value, uint32_t value_length)
2218 {
2219     nxt_unit_buf_t                *buf;
2220     nxt_unit_field_t              *f;
2221     nxt_unit_response_t           *resp;
2222     nxt_unit_request_info_impl_t  *req_impl;
2223 
2224     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2225 
2226     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2227         nxt_unit_req_warn(req, "add_field: response not initialized or "
2228                           "already sent");
2229 
2230         return NXT_UNIT_ERROR;
2231     }
2232 
2233     resp = req->response;
2234 
2235     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2236         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2237                           (int) resp->fields_count);
2238 
2239         return NXT_UNIT_ERROR;
2240     }
2241 
2242     buf = req->response_buf;
2243 
2244     if (nxt_slow_path(name_length + value_length + 2
2245                       > (uint32_t) (buf->end - buf->free)))
2246     {
2247         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2248 
2249         return NXT_UNIT_ERROR;
2250     }
2251 
2252     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2253                        resp->fields_count,
2254                        (int) name_length, name,
2255                        (int) value_length, value);
2256 
2257     f = resp->fields + resp->fields_count;
2258 
2259     nxt_unit_sptr_set(&f->name, buf->free);
2260     buf->free = nxt_cpymem(buf->free, name, name_length);
2261     *buf->free++ = '\0';
2262 
2263     nxt_unit_sptr_set(&f->value, buf->free);
2264     buf->free = nxt_cpymem(buf->free, value, value_length);
2265     *buf->free++ = '\0';
2266 
2267     f->hash = nxt_unit_field_hash(name, name_length);
2268     f->skip = 0;
2269     f->name_length = name_length;
2270     f->value_length = value_length;
2271 
2272     resp->fields_count++;
2273 
2274     return NXT_UNIT_OK;
2275 }
2276 
2277 
2278 int
nxt_unit_response_add_content(nxt_unit_request_info_t * req,const void * src,uint32_t size)2279 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2280     const void* src, uint32_t size)
2281 {
2282     nxt_unit_buf_t                *buf;
2283     nxt_unit_response_t           *resp;
2284     nxt_unit_request_info_impl_t  *req_impl;
2285 
2286     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2287 
2288     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2289         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2290 
2291         return NXT_UNIT_ERROR;
2292     }
2293 
2294     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2295         nxt_unit_req_warn(req, "add_content: response already sent");
2296 
2297         return NXT_UNIT_ERROR;
2298     }
2299 
2300     buf = req->response_buf;
2301 
2302     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2303         nxt_unit_req_warn(req, "add_content: buffer overflow");
2304 
2305         return NXT_UNIT_ERROR;
2306     }
2307 
2308     resp = req->response;
2309 
2310     if (resp->piggyback_content_length == 0) {
2311         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2312         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2313     }
2314 
2315     resp->piggyback_content_length += size;
2316 
2317     buf->free = nxt_cpymem(buf->free, src, size);
2318 
2319     return NXT_UNIT_OK;
2320 }
2321 
2322 
2323 int
nxt_unit_response_send(nxt_unit_request_info_t * req)2324 nxt_unit_response_send(nxt_unit_request_info_t *req)
2325 {
2326     int                           rc;
2327     nxt_unit_mmap_buf_t           *mmap_buf;
2328     nxt_unit_request_info_impl_t  *req_impl;
2329 
2330     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2331 
2332     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2333         nxt_unit_req_warn(req, "send: response is not initialized yet");
2334 
2335         return NXT_UNIT_ERROR;
2336     }
2337 
2338     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2339         nxt_unit_req_warn(req, "send: response already sent");
2340 
2341         return NXT_UNIT_ERROR;
2342     }
2343 
2344     if (req->request->websocket_handshake && req->response->status == 101) {
2345         nxt_unit_response_upgrade(req);
2346     }
2347 
2348     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2349                        req->response->fields_count,
2350                        (int) (req->response_buf->free
2351                               - req->response_buf->start));
2352 
2353     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2354 
2355     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2356     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2357         req->response = NULL;
2358         req->response_buf = NULL;
2359         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2360 
2361         nxt_unit_mmap_buf_free(mmap_buf);
2362     }
2363 
2364     return rc;
2365 }
2366 
2367 
2368 int
nxt_unit_response_is_sent(nxt_unit_request_info_t * req)2369 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2370 {
2371     nxt_unit_request_info_impl_t  *req_impl;
2372 
2373     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2374 
2375     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2376 }
2377 
2378 
2379 nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t * req,uint32_t size)2380 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2381 {
2382     int                           rc;
2383     nxt_unit_mmap_buf_t           *mmap_buf;
2384     nxt_unit_request_info_impl_t  *req_impl;
2385 
2386     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2387         nxt_unit_req_warn(req, "response_buf_alloc: "
2388                           "requested buffer (%"PRIu32") too big", size);
2389 
2390         return NULL;
2391     }
2392 
2393     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2394 
2395     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2396 
2397     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2398     if (nxt_slow_path(mmap_buf == NULL)) {
2399         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2400 
2401         return NULL;
2402     }
2403 
2404     mmap_buf->req = req;
2405 
2406     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2407 
2408     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2409                                    size, size, mmap_buf,
2410                                    NULL);
2411     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2412         nxt_unit_mmap_buf_release(mmap_buf);
2413 
2414         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2415 
2416         return NULL;
2417     }
2418 
2419     return &mmap_buf->buf;
2420 }
2421 
2422 
2423 static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t * ctx)2424 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2425 {
2426     nxt_unit_mmap_buf_t  *mmap_buf;
2427     nxt_unit_ctx_impl_t  *ctx_impl;
2428 
2429     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2430 
2431     pthread_mutex_lock(&ctx_impl->mutex);
2432 
2433     if (ctx_impl->free_buf == NULL) {
2434         pthread_mutex_unlock(&ctx_impl->mutex);
2435 
2436         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2437         if (nxt_slow_path(mmap_buf == NULL)) {
2438             return NULL;
2439         }
2440 
2441     } else {
2442         mmap_buf = ctx_impl->free_buf;
2443 
2444         nxt_unit_mmap_buf_unlink(mmap_buf);
2445 
2446         pthread_mutex_unlock(&ctx_impl->mutex);
2447     }
2448 
2449     mmap_buf->ctx_impl = ctx_impl;
2450 
2451     mmap_buf->hdr = NULL;
2452     mmap_buf->free_ptr = NULL;
2453 
2454     return mmap_buf;
2455 }
2456 
2457 
2458 static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t * mmap_buf)2459 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2460 {
2461     nxt_unit_mmap_buf_unlink(mmap_buf);
2462 
2463     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2464 
2465     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2466 
2467     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2468 }
2469 
2470 
2471 int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t * req)2472 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2473 {
2474     return req->request->websocket_handshake;
2475 }
2476 
2477 
2478 int
nxt_unit_response_upgrade(nxt_unit_request_info_t * req)2479 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2480 {
2481     int                           rc;
2482     nxt_unit_request_info_impl_t  *req_impl;
2483 
2484     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2485 
2486     if (nxt_slow_path(req_impl->websocket != 0)) {
2487         nxt_unit_req_debug(req, "upgrade: already upgraded");
2488 
2489         return NXT_UNIT_OK;
2490     }
2491 
2492     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2493         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2494 
2495         return NXT_UNIT_ERROR;
2496     }
2497 
2498     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2499         nxt_unit_req_warn(req, "upgrade: response already sent");
2500 
2501         return NXT_UNIT_ERROR;
2502     }
2503 
2504     rc = nxt_unit_request_hash_add(req->ctx, req);
2505     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2506         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2507 
2508         return NXT_UNIT_ERROR;
2509     }
2510 
2511     req_impl->websocket = 1;
2512 
2513     req->response->status = 101;
2514 
2515     return NXT_UNIT_OK;
2516 }
2517 
2518 
2519 int
nxt_unit_response_is_websocket(nxt_unit_request_info_t * req)2520 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2521 {
2522     nxt_unit_request_info_impl_t  *req_impl;
2523 
2524     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2525 
2526     return req_impl->websocket;
2527 }
2528 
2529 
2530 nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void * data)2531 nxt_unit_get_request_info_from_data(void *data)
2532 {
2533     nxt_unit_request_info_impl_t  *req_impl;
2534 
2535     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2536 
2537     return &req_impl->req;
2538 }
2539 
2540 
2541 int
nxt_unit_buf_send(nxt_unit_buf_t * buf)2542 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2543 {
2544     int                           rc;
2545     nxt_unit_mmap_buf_t           *mmap_buf;
2546     nxt_unit_request_info_t       *req;
2547     nxt_unit_request_info_impl_t  *req_impl;
2548 
2549     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2550 
2551     req = mmap_buf->req;
2552     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2553 
2554     nxt_unit_req_debug(req, "buf_send: %d bytes",
2555                        (int) (buf->free - buf->start));
2556 
2557     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2558         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2559 
2560         return NXT_UNIT_ERROR;
2561     }
2562 
2563     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2564         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2565 
2566         return NXT_UNIT_ERROR;
2567     }
2568 
2569     if (nxt_fast_path(buf->free > buf->start)) {
2570         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2571         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2572             return rc;
2573         }
2574     }
2575 
2576     nxt_unit_mmap_buf_free(mmap_buf);
2577 
2578     return NXT_UNIT_OK;
2579 }
2580 
2581 
2582 static void
nxt_unit_buf_send_done(nxt_unit_buf_t * buf)2583 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2584 {
2585     int                      rc;
2586     nxt_unit_mmap_buf_t      *mmap_buf;
2587     nxt_unit_request_info_t  *req;
2588 
2589     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2590 
2591     req = mmap_buf->req;
2592 
2593     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2594     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2595         nxt_unit_mmap_buf_free(mmap_buf);
2596 
2597         nxt_unit_request_info_release(req);
2598 
2599     } else {
2600         nxt_unit_request_done(req, rc);
2601     }
2602 }
2603 
2604 
2605 static int
nxt_unit_mmap_buf_send(nxt_unit_request_info_t * req,nxt_unit_mmap_buf_t * mmap_buf,int last)2606 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2607     nxt_unit_mmap_buf_t *mmap_buf, int last)
2608 {
2609     struct {
2610         nxt_port_msg_t       msg;
2611         nxt_port_mmap_msg_t  mmap_msg;
2612     } m;
2613 
2614     int                           rc;
2615     u_char                        *last_used, *first_free;
2616     ssize_t                       res;
2617     nxt_chunk_id_t                first_free_chunk;
2618     nxt_unit_buf_t                *buf;
2619     nxt_unit_impl_t               *lib;
2620     nxt_port_mmap_header_t        *hdr;
2621     nxt_unit_request_info_impl_t  *req_impl;
2622 
2623     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2624     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2625 
2626     buf = &mmap_buf->buf;
2627     hdr = mmap_buf->hdr;
2628 
2629     m.mmap_msg.size = buf->free - buf->start;
2630 
2631     m.msg.stream = req_impl->stream;
2632     m.msg.pid = lib->pid;
2633     m.msg.reply_port = 0;
2634     m.msg.type = _NXT_PORT_MSG_DATA;
2635     m.msg.last = last != 0;
2636     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2637     m.msg.nf = 0;
2638     m.msg.mf = 0;
2639     m.msg.tracking = 0;
2640 
2641     rc = NXT_UNIT_ERROR;
2642 
2643     if (m.msg.mmap) {
2644         m.mmap_msg.mmap_id = hdr->id;
2645         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2646                                                      (u_char *) buf->start);
2647 
2648         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2649                        req_impl->stream,
2650                        (int) m.mmap_msg.mmap_id,
2651                        (int) m.mmap_msg.chunk_id,
2652                        (int) m.mmap_msg.size);
2653 
2654         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2655                                  NULL);
2656         if (nxt_slow_path(res != sizeof(m))) {
2657             goto free_buf;
2658         }
2659 
2660         last_used = (u_char *) buf->free - 1;
2661         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2662 
2663         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2664             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2665 
2666             buf->start = (char *) first_free;
2667             buf->free = buf->start;
2668 
2669             if (buf->end < buf->start) {
2670                 buf->end = buf->start;
2671             }
2672 
2673         } else {
2674             buf->start = NULL;
2675             buf->free = NULL;
2676             buf->end = NULL;
2677 
2678             mmap_buf->hdr = NULL;
2679         }
2680 
2681         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2682                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2683 
2684         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2685                        (int) lib->outgoing.allocated_chunks);
2686 
2687     } else {
2688         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2689                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2690         {
2691             nxt_unit_alert(req->ctx,
2692                            "#%"PRIu32": failed to send plain memory buffer"
2693                            ": no space reserved for message header",
2694                            req_impl->stream);
2695 
2696             goto free_buf;
2697         }
2698 
2699         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2700 
2701         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2702                        req_impl->stream,
2703                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2704 
2705         res = nxt_unit_port_send(req->ctx, req->response_port,
2706                                  buf->start - sizeof(m.msg),
2707                                  m.mmap_msg.size + sizeof(m.msg), NULL);
2708 
2709         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2710             goto free_buf;
2711         }
2712     }
2713 
2714     rc = NXT_UNIT_OK;
2715 
2716 free_buf:
2717 
2718     nxt_unit_free_outgoing_buf(mmap_buf);
2719 
2720     return rc;
2721 }
2722 
2723 
2724 void
nxt_unit_buf_free(nxt_unit_buf_t * buf)2725 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2726 {
2727     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2728 }
2729 
2730 
2731 static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t * mmap_buf)2732 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2733 {
2734     nxt_unit_free_outgoing_buf(mmap_buf);
2735 
2736     nxt_unit_mmap_buf_release(mmap_buf);
2737 }
2738 
2739 
2740 static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t * mmap_buf)2741 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2742 {
2743     if (mmap_buf->hdr != NULL) {
2744         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2745                               mmap_buf->hdr, mmap_buf->buf.start,
2746                               mmap_buf->buf.end - mmap_buf->buf.start);
2747 
2748         mmap_buf->hdr = NULL;
2749 
2750         return;
2751     }
2752 
2753     if (mmap_buf->free_ptr != NULL) {
2754         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2755 
2756         mmap_buf->free_ptr = NULL;
2757     }
2758 }
2759 
2760 
2761 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t * ctx)2762 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2763 {
2764     nxt_unit_ctx_impl_t  *ctx_impl;
2765     nxt_unit_read_buf_t  *rbuf;
2766 
2767     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2768 
2769     pthread_mutex_lock(&ctx_impl->mutex);
2770 
2771     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2772 
2773     pthread_mutex_unlock(&ctx_impl->mutex);
2774 
2775     rbuf->oob.size = 0;
2776 
2777     return rbuf;
2778 }
2779 
2780 
2781 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t * ctx_impl)2782 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2783 {
2784     nxt_queue_link_t     *link;
2785     nxt_unit_read_buf_t  *rbuf;
2786 
2787     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2788         link = nxt_queue_first(&ctx_impl->free_rbuf);
2789         nxt_queue_remove(link);
2790 
2791         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2792 
2793         return rbuf;
2794     }
2795 
2796     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2797 
2798     if (nxt_fast_path(rbuf != NULL)) {
2799         rbuf->ctx_impl = ctx_impl;
2800     }
2801 
2802     return rbuf;
2803 }
2804 
2805 
2806 static void
nxt_unit_read_buf_release(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)2807 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2808     nxt_unit_read_buf_t *rbuf)
2809 {
2810     nxt_unit_ctx_impl_t  *ctx_impl;
2811 
2812     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2813 
2814     pthread_mutex_lock(&ctx_impl->mutex);
2815 
2816     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2817 
2818     pthread_mutex_unlock(&ctx_impl->mutex);
2819 }
2820 
2821 
2822 nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t * buf)2823 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2824 {
2825     nxt_unit_mmap_buf_t  *mmap_buf;
2826 
2827     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2828 
2829     if (mmap_buf->next == NULL) {
2830         return NULL;
2831     }
2832 
2833     return &mmap_buf->next->buf;
2834 }
2835 
2836 
2837 uint32_t
nxt_unit_buf_max(void)2838 nxt_unit_buf_max(void)
2839 {
2840     return PORT_MMAP_DATA_SIZE;
2841 }
2842 
2843 
2844 uint32_t
nxt_unit_buf_min(void)2845 nxt_unit_buf_min(void)
2846 {
2847     return PORT_MMAP_CHUNK_SIZE;
2848 }
2849 
2850 
2851 int
nxt_unit_response_write(nxt_unit_request_info_t * req,const void * start,size_t size)2852 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2853     size_t size)
2854 {
2855     ssize_t  res;
2856 
2857     res = nxt_unit_response_write_nb(req, start, size, size);
2858 
2859     return res < 0 ? -res : NXT_UNIT_OK;
2860 }
2861 
2862 
2863 ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t * req,const void * start,size_t size,size_t min_size)2864 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2865     size_t size, size_t min_size)
2866 {
2867     int                           rc;
2868     ssize_t                       sent;
2869     uint32_t                      part_size, min_part_size, buf_size;
2870     const char                    *part_start;
2871     nxt_unit_mmap_buf_t           mmap_buf;
2872     nxt_unit_request_info_impl_t  *req_impl;
2873     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2874 
2875     nxt_unit_req_debug(req, "write: %d", (int) size);
2876 
2877     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2878 
2879     part_start = start;
2880     sent = 0;
2881 
2882     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2883         nxt_unit_req_alert(req, "write: response not initialized yet");
2884 
2885         return -NXT_UNIT_ERROR;
2886     }
2887 
2888     /* Check if response is not send yet. */
2889     if (nxt_slow_path(req->response_buf != NULL)) {
2890         part_size = req->response_buf->end - req->response_buf->free;
2891         part_size = nxt_min(size, part_size);
2892 
2893         rc = nxt_unit_response_add_content(req, part_start, part_size);
2894         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2895             return -rc;
2896         }
2897 
2898         rc = nxt_unit_response_send(req);
2899         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2900             return -rc;
2901         }
2902 
2903         size -= part_size;
2904         part_start += part_size;
2905         sent += part_size;
2906 
2907         min_size -= nxt_min(min_size, part_size);
2908     }
2909 
2910     while (size > 0) {
2911         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2912         min_part_size = nxt_min(min_size, part_size);
2913         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2914 
2915         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2916                                        min_part_size, &mmap_buf, local_buf);
2917         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2918             return -rc;
2919         }
2920 
2921         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2922         if (nxt_slow_path(buf_size == 0)) {
2923             return sent;
2924         }
2925         part_size = nxt_min(buf_size, part_size);
2926 
2927         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2928                                        part_start, part_size);
2929 
2930         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2931         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2932             return -rc;
2933         }
2934 
2935         size -= part_size;
2936         part_start += part_size;
2937         sent += part_size;
2938 
2939         min_size -= nxt_min(min_size, part_size);
2940     }
2941 
2942     return sent;
2943 }
2944 
2945 
2946 int
nxt_unit_response_write_cb(nxt_unit_request_info_t * req,nxt_unit_read_info_t * read_info)2947 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2948     nxt_unit_read_info_t *read_info)
2949 {
2950     int                           rc;
2951     ssize_t                       n;
2952     uint32_t                      buf_size;
2953     nxt_unit_buf_t                *buf;
2954     nxt_unit_mmap_buf_t           mmap_buf;
2955     nxt_unit_request_info_impl_t  *req_impl;
2956     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2957 
2958     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2959 
2960     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2961         nxt_unit_req_alert(req, "write: response not initialized yet");
2962 
2963         return NXT_UNIT_ERROR;
2964     }
2965 
2966     /* Check if response is not send yet. */
2967     if (nxt_slow_path(req->response_buf != NULL)) {
2968 
2969         /* Enable content in headers buf. */
2970         rc = nxt_unit_response_add_content(req, "", 0);
2971         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2972             nxt_unit_req_error(req, "Failed to add piggyback content");
2973 
2974             return rc;
2975         }
2976 
2977         buf = req->response_buf;
2978 
2979         while (buf->end - buf->free > 0) {
2980             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2981             if (nxt_slow_path(n < 0)) {
2982                 nxt_unit_req_error(req, "Read error");
2983 
2984                 return NXT_UNIT_ERROR;
2985             }
2986 
2987             /* Manually increase sizes. */
2988             buf->free += n;
2989             req->response->piggyback_content_length += n;
2990 
2991             if (read_info->eof) {
2992                 break;
2993             }
2994         }
2995 
2996         rc = nxt_unit_response_send(req);
2997         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2998             nxt_unit_req_error(req, "Failed to send headers with content");
2999 
3000             return rc;
3001         }
3002 
3003         if (read_info->eof) {
3004             return NXT_UNIT_OK;
3005         }
3006     }
3007 
3008     while (!read_info->eof) {
3009         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3010                            read_info->buf_size);
3011 
3012         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3013 
3014         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3015                                        buf_size, buf_size,
3016                                        &mmap_buf, local_buf);
3017         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3018             return rc;
3019         }
3020 
3021         buf = &mmap_buf.buf;
3022 
3023         while (!read_info->eof && buf->end > buf->free) {
3024             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3025             if (nxt_slow_path(n < 0)) {
3026                 nxt_unit_req_error(req, "Read error");
3027 
3028                 nxt_unit_free_outgoing_buf(&mmap_buf);
3029 
3030                 return NXT_UNIT_ERROR;
3031             }
3032 
3033             buf->free += n;
3034         }
3035 
3036         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3037         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3038             nxt_unit_req_error(req, "Failed to send content");
3039 
3040             return rc;
3041         }
3042     }
3043 
3044     return NXT_UNIT_OK;
3045 }
3046 
3047 
3048 ssize_t
nxt_unit_request_read(nxt_unit_request_info_t * req,void * dst,size_t size)3049 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3050 {
3051     ssize_t  buf_res, res;
3052 
3053     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3054                                 dst, size);
3055 
3056     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3057         res = read(req->content_fd, dst, size);
3058         if (nxt_slow_path(res < 0)) {
3059             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3060                                strerror(errno), errno);
3061 
3062             return res;
3063         }
3064 
3065         if (res < (ssize_t) size) {
3066             nxt_unit_close(req->content_fd);
3067 
3068             req->content_fd = -1;
3069         }
3070 
3071         req->content_length -= res;
3072         size -= res;
3073 
3074         dst = nxt_pointer_to(dst, res);
3075 
3076     } else {
3077         res = 0;
3078     }
3079 
3080     return buf_res + res;
3081 }
3082 
3083 
3084 ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t * req,size_t max_size)3085 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3086 {
3087     char                 *p;
3088     size_t               l_size, b_size;
3089     nxt_unit_buf_t       *b;
3090     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3091 
3092     if (req->content_length == 0) {
3093         return 0;
3094     }
3095 
3096     l_size = 0;
3097 
3098     b = req->content_buf;
3099 
3100     while (b != NULL) {
3101         b_size = b->end - b->free;
3102         p = memchr(b->free, '\n', b_size);
3103 
3104         if (p != NULL) {
3105             p++;
3106             l_size += p - b->free;
3107             break;
3108         }
3109 
3110         l_size += b_size;
3111 
3112         if (max_size <= l_size) {
3113             break;
3114         }
3115 
3116         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3117         if (mmap_buf->next == NULL
3118             && req->content_fd != -1
3119             && l_size < req->content_length)
3120         {
3121             preread_buf = nxt_unit_request_preread(req, 16384);
3122             if (nxt_slow_path(preread_buf == NULL)) {
3123                 return -1;
3124             }
3125 
3126             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3127         }
3128 
3129         b = nxt_unit_buf_next(b);
3130     }
3131 
3132     return nxt_min(max_size, l_size);
3133 }
3134 
3135 
3136 static nxt_unit_mmap_buf_t *
nxt_unit_request_preread(nxt_unit_request_info_t * req,size_t size)3137 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
3138 {
3139     ssize_t              res;
3140     nxt_unit_mmap_buf_t  *mmap_buf;
3141 
3142     if (req->content_fd == -1) {
3143         nxt_unit_req_alert(req, "preread: content_fd == -1");
3144         return NULL;
3145     }
3146 
3147     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
3148     if (nxt_slow_path(mmap_buf == NULL)) {
3149         nxt_unit_req_alert(req, "preread: failed to allocate buf");
3150         return NULL;
3151     }
3152 
3153     mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size);
3154     if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3155         nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
3156         nxt_unit_mmap_buf_release(mmap_buf);
3157         return NULL;
3158     }
3159 
3160     mmap_buf->plain_ptr = mmap_buf->free_ptr;
3161 
3162     mmap_buf->hdr = NULL;
3163     mmap_buf->buf.start = mmap_buf->free_ptr;
3164     mmap_buf->buf.free = mmap_buf->buf.start;
3165     mmap_buf->buf.end = mmap_buf->buf.start + size;
3166 
3167     res = read(req->content_fd, mmap_buf->free_ptr, size);
3168     if (res < 0) {
3169         nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3170                            strerror(errno), errno);
3171 
3172         nxt_unit_mmap_buf_free(mmap_buf);
3173 
3174         return NULL;
3175     }
3176 
3177     if (res < (ssize_t) size) {
3178         nxt_unit_close(req->content_fd);
3179 
3180         req->content_fd = -1;
3181     }
3182 
3183     nxt_unit_req_debug(req, "preread: read %d", (int) res);
3184 
3185     mmap_buf->buf.end = mmap_buf->buf.free + res;
3186 
3187     return mmap_buf;
3188 }
3189 
3190 
3191 static ssize_t
nxt_unit_buf_read(nxt_unit_buf_t ** b,uint64_t * len,void * dst,size_t size)3192 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
3193 {
3194     u_char          *p;
3195     size_t          rest, copy, read;
3196     nxt_unit_buf_t  *buf, *last_buf;
3197 
3198     p = dst;
3199     rest = size;
3200 
3201     buf = *b;
3202     last_buf = buf;
3203 
3204     while (buf != NULL) {
3205         last_buf = buf;
3206 
3207         copy = buf->end - buf->free;
3208         copy = nxt_min(rest, copy);
3209 
3210         p = nxt_cpymem(p, buf->free, copy);
3211 
3212         buf->free += copy;
3213         rest -= copy;
3214 
3215         if (rest == 0) {
3216             if (buf->end == buf->free) {
3217                 buf = nxt_unit_buf_next(buf);
3218             }
3219 
3220             break;
3221         }
3222 
3223         buf = nxt_unit_buf_next(buf);
3224     }
3225 
3226     *b = last_buf;
3227 
3228     read = size - rest;
3229 
3230     *len -= read;
3231 
3232     return read;
3233 }
3234 
3235 
3236 void
nxt_unit_request_done(nxt_unit_request_info_t * req,int rc)3237 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
3238 {
3239     uint32_t                      size;
3240     nxt_port_msg_t                msg;
3241     nxt_unit_impl_t               *lib;
3242     nxt_unit_request_info_impl_t  *req_impl;
3243 
3244     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
3245 
3246     nxt_unit_req_debug(req, "done: %d", rc);
3247 
3248     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3249         goto skip_response_send;
3250     }
3251 
3252     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
3253 
3254         size = nxt_length("Content-Type") + nxt_length("text/plain");
3255 
3256         rc = nxt_unit_response_init(req, 200, 1, size);
3257         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3258             goto skip_response_send;
3259         }
3260 
3261         rc = nxt_unit_response_add_field(req, "Content-Type",
3262                                    nxt_length("Content-Type"),
3263                                    "text/plain", nxt_length("text/plain"));
3264         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3265             goto skip_response_send;
3266         }
3267     }
3268 
3269     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
3270 
3271         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
3272 
3273         nxt_unit_buf_send_done(req->response_buf);
3274 
3275         return;
3276     }
3277 
3278 skip_response_send:
3279 
3280     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
3281 
3282     msg.stream = req_impl->stream;
3283     msg.pid = lib->pid;
3284     msg.reply_port = 0;
3285     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
3286                                    : _NXT_PORT_MSG_RPC_ERROR;
3287     msg.last = 1;
3288     msg.mmap = 0;
3289     msg.nf = 0;
3290     msg.mf = 0;
3291     msg.tracking = 0;
3292 
3293     (void) nxt_unit_port_send(req->ctx, req->response_port,
3294                               &msg, sizeof(msg), NULL);
3295 
3296     nxt_unit_request_info_release(req);
3297 }
3298 
3299 
3300 int
nxt_unit_websocket_send(nxt_unit_request_info_t * req,uint8_t opcode,uint8_t last,const void * start,size_t size)3301 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
3302     uint8_t last, const void *start, size_t size)
3303 {
3304     const struct iovec  iov = { (void *) start, size };
3305 
3306     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
3307 }
3308 
3309 
3310 int
nxt_unit_websocket_sendv(nxt_unit_request_info_t * req,uint8_t opcode,uint8_t last,const struct iovec * iov,int iovcnt)3311 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
3312     uint8_t last, const struct iovec *iov, int iovcnt)
3313 {
3314     int                     i, rc;
3315     size_t                  l, copy;
3316     uint32_t                payload_len, buf_size, alloc_size;
3317     const uint8_t           *b;
3318     nxt_unit_buf_t          *buf;
3319     nxt_unit_mmap_buf_t     mmap_buf;
3320     nxt_websocket_header_t  *wh;
3321     char                    local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
3322 
3323     payload_len = 0;
3324 
3325     for (i = 0; i < iovcnt; i++) {
3326         payload_len += iov[i].iov_len;
3327     }
3328 
3329     buf_size = 10 + payload_len;
3330     alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3331 
3332     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3333                                    alloc_size, alloc_size,
3334                                    &mmap_buf, local_buf);
3335     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3336         return rc;
3337     }
3338 
3339     buf = &mmap_buf.buf;
3340 
3341     buf->start[0] = 0;
3342     buf->start[1] = 0;
3343 
3344     buf_size -= buf->end - buf->start;
3345 
3346     wh = (void *) buf->free;
3347 
3348     buf->free = nxt_websocket_frame_init(wh, payload_len);
3349     wh->fin = last;
3350     wh->opcode = opcode;
3351 
3352     for (i = 0; i < iovcnt; i++) {
3353         b = iov[i].iov_base;
3354         l = iov[i].iov_len;
3355 
3356         while (l > 0) {
3357             copy = buf->end - buf->free;
3358             copy = nxt_min(l, copy);
3359 
3360             buf->free = nxt_cpymem(buf->free, b, copy);
3361             b += copy;
3362             l -= copy;
3363 
3364             if (l > 0) {
3365                 if (nxt_fast_path(buf->free > buf->start)) {
3366                     rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3367 
3368                     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3369                         return rc;
3370                     }
3371                 }
3372 
3373                 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3374 
3375                 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3376                                                alloc_size, alloc_size,
3377                                                &mmap_buf, local_buf);
3378                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3379                     return rc;
3380                 }
3381 
3382                 buf_size -= buf->end - buf->start;
3383             }
3384         }
3385     }
3386 
3387     if (buf->free > buf->start) {
3388         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3389     }
3390 
3391     return rc;
3392 }
3393 
3394 
3395 ssize_t
nxt_unit_websocket_read(nxt_unit_websocket_frame_t * ws,void * dst,size_t size)3396 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
3397     size_t size)
3398 {
3399     ssize_t   res;
3400     uint8_t   *b;
3401     uint64_t  i, d;
3402 
3403     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
3404                             dst, size);
3405 
3406     if (ws->mask == NULL) {
3407         return res;
3408     }
3409 
3410     b = dst;
3411     d = (ws->payload_len - ws->content_length - res) % 4;
3412 
3413     for (i = 0; i < (uint64_t) res; i++) {
3414         b[i] ^= ws->mask[ (i + d) % 4 ];
3415     }
3416 
3417     return res;
3418 }
3419 
3420 
3421 int
nxt_unit_websocket_retain(nxt_unit_websocket_frame_t * ws)3422 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
3423 {
3424     char                             *b;
3425     size_t                           size, hsize;
3426     nxt_unit_websocket_frame_impl_t  *ws_impl;
3427 
3428     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
3429 
3430     if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
3431         return NXT_UNIT_OK;
3432     }
3433 
3434     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
3435 
3436     b = nxt_unit_malloc(ws->req->ctx, size);
3437     if (nxt_slow_path(b == NULL)) {
3438         return NXT_UNIT_ERROR;
3439     }
3440 
3441     memcpy(b, ws_impl->buf->buf.start, size);
3442 
3443     hsize = nxt_websocket_frame_header_size(b);
3444 
3445     ws_impl->buf->buf.start = b;
3446     ws_impl->buf->buf.free = b + hsize;
3447     ws_impl->buf->buf.end = b + size;
3448 
3449     ws_impl->buf->free_ptr = b;
3450 
3451     ws_impl->ws.header = (nxt_websocket_header_t *) b;
3452 
3453     if (ws_impl->ws.header->mask) {
3454         ws_impl->ws.mask = (uint8_t *) b + hsize - 4;
3455 
3456     } else {
3457         ws_impl->ws.mask = NULL;
3458     }
3459 
3460     return NXT_UNIT_OK;
3461 }
3462 
3463 
3464 void
nxt_unit_websocket_done(nxt_unit_websocket_frame_t * ws)3465 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
3466 {
3467     nxt_unit_websocket_frame_release(ws);
3468 }
3469 
3470 
3471 static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_chunk_id_t * c,int * n,int min_n)3472 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3473     nxt_chunk_id_t *c, int *n, int min_n)
3474 {
3475     int                     res, nchunks, i;
3476     uint32_t                outgoing_size;
3477     nxt_unit_mmap_t         *mm, *mm_end;
3478     nxt_unit_impl_t         *lib;
3479     nxt_port_mmap_header_t  *hdr;
3480 
3481     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3482 
3483     pthread_mutex_lock(&lib->outgoing.mutex);
3484 
3485 retry:
3486 
3487     outgoing_size = lib->outgoing.size;
3488 
3489     mm_end = lib->outgoing.elts + outgoing_size;
3490 
3491     for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
3492         hdr = mm->hdr;
3493 
3494         if (hdr->sent_over != 0xFFFFu
3495             && (hdr->sent_over != port->id.id
3496                 || mm->src_thread != pthread_self()))
3497         {
3498             continue;
3499         }
3500 
3501         *c = 0;
3502 
3503         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
3504             nchunks = 1;
3505 
3506             while (nchunks < *n) {
3507                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
3508                                                        *c + nchunks);
3509 
3510                 if (res == 0) {
3511                     if (nchunks >= min_n) {
3512                         *n = nchunks;
3513 
3514                         goto unlock;
3515                     }
3516 
3517                     for (i = 0; i < nchunks; i++) {
3518                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
3519                     }
3520 
3521                     *c += nchunks + 1;
3522                     nchunks = 0;
3523                     break;
3524                 }
3525 
3526                 nchunks++;
3527             }
3528 
3529             if (nchunks >= min_n) {
3530                 *n = nchunks;
3531 
3532                 goto unlock;
3533             }
3534         }
3535 
3536         hdr->oosm = 1;
3537     }
3538 
3539     if (outgoing_size >= lib->shm_mmap_limit) {
3540         /* Cannot allocate more shared memory. */
3541         pthread_mutex_unlock(&lib->outgoing.mutex);
3542 
3543         if (min_n == 0) {
3544             *n = 0;
3545         }
3546 
3547         if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n
3548                           >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
3549         {
3550             /* Memory allocated by application, but not send to router. */
3551             return NULL;
3552         }
3553 
3554         /* Notify router about OOSM condition. */
3555 
3556         res = nxt_unit_send_oosm(ctx, port);
3557         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3558             return NULL;
3559         }
3560 
3561         /* Return if caller can handle OOSM condition. Non-blocking mode. */
3562 
3563         if (min_n == 0) {
3564             return NULL;
3565         }
3566 
3567         nxt_unit_debug(ctx, "oosm: waiting for ACK");
3568 
3569         res = nxt_unit_wait_shm_ack(ctx);
3570         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3571             return NULL;
3572         }
3573 
3574         nxt_unit_debug(ctx, "oosm: retry");
3575 
3576         pthread_mutex_lock(&lib->outgoing.mutex);
3577 
3578         goto retry;
3579     }
3580 
3581     *c = 0;
3582     hdr = nxt_unit_new_mmap(ctx, port, *n);
3583 
3584 unlock:
3585 
3586     nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n);
3587 
3588     nxt_unit_debug(ctx, "allocated_chunks %d",
3589                    (int) lib->outgoing.allocated_chunks);
3590 
3591     pthread_mutex_unlock(&lib->outgoing.mutex);
3592 
3593     return hdr;
3594 }
3595 
3596 
3597 static int
nxt_unit_send_oosm(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)3598 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3599 {
3600     ssize_t          res;
3601     nxt_port_msg_t   msg;
3602     nxt_unit_impl_t  *lib;
3603 
3604     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3605 
3606     msg.stream = 0;
3607     msg.pid = lib->pid;
3608     msg.reply_port = 0;
3609     msg.type = _NXT_PORT_MSG_OOSM;
3610     msg.last = 0;
3611     msg.mmap = 0;
3612     msg.nf = 0;
3613     msg.mf = 0;
3614     msg.tracking = 0;
3615 
3616     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
3617     if (nxt_slow_path(res != sizeof(msg))) {
3618         return NXT_UNIT_ERROR;
3619     }
3620 
3621     return NXT_UNIT_OK;
3622 }
3623 
3624 
3625 static int
nxt_unit_wait_shm_ack(nxt_unit_ctx_t * ctx)3626 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3627 {
3628     int                  res;
3629     nxt_unit_ctx_impl_t  *ctx_impl;
3630     nxt_unit_read_buf_t  *rbuf;
3631 
3632     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3633 
3634     while (1) {
3635         rbuf = nxt_unit_read_buf_get(ctx);
3636         if (nxt_slow_path(rbuf == NULL)) {
3637             return NXT_UNIT_ERROR;
3638         }
3639 
3640         do {
3641             res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
3642         } while (res == NXT_UNIT_AGAIN);
3643 
3644         if (res == NXT_UNIT_ERROR) {
3645             nxt_unit_read_buf_release(ctx, rbuf);
3646 
3647             return NXT_UNIT_ERROR;
3648         }
3649 
3650         if (nxt_unit_is_shm_ack(rbuf)) {
3651             nxt_unit_read_buf_release(ctx, rbuf);
3652             break;
3653         }
3654 
3655         pthread_mutex_lock(&ctx_impl->mutex);
3656 
3657         nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3658 
3659         pthread_mutex_unlock(&ctx_impl->mutex);
3660 
3661         if (nxt_unit_is_quit(rbuf)) {
3662             nxt_unit_debug(ctx, "oosm: quit received");
3663 
3664             return NXT_UNIT_ERROR;
3665         }
3666     }
3667 
3668     return NXT_UNIT_OK;
3669 }
3670 
3671 
3672 static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t * mmaps,uint32_t i)3673 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3674 {
3675     uint32_t         cap, n;
3676     nxt_unit_mmap_t  *e;
3677 
3678     if (nxt_fast_path(mmaps->size > i)) {
3679         return mmaps->elts + i;
3680     }
3681 
3682     cap = mmaps->cap;
3683 
3684     if (cap == 0) {
3685         cap = i + 1;
3686     }
3687 
3688     while (i + 1 > cap) {
3689 
3690         if (cap < 16) {
3691             cap = cap * 2;
3692 
3693         } else {
3694             cap = cap + cap / 2;
3695         }
3696     }
3697 
3698     if (cap != mmaps->cap) {
3699 
3700         e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
3701         if (nxt_slow_path(e == NULL)) {
3702             return NULL;
3703         }
3704 
3705         mmaps->elts = e;
3706 
3707         for (n = mmaps->cap; n < cap; n++) {
3708             e = mmaps->elts + n;
3709 
3710             e->hdr = NULL;
3711             nxt_queue_init(&e->awaiting_rbuf);
3712         }
3713 
3714         mmaps->cap = cap;
3715     }
3716 
3717     if (i + 1 > mmaps->size) {
3718         mmaps->size = i + 1;
3719     }
3720 
3721     return mmaps->elts + i;
3722 }
3723 
3724 
3725 static nxt_port_mmap_header_t *
nxt_unit_new_mmap(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,int n)3726 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3727 {
3728     int                     i, fd, rc;
3729     void                    *mem;
3730     nxt_unit_mmap_t         *mm;
3731     nxt_unit_impl_t         *lib;
3732     nxt_port_mmap_header_t  *hdr;
3733 
3734     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3735 
3736     mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
3737     if (nxt_slow_path(mm == NULL)) {
3738         nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3739 
3740         return NULL;
3741     }
3742 
3743     fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
3744     if (nxt_slow_path(fd == -1)) {
3745         goto remove_fail;
3746     }
3747 
3748     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3749     if (nxt_slow_path(mem == MAP_FAILED)) {
3750         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3751                        strerror(errno), errno);
3752 
3753         nxt_unit_close(fd);
3754 
3755         goto remove_fail;
3756     }
3757 
3758     mm->hdr = mem;
3759     hdr = mem;
3760 
3761     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3762     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
3763 
3764     hdr->id = lib->outgoing.size - 1;
3765     hdr->src_pid = lib->pid;
3766     hdr->dst_pid = port->id.pid;
3767     hdr->sent_over = port->id.id;
3768     mm->src_thread = pthread_self();
3769 
3770     /* Mark first n chunk(s) as busy */
3771     for (i = 0; i < n; i++) {
3772         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
3773     }
3774 
3775     /* Mark as busy chunk followed the last available chunk. */
3776     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
3777     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
3778 
3779     pthread_mutex_unlock(&lib->outgoing.mutex);
3780 
3781     rc = nxt_unit_send_mmap(ctx, port, fd);
3782     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3783         munmap(mem, PORT_MMAP_SIZE);
3784         hdr = NULL;
3785 
3786     } else {
3787         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3788                        hdr->id, (int) lib->pid, (int) port->id.pid);
3789     }
3790 
3791     nxt_unit_close(fd);
3792 
3793     pthread_mutex_lock(&lib->outgoing.mutex);
3794 
3795     if (nxt_fast_path(hdr != NULL)) {
3796         return hdr;
3797     }
3798 
3799 remove_fail:
3800 
3801     lib->outgoing.size--;
3802 
3803     return NULL;
3804 }
3805 
3806 
3807 static int
nxt_unit_shm_open(nxt_unit_ctx_t * ctx,size_t size)3808 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
3809 {
3810     int              fd;
3811     nxt_unit_impl_t  *lib;
3812 
3813     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3814 
3815 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
3816     char             name[64];
3817 
3818     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3819              lib->pid, (void *) (uintptr_t) pthread_self());
3820 #endif
3821 
3822 #if (NXT_HAVE_MEMFD_CREATE)
3823 
3824     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3825     if (nxt_slow_path(fd == -1)) {
3826         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3827                        strerror(errno), errno);
3828 
3829         return -1;
3830     }
3831 
3832     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3833 
3834 #elif (NXT_HAVE_SHM_OPEN_ANON)
3835 
3836     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3837     if (nxt_slow_path(fd == -1)) {
3838         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3839                        strerror(errno), errno);
3840 
3841         return -1;
3842     }
3843 
3844 #elif (NXT_HAVE_SHM_OPEN)
3845 
3846     /* Just in case. */
3847     shm_unlink(name);
3848 
3849     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3850     if (nxt_slow_path(fd == -1)) {
3851         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3852                        strerror(errno), errno);
3853 
3854         return -1;
3855     }
3856 
3857     if (nxt_slow_path(shm_unlink(name) == -1)) {
3858         nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3859                        strerror(errno), errno);
3860     }
3861 
3862 #else
3863 
3864 #error No working shared memory implementation.
3865 
3866 #endif
3867 
3868     if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3869         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3870                        strerror(errno), errno);
3871 
3872         nxt_unit_close(fd);
3873 
3874         return -1;
3875     }
3876 
3877     return fd;
3878 }
3879 
3880 
3881 static int
nxt_unit_send_mmap(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,int fd)3882 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3883 {
3884     ssize_t          res;
3885     nxt_send_oob_t   oob;
3886     nxt_port_msg_t   msg;
3887     nxt_unit_impl_t  *lib;
3888     int              fds[2] = {fd, -1};
3889 
3890     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3891 
3892     msg.stream = 0;
3893     msg.pid = lib->pid;
3894     msg.reply_port = 0;
3895     msg.type = _NXT_PORT_MSG_MMAP;
3896     msg.last = 0;
3897     msg.mmap = 0;
3898     msg.nf = 0;
3899     msg.mf = 0;
3900     msg.tracking = 0;
3901 
3902     nxt_socket_msg_oob_init(&oob, fds);
3903 
3904     res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), &oob);
3905     if (nxt_slow_path(res != sizeof(msg))) {
3906         return NXT_UNIT_ERROR;
3907     }
3908 
3909     return NXT_UNIT_OK;
3910 }
3911 
3912 
3913 static int
nxt_unit_get_outgoing_buf(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,uint32_t size,uint32_t min_size,nxt_unit_mmap_buf_t * mmap_buf,char * local_buf)3914 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3915     uint32_t size, uint32_t min_size,
3916     nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
3917 {
3918     int                     nchunks, min_nchunks;
3919     nxt_chunk_id_t          c;
3920     nxt_port_mmap_header_t  *hdr;
3921 
3922     if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
3923         if (local_buf != NULL) {
3924             mmap_buf->free_ptr = NULL;
3925             mmap_buf->plain_ptr = local_buf;
3926 
3927         } else {
3928             mmap_buf->free_ptr = nxt_unit_malloc(ctx,
3929                                                  size + sizeof(nxt_port_msg_t));
3930             if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3931                 return NXT_UNIT_ERROR;
3932             }
3933 
3934             mmap_buf->plain_ptr = mmap_buf->free_ptr;
3935         }
3936 
3937         mmap_buf->hdr = NULL;
3938         mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
3939         mmap_buf->buf.free = mmap_buf->buf.start;
3940         mmap_buf->buf.end = mmap_buf->buf.start + size;
3941 
3942         nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
3943                        mmap_buf->buf.start, (int) size);
3944 
3945         return NXT_UNIT_OK;
3946     }
3947 
3948     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3949     min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3950 
3951     hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks);
3952     if (nxt_slow_path(hdr == NULL)) {
3953         if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
3954             mmap_buf->hdr = NULL;
3955             mmap_buf->buf.start = NULL;
3956             mmap_buf->buf.free = NULL;
3957             mmap_buf->buf.end = NULL;
3958             mmap_buf->free_ptr = NULL;
3959 
3960             return NXT_UNIT_OK;
3961         }
3962 
3963         return NXT_UNIT_ERROR;
3964     }
3965 
3966     mmap_buf->hdr = hdr;
3967     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
3968     mmap_buf->buf.free = mmap_buf->buf.start;
3969     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
3970     mmap_buf->free_ptr = NULL;
3971     mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3972 
3973     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
3974                   (int) hdr->id, (int) c,
3975                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
3976 
3977     return NXT_UNIT_OK;
3978 }
3979 
3980 
3981 static int
nxt_unit_incoming_mmap(nxt_unit_ctx_t * ctx,pid_t pid,int fd)3982 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
3983 {
3984     int                     rc;
3985     void                    *mem;
3986     nxt_queue_t             awaiting_rbuf;
3987     struct stat             mmap_stat;
3988     nxt_unit_mmap_t         *mm;
3989     nxt_unit_impl_t         *lib;
3990     nxt_unit_ctx_impl_t     *ctx_impl;
3991     nxt_unit_read_buf_t     *rbuf;
3992     nxt_port_mmap_header_t  *hdr;
3993 
3994     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3995 
3996     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
3997 
3998     if (fstat(fd, &mmap_stat) == -1) {
3999         nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
4000                        strerror(errno), errno);
4001 
4002         return NXT_UNIT_ERROR;
4003     }
4004 
4005     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
4006                MAP_SHARED, fd, 0);
4007     if (nxt_slow_path(mem == MAP_FAILED)) {
4008         nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)",
4009                        strerror(errno), errno);
4010 
4011         return NXT_UNIT_ERROR;
4012     }
4013 
4014     hdr = mem;
4015 
4016     if (nxt_slow_path(hdr->src_pid != pid)) {
4017 
4018         nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header "
4019                        "detected: %d != %d or %d != %d", (int) hdr->src_pid,
4020                        (int) pid, (int) hdr->dst_pid, (int) lib->pid);
4021 
4022         munmap(mem, PORT_MMAP_SIZE);
4023 
4024         return NXT_UNIT_ERROR;
4025     }
4026 
4027     nxt_queue_init(&awaiting_rbuf);
4028 
4029     pthread_mutex_lock(&lib->incoming.mutex);
4030 
4031     mm = nxt_unit_mmap_at(&lib->incoming, hdr->id);
4032     if (nxt_slow_path(mm == NULL)) {
4033         nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array");
4034 
4035         munmap(mem, PORT_MMAP_SIZE);
4036 
4037         rc = NXT_UNIT_ERROR;
4038 
4039     } else {
4040         mm->hdr = hdr;
4041 
4042         hdr->sent_over = 0xFFFFu;
4043 
4044         nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
4045         nxt_queue_init(&mm->awaiting_rbuf);
4046 
4047         rc = NXT_UNIT_OK;
4048     }
4049 
4050     pthread_mutex_unlock(&lib->incoming.mutex);
4051 
4052     nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
4053 
4054         ctx_impl = rbuf->ctx_impl;
4055 
4056         pthread_mutex_lock(&ctx_impl->mutex);
4057 
4058         nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
4059 
4060         pthread_mutex_unlock(&ctx_impl->mutex);
4061 
4062         nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
4063 
4064         nxt_unit_awake_ctx(ctx, ctx_impl);
4065 
4066     } nxt_queue_loop;
4067 
4068     return rc;
4069 }
4070 
4071 
4072 static void
nxt_unit_awake_ctx(nxt_unit_ctx_t * ctx,nxt_unit_ctx_impl_t * ctx_impl)4073 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
4074 {
4075     nxt_port_msg_t  msg;
4076 
4077     if (nxt_fast_path(ctx == &ctx_impl->ctx)) {
4078         return;
4079     }
4080 
4081     if (nxt_slow_path(ctx_impl->read_port == NULL
4082                       || ctx_impl->read_port->out_fd == -1))
4083     {
4084         nxt_unit_alert(ctx, "target context read_port is NULL or not writable");
4085 
4086         return;
4087     }
4088 
4089     memset(&msg, 0, sizeof(nxt_port_msg_t));
4090 
4091     msg.type = _NXT_PORT_MSG_RPC_READY;
4092 
4093     (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
4094                               &msg, sizeof(msg), NULL);
4095 }
4096 
4097 
4098 static void
nxt_unit_mmaps_init(nxt_unit_mmaps_t * mmaps)4099 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
4100 {
4101     pthread_mutex_init(&mmaps->mutex, NULL);
4102 
4103     mmaps->size = 0;
4104     mmaps->cap = 0;
4105     mmaps->elts = NULL;
4106     mmaps->allocated_chunks = 0;
4107 }
4108 
4109 
4110 nxt_inline void
nxt_unit_process_use(nxt_unit_process_t * process)4111 nxt_unit_process_use(nxt_unit_process_t *process)
4112 {
4113     nxt_atomic_fetch_add(&process->use_count, 1);
4114 }
4115 
4116 
4117 nxt_inline void
nxt_unit_process_release(nxt_unit_process_t * process)4118 nxt_unit_process_release(nxt_unit_process_t *process)
4119 {
4120     long c;
4121 
4122     c = nxt_atomic_fetch_add(&process->use_count, -1);
4123 
4124     if (c == 1) {
4125         nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
4126 
4127         nxt_unit_free(NULL, process);
4128     }
4129 }
4130 
4131 
4132 static void
nxt_unit_mmaps_destroy(nxt_unit_mmaps_t * mmaps)4133 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
4134 {
4135     nxt_unit_mmap_t  *mm, *end;
4136 
4137     if (mmaps->elts != NULL) {
4138         end = mmaps->elts + mmaps->size;
4139 
4140         for (mm = mmaps->elts; mm < end; mm++) {
4141             munmap(mm->hdr, PORT_MMAP_SIZE);
4142         }
4143 
4144         nxt_unit_free(NULL, mmaps->elts);
4145     }
4146 
4147     pthread_mutex_destroy(&mmaps->mutex);
4148 }
4149 
4150 
4151 static int
nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t * ctx,nxt_unit_mmaps_t * mmaps,pid_t pid,uint32_t id,nxt_port_mmap_header_t ** hdr,nxt_unit_read_buf_t * rbuf)4152 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
4153     pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
4154     nxt_unit_read_buf_t *rbuf)
4155 {
4156     int                  res, need_rbuf;
4157     nxt_unit_mmap_t      *mm;
4158     nxt_unit_ctx_impl_t  *ctx_impl;
4159 
4160     mm = nxt_unit_mmap_at(mmaps, id);
4161     if (nxt_slow_path(mm == NULL)) {
4162         nxt_unit_alert(ctx, "failed to allocate mmap");
4163 
4164         pthread_mutex_unlock(&mmaps->mutex);
4165 
4166         *hdr = NULL;
4167 
4168         return NXT_UNIT_ERROR;
4169     }
4170 
4171     *hdr = mm->hdr;
4172 
4173     if (nxt_fast_path(*hdr != NULL)) {
4174         return NXT_UNIT_OK;
4175     }
4176 
4177     need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
4178 
4179     nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
4180 
4181     pthread_mutex_unlock(&mmaps->mutex);
4182 
4183     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4184 
4185     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
4186 
4187     if (need_rbuf) {
4188         res = nxt_unit_get_mmap(ctx, pid, id);
4189         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
4190             return NXT_UNIT_ERROR;
4191         }
4192     }
4193 
4194     return NXT_UNIT_AGAIN;
4195 }
4196 
4197 
4198 static int
nxt_unit_mmap_read(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_read_buf_t * rbuf)4199 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
4200     nxt_unit_read_buf_t *rbuf)
4201 {
4202     int                     res;
4203     void                    *start;
4204     uint32_t                size;
4205     nxt_unit_impl_t         *lib;
4206     nxt_unit_mmaps_t        *mmaps;
4207     nxt_unit_mmap_buf_t     *b, **incoming_tail;
4208     nxt_port_mmap_msg_t     *mmap_msg, *end;
4209     nxt_port_mmap_header_t  *hdr;
4210 
4211     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
4212         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
4213                       recv_msg->stream, (int) recv_msg->size);
4214 
4215         return NXT_UNIT_ERROR;
4216     }
4217 
4218     mmap_msg = recv_msg->start;
4219     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
4220 
4221     incoming_tail = &recv_msg->incoming_buf;
4222 
4223     /* Allocating buffer structures. */
4224     for (; mmap_msg < end; mmap_msg++) {
4225         b = nxt_unit_mmap_buf_get(ctx);
4226         if (nxt_slow_path(b == NULL)) {
4227             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
4228                           recv_msg->stream);
4229 
4230             while (recv_msg->incoming_buf != NULL) {
4231                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4232             }
4233 
4234             return NXT_UNIT_ERROR;
4235         }
4236 
4237         nxt_unit_mmap_buf_insert(incoming_tail, b);
4238         incoming_tail = &b->next;
4239     }
4240 
4241     b = recv_msg->incoming_buf;
4242     mmap_msg = recv_msg->start;
4243 
4244     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4245 
4246     mmaps = &lib->incoming;
4247 
4248     pthread_mutex_lock(&mmaps->mutex);
4249 
4250     for (; mmap_msg < end; mmap_msg++) {
4251         res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
4252                                        recv_msg->pid, mmap_msg->mmap_id,
4253                                        &hdr, rbuf);
4254 
4255         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4256             while (recv_msg->incoming_buf != NULL) {
4257                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4258             }
4259 
4260             return res;
4261         }
4262 
4263         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
4264         size = mmap_msg->size;
4265 
4266         if (recv_msg->start == mmap_msg) {
4267             recv_msg->start = start;
4268             recv_msg->size = size;
4269         }
4270 
4271         b->buf.start = start;
4272         b->buf.free = start;
4273         b->buf.end = b->buf.start + size;
4274         b->hdr = hdr;
4275 
4276         b = b->next;
4277 
4278         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
4279                        recv_msg->stream,
4280                        start, (int) size,
4281                        (int) hdr->src_pid, (int) hdr->dst_pid,
4282                        (int) hdr->id, (int) mmap_msg->chunk_id,
4283                        (int) mmap_msg->size);
4284     }
4285 
4286     pthread_mutex_unlock(&mmaps->mutex);
4287 
4288     return NXT_UNIT_OK;
4289 }
4290 
4291 
4292 static int
nxt_unit_get_mmap(nxt_unit_ctx_t * ctx,pid_t pid,uint32_t id)4293 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
4294 {
4295     ssize_t              res;
4296     nxt_unit_impl_t      *lib;
4297     nxt_unit_ctx_impl_t  *ctx_impl;
4298 
4299     struct {
4300         nxt_port_msg_t           msg;
4301         nxt_port_msg_get_mmap_t  get_mmap;
4302     } m;
4303 
4304     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4305     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4306 
4307     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
4308 
4309     m.msg.pid = lib->pid;
4310     m.msg.reply_port = ctx_impl->read_port->id.id;
4311     m.msg.type = _NXT_PORT_MSG_GET_MMAP;
4312 
4313     m.get_mmap.id = id;
4314 
4315     nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
4316 
4317     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
4318     if (nxt_slow_path(res != sizeof(m))) {
4319         return NXT_UNIT_ERROR;
4320     }
4321 
4322     return NXT_UNIT_OK;
4323 }
4324 
4325 
4326 static void
nxt_unit_mmap_release(nxt_unit_ctx_t * ctx,nxt_port_mmap_header_t * hdr,void * start,uint32_t size)4327 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr,
4328     void *start, uint32_t size)
4329 {
4330     int              freed_chunks;
4331     u_char           *p, *end;
4332     nxt_chunk_id_t   c;
4333     nxt_unit_impl_t  *lib;
4334 
4335     memset(start, 0xA5, size);
4336 
4337     p = start;
4338     end = p + size;
4339     c = nxt_port_mmap_chunk_id(hdr, p);
4340     freed_chunks = 0;
4341 
4342     while (p < end) {
4343         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
4344 
4345         p += PORT_MMAP_CHUNK_SIZE;
4346         c++;
4347         freed_chunks++;
4348     }
4349 
4350     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4351 
4352     if (hdr->src_pid == lib->pid && freed_chunks != 0) {
4353         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks);
4354 
4355         nxt_unit_debug(ctx, "allocated_chunks %d",
4356                        (int) lib->outgoing.allocated_chunks);
4357     }
4358 
4359     if (hdr->dst_pid == lib->pid
4360         && freed_chunks != 0
4361         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
4362     {
4363         nxt_unit_send_shm_ack(ctx, hdr->src_pid);
4364     }
4365 }
4366 
4367 
4368 static int
nxt_unit_send_shm_ack(nxt_unit_ctx_t * ctx,pid_t pid)4369 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
4370 {
4371     ssize_t          res;
4372     nxt_port_msg_t   msg;
4373     nxt_unit_impl_t  *lib;
4374 
4375     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4376 
4377     msg.stream = 0;
4378     msg.pid = lib->pid;
4379     msg.reply_port = 0;
4380     msg.type = _NXT_PORT_MSG_SHM_ACK;
4381     msg.last = 0;
4382     msg.mmap = 0;
4383     msg.nf = 0;
4384     msg.mf = 0;
4385     msg.tracking = 0;
4386 
4387     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
4388     if (nxt_slow_path(res != sizeof(msg))) {
4389         return NXT_UNIT_ERROR;
4390     }
4391 
4392     return NXT_UNIT_OK;
4393 }
4394 
4395 
4396 static nxt_int_t
nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t * lhq,void * data)4397 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
4398 {
4399     nxt_process_t  *process;
4400 
4401     process = data;
4402 
4403     if (lhq->key.length == sizeof(pid_t)
4404         && *(pid_t *) lhq->key.start == process->pid)
4405     {
4406         return NXT_OK;
4407     }
4408 
4409     return NXT_DECLINED;
4410 }
4411 
4412 
4413 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
4414     NXT_LVLHSH_DEFAULT,
4415     nxt_unit_lvlhsh_pid_test,
4416     nxt_unit_lvlhsh_alloc,
4417     nxt_unit_lvlhsh_free,
4418 };
4419 
4420 
4421 static inline void
nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t * lhq,pid_t * pid)4422 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
4423 {
4424     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
4425     lhq->key.length = sizeof(*pid);
4426     lhq->key.start = (u_char *) pid;
4427     lhq->proto = &lvlhsh_processes_proto;
4428 }
4429 
4430 
4431 static nxt_unit_process_t *
nxt_unit_process_get(nxt_unit_ctx_t * ctx,pid_t pid)4432 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
4433 {
4434     nxt_unit_impl_t     *lib;
4435     nxt_unit_process_t  *process;
4436     nxt_lvlhsh_query_t  lhq;
4437 
4438     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4439 
4440     nxt_unit_process_lhq_pid(&lhq, &pid);
4441 
4442     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
4443         process = lhq.value;
4444         nxt_unit_process_use(process);
4445 
4446         return process;
4447     }
4448 
4449     process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t));
4450     if (nxt_slow_path(process == NULL)) {
4451         nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid);
4452 
4453         return NULL;
4454     }
4455 
4456     process->pid = pid;
4457     process->use_count = 2;
4458     process->next_port_id = 0;
4459     process->lib = lib;
4460 
4461     nxt_queue_init(&process->ports);
4462 
4463     lhq.replace = 0;
4464     lhq.value = process;
4465 
4466     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
4467 
4468     case NXT_OK:
4469         break;
4470 
4471     default:
4472         nxt_unit_alert(ctx, "process %d insert failed", (int) pid);
4473 
4474         nxt_unit_free(ctx, process);
4475         process = NULL;
4476         break;
4477     }
4478 
4479     return process;
4480 }
4481 
4482 
4483 static nxt_unit_process_t *
nxt_unit_process_find(nxt_unit_impl_t * lib,pid_t pid,int remove)4484 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
4485 {
4486     int                 rc;
4487     nxt_lvlhsh_query_t  lhq;
4488 
4489     nxt_unit_process_lhq_pid(&lhq, &pid);
4490 
4491     if (remove) {
4492         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
4493 
4494     } else {
4495         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
4496     }
4497 
4498     if (rc == NXT_OK) {
4499         if (!remove) {
4500             nxt_unit_process_use(lhq.value);
4501         }
4502 
4503         return lhq.value;
4504     }
4505 
4506     return NULL;
4507 }
4508 
4509 
4510 static nxt_unit_process_t *
nxt_unit_process_pop_first(nxt_unit_impl_t * lib)4511 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
4512 {
4513     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
4514 }
4515 
4516 
4517 int
nxt_unit_run(nxt_unit_ctx_t * ctx)4518 nxt_unit_run(nxt_unit_ctx_t *ctx)
4519 {
4520     int                  rc;
4521     nxt_unit_ctx_impl_t  *ctx_impl;
4522 
4523     nxt_unit_ctx_use(ctx);
4524 
4525     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4526 
4527     rc = NXT_UNIT_OK;
4528 
4529     while (nxt_fast_path(ctx_impl->online)) {
4530         rc = nxt_unit_run_once_impl(ctx);
4531 
4532         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4533             nxt_unit_quit(ctx, NXT_QUIT_NORMAL);
4534             break;
4535         }
4536     }
4537 
4538     nxt_unit_ctx_release(ctx);
4539 
4540     return rc;
4541 }
4542 
4543 
4544 int
nxt_unit_run_once(nxt_unit_ctx_t * ctx)4545 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
4546 {
4547     int  rc;
4548 
4549     nxt_unit_ctx_use(ctx);
4550 
4551     rc = nxt_unit_run_once_impl(ctx);
4552 
4553     nxt_unit_ctx_release(ctx);
4554 
4555     return rc;
4556 }
4557 
4558 
4559 static int
nxt_unit_run_once_impl(nxt_unit_ctx_t * ctx)4560 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
4561 {
4562     int                  rc;
4563     nxt_unit_read_buf_t  *rbuf;
4564 
4565     rbuf = nxt_unit_read_buf_get(ctx);
4566     if (nxt_slow_path(rbuf == NULL)) {
4567         return NXT_UNIT_ERROR;
4568     }
4569 
4570     rc = nxt_unit_read_buf(ctx, rbuf);
4571     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4572         nxt_unit_read_buf_release(ctx, rbuf);
4573 
4574         return rc;
4575     }
4576 
4577     rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4578     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4579         return NXT_UNIT_ERROR;
4580     }
4581 
4582     rc = nxt_unit_process_pending_rbuf(ctx);
4583     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4584         return NXT_UNIT_ERROR;
4585     }
4586 
4587     nxt_unit_process_ready_req(ctx);
4588 
4589     return rc;
4590 }
4591 
4592 
4593 static int
nxt_unit_read_buf(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)4594 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4595 {
4596     int                   nevents, res, err;
4597     nxt_uint_t            nfds;
4598     nxt_unit_impl_t       *lib;
4599     nxt_unit_ctx_impl_t   *ctx_impl;
4600     nxt_unit_port_impl_t  *port_impl;
4601     struct pollfd         fds[2];
4602 
4603     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4604 
4605     if (ctx_impl->wait_items > 0 || !nxt_unit_chk_ready(ctx)) {
4606         return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4607     }
4608 
4609     port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4610                                  port);
4611 
4612     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4613 
4614 retry:
4615 
4616     if (port_impl->from_socket == 0) {
4617         res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
4618         if (res == NXT_UNIT_OK) {
4619             if (nxt_unit_is_read_socket(rbuf)) {
4620                 port_impl->from_socket++;
4621 
4622                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
4623                                (int) ctx_impl->read_port->id.pid,
4624                                (int) ctx_impl->read_port->id.id,
4625                                port_impl->from_socket);
4626 
4627             } else {
4628                 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
4629                                (int) ctx_impl->read_port->id.pid,
4630                                (int) ctx_impl->read_port->id.id,
4631                                (int) rbuf->size);
4632 
4633                 return NXT_UNIT_OK;
4634             }
4635         }
4636     }
4637 
4638     if (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
4639         res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
4640         if (res == NXT_UNIT_OK) {
4641             return NXT_UNIT_OK;
4642         }
4643 
4644         fds[1].fd = lib->shared_port->in_fd;
4645         fds[1].events = POLLIN;
4646 
4647         nfds = 2;
4648 
4649     } else {
4650         nfds = 1;
4651     }
4652 
4653     fds[0].fd = ctx_impl->read_port->in_fd;
4654     fds[0].events = POLLIN;
4655     fds[0].revents = 0;
4656 
4657     fds[1].revents = 0;
4658 
4659     nevents = poll(fds, nfds, -1);
4660     if (nxt_slow_path(nevents == -1)) {
4661         err = errno;
4662 
4663         if (err == EINTR) {
4664             goto retry;
4665         }
4666 
4667         nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
4668                        fds[0].fd, fds[1].fd, strerror(err), err);
4669 
4670         rbuf->size = -1;
4671 
4672         return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
4673     }
4674 
4675     nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04X, %04X]",
4676                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4677                    fds[1].revents);
4678 
4679     if ((fds[0].revents & POLLIN) != 0) {
4680         res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4681         if (res == NXT_UNIT_AGAIN) {
4682             goto retry;
4683         }
4684 
4685         return res;
4686     }
4687 
4688     if ((fds[1].revents & POLLIN) != 0) {
4689         res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4690         if (res == NXT_UNIT_AGAIN) {
4691             goto retry;
4692         }
4693 
4694         return res;
4695     }
4696 
4697     nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
4698                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4699                    fds[1].revents);
4700 
4701     return NXT_UNIT_ERROR;
4702 }
4703 
4704 
4705 static int
nxt_unit_chk_ready(nxt_unit_ctx_t * ctx)4706 nxt_unit_chk_ready(nxt_unit_ctx_t *ctx)
4707 {
4708     nxt_unit_impl_t      *lib;
4709     nxt_unit_ctx_impl_t  *ctx_impl;
4710 
4711     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4712     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4713 
4714     return (ctx_impl->ready
4715             && (lib->request_limit == 0
4716                 || lib->request_count < lib->request_limit));
4717 }
4718 
4719 
4720 static int
nxt_unit_process_pending_rbuf(nxt_unit_ctx_t * ctx)4721 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4722 {
4723     int                  rc;
4724     nxt_queue_t          pending_rbuf;
4725     nxt_unit_ctx_impl_t  *ctx_impl;
4726     nxt_unit_read_buf_t  *rbuf;
4727 
4728     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4729 
4730     pthread_mutex_lock(&ctx_impl->mutex);
4731 
4732     if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
4733         pthread_mutex_unlock(&ctx_impl->mutex);
4734 
4735         return NXT_UNIT_OK;
4736     }
4737 
4738     nxt_queue_init(&pending_rbuf);
4739 
4740     nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
4741     nxt_queue_init(&ctx_impl->pending_rbuf);
4742 
4743     pthread_mutex_unlock(&ctx_impl->mutex);
4744 
4745     rc = NXT_UNIT_OK;
4746 
4747     nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
4748 
4749         if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
4750             rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
4751 
4752         } else {
4753             nxt_unit_read_buf_release(ctx, rbuf);
4754         }
4755 
4756     } nxt_queue_loop;
4757 
4758     if (!ctx_impl->ready) {
4759         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
4760     }
4761 
4762     return rc;
4763 }
4764 
4765 
4766 static void
nxt_unit_process_ready_req(nxt_unit_ctx_t * ctx)4767 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4768 {
4769     int                           res;
4770     nxt_queue_t                   ready_req;
4771     nxt_unit_impl_t               *lib;
4772     nxt_unit_ctx_impl_t           *ctx_impl;
4773     nxt_unit_request_info_t       *req;
4774     nxt_unit_request_info_impl_t  *req_impl;
4775 
4776     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4777 
4778     pthread_mutex_lock(&ctx_impl->mutex);
4779 
4780     if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
4781         pthread_mutex_unlock(&ctx_impl->mutex);
4782 
4783         return;
4784     }
4785 
4786     nxt_queue_init(&ready_req);
4787 
4788     nxt_queue_add(&ready_req, &ctx_impl->ready_req);
4789     nxt_queue_init(&ctx_impl->ready_req);
4790 
4791     pthread_mutex_unlock(&ctx_impl->mutex);
4792 
4793     nxt_queue_each(req_impl, &ready_req,
4794                    nxt_unit_request_info_impl_t, port_wait_link)
4795     {
4796         lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4797 
4798         req = &req_impl->req;
4799 
4800         res = nxt_unit_send_req_headers_ack(req);
4801         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4802             nxt_unit_request_done(req, NXT_UNIT_ERROR);
4803 
4804             continue;
4805         }
4806 
4807         if (req->content_length
4808             > (uint64_t) (req->content_buf->end - req->content_buf->free))
4809         {
4810             res = nxt_unit_request_hash_add(ctx, req);
4811             if (nxt_slow_path(res != NXT_UNIT_OK)) {
4812                 nxt_unit_req_warn(req, "failed to add request to hash");
4813 
4814                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4815 
4816                 continue;
4817             }
4818 
4819             /*
4820              * If application have separate data handler, we may start
4821              * request processing and process data when it is arrived.
4822              */
4823             if (lib->callbacks.data_handler == NULL) {
4824                 continue;
4825             }
4826         }
4827 
4828         lib->callbacks.request_handler(&req_impl->req);
4829 
4830     } nxt_queue_loop;
4831 }
4832 
4833 
4834 int
nxt_unit_run_ctx(nxt_unit_ctx_t * ctx)4835 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
4836 {
4837     int                  rc;
4838     nxt_unit_read_buf_t  *rbuf;
4839     nxt_unit_ctx_impl_t  *ctx_impl;
4840 
4841     nxt_unit_ctx_use(ctx);
4842 
4843     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4844 
4845     rc = NXT_UNIT_OK;
4846 
4847     while (nxt_fast_path(ctx_impl->online)) {
4848         rbuf = nxt_unit_read_buf_get(ctx);
4849         if (nxt_slow_path(rbuf == NULL)) {
4850             rc = NXT_UNIT_ERROR;
4851             break;
4852         }
4853 
4854     retry:
4855 
4856         rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4857         if (rc == NXT_UNIT_AGAIN) {
4858             goto retry;
4859         }
4860 
4861         rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4862         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4863             break;
4864         }
4865 
4866         rc = nxt_unit_process_pending_rbuf(ctx);
4867         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4868             break;
4869         }
4870 
4871         nxt_unit_process_ready_req(ctx);
4872     }
4873 
4874     nxt_unit_ctx_release(ctx);
4875 
4876     return rc;
4877 }
4878 
4879 
4880 nxt_inline int
nxt_unit_is_read_queue(nxt_unit_read_buf_t * rbuf)4881 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
4882 {
4883     nxt_port_msg_t  *port_msg;
4884 
4885     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4886         port_msg = (nxt_port_msg_t *) rbuf->buf;
4887 
4888         return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
4889     }
4890 
4891     return 0;
4892 }
4893 
4894 
4895 nxt_inline int
nxt_unit_is_read_socket(nxt_unit_read_buf_t * rbuf)4896 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
4897 {
4898     if (nxt_fast_path(rbuf->size == 1)) {
4899         return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
4900     }
4901 
4902     return 0;
4903 }
4904 
4905 
4906 nxt_inline int
nxt_unit_is_shm_ack(nxt_unit_read_buf_t * rbuf)4907 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
4908 {
4909     nxt_port_msg_t  *port_msg;
4910 
4911     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4912         port_msg = (nxt_port_msg_t *) rbuf->buf;
4913 
4914         return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
4915     }
4916 
4917     return 0;
4918 }
4919 
4920 
4921 nxt_inline int
nxt_unit_is_quit(nxt_unit_read_buf_t * rbuf)4922 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
4923 {
4924     nxt_port_msg_t  *port_msg;
4925 
4926     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4927         port_msg = (nxt_port_msg_t *) rbuf->buf;
4928 
4929         return port_msg->type == _NXT_PORT_MSG_QUIT;
4930     }
4931 
4932     return 0;
4933 }
4934 
4935 
4936 int
nxt_unit_run_shared(nxt_unit_ctx_t * ctx)4937 nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4938 {
4939     int                  rc;
4940     nxt_unit_impl_t      *lib;
4941     nxt_unit_read_buf_t  *rbuf;
4942 
4943     nxt_unit_ctx_use(ctx);
4944 
4945     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4946 
4947     rc = NXT_UNIT_OK;
4948 
4949     while (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
4950         rbuf = nxt_unit_read_buf_get(ctx);
4951         if (nxt_slow_path(rbuf == NULL)) {
4952             rc = NXT_UNIT_ERROR;
4953             break;
4954         }
4955 
4956     retry:
4957 
4958         rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4959         if (rc == NXT_UNIT_AGAIN) {
4960             goto retry;
4961         }
4962 
4963         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4964             nxt_unit_read_buf_release(ctx, rbuf);
4965             break;
4966         }
4967 
4968         rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4969         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4970             break;
4971         }
4972     }
4973 
4974     nxt_unit_ctx_release(ctx);
4975 
4976     return rc;
4977 }
4978 
4979 
4980 nxt_unit_request_info_t *
nxt_unit_dequeue_request(nxt_unit_ctx_t * ctx)4981 nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
4982 {
4983     int                      rc;
4984     nxt_unit_impl_t          *lib;
4985     nxt_unit_read_buf_t      *rbuf;
4986     nxt_unit_request_info_t  *req;
4987 
4988     nxt_unit_ctx_use(ctx);
4989 
4990     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4991 
4992     req = NULL;
4993 
4994     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
4995         goto done;
4996     }
4997 
4998     rbuf = nxt_unit_read_buf_get(ctx);
4999     if (nxt_slow_path(rbuf == NULL)) {
5000         goto done;
5001     }
5002 
5003     rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
5004     if (rc != NXT_UNIT_OK) {
5005         nxt_unit_read_buf_release(ctx, rbuf);
5006         goto done;
5007     }
5008 
5009     (void) nxt_unit_process_msg(ctx, rbuf, &req);
5010 
5011 done:
5012 
5013     nxt_unit_ctx_release(ctx);
5014 
5015     return req;
5016 }
5017 
5018 
5019 int
nxt_unit_process_port_msg(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)5020 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5021 {
5022     int  rc;
5023 
5024     nxt_unit_ctx_use(ctx);
5025 
5026     rc = nxt_unit_process_port_msg_impl(ctx, port);
5027 
5028     nxt_unit_ctx_release(ctx);
5029 
5030     return rc;
5031 }
5032 
5033 
5034 static int
nxt_unit_process_port_msg_impl(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)5035 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5036 {
5037     int                  rc;
5038     nxt_unit_impl_t      *lib;
5039     nxt_unit_read_buf_t  *rbuf;
5040 
5041     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5042 
5043     if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) {
5044         return NXT_UNIT_AGAIN;
5045     }
5046 
5047     rbuf = nxt_unit_read_buf_get(ctx);
5048     if (nxt_slow_path(rbuf == NULL)) {
5049         return NXT_UNIT_ERROR;
5050     }
5051 
5052     if (port == lib->shared_port) {
5053         rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
5054 
5055     } else {
5056         rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
5057     }
5058 
5059     if (rc != NXT_UNIT_OK) {
5060         nxt_unit_read_buf_release(ctx, rbuf);
5061         return rc;
5062     }
5063 
5064     rc = nxt_unit_process_msg(ctx, rbuf, NULL);
5065     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5066         return NXT_UNIT_ERROR;
5067     }
5068 
5069     rc = nxt_unit_process_pending_rbuf(ctx);
5070     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5071         return NXT_UNIT_ERROR;
5072     }
5073 
5074     nxt_unit_process_ready_req(ctx);
5075 
5076     return rc;
5077 }
5078 
5079 
5080 void
nxt_unit_done(nxt_unit_ctx_t * ctx)5081 nxt_unit_done(nxt_unit_ctx_t *ctx)
5082 {
5083     nxt_unit_ctx_release(ctx);
5084 }
5085 
5086 
5087 nxt_unit_ctx_t *
nxt_unit_ctx_alloc(nxt_unit_ctx_t * ctx,void * data)5088 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
5089 {
5090     int                   rc, queue_fd;
5091     void                  *mem;
5092     nxt_unit_impl_t       *lib;
5093     nxt_unit_port_t       *port;
5094     nxt_unit_ctx_impl_t   *new_ctx;
5095     nxt_unit_port_impl_t  *port_impl;
5096 
5097     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5098 
5099     new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t)
5100                                    + lib->request_data_size);
5101     if (nxt_slow_path(new_ctx == NULL)) {
5102         nxt_unit_alert(ctx, "failed to allocate context");
5103 
5104         return NULL;
5105     }
5106 
5107     rc = nxt_unit_ctx_init(lib, new_ctx, data);
5108     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5109          nxt_unit_free(ctx, new_ctx);
5110 
5111          return NULL;
5112     }
5113 
5114     queue_fd = -1;
5115 
5116     port = nxt_unit_create_port(&new_ctx->ctx);
5117     if (nxt_slow_path(port == NULL)) {
5118         goto fail;
5119     }
5120 
5121     new_ctx->read_port = port;
5122 
5123     queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t));
5124     if (nxt_slow_path(queue_fd == -1)) {
5125         goto fail;
5126     }
5127 
5128     mem = mmap(NULL, sizeof(nxt_port_queue_t),
5129                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
5130     if (nxt_slow_path(mem == MAP_FAILED)) {
5131         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
5132                        strerror(errno), errno);
5133 
5134         goto fail;
5135     }
5136 
5137     nxt_port_queue_init(mem);
5138 
5139     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5140     port_impl->queue = mem;
5141 
5142     rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd);
5143     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5144         goto fail;
5145     }
5146 
5147     nxt_unit_close(queue_fd);
5148 
5149     return &new_ctx->ctx;
5150 
5151 fail:
5152 
5153     if (queue_fd != -1) {
5154         nxt_unit_close(queue_fd);
5155     }
5156 
5157     nxt_unit_ctx_release(&new_ctx->ctx);
5158 
5159     return NULL;
5160 }
5161 
5162 
5163 static void
nxt_unit_ctx_free(nxt_unit_ctx_impl_t * ctx_impl)5164 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
5165 {
5166     nxt_unit_impl_t                  *lib;
5167     nxt_unit_mmap_buf_t              *mmap_buf;
5168     nxt_unit_read_buf_t              *rbuf;
5169     nxt_unit_request_info_impl_t     *req_impl;
5170     nxt_unit_websocket_frame_impl_t  *ws_impl;
5171 
5172     lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
5173 
5174     nxt_queue_each(req_impl, &ctx_impl->active_req,
5175                    nxt_unit_request_info_impl_t, link)
5176     {
5177         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
5178 
5179         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
5180 
5181     } nxt_queue_loop;
5182 
5183     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
5184     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
5185 
5186     while (ctx_impl->free_buf != NULL) {
5187         mmap_buf = ctx_impl->free_buf;
5188         nxt_unit_mmap_buf_unlink(mmap_buf);
5189         nxt_unit_free(&ctx_impl->ctx, mmap_buf);
5190     }
5191 
5192     nxt_queue_each(req_impl, &ctx_impl->free_req,
5193                    nxt_unit_request_info_impl_t, link)
5194     {
5195         nxt_unit_request_info_free(req_impl);
5196 
5197     } nxt_queue_loop;
5198 
5199     nxt_queue_each(ws_impl, &ctx_impl->free_ws,
5200                    nxt_unit_websocket_frame_impl_t, link)
5201     {
5202         nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl);
5203 
5204     } nxt_queue_loop;
5205 
5206     nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link)
5207     {
5208         if (rbuf != &ctx_impl->ctx_read_buf) {
5209             nxt_unit_free(&ctx_impl->ctx, rbuf);
5210         }
5211     } nxt_queue_loop;
5212 
5213     pthread_mutex_destroy(&ctx_impl->mutex);
5214 
5215     pthread_mutex_lock(&lib->mutex);
5216 
5217     nxt_queue_remove(&ctx_impl->link);
5218 
5219     pthread_mutex_unlock(&lib->mutex);
5220 
5221     if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5222         nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id);
5223         nxt_unit_port_release(ctx_impl->read_port);
5224     }
5225 
5226     if (ctx_impl != &lib->main_ctx) {
5227         nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
5228     }
5229 
5230     nxt_unit_lib_release(lib);
5231 }
5232 
5233 
5234 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
5235 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
5236 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
5237 #else
5238 #define NXT_UNIX_SOCKET  SOCK_DGRAM
5239 #endif
5240 
5241 
5242 void
nxt_unit_port_id_init(nxt_unit_port_id_t * port_id,pid_t pid,uint16_t id)5243 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
5244 {
5245     nxt_unit_port_hash_id_t  port_hash_id;
5246 
5247     port_hash_id.pid = pid;
5248     port_hash_id.id = id;
5249 
5250     port_id->pid = pid;
5251     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
5252     port_id->id = id;
5253 }
5254 
5255 
5256 static nxt_unit_port_t *
nxt_unit_create_port(nxt_unit_ctx_t * ctx)5257 nxt_unit_create_port(nxt_unit_ctx_t *ctx)
5258 {
5259     int                 rc, port_sockets[2];
5260     nxt_unit_impl_t     *lib;
5261     nxt_unit_port_t     new_port, *port;
5262     nxt_unit_process_t  *process;
5263 
5264     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5265 
5266     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
5267     if (nxt_slow_path(rc != 0)) {
5268         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
5269                       strerror(errno), errno);
5270 
5271         return NULL;
5272     }
5273 
5274 #if (NXT_HAVE_SOCKOPT_SO_PASSCRED)
5275     int  enable_creds = 1;
5276 
5277     if (nxt_slow_path(setsockopt(port_sockets[0], SOL_SOCKET, SO_PASSCRED,
5278                         &enable_creds, sizeof(enable_creds)) == -1))
5279     {
5280         nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
5281         return NULL;
5282     }
5283 
5284     if (nxt_slow_path(setsockopt(port_sockets[1], SOL_SOCKET, SO_PASSCRED,
5285                         &enable_creds, sizeof(enable_creds)) == -1))
5286     {
5287         nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
5288         return NULL;
5289     }
5290 #endif
5291 
5292     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
5293                    port_sockets[0], port_sockets[1]);
5294 
5295     pthread_mutex_lock(&lib->mutex);
5296 
5297     process = nxt_unit_process_get(ctx, lib->pid);
5298     if (nxt_slow_path(process == NULL)) {
5299         pthread_mutex_unlock(&lib->mutex);
5300 
5301         nxt_unit_close(port_sockets[0]);
5302         nxt_unit_close(port_sockets[1]);
5303 
5304         return NULL;
5305     }
5306 
5307     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
5308 
5309     new_port.in_fd = port_sockets[0];
5310     new_port.out_fd = port_sockets[1];
5311     new_port.data = NULL;
5312 
5313     pthread_mutex_unlock(&lib->mutex);
5314 
5315     nxt_unit_process_release(process);
5316 
5317     port = nxt_unit_add_port(ctx, &new_port, NULL);
5318     if (nxt_slow_path(port == NULL)) {
5319         nxt_unit_close(port_sockets[0]);
5320         nxt_unit_close(port_sockets[1]);
5321     }
5322 
5323     return port;
5324 }
5325 
5326 
5327 static int
nxt_unit_send_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * dst,nxt_unit_port_t * port,int queue_fd)5328 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
5329     nxt_unit_port_t *port, int queue_fd)
5330 {
5331     ssize_t          res;
5332     nxt_send_oob_t   oob;
5333     nxt_unit_impl_t  *lib;
5334     int              fds[2] = { port->out_fd, queue_fd };
5335 
5336     struct {
5337         nxt_port_msg_t            msg;
5338         nxt_port_msg_new_port_t   new_port;
5339     } m;
5340 
5341     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5342 
5343     m.msg.stream = 0;
5344     m.msg.pid = lib->pid;
5345     m.msg.reply_port = 0;
5346     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
5347     m.msg.last = 0;
5348     m.msg.mmap = 0;
5349     m.msg.nf = 0;
5350     m.msg.mf = 0;
5351     m.msg.tracking = 0;
5352 
5353     m.new_port.id = port->id.id;
5354     m.new_port.pid = port->id.pid;
5355     m.new_port.type = NXT_PROCESS_APP;
5356     m.new_port.max_size = 16 * 1024;
5357     m.new_port.max_share = 64 * 1024;
5358 
5359     nxt_socket_msg_oob_init(&oob, fds);
5360 
5361     res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &oob);
5362 
5363     return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
5364 }
5365 
5366 
nxt_unit_port_use(nxt_unit_port_t * port)5367 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)
5368 {
5369     nxt_unit_port_impl_t  *port_impl;
5370 
5371     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5372 
5373     nxt_atomic_fetch_add(&port_impl->use_count, 1);
5374 }
5375 
5376 
nxt_unit_port_release(nxt_unit_port_t * port)5377 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
5378 {
5379     long                  c;
5380     nxt_unit_port_impl_t  *port_impl;
5381 
5382     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5383 
5384     c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5385 
5386     if (c == 1) {
5387         nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d",
5388                        (int) port->id.pid, (int) port->id.id,
5389                        port->in_fd, port->out_fd);
5390 
5391         nxt_unit_process_release(port_impl->process);
5392 
5393         if (port->in_fd != -1) {
5394             nxt_unit_close(port->in_fd);
5395 
5396             port->in_fd = -1;
5397         }
5398 
5399         if (port->out_fd != -1) {
5400             nxt_unit_close(port->out_fd);
5401 
5402             port->out_fd = -1;
5403         }
5404 
5405         if (port_impl->queue != NULL) {
5406             munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID)
5407                                      ? sizeof(nxt_app_queue_t)
5408                                      : sizeof(nxt_port_queue_t));
5409         }
5410 
5411         nxt_unit_free(NULL, port_impl);
5412     }
5413 }
5414 
5415 
5416 static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,void * queue)5417 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
5418 {
5419     int                   rc, ready;
5420     nxt_queue_t           awaiting_req;
5421     nxt_unit_impl_t       *lib;
5422     nxt_unit_port_t       *old_port;
5423     nxt_unit_process_t    *process;
5424     nxt_unit_port_impl_t  *new_port, *old_port_impl;
5425 
5426     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5427 
5428     pthread_mutex_lock(&lib->mutex);
5429 
5430     old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
5431 
5432     if (nxt_slow_path(old_port != NULL)) {
5433         nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
5434                             "in_fd %d out_fd %d queue %p",
5435                             port->id.pid, port->id.id,
5436                             port->in_fd, port->out_fd, queue);
5437 
5438         if (old_port->data == NULL) {
5439             old_port->data = port->data;
5440             port->data = NULL;
5441         }
5442 
5443         if (old_port->in_fd == -1) {
5444             old_port->in_fd = port->in_fd;
5445             port->in_fd = -1;
5446         }
5447 
5448         if (port->in_fd != -1) {
5449             nxt_unit_close(port->in_fd);
5450             port->in_fd = -1;
5451         }
5452 
5453         if (old_port->out_fd == -1) {
5454             old_port->out_fd = port->out_fd;
5455             port->out_fd = -1;
5456         }
5457 
5458         if (port->out_fd != -1) {
5459             nxt_unit_close(port->out_fd);
5460             port->out_fd = -1;
5461         }
5462 
5463         *port = *old_port;
5464 
5465         nxt_queue_init(&awaiting_req);
5466 
5467         old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
5468 
5469         if (old_port_impl->queue == NULL) {
5470             old_port_impl->queue = queue;
5471         }
5472 
5473         ready = (port->in_fd != -1 || port->out_fd != -1);
5474 
5475         /*
5476          * Port can be market as 'ready' only after callbacks.add_port() call.
5477          * Otherwise, request may try to use the port before callback.
5478          */
5479         if (lib->callbacks.add_port == NULL && ready) {
5480             old_port_impl->ready = ready;
5481 
5482             if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5483                 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5484                 nxt_queue_init(&old_port_impl->awaiting_req);
5485             }
5486         }
5487 
5488         pthread_mutex_unlock(&lib->mutex);
5489 
5490         if (lib->callbacks.add_port != NULL && ready) {
5491             lib->callbacks.add_port(ctx, old_port);
5492 
5493             pthread_mutex_lock(&lib->mutex);
5494 
5495             old_port_impl->ready = ready;
5496 
5497             if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5498                 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5499                 nxt_queue_init(&old_port_impl->awaiting_req);
5500             }
5501 
5502             pthread_mutex_unlock(&lib->mutex);
5503         }
5504 
5505         nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5506 
5507         return old_port;
5508     }
5509 
5510     new_port = NULL;
5511     ready = 0;
5512 
5513     nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
5514                    port->id.pid, port->id.id,
5515                    port->in_fd, port->out_fd, queue);
5516 
5517     process = nxt_unit_process_get(ctx, port->id.pid);
5518     if (nxt_slow_path(process == NULL)) {
5519         goto unlock;
5520     }
5521 
5522     if (port->id.id != NXT_UNIT_SHARED_PORT_ID
5523         && port->id.id >= process->next_port_id)
5524     {
5525         process->next_port_id = port->id.id + 1;
5526     }
5527 
5528     new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
5529     if (nxt_slow_path(new_port == NULL)) {
5530         nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
5531                        port->id.pid, port->id.id);
5532 
5533         goto unlock;
5534     }
5535 
5536     new_port->port = *port;
5537 
5538     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
5539     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5540         nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
5541                        port->id.pid, port->id.id);
5542 
5543         nxt_unit_free(ctx, new_port);
5544 
5545         new_port = NULL;
5546 
5547         goto unlock;
5548     }
5549 
5550     nxt_queue_insert_tail(&process->ports, &new_port->link);
5551 
5552     new_port->use_count = 2;
5553     new_port->process = process;
5554     new_port->queue = queue;
5555     new_port->from_socket = 0;
5556     new_port->socket_rbuf = NULL;
5557 
5558     nxt_queue_init(&new_port->awaiting_req);
5559 
5560     ready = (port->in_fd != -1 || port->out_fd != -1);
5561 
5562     if (lib->callbacks.add_port == NULL) {
5563         new_port->ready = ready;
5564 
5565     } else {
5566         new_port->ready = 0;
5567     }
5568 
5569     process = NULL;
5570 
5571 unlock:
5572 
5573     pthread_mutex_unlock(&lib->mutex);
5574 
5575     if (nxt_slow_path(process != NULL)) {
5576         nxt_unit_process_release(process);
5577     }
5578 
5579     if (lib->callbacks.add_port != NULL && new_port != NULL && ready) {
5580         lib->callbacks.add_port(ctx, &new_port->port);
5581 
5582         nxt_queue_init(&awaiting_req);
5583 
5584         pthread_mutex_lock(&lib->mutex);
5585 
5586         new_port->ready = 1;
5587 
5588         if (!nxt_queue_is_empty(&new_port->awaiting_req)) {
5589             nxt_queue_add(&awaiting_req, &new_port->awaiting_req);
5590             nxt_queue_init(&new_port->awaiting_req);
5591         }
5592 
5593         pthread_mutex_unlock(&lib->mutex);
5594 
5595         nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5596     }
5597 
5598     return (new_port == NULL) ? NULL : &new_port->port;
5599 }
5600 
5601 
5602 static void
nxt_unit_process_awaiting_req(nxt_unit_ctx_t * ctx,nxt_queue_t * awaiting_req)5603 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
5604 {
5605     nxt_unit_ctx_impl_t           *ctx_impl;
5606     nxt_unit_request_info_impl_t  *req_impl;
5607 
5608     nxt_queue_each(req_impl, awaiting_req,
5609                    nxt_unit_request_info_impl_t, port_wait_link)
5610     {
5611         nxt_queue_remove(&req_impl->port_wait_link);
5612 
5613         ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
5614                                     ctx);
5615 
5616         pthread_mutex_lock(&ctx_impl->mutex);
5617 
5618         nxt_queue_insert_tail(&ctx_impl->ready_req,
5619                               &req_impl->port_wait_link);
5620 
5621         pthread_mutex_unlock(&ctx_impl->mutex);
5622 
5623         nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
5624 
5625         nxt_unit_awake_ctx(ctx, ctx_impl);
5626 
5627     } nxt_queue_loop;
5628 }
5629 
5630 
5631 static void
nxt_unit_remove_port(nxt_unit_impl_t * lib,nxt_unit_ctx_t * ctx,nxt_unit_port_id_t * port_id)5632 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
5633     nxt_unit_port_id_t *port_id)
5634 {
5635     nxt_unit_port_t       *port;
5636     nxt_unit_port_impl_t  *port_impl;
5637 
5638     pthread_mutex_lock(&lib->mutex);
5639 
5640     port = nxt_unit_remove_port_unsafe(lib, port_id);
5641 
5642     if (nxt_fast_path(port != NULL)) {
5643         port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5644 
5645         nxt_queue_remove(&port_impl->link);
5646     }
5647 
5648     pthread_mutex_unlock(&lib->mutex);
5649 
5650     if (lib->callbacks.remove_port != NULL && port != NULL) {
5651         lib->callbacks.remove_port(&lib->unit, ctx, port);
5652     }
5653 
5654     if (nxt_fast_path(port != NULL)) {
5655         nxt_unit_port_release(port);
5656     }
5657 }
5658 
5659 
5660 static nxt_unit_port_t *
nxt_unit_remove_port_unsafe(nxt_unit_impl_t * lib,nxt_unit_port_id_t * port_id)5661 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5662 {
5663     nxt_unit_port_t  *port;
5664 
5665     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
5666     if (nxt_slow_path(port == NULL)) {
5667         nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
5668                        (int) port_id->pid, (int) port_id->id);
5669 
5670         return NULL;
5671     }
5672 
5673     nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
5674                    (int) port_id->pid, (int) port_id->id,
5675                    port->in_fd, port->out_fd, port->data);
5676 
5677     return port;
5678 }
5679 
5680 
5681 static void
nxt_unit_remove_pid(nxt_unit_impl_t * lib,pid_t pid)5682 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
5683 {
5684     nxt_unit_process_t  *process;
5685 
5686     pthread_mutex_lock(&lib->mutex);
5687 
5688     process = nxt_unit_process_find(lib, pid, 1);
5689     if (nxt_slow_path(process == NULL)) {
5690         nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
5691 
5692         pthread_mutex_unlock(&lib->mutex);
5693 
5694         return;
5695     }
5696 
5697     nxt_unit_remove_process(lib, process);
5698 
5699     if (lib->callbacks.remove_pid != NULL) {
5700         lib->callbacks.remove_pid(&lib->unit, pid);
5701     }
5702 }
5703 
5704 
5705 static void
nxt_unit_remove_process(nxt_unit_impl_t * lib,nxt_unit_process_t * process)5706 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
5707 {
5708     nxt_queue_t           ports;
5709     nxt_unit_port_impl_t  *port;
5710 
5711     nxt_queue_init(&ports);
5712 
5713     nxt_queue_add(&ports, &process->ports);
5714 
5715     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5716 
5717         nxt_unit_remove_port_unsafe(lib, &port->port.id);
5718 
5719     } nxt_queue_loop;
5720 
5721     pthread_mutex_unlock(&lib->mutex);
5722 
5723     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5724 
5725         nxt_queue_remove(&port->link);
5726 
5727         if (lib->callbacks.remove_port != NULL) {
5728             lib->callbacks.remove_port(&lib->unit, NULL, &port->port);
5729         }
5730 
5731         nxt_unit_port_release(&port->port);
5732 
5733     } nxt_queue_loop;
5734 
5735     nxt_unit_process_release(process);
5736 }
5737 
5738 
5739 static void
nxt_unit_quit(nxt_unit_ctx_t * ctx,uint8_t quit_param)5740 nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param)
5741 {
5742     nxt_bool_t                    skip_graceful_broadcast, quit;
5743     nxt_unit_impl_t               *lib;
5744     nxt_unit_ctx_impl_t           *ctx_impl;
5745     nxt_unit_callbacks_t          *cb;
5746     nxt_unit_request_info_t       *req;
5747     nxt_unit_request_info_impl_t  *req_impl;
5748 
5749     struct {
5750         nxt_port_msg_t            msg;
5751         uint8_t                   quit_param;
5752     } nxt_packed m;
5753 
5754     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5755     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5756 
5757     nxt_unit_debug(ctx, "quit: %d/%d/%d", (int) quit_param, ctx_impl->ready,
5758                    ctx_impl->online);
5759 
5760     if (nxt_slow_path(!ctx_impl->online)) {
5761         return;
5762     }
5763 
5764     skip_graceful_broadcast = quit_param == NXT_QUIT_GRACEFUL
5765                               && !ctx_impl->ready;
5766 
5767     cb = &lib->callbacks;
5768 
5769     if (nxt_fast_path(ctx_impl->ready)) {
5770         ctx_impl->ready = 0;
5771 
5772         if (cb->remove_port != NULL) {
5773             cb->remove_port(&lib->unit, ctx, lib->shared_port);
5774         }
5775     }
5776 
5777     if (quit_param == NXT_QUIT_GRACEFUL) {
5778         pthread_mutex_lock(&ctx_impl->mutex);
5779 
5780         quit = nxt_queue_is_empty(&ctx_impl->active_req)
5781                && nxt_queue_is_empty(&ctx_impl->pending_rbuf)
5782                && ctx_impl->wait_items == 0;
5783 
5784         pthread_mutex_unlock(&ctx_impl->mutex);
5785 
5786     } else {
5787         quit = 1;
5788         ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
5789     }
5790 
5791     if (quit) {
5792         ctx_impl->online = 0;
5793 
5794         if (cb->quit != NULL) {
5795             cb->quit(ctx);
5796         }
5797 
5798         nxt_queue_each(req_impl, &ctx_impl->active_req,
5799                        nxt_unit_request_info_impl_t, link)
5800         {
5801             req = &req_impl->req;
5802 
5803             nxt_unit_req_warn(req, "active request on ctx quit");
5804 
5805             if (cb->close_handler) {
5806                 nxt_unit_req_debug(req, "close_handler");
5807 
5808                 cb->close_handler(req);
5809 
5810             } else {
5811                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
5812             }
5813 
5814         } nxt_queue_loop;
5815 
5816         if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5817             nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id);
5818         }
5819     }
5820 
5821     if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) {
5822         return;
5823     }
5824 
5825     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5826 
5827     m.msg.pid = lib->pid;
5828     m.msg.type = _NXT_PORT_MSG_QUIT;
5829     m.quit_param = quit_param;
5830 
5831     pthread_mutex_lock(&lib->mutex);
5832 
5833     nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
5834 
5835         if (ctx == &ctx_impl->ctx
5836             || ctx_impl->read_port == NULL
5837             || ctx_impl->read_port->out_fd == -1)
5838         {
5839             continue;
5840         }
5841 
5842         (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
5843                                   &m, sizeof(m), NULL);
5844 
5845     } nxt_queue_loop;
5846 
5847     pthread_mutex_unlock(&lib->mutex);
5848 }
5849 
5850 
5851 static int
nxt_unit_get_port(nxt_unit_ctx_t * ctx,nxt_unit_port_id_t * port_id)5852 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
5853 {
5854     ssize_t              res;
5855     nxt_unit_impl_t      *lib;
5856     nxt_unit_ctx_impl_t  *ctx_impl;
5857 
5858     struct {
5859         nxt_port_msg_t           msg;
5860         nxt_port_msg_get_port_t  get_port;
5861     } m;
5862 
5863     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5864     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5865 
5866     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5867 
5868     m.msg.pid = lib->pid;
5869     m.msg.reply_port = ctx_impl->read_port->id.id;
5870     m.msg.type = _NXT_PORT_MSG_GET_PORT;
5871 
5872     m.get_port.id = port_id->id;
5873     m.get_port.pid = port_id->pid;
5874 
5875     nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
5876                    (int) port_id->id);
5877 
5878     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
5879     if (nxt_slow_path(res != sizeof(m))) {
5880         return NXT_UNIT_ERROR;
5881     }
5882 
5883     return NXT_UNIT_OK;
5884 }
5885 
5886 
5887 static ssize_t
nxt_unit_port_send(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,const void * buf,size_t buf_size,const nxt_send_oob_t * oob)5888 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5889     const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
5890 {
5891     int                   notify;
5892     ssize_t               ret;
5893     nxt_int_t             rc;
5894     nxt_port_msg_t        msg;
5895     nxt_unit_impl_t       *lib;
5896     nxt_unit_port_impl_t  *port_impl;
5897 
5898     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5899 
5900     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5901     if (port_impl->queue != NULL && (oob == NULL || oob->size == 0)
5902         && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
5903     {
5904         rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
5905         if (nxt_slow_path(rc != NXT_OK)) {
5906             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5907                            (int) port->id.pid, (int) port->id.id);
5908 
5909             return -1;
5910         }
5911 
5912         nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
5913                        (int) port->id.pid, (int) port->id.id,
5914                        (int) buf_size, notify);
5915 
5916         if (notify) {
5917             memcpy(&msg, buf, sizeof(nxt_port_msg_t));
5918 
5919             msg.type = _NXT_PORT_MSG_READ_QUEUE;
5920 
5921             if (lib->callbacks.port_send == NULL) {
5922                 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
5923                                        sizeof(nxt_port_msg_t), NULL);
5924 
5925                 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
5926                                (int) port->id.pid, (int) port->id.id,
5927                                (int) ret);
5928 
5929             } else {
5930                 ret = lib->callbacks.port_send(ctx, port, &msg,
5931                                                sizeof(nxt_port_msg_t), NULL, 0);
5932 
5933                 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
5934                                (int) port->id.pid, (int) port->id.id,
5935                                (int) ret);
5936             }
5937 
5938         }
5939 
5940         return buf_size;
5941     }
5942 
5943     if (port_impl->queue != NULL) {
5944         msg.type = _NXT_PORT_MSG_READ_SOCKET;
5945 
5946         rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
5947         if (nxt_slow_path(rc != NXT_OK)) {
5948             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5949                            (int) port->id.pid, (int) port->id.id);
5950 
5951             return -1;
5952         }
5953 
5954         nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
5955                        (int) port->id.pid, (int) port->id.id, notify);
5956     }
5957 
5958     if (lib->callbacks.port_send != NULL) {
5959         ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
5960                                        oob != NULL ? oob->buf : NULL,
5961                                        oob != NULL ? oob->size : 0);
5962 
5963         nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
5964                        (int) port->id.pid, (int) port->id.id,
5965                        (int) ret);
5966 
5967     } else {
5968         ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, oob);
5969 
5970         nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
5971                        (int) port->id.pid, (int) port->id.id,
5972                        (int) ret);
5973     }
5974 
5975     return ret;
5976 }
5977 
5978 
5979 static ssize_t
nxt_unit_sendmsg(nxt_unit_ctx_t * ctx,int fd,const void * buf,size_t buf_size,const nxt_send_oob_t * oob)5980 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
5981     const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
5982 {
5983     int            err;
5984     ssize_t        n;
5985     struct iovec   iov[1];
5986 
5987     iov[0].iov_base = (void *) buf;
5988     iov[0].iov_len = buf_size;
5989 
5990 retry:
5991 
5992     n = nxt_sendmsg(fd, iov, 1, oob);
5993 
5994     if (nxt_slow_path(n == -1)) {
5995         err = errno;
5996 
5997         if (err == EINTR) {
5998             goto retry;
5999         }
6000 
6001         /*
6002          * FIXME: This should be "alert" after router graceful shutdown
6003          * implementation.
6004          */
6005         nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
6006                       fd, (int) buf_size, strerror(err), err);
6007 
6008     } else {
6009         nxt_unit_debug(ctx, "sendmsg(%d, %d, %d): %d", fd, (int) buf_size,
6010                        (oob != NULL ? (int) oob->size : 0), (int) n);
6011     }
6012 
6013     return n;
6014 }
6015 
6016 
6017 static int
nxt_unit_ctx_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6018 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6019     nxt_unit_read_buf_t *rbuf)
6020 {
6021     int                   res, read;
6022     nxt_unit_port_impl_t  *port_impl;
6023 
6024     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6025 
6026     read = 0;
6027 
6028 retry:
6029 
6030     if (port_impl->from_socket > 0) {
6031         if (port_impl->socket_rbuf != NULL
6032             && port_impl->socket_rbuf->size > 0)
6033         {
6034             port_impl->from_socket--;
6035 
6036             nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
6037             port_impl->socket_rbuf->size = 0;
6038 
6039             nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
6040                            (int) port->id.pid, (int) port->id.id,
6041                            (int) rbuf->size);
6042 
6043             return NXT_UNIT_OK;
6044         }
6045 
6046     } else {
6047         res = nxt_unit_port_queue_recv(port, rbuf);
6048 
6049         if (res == NXT_UNIT_OK) {
6050             if (nxt_unit_is_read_socket(rbuf)) {
6051                 port_impl->from_socket++;
6052 
6053                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
6054                                (int) port->id.pid, (int) port->id.id,
6055                                port_impl->from_socket);
6056 
6057                 goto retry;
6058             }
6059 
6060             nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
6061                            (int) port->id.pid, (int) port->id.id,
6062                            (int) rbuf->size);
6063 
6064             return NXT_UNIT_OK;
6065         }
6066     }
6067 
6068     if (read) {
6069         return NXT_UNIT_AGAIN;
6070     }
6071 
6072     res = nxt_unit_port_recv(ctx, port, rbuf);
6073     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6074         return NXT_UNIT_ERROR;
6075     }
6076 
6077     read = 1;
6078 
6079     if (nxt_unit_is_read_queue(rbuf)) {
6080         nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6081                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6082 
6083         goto retry;
6084     }
6085 
6086     nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
6087                    (int) port->id.pid, (int) port->id.id,
6088                    (int) rbuf->size);
6089 
6090     if (res == NXT_UNIT_AGAIN) {
6091         return NXT_UNIT_AGAIN;
6092     }
6093 
6094     if (port_impl->from_socket > 0) {
6095         port_impl->from_socket--;
6096 
6097         return NXT_UNIT_OK;
6098     }
6099 
6100     nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
6101                    (int) port->id.pid, (int) port->id.id,
6102                    (int) rbuf->size);
6103 
6104     if (port_impl->socket_rbuf == NULL) {
6105         port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
6106 
6107         if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
6108             return NXT_UNIT_ERROR;
6109         }
6110 
6111         port_impl->socket_rbuf->size = 0;
6112     }
6113 
6114     if (port_impl->socket_rbuf->size > 0) {
6115         nxt_unit_alert(ctx, "too many port socket messages");
6116 
6117         return NXT_UNIT_ERROR;
6118     }
6119 
6120     nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
6121 
6122     rbuf->oob.size = 0;
6123 
6124     goto retry;
6125 }
6126 
6127 
6128 nxt_inline void
nxt_unit_rbuf_cpy(nxt_unit_read_buf_t * dst,nxt_unit_read_buf_t * src)6129 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
6130 {
6131     memcpy(dst->buf, src->buf, src->size);
6132     dst->size = src->size;
6133     dst->oob.size = src->oob.size;
6134     memcpy(dst->oob.buf, src->oob.buf, src->oob.size);
6135 }
6136 
6137 
6138 static int
nxt_unit_shared_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6139 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6140     nxt_unit_read_buf_t *rbuf)
6141 {
6142     int                   res;
6143     nxt_unit_port_impl_t  *port_impl;
6144 
6145     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6146 
6147 retry:
6148 
6149     res = nxt_unit_app_queue_recv(ctx, port, rbuf);
6150 
6151     if (res == NXT_UNIT_OK) {
6152         return NXT_UNIT_OK;
6153     }
6154 
6155     if (res == NXT_UNIT_AGAIN) {
6156         res = nxt_unit_port_recv(ctx, port, rbuf);
6157         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6158             return NXT_UNIT_ERROR;
6159         }
6160 
6161         if (nxt_unit_is_read_queue(rbuf)) {
6162             nxt_app_queue_notification_received(port_impl->queue);
6163 
6164             nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6165                            (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6166 
6167             goto retry;
6168         }
6169     }
6170 
6171     return res;
6172 }
6173 
6174 
6175 static int
nxt_unit_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6176 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6177     nxt_unit_read_buf_t *rbuf)
6178 {
6179     int              fd, err;
6180     size_t           oob_size;
6181     struct iovec     iov[1];
6182     nxt_unit_impl_t  *lib;
6183 
6184     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6185 
6186     if (lib->callbacks.port_recv != NULL) {
6187         oob_size = sizeof(rbuf->oob.buf);
6188 
6189         rbuf->size = lib->callbacks.port_recv(ctx, port,
6190                                               rbuf->buf, sizeof(rbuf->buf),
6191                                               rbuf->oob.buf, &oob_size);
6192 
6193         nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
6194                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6195 
6196         if (nxt_slow_path(rbuf->size < 0)) {
6197             return NXT_UNIT_ERROR;
6198         }
6199 
6200         rbuf->oob.size = oob_size;
6201         return NXT_UNIT_OK;
6202     }
6203 
6204     iov[0].iov_base = rbuf->buf;
6205     iov[0].iov_len = sizeof(rbuf->buf);
6206 
6207     fd = port->in_fd;
6208 
6209 retry:
6210 
6211     rbuf->size = nxt_recvmsg(fd, iov, 1, &rbuf->oob);
6212 
6213     if (nxt_slow_path(rbuf->size == -1)) {
6214         err = errno;
6215 
6216         if (err == EINTR) {
6217             goto retry;
6218         }
6219 
6220         if (err == EAGAIN) {
6221             nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
6222                            fd, strerror(err), err);
6223 
6224             return NXT_UNIT_AGAIN;
6225         }
6226 
6227         nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
6228                        fd, strerror(err), err);
6229 
6230         return NXT_UNIT_ERROR;
6231     }
6232 
6233     nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
6234 
6235     return NXT_UNIT_OK;
6236 }
6237 
6238 
6239 static int
nxt_unit_port_queue_recv(nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6240 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6241 {
6242     nxt_unit_port_impl_t  *port_impl;
6243 
6244     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6245 
6246     rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
6247 
6248     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6249 }
6250 
6251 
6252 static int
nxt_unit_app_queue_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6253 nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6254     nxt_unit_read_buf_t *rbuf)
6255 {
6256     uint32_t              cookie;
6257     nxt_port_msg_t        *port_msg;
6258     nxt_app_queue_t       *queue;
6259     nxt_unit_impl_t       *lib;
6260     nxt_unit_port_impl_t  *port_impl;
6261 
6262     struct {
6263         nxt_port_msg_t    msg;
6264         uint8_t           quit_param;
6265     } nxt_packed m;
6266 
6267     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6268     queue = port_impl->queue;
6269 
6270 retry:
6271 
6272     rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
6273 
6274     nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
6275 
6276     if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
6277         port_msg = (nxt_port_msg_t *) rbuf->buf;
6278 
6279         if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
6280             lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6281 
6282             if (lib->request_limit != 0) {
6283                 nxt_atomic_fetch_add(&lib->request_count, 1);
6284 
6285                 if (nxt_slow_path(lib->request_count >= lib->request_limit)) {
6286                     nxt_unit_debug(ctx, "request limit reached");
6287 
6288                     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
6289 
6290                     m.msg.pid = lib->pid;
6291                     m.msg.type = _NXT_PORT_MSG_QUIT;
6292                     m.quit_param = NXT_QUIT_GRACEFUL;
6293 
6294                     (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port,
6295                                               &m, sizeof(m), NULL);
6296                 }
6297             }
6298 
6299             return NXT_UNIT_OK;
6300         }
6301 
6302         nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
6303 
6304         goto retry;
6305     }
6306 
6307     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6308 }
6309 
6310 
6311 nxt_inline int
nxt_unit_close(int fd)6312 nxt_unit_close(int fd)
6313 {
6314     int  res;
6315 
6316     res = close(fd);
6317 
6318     if (nxt_slow_path(res == -1)) {
6319         nxt_unit_alert(NULL, "close(%d) failed: %s (%d)",
6320                        fd, strerror(errno), errno);
6321 
6322     } else {
6323         nxt_unit_debug(NULL, "close(%d): %d", fd, res);
6324     }
6325 
6326     return res;
6327 }
6328 
6329 
6330 static int
nxt_unit_fd_blocking(int fd)6331 nxt_unit_fd_blocking(int fd)
6332 {
6333     int  nb;
6334 
6335     nb = 0;
6336 
6337     if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
6338         nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
6339                        fd, strerror(errno), errno);
6340 
6341         return NXT_UNIT_ERROR;
6342     }
6343 
6344     return NXT_UNIT_OK;
6345 }
6346 
6347 
6348 static nxt_int_t
nxt_unit_port_hash_test(nxt_lvlhsh_query_t * lhq,void * data)6349 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6350 {
6351     nxt_unit_port_t          *port;
6352     nxt_unit_port_hash_id_t  *port_id;
6353 
6354     port = data;
6355     port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
6356 
6357     if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
6358         && port_id->pid == port->id.pid
6359         && port_id->id == port->id.id)
6360     {
6361         return NXT_OK;
6362     }
6363 
6364     return NXT_DECLINED;
6365 }
6366 
6367 
6368 static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
6369     NXT_LVLHSH_DEFAULT,
6370     nxt_unit_port_hash_test,
6371     nxt_unit_lvlhsh_alloc,
6372     nxt_unit_lvlhsh_free,
6373 };
6374 
6375 
6376 static inline void
nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t * lhq,nxt_unit_port_hash_id_t * port_hash_id,nxt_unit_port_id_t * port_id)6377 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
6378     nxt_unit_port_hash_id_t *port_hash_id,
6379     nxt_unit_port_id_t *port_id)
6380 {
6381     port_hash_id->pid = port_id->pid;
6382     port_hash_id->id = port_id->id;
6383 
6384     if (nxt_fast_path(port_id->hash != 0)) {
6385         lhq->key_hash = port_id->hash;
6386 
6387     } else {
6388         lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
6389 
6390         port_id->hash = lhq->key_hash;
6391 
6392         nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
6393                        (int) port_id->pid, (int) port_id->id,
6394                        (int) port_id->hash);
6395     }
6396 
6397     lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
6398     lhq->key.start = (u_char *) port_hash_id;
6399     lhq->proto = &lvlhsh_ports_proto;
6400     lhq->pool = NULL;
6401 }
6402 
6403 
6404 static int
nxt_unit_port_hash_add(nxt_lvlhsh_t * port_hash,nxt_unit_port_t * port)6405 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
6406 {
6407     nxt_int_t                res;
6408     nxt_lvlhsh_query_t       lhq;
6409     nxt_unit_port_hash_id_t  port_hash_id;
6410 
6411     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
6412     lhq.replace = 0;
6413     lhq.value = port;
6414 
6415     res = nxt_lvlhsh_insert(port_hash, &lhq);
6416 
6417     switch (res) {
6418 
6419     case NXT_OK:
6420         return NXT_UNIT_OK;
6421 
6422     default:
6423         return NXT_UNIT_ERROR;
6424     }
6425 }
6426 
6427 
6428 static nxt_unit_port_t *
nxt_unit_port_hash_find(nxt_lvlhsh_t * port_hash,nxt_unit_port_id_t * port_id,int remove)6429 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
6430     int remove)
6431 {
6432     nxt_int_t                res;
6433     nxt_lvlhsh_query_t       lhq;
6434     nxt_unit_port_hash_id_t  port_hash_id;
6435 
6436     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
6437 
6438     if (remove) {
6439         res = nxt_lvlhsh_delete(port_hash, &lhq);
6440 
6441     } else {
6442         res = nxt_lvlhsh_find(port_hash, &lhq);
6443     }
6444 
6445     switch (res) {
6446 
6447     case NXT_OK:
6448         if (!remove) {
6449             nxt_unit_port_use(lhq.value);
6450         }
6451 
6452         return lhq.value;
6453 
6454     default:
6455         return NULL;
6456     }
6457 }
6458 
6459 
6460 static nxt_int_t
nxt_unit_request_hash_test(nxt_lvlhsh_query_t * lhq,void * data)6461 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6462 {
6463     return NXT_OK;
6464 }
6465 
6466 
6467 static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
6468     NXT_LVLHSH_DEFAULT,
6469     nxt_unit_request_hash_test,
6470     nxt_unit_lvlhsh_alloc,
6471     nxt_unit_lvlhsh_free,
6472 };
6473 
6474 
6475 static int
nxt_unit_request_hash_add(nxt_unit_ctx_t * ctx,nxt_unit_request_info_t * req)6476 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
6477     nxt_unit_request_info_t *req)
6478 {
6479     uint32_t                      *stream;
6480     nxt_int_t                     res;
6481     nxt_lvlhsh_query_t            lhq;
6482     nxt_unit_ctx_impl_t           *ctx_impl;
6483     nxt_unit_request_info_impl_t  *req_impl;
6484 
6485     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6486     if (req_impl->in_hash) {
6487         return NXT_UNIT_OK;
6488     }
6489 
6490     stream = &req_impl->stream;
6491 
6492     lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
6493     lhq.key.length = sizeof(*stream);
6494     lhq.key.start = (u_char *) stream;
6495     lhq.proto = &lvlhsh_requests_proto;
6496     lhq.pool = NULL;
6497     lhq.replace = 0;
6498     lhq.value = req_impl;
6499 
6500     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6501 
6502     pthread_mutex_lock(&ctx_impl->mutex);
6503 
6504     res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
6505 
6506     pthread_mutex_unlock(&ctx_impl->mutex);
6507 
6508     switch (res) {
6509 
6510     case NXT_OK:
6511         req_impl->in_hash = 1;
6512         return NXT_UNIT_OK;
6513 
6514     default:
6515         return NXT_UNIT_ERROR;
6516     }
6517 }
6518 
6519 
6520 static nxt_unit_request_info_t *
nxt_unit_request_hash_find(nxt_unit_ctx_t * ctx,uint32_t stream,int remove)6521 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
6522 {
6523     nxt_int_t                     res;
6524     nxt_lvlhsh_query_t            lhq;
6525     nxt_unit_ctx_impl_t           *ctx_impl;
6526     nxt_unit_request_info_impl_t  *req_impl;
6527 
6528     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
6529     lhq.key.length = sizeof(stream);
6530     lhq.key.start = (u_char *) &stream;
6531     lhq.proto = &lvlhsh_requests_proto;
6532     lhq.pool = NULL;
6533 
6534     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6535 
6536     pthread_mutex_lock(&ctx_impl->mutex);
6537 
6538     if (remove) {
6539         res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
6540 
6541     } else {
6542         res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
6543     }
6544 
6545     pthread_mutex_unlock(&ctx_impl->mutex);
6546 
6547     switch (res) {
6548 
6549     case NXT_OK:
6550         req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
6551                                     req);
6552         if (remove) {
6553             req_impl->in_hash = 0;
6554         }
6555 
6556         return lhq.value;
6557 
6558     default:
6559         return NULL;
6560     }
6561 }
6562 
6563 
6564 void
nxt_unit_log(nxt_unit_ctx_t * ctx,int level,const char * fmt,...)6565 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
6566 {
6567     int              log_fd, n;
6568     char             msg[NXT_MAX_ERROR_STR], *p, *end;
6569     pid_t            pid;
6570     va_list          ap;
6571     nxt_unit_impl_t  *lib;
6572 
6573     if (nxt_fast_path(ctx != NULL)) {
6574         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6575 
6576         pid = lib->pid;
6577         log_fd = lib->log_fd;
6578 
6579     } else {
6580         pid = nxt_unit_pid;
6581         log_fd = STDERR_FILENO;
6582     }
6583 
6584     p = msg;
6585     end = p + sizeof(msg) - 1;
6586 
6587     p = nxt_unit_snprint_prefix(p, end, pid, level);
6588 
6589     va_start(ap, fmt);
6590     p += vsnprintf(p, end - p, fmt, ap);
6591     va_end(ap);
6592 
6593     if (nxt_slow_path(p > end)) {
6594         memcpy(end - 5, "[...]", 5);
6595         p = end;
6596     }
6597 
6598     *p++ = '\n';
6599 
6600     n = write(log_fd, msg, p - msg);
6601     if (nxt_slow_path(n < 0)) {
6602         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6603     }
6604 }
6605 
6606 
6607 void
nxt_unit_req_log(nxt_unit_request_info_t * req,int level,const char * fmt,...)6608 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
6609 {
6610     int                           log_fd, n;
6611     char                          msg[NXT_MAX_ERROR_STR], *p, *end;
6612     pid_t                         pid;
6613     va_list                       ap;
6614     nxt_unit_impl_t               *lib;
6615     nxt_unit_request_info_impl_t  *req_impl;
6616 
6617     if (nxt_fast_path(req != NULL)) {
6618         lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
6619 
6620         pid = lib->pid;
6621         log_fd = lib->log_fd;
6622 
6623     } else {
6624         pid = nxt_unit_pid;
6625         log_fd = STDERR_FILENO;
6626     }
6627 
6628     p = msg;
6629     end = p + sizeof(msg) - 1;
6630 
6631     p = nxt_unit_snprint_prefix(p, end, pid, level);
6632 
6633     if (nxt_fast_path(req != NULL)) {
6634         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6635 
6636         p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
6637     }
6638 
6639     va_start(ap, fmt);
6640     p += vsnprintf(p, end - p, fmt, ap);
6641     va_end(ap);
6642 
6643     if (nxt_slow_path(p > end)) {
6644         memcpy(end - 5, "[...]", 5);
6645         p = end;
6646     }
6647 
6648     *p++ = '\n';
6649 
6650     n = write(log_fd, msg, p - msg);
6651     if (nxt_slow_path(n < 0)) {
6652         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6653     }
6654 }
6655 
6656 
6657 static const char * nxt_unit_log_levels[] = {
6658     "alert",
6659     "error",
6660     "warn",
6661     "notice",
6662     "info",
6663     "debug",
6664 };
6665 
6666 
6667 static char *
nxt_unit_snprint_prefix(char * p,char * end,pid_t pid,int level)6668 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
6669 {
6670     struct tm        tm;
6671     struct timespec  ts;
6672 
6673     (void) clock_gettime(CLOCK_REALTIME, &ts);
6674 
6675 #if (NXT_HAVE_LOCALTIME_R)
6676     (void) localtime_r(&ts.tv_sec, &tm);
6677 #else
6678     tm = *localtime(&ts.tv_sec);
6679 #endif
6680 
6681 #if (NXT_DEBUG)
6682     p += snprintf(p, end - p,
6683                   "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
6684                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6685                   tm.tm_hour, tm.tm_min, tm.tm_sec,
6686                   (int) ts.tv_nsec / 1000000);
6687 #else
6688     p += snprintf(p, end - p,
6689                   "%4d/%02d/%02d %02d:%02d:%02d ",
6690                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6691                   tm.tm_hour, tm.tm_min, tm.tm_sec);
6692 #endif
6693 
6694     p += snprintf(p, end - p,
6695                   "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
6696                   (int) pid,
6697                   (uint64_t) (uintptr_t) nxt_thread_get_tid());
6698 
6699     return p;
6700 }
6701 
6702 
6703 static void *
nxt_unit_lvlhsh_alloc(void * data,size_t size)6704 nxt_unit_lvlhsh_alloc(void *data, size_t size)
6705 {
6706     int   err;
6707     void  *p;
6708 
6709     err = posix_memalign(&p, size, size);
6710 
6711     if (nxt_fast_path(err == 0)) {
6712         nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p",
6713                        (int) size, (int) size, p);
6714         return p;
6715     }
6716 
6717     nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)",
6718                    (int) size, (int) size, strerror(err), err);
6719     return NULL;
6720 }
6721 
6722 
6723 static void
nxt_unit_lvlhsh_free(void * data,void * p)6724 nxt_unit_lvlhsh_free(void *data, void *p)
6725 {
6726     nxt_unit_free(NULL, p);
6727 }
6728 
6729 
6730 void *
nxt_unit_malloc(nxt_unit_ctx_t * ctx,size_t size)6731 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
6732 {
6733     void  *p;
6734 
6735     p = malloc(size);
6736 
6737     if (nxt_fast_path(p != NULL)) {
6738 #if (NXT_DEBUG_ALLOC)
6739         nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p);
6740 #endif
6741 
6742     } else {
6743         nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)",
6744                        (int) size, strerror(errno), errno);
6745     }
6746 
6747     return p;
6748 }
6749 
6750 
6751 void
nxt_unit_free(nxt_unit_ctx_t * ctx,void * p)6752 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p)
6753 {
6754 #if (NXT_DEBUG_ALLOC)
6755     nxt_unit_debug(ctx, "free(%p)", p);
6756 #endif
6757 
6758     free(p);
6759 }
6760 
6761 
6762 static int
nxt_unit_memcasecmp(const void * p1,const void * p2,size_t length)6763 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length)
6764 {
6765     u_char        c1, c2;
6766     nxt_int_t     n;
6767     const u_char  *s1, *s2;
6768 
6769     s1 = p1;
6770     s2 = p2;
6771 
6772     while (length-- != 0) {
6773         c1 = *s1++;
6774         c2 = *s2++;
6775 
6776         c1 = nxt_lowcase(c1);
6777         c2 = nxt_lowcase(c2);
6778 
6779         n = c1 - c2;
6780 
6781         if (n != 0) {
6782             return n;
6783         }
6784     }
6785 
6786     return 0;
6787 }
6788