1
2 /*
3 * Copyright (C) Max Romanov
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9 #if (NXT_HAVE_MEMFD_CREATE)
10
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14
15 #endif
16
17 #include <nxt_port_memory_int.h>
18
19
20 static void nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port,
21 void *data);
22
23
24 nxt_inline void
nxt_port_mmap_handler_use(nxt_port_mmap_handler_t * mmap_handler,int i)25 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
26 {
27 int c;
28
29 c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
30
31 if (i < 0 && c == -i) {
32 if (mmap_handler->hdr != NULL) {
33 nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
34 mmap_handler->hdr = NULL;
35 }
36
37 if (mmap_handler->fd != -1) {
38 nxt_fd_close(mmap_handler->fd);
39 }
40
41 nxt_free(mmap_handler);
42 }
43 }
44
45
46 static nxt_port_mmap_t *
nxt_port_mmap_at(nxt_port_mmaps_t * port_mmaps,uint32_t i)47 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
48 {
49 uint32_t cap;
50
51 cap = port_mmaps->cap;
52
53 if (cap == 0) {
54 cap = i + 1;
55 }
56
57 while (i + 1 > cap) {
58
59 if (cap < 16) {
60 cap = cap * 2;
61
62 } else {
63 cap = cap + cap / 2;
64 }
65 }
66
67 if (cap != port_mmaps->cap) {
68
69 port_mmaps->elts = nxt_realloc(port_mmaps->elts,
70 cap * sizeof(nxt_port_mmap_t));
71 if (nxt_slow_path(port_mmaps->elts == NULL)) {
72 return NULL;
73 }
74
75 nxt_memzero(port_mmaps->elts + port_mmaps->cap,
76 sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
77
78 port_mmaps->cap = cap;
79 }
80
81 if (i + 1 > port_mmaps->size) {
82 port_mmaps->size = i + 1;
83 }
84
85 return port_mmaps->elts + i;
86 }
87
88
89 void
nxt_port_mmaps_destroy(nxt_port_mmaps_t * port_mmaps,nxt_bool_t free_elts)90 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
91 {
92 uint32_t i;
93 nxt_port_mmap_t *port_mmap;
94
95 if (port_mmaps == NULL) {
96 return;
97 }
98
99 port_mmap = port_mmaps->elts;
100
101 for (i = 0; i < port_mmaps->size; i++) {
102 nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
103 }
104
105 port_mmaps->size = 0;
106
107 if (free_elts != 0) {
108 nxt_free(port_mmaps->elts);
109 }
110 }
111
112
113 #define nxt_port_mmap_free_junk(p, size) \
114 memset((p), 0xA5, size)
115
116
117 static void
nxt_port_mmap_buf_completion(nxt_task_t * task,void * obj,void * data)118 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
119 {
120 u_char *p;
121 nxt_mp_t *mp;
122 nxt_buf_t *b, *next;
123 nxt_process_t *process;
124 nxt_chunk_id_t c;
125 nxt_port_mmap_header_t *hdr;
126 nxt_port_mmap_handler_t *mmap_handler;
127
128 if (nxt_buf_ts_handle(task, obj, data)) {
129 return;
130 }
131
132 b = obj;
133
134 nxt_assert(data == b->parent);
135
136 mmap_handler = data;
137
138 complete_buf:
139
140 hdr = mmap_handler->hdr;
141
142 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
143 nxt_debug(task, "mmap buf completion: mmap for other process pair "
144 "%PI->%PI", hdr->src_pid, hdr->dst_pid);
145
146 goto release_buf;
147 }
148
149 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
150 /*
151 * Chunks until b->mem.pos has been sent to other side,
152 * let's release rest (if any).
153 */
154 p = b->mem.pos - 1;
155 c = nxt_port_mmap_chunk_id(hdr, p) + 1;
156 p = nxt_port_mmap_chunk_start(hdr, c);
157
158 } else {
159 p = b->mem.start;
160 c = nxt_port_mmap_chunk_id(hdr, p);
161 }
162
163 nxt_port_mmap_free_junk(p, b->mem.end - p);
164
165 nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), "
166 "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
167 b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
168
169 while (p < b->mem.end) {
170 nxt_port_mmap_set_chunk_free(hdr->free_map, c);
171
172 p += PORT_MMAP_CHUNK_SIZE;
173 c++;
174 }
175
176 if (hdr->dst_pid == nxt_pid
177 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
178 {
179 process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
180
181 nxt_process_broadcast_shm_ack(task, process);
182 }
183
184 release_buf:
185
186 nxt_port_mmap_handler_use(mmap_handler, -1);
187
188 next = b->next;
189 mp = b->data;
190
191 nxt_mp_free(mp, b);
192 nxt_mp_release(mp);
193
194 if (next != NULL) {
195 b = next;
196 mmap_handler = b->parent;
197
198 goto complete_buf;
199 }
200 }
201
202
203 nxt_port_mmap_handler_t *
nxt_port_incoming_port_mmap(nxt_task_t * task,nxt_process_t * process,nxt_fd_t fd)204 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
205 nxt_fd_t fd)
206 {
207 void *mem;
208 struct stat mmap_stat;
209 nxt_port_mmap_t *port_mmap;
210 nxt_port_mmap_header_t *hdr;
211 nxt_port_mmap_handler_t *mmap_handler;
212
213 nxt_debug(task, "got new mmap fd #%FD from process %PI",
214 fd, process->pid);
215
216 port_mmap = NULL;
217
218 if (fstat(fd, &mmap_stat) == -1) {
219 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
220
221 return NULL;
222 }
223
224 mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
225 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
226
227 if (nxt_slow_path(mem == MAP_FAILED)) {
228 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
229
230 return NULL;
231 }
232
233 hdr = mem;
234
235 if (nxt_slow_path(hdr->src_pid != process->pid
236 || hdr->dst_pid != nxt_pid))
237 {
238 nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: "
239 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid,
240 hdr->dst_pid, nxt_pid);
241
242 nxt_mem_munmap(mem, PORT_MMAP_SIZE);
243
244 return NULL;
245 }
246
247 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
248 if (nxt_slow_path(mmap_handler == NULL)) {
249 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
250
251 nxt_mem_munmap(mem, PORT_MMAP_SIZE);
252
253 return NULL;
254 }
255
256 mmap_handler->hdr = hdr;
257 mmap_handler->fd = -1;
258
259 nxt_thread_mutex_lock(&process->incoming.mutex);
260
261 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
262 if (nxt_slow_path(port_mmap == NULL)) {
263 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
264
265 nxt_mem_munmap(mem, PORT_MMAP_SIZE);
266
267 nxt_free(mmap_handler);
268 mmap_handler = NULL;
269
270 goto fail;
271 }
272
273 port_mmap->mmap_handler = mmap_handler;
274 nxt_port_mmap_handler_use(mmap_handler, 1);
275
276 hdr->sent_over = 0xFFFFu;
277
278 fail:
279
280 nxt_thread_mutex_unlock(&process->incoming.mutex);
281
282 return mmap_handler;
283 }
284
285
286 static nxt_port_mmap_handler_t *
nxt_port_new_port_mmap(nxt_task_t * task,nxt_port_mmaps_t * mmaps,nxt_bool_t tracking,nxt_int_t n)287 nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
288 nxt_bool_t tracking, nxt_int_t n)
289 {
290 void *mem;
291 nxt_fd_t fd;
292 nxt_int_t i;
293 nxt_free_map_t *free_map;
294 nxt_port_mmap_t *port_mmap;
295 nxt_port_mmap_header_t *hdr;
296 nxt_port_mmap_handler_t *mmap_handler;
297
298 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
299 if (nxt_slow_path(mmap_handler == NULL)) {
300 nxt_alert(task, "failed to allocate mmap_handler");
301
302 return NULL;
303 }
304
305 port_mmap = nxt_port_mmap_at(mmaps, mmaps->size);
306 if (nxt_slow_path(port_mmap == NULL)) {
307 nxt_alert(task, "failed to add port mmap to mmaps array");
308
309 nxt_free(mmap_handler);
310 return NULL;
311 }
312
313 fd = nxt_shm_open(task, PORT_MMAP_SIZE);
314 if (nxt_slow_path(fd == -1)) {
315 goto remove_fail;
316 }
317
318 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
319 MAP_SHARED, fd, 0);
320
321 if (nxt_slow_path(mem == MAP_FAILED)) {
322 nxt_fd_close(fd);
323 goto remove_fail;
324 }
325
326 mmap_handler->hdr = mem;
327 mmap_handler->fd = fd;
328 port_mmap->mmap_handler = mmap_handler;
329 nxt_port_mmap_handler_use(mmap_handler, 1);
330
331 /* Init segment header. */
332 hdr = mmap_handler->hdr;
333
334 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
335 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
336
337 hdr->id = mmaps->size - 1;
338 hdr->src_pid = nxt_pid;
339 hdr->sent_over = 0xFFFFu;
340
341 /* Mark first chunk as busy */
342 free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
343
344 for (i = 0; i < n; i++) {
345 nxt_port_mmap_set_chunk_busy(free_map, i);
346 }
347
348 /* Mark as busy chunk followed the last available chunk. */
349 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
350 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
351
352 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...",
353 hdr->id, nxt_pid);
354
355 return mmap_handler;
356
357 remove_fail:
358
359 nxt_free(mmap_handler);
360
361 mmaps->size--;
362
363 return NULL;
364 }
365
366
367 nxt_int_t
nxt_shm_open(nxt_task_t * task,size_t size)368 nxt_shm_open(nxt_task_t *task, size_t size)
369 {
370 nxt_fd_t fd;
371
372 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
373
374 u_char *p, name[64];
375
376 p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
377 nxt_pid, nxt_random(&task->thread->random));
378 *p = '\0';
379
380 #endif
381
382 #if (NXT_HAVE_MEMFD_CREATE)
383
384 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
385
386 if (nxt_slow_path(fd == -1)) {
387 nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
388
389 return -1;
390 }
391
392 nxt_debug(task, "memfd_create(%s): %FD", name, fd);
393
394 #elif (NXT_HAVE_SHM_OPEN_ANON)
395
396 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
397
398 if (nxt_slow_path(fd == -1)) {
399 nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
400
401 return -1;
402 }
403
404 nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
405
406 #elif (NXT_HAVE_SHM_OPEN)
407
408 /* Just in case. */
409 shm_unlink((char *) name);
410
411 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
412
413 if (nxt_slow_path(fd == -1)) {
414 nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
415
416 return -1;
417 }
418
419 nxt_debug(task, "shm_open(%s): %FD", name, fd);
420
421 if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
422 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
423 nxt_errno);
424 }
425
426 #else
427
428 #error No working shared memory implementation.
429
430 #endif
431
432 if (nxt_slow_path(ftruncate(fd, size) == -1)) {
433 nxt_alert(task, "ftruncate() failed %E", nxt_errno);
434
435 nxt_fd_close(fd);
436
437 return -1;
438 }
439
440 return fd;
441 }
442
443
444 static nxt_port_mmap_handler_t *
nxt_port_mmap_get(nxt_task_t * task,nxt_port_mmaps_t * mmaps,nxt_chunk_id_t * c,nxt_int_t n,nxt_bool_t tracking)445 nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c,
446 nxt_int_t n, nxt_bool_t tracking)
447 {
448 nxt_int_t i, res, nchunks;
449 nxt_free_map_t *free_map;
450 nxt_port_mmap_t *port_mmap;
451 nxt_port_mmap_t *end_port_mmap;
452 nxt_port_mmap_header_t *hdr;
453 nxt_port_mmap_handler_t *mmap_handler;
454
455 nxt_thread_mutex_lock(&mmaps->mutex);
456
457 end_port_mmap = mmaps->elts + mmaps->size;
458
459 for (port_mmap = mmaps->elts;
460 port_mmap < end_port_mmap;
461 port_mmap++)
462 {
463 mmap_handler = port_mmap->mmap_handler;
464 hdr = mmap_handler->hdr;
465
466 if (hdr->sent_over != 0xFFFFu) {
467 continue;
468 }
469
470 *c = 0;
471
472 free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
473
474 while (nxt_port_mmap_get_free_chunk(free_map, c)) {
475 nchunks = 1;
476
477 while (nchunks < n) {
478 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks);
479
480 if (res == 0) {
481 for (i = 0; i < nchunks; i++) {
482 nxt_port_mmap_set_chunk_free(free_map, *c + i);
483 }
484
485 *c += nchunks + 1;
486 nchunks = 0;
487 break;
488 }
489
490 nchunks++;
491 }
492
493 if (nchunks == n) {
494 goto unlock_return;
495 }
496 }
497
498 hdr->oosm = 1;
499 }
500
501 /* TODO introduce port_mmap limit and release wait. */
502
503 *c = 0;
504 mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n);
505
506 unlock_return:
507
508 nxt_thread_mutex_unlock(&mmaps->mutex);
509
510 return mmap_handler;
511 }
512
513
514 static nxt_port_mmap_handler_t *
nxt_port_get_port_incoming_mmap(nxt_task_t * task,nxt_pid_t spid,uint32_t id)515 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
516 {
517 nxt_process_t *process;
518 nxt_port_mmap_handler_t *mmap_handler;
519
520 process = nxt_runtime_process_find(task->thread->runtime, spid);
521 if (nxt_slow_path(process == NULL)) {
522 return NULL;
523 }
524
525 nxt_thread_mutex_lock(&process->incoming.mutex);
526
527 if (nxt_fast_path(process->incoming.size > id)) {
528 mmap_handler = process->incoming.elts[id].mmap_handler;
529
530 } else {
531 mmap_handler = NULL;
532
533 nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
534 }
535
536 nxt_thread_mutex_unlock(&process->incoming.mutex);
537
538 return mmap_handler;
539 }
540
541
542 nxt_int_t
nxt_port_mmap_get_tracking(nxt_task_t * task,nxt_port_mmaps_t * mmaps,nxt_port_mmap_tracking_t * tracking,uint32_t stream)543 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
544 nxt_port_mmap_tracking_t *tracking, uint32_t stream)
545 {
546 nxt_chunk_id_t c;
547 nxt_port_mmap_header_t *hdr;
548 nxt_port_mmap_handler_t *mmap_handler;
549
550 nxt_debug(task, "request tracking for stream #%uD", stream);
551
552 mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1);
553 if (nxt_slow_path(mmap_handler == NULL)) {
554 return NXT_ERROR;
555 }
556
557 nxt_port_mmap_handler_use(mmap_handler, 1);
558
559 hdr = mmap_handler->hdr;
560
561 tracking->mmap_handler = mmap_handler;
562 tracking->tracking = hdr->tracking + c;
563
564 *tracking->tracking = stream;
565
566 nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d",
567 hdr->src_pid, hdr->dst_pid, hdr->id, c);
568
569 return NXT_OK;
570 }
571
572
573 nxt_bool_t
nxt_port_mmap_tracking_cancel(nxt_task_t * task,nxt_port_mmap_tracking_t * tracking,uint32_t stream)574 nxt_port_mmap_tracking_cancel(nxt_task_t *task,
575 nxt_port_mmap_tracking_t *tracking, uint32_t stream)
576 {
577 nxt_bool_t res;
578 nxt_chunk_id_t c;
579 nxt_port_mmap_header_t *hdr;
580 nxt_port_mmap_handler_t *mmap_handler;
581
582 mmap_handler = tracking->mmap_handler;
583
584 if (nxt_slow_path(mmap_handler == NULL)) {
585 return 0;
586 }
587
588 hdr = mmap_handler->hdr;
589
590 res = nxt_atomic_cmp_set(tracking->tracking, stream, 0);
591
592 nxt_debug(task, "%s tracking for stream #%uD",
593 (res ? "cancelled" : "failed to cancel"), stream);
594
595 if (!res) {
596 c = tracking->tracking - hdr->tracking;
597 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
598 }
599
600 nxt_port_mmap_handler_use(mmap_handler, -1);
601
602 return res;
603 }
604
605
606 nxt_int_t
nxt_port_mmap_tracking_write(uint32_t * buf,nxt_port_mmap_tracking_t * t)607 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t)
608 {
609 nxt_port_mmap_handler_t *mmap_handler;
610
611 mmap_handler = t->mmap_handler;
612
613 #if (NXT_DEBUG)
614 {
615 nxt_atomic_t *tracking;
616
617 tracking = mmap_handler->hdr->tracking;
618
619 nxt_assert(t->tracking >= tracking);
620 nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT);
621 }
622 #endif
623
624 buf[0] = mmap_handler->hdr->id;
625 buf[1] = t->tracking - mmap_handler->hdr->tracking;
626
627 return NXT_OK;
628 }
629
630 nxt_bool_t
nxt_port_mmap_tracking_read(nxt_task_t * task,nxt_port_recv_msg_t * msg)631 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
632 {
633 nxt_buf_t *b;
634 nxt_bool_t res;
635 nxt_chunk_id_t c;
636 nxt_port_mmap_header_t *hdr;
637 nxt_port_mmap_handler_t *mmap_handler;
638 nxt_port_mmap_tracking_msg_t *tracking_msg;
639
640 b = msg->buf;
641
642 if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
643 nxt_debug(task, "too small message %O", nxt_buf_used_size(b));
644 return 0;
645 }
646
647 tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos;
648
649 b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t);
650 mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid,
651 tracking_msg->mmap_id);
652
653 if (nxt_slow_path(mmap_handler == NULL)) {
654 return 0;
655 }
656
657 hdr = mmap_handler->hdr;
658
659 c = tracking_msg->tracking_id;
660 res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0);
661
662 nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream,
663 (res ? "received" : "already cancelled"));
664
665 if (!res) {
666 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
667 }
668
669 return res;
670 }
671
672
673 nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t * task,nxt_port_mmaps_t * mmaps,size_t size)674 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size)
675 {
676 nxt_mp_t *mp;
677 nxt_buf_t *b;
678 nxt_int_t nchunks;
679 nxt_chunk_id_t c;
680 nxt_port_mmap_header_t *hdr;
681 nxt_port_mmap_handler_t *mmap_handler;
682
683 nxt_debug(task, "request %z bytes shm buffer", size);
684
685 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
686
687 if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) {
688 nxt_alert(task, "requested buffer (%z) too big", size);
689
690 return NULL;
691 }
692
693 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
694 if (nxt_slow_path(b == NULL)) {
695 return NULL;
696 }
697
698 b->completion_handler = nxt_port_mmap_buf_completion;
699 nxt_buf_set_port_mmap(b);
700
701 mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0);
702 if (nxt_slow_path(mmap_handler == NULL)) {
703 mp = task->thread->engine->mem_pool;
704 nxt_mp_free(mp, b);
705 nxt_mp_release(mp);
706 return NULL;
707 }
708
709 b->parent = mmap_handler;
710
711 nxt_port_mmap_handler_use(mmap_handler, 1);
712
713 hdr = mmap_handler->hdr;
714
715 b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
716 b->mem.pos = b->mem.start;
717 b->mem.free = b->mem.start;
718 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
719
720 nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
721 b, b->mem.start, b->mem.end - b->mem.start,
722 hdr->src_pid, hdr->dst_pid, hdr->id, c);
723
724 return b;
725 }
726
727
728 nxt_int_t
nxt_port_mmap_increase_buf(nxt_task_t * task,nxt_buf_t * b,size_t size,size_t min_size)729 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
730 size_t min_size)
731 {
732 size_t nchunks, free_size;
733 nxt_chunk_id_t c, start;
734 nxt_port_mmap_header_t *hdr;
735 nxt_port_mmap_handler_t *mmap_handler;
736
737 nxt_debug(task, "request increase %z bytes shm buffer", size);
738
739 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
740 nxt_log(task, NXT_LOG_WARN,
741 "failed to increase, not a mmap buffer");
742 return NXT_ERROR;
743 }
744
745 free_size = nxt_buf_mem_free_size(&b->mem);
746
747 if (nxt_slow_path(size <= free_size)) {
748 return NXT_OK;
749 }
750
751 mmap_handler = b->parent;
752 hdr = mmap_handler->hdr;
753
754 start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
755
756 size -= free_size;
757
758 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
759
760 c = start;
761
762 /* Try to acquire as much chunks as required. */
763 while (nchunks > 0) {
764
765 if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
766 break;
767 }
768
769 c++;
770 nchunks--;
771 }
772
773 if (nchunks != 0
774 && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
775 {
776 c--;
777 while (c >= start) {
778 nxt_port_mmap_set_chunk_free(hdr->free_map, c);
779 c--;
780 }
781
782 nxt_debug(task, "failed to increase, %uz chunks busy", nchunks);
783
784 return NXT_ERROR;
785
786 } else {
787 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
788
789 return NXT_OK;
790 }
791 }
792
793
794 static nxt_buf_t *
nxt_port_mmap_get_incoming_buf(nxt_task_t * task,nxt_port_t * port,nxt_pid_t spid,nxt_port_mmap_msg_t * mmap_msg)795 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
796 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
797 {
798 size_t nchunks;
799 nxt_buf_t *b;
800 nxt_port_mmap_header_t *hdr;
801 nxt_port_mmap_handler_t *mmap_handler;
802
803 mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
804 mmap_msg->mmap_id);
805 if (nxt_slow_path(mmap_handler == NULL)) {
806 return NULL;
807 }
808
809 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
810 if (nxt_slow_path(b == NULL)) {
811 return NULL;
812 }
813
814 b->completion_handler = nxt_port_mmap_buf_completion;
815
816 nxt_buf_set_port_mmap(b);
817
818 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
819 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
820 nchunks++;
821 }
822
823 hdr = mmap_handler->hdr;
824
825 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
826 b->mem.pos = b->mem.start;
827 b->mem.free = b->mem.start + mmap_msg->size;
828 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
829
830 b->parent = mmap_handler;
831 nxt_port_mmap_handler_use(mmap_handler, 1);
832
833 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
834 b, b->mem.start, b->mem.end - b->mem.start,
835 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
836
837 return b;
838 }
839
840
841 void
nxt_port_mmap_write(nxt_task_t * task,nxt_port_t * port,nxt_port_send_msg_t * msg,nxt_sendbuf_coalesce_t * sb,void * mmsg_buf)842 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
843 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf)
844 {
845 size_t bsize;
846 nxt_buf_t *bmem;
847 nxt_uint_t i;
848 nxt_port_mmap_msg_t *mmap_msg;
849 nxt_port_mmap_header_t *hdr;
850 nxt_port_mmap_handler_t *mmap_handler;
851
852 nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
853 "via shared memory", sb->size, port->pid);
854
855 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
856 mmap_msg = mmsg_buf;
857
858 bmem = msg->buf;
859
860 for (i = 0; i < sb->niov; i++, mmap_msg++) {
861
862 /* Lookup buffer which starts current iov_base. */
863 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
864 bmem = bmem->next;
865 }
866
867 if (nxt_slow_path(bmem == NULL)) {
868 nxt_log_error(NXT_LOG_ERR, task->log,
869 "failed to find buf for iobuf[%d]", i);
870 return;
871 /* TODO clear b and exit */
872 }
873
874 mmap_handler = bmem->parent;
875 hdr = mmap_handler->hdr;
876
877 mmap_msg->mmap_id = hdr->id;
878 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
879 mmap_msg->size = sb->iobuf[i].iov_len;
880
881 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
882 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
883 port->pid);
884 }
885
886 sb->iobuf[0].iov_base = mmsg_buf;
887 sb->iobuf[0].iov_len = bsize;
888 sb->niov = 1;
889 sb->size = bsize;
890
891 msg->port_msg.mmap = 1;
892 }
893
894
895 void
nxt_port_mmap_read(nxt_task_t * task,nxt_port_recv_msg_t * msg)896 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
897 {
898 nxt_buf_t *b, **pb;
899 nxt_port_mmap_msg_t *end, *mmap_msg;
900
901 pb = &msg->buf;
902 msg->size = 0;
903
904 for (b = msg->buf; b != NULL; b = b->next) {
905
906 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
907 end = (nxt_port_mmap_msg_t *) b->mem.free;
908
909 while (mmap_msg < end) {
910 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
911 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
912 msg->port_msg.pid);
913
914 *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
915 msg->port_msg.pid, mmap_msg);
916 if (nxt_slow_path(*pb == NULL)) {
917 nxt_log_error(NXT_LOG_ERR, task->log,
918 "failed to get mmap buffer");
919
920 break;
921 }
922
923 msg->size += mmap_msg->size;
924 pb = &(*pb)->next;
925 mmap_msg++;
926
927 /* Mark original buf as complete. */
928 b->mem.pos += sizeof(nxt_port_mmap_msg_t);
929 }
930 }
931 }
932
933
934 nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t * task,nxt_port_t * port,nxt_buf_t * b)935 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
936 {
937 nxt_port_method_t m;
938
939 m = NXT_PORT_METHOD_ANY;
940
941 for (/* void */; b != NULL; b = b->next) {
942 if (nxt_buf_used_size(b) == 0) {
943 /* empty buffers does not affect method */
944 continue;
945 }
946
947 if (nxt_buf_is_port_mmap(b)) {
948 if (m == NXT_PORT_METHOD_PLAIN) {
949 nxt_log_error(NXT_LOG_ERR, task->log,
950 "mixing plain and mmap buffers, "
951 "using plain mode");
952
953 break;
954 }
955
956 if (m == NXT_PORT_METHOD_ANY) {
957 nxt_debug(task, "using mmap mode");
958
959 m = NXT_PORT_METHOD_MMAP;
960 }
961 } else {
962 if (m == NXT_PORT_METHOD_MMAP) {
963 nxt_log_error(NXT_LOG_ERR, task->log,
964 "mixing mmap and plain buffers, "
965 "switching to plain mode");
966
967 m = NXT_PORT_METHOD_PLAIN;
968
969 break;
970 }
971
972 if (m == NXT_PORT_METHOD_ANY) {
973 nxt_debug(task, "using plain mode");
974
975 m = NXT_PORT_METHOD_PLAIN;
976 }
977 }
978 }
979
980 return m;
981 }
982
983
984 void
nxt_process_broadcast_shm_ack(nxt_task_t * task,nxt_process_t * process)985 nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process)
986 {
987 nxt_port_t *port;
988
989 if (nxt_slow_path(process == NULL || nxt_queue_is_empty(&process->ports)))
990 {
991 return;
992 }
993
994 port = nxt_process_port_first(process);
995
996 if (port->type == NXT_PROCESS_APP) {
997 nxt_port_post(task, port, nxt_port_broadcast_shm_ack, process);
998 }
999 }
1000
1001
1002 static void
nxt_port_broadcast_shm_ack(nxt_task_t * task,nxt_port_t * port,void * data)1003 nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, void *data)
1004 {
1005 nxt_process_t *process;
1006
1007 process = data;
1008
1009 nxt_queue_each(port, &process->ports, nxt_port_t, link) {
1010 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
1011 -1, 0, 0, NULL);
1012 } nxt_queue_loop;
1013 }
1014