1 /*-------------------------------------------------------------------------
2 *
3 * shm_mq.c
4 * single-reader, single-writer shared memory message queue
5 *
6 * Both the sender and the receiver must have a PGPROC; their respective
7 * process latches are used for synchronization. Only the sender may send,
8 * and only the receiver may receive. This is intended to allow a user
9 * backend to communicate with worker backends that it has registered.
10 *
11 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
13 *
14 * src/include/storage/shm_mq.h
15 *
16 *-------------------------------------------------------------------------
17 */
18
19 #include "postgres.h"
20
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postmaster/bgworker.h"
24 #include "storage/procsignal.h"
25 #include "storage/shm_mq.h"
26 #include "storage/spin.h"
27 #include "utils/memutils.h"
28
29 /*
30 * This structure represents the actual queue, stored in shared memory.
31 *
32 * Some notes on synchronization:
33 *
34 * mq_receiver and mq_bytes_read can only be changed by the receiver; and
35 * mq_sender and mq_bytes_written can only be changed by the sender. However,
36 * because most of these fields are 8 bytes and we don't assume that 8 byte
37 * reads and writes are atomic, the spinlock must be taken whenever the field
38 * is updated, and whenever it is read by a process other than the one allowed
39 * to modify it. But the process that is allowed to modify it is also allowed
40 * to read it without the lock. On architectures where 8-byte writes are
41 * atomic, we could replace these spinlocks with memory barriers, but
42 * testing found no performance benefit, so it seems best to keep things
43 * simple for now.
44 *
45 * mq_detached can be set by either the sender or the receiver, so the mutex
46 * must be held to read or write it. Memory barriers could be used here as
47 * well, if needed.
48 *
49 * mq_ring_size and mq_ring_offset never change after initialization, and
50 * can therefore be read without the lock.
51 *
52 * Importantly, mq_ring can be safely read and written without a lock. Were
53 * this not the case, we'd have to hold the spinlock for much longer
54 * intervals, and performance might suffer. Fortunately, that's not
55 * necessary. At any given time, the difference between mq_bytes_read and
56 * mq_bytes_written defines the number of bytes within mq_ring that contain
57 * unread data, and mq_bytes_read defines the position where those bytes
58 * begin. The sender can increase the number of unread bytes at any time,
59 * but only the receiver can give license to overwrite those bytes, by
60 * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
61 * the unread bytes it knows to be present without the lock. Conversely,
62 * the sender can write to the unused portion of the ring buffer without
63 * the lock, because nobody else can be reading or writing those bytes. The
64 * receiver could be making more bytes unused by incrementing mq_bytes_read,
65 * but that's OK. Note that it would be unsafe for the receiver to read any
66 * data it's already marked as read, or to write any data; and it would be
67 * unsafe for the sender to reread any data after incrementing
68 * mq_bytes_written, but fortunately there's no need for any of that.
69 */
70 struct shm_mq
71 {
72 slock_t mq_mutex;
73 PGPROC *mq_receiver;
74 PGPROC *mq_sender;
75 uint64 mq_bytes_read;
76 uint64 mq_bytes_written;
77 Size mq_ring_size;
78 bool mq_detached;
79 uint8 mq_ring_offset;
80 char mq_ring[FLEXIBLE_ARRAY_MEMBER];
81 };
82
83 /*
84 * This structure is a backend-private handle for access to a queue.
85 *
86 * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
87 * an optional pointer to the dynamic shared memory segment that contains it.
88 * (If mqh_segment is provided, we register an on_dsm_detach callback to
89 * make sure we detach from the queue before detaching from DSM.)
90 *
91 * If this queue is intended to connect the current process with a background
92 * worker that started it, the user can pass a pointer to the worker handle
93 * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
94 * is to allow us to begin sending to or receiving from that queue before the
95 * process we'll be communicating with has even been started. If it fails
96 * to start, the handle will allow us to notice that and fail cleanly, rather
97 * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
98 * simple cases - e.g. where there are just 2 processes communicating; in
99 * more complex scenarios, every process may not have a BackgroundWorkerHandle
100 * available, or may need to watch for the failure of more than one other
101 * process at a time.
102 *
103 * When a message exists as a contiguous chunk of bytes in the queue - that is,
104 * it is smaller than the size of the ring buffer and does not wrap around
105 * the end - we return the message to the caller as a pointer into the buffer.
106 * For messages that are larger or happen to wrap, we reassemble the message
107 * locally by copying the chunks into a backend-local buffer. mqh_buffer is
108 * the buffer, and mqh_buflen is the number of bytes allocated for it.
109 *
110 * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
111 * are used to track the state of non-blocking operations. When the caller
112 * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
113 * are expected to retry the call at a later time with the same argument;
114 * we need to retain enough state to pick up where we left off.
115 * mqh_length_word_complete tracks whether we are done sending or receiving
116 * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
117 * the number of bytes read or written for either the length word or the
118 * message itself, and mqh_expected_bytes - which is used only for reads -
119 * tracks the expected total size of the payload.
120 *
121 * mqh_counterparty_attached tracks whether we know the counterparty to have
122 * attached to the queue at some previous point. This lets us avoid some
123 * mutex acquisitions.
124 *
125 * mqh_context is the memory context in effect at the time we attached to
126 * the shm_mq. The shm_mq_handle itself is allocated in this context, and
127 * we make sure any other allocations we do happen in this context as well,
128 * to avoid nasty surprises.
129 */
130 struct shm_mq_handle
131 {
132 shm_mq *mqh_queue;
133 dsm_segment *mqh_segment;
134 BackgroundWorkerHandle *mqh_handle;
135 char *mqh_buffer;
136 Size mqh_buflen;
137 Size mqh_consume_pending;
138 Size mqh_partial_bytes;
139 Size mqh_expected_bytes;
140 bool mqh_length_word_complete;
141 bool mqh_counterparty_attached;
142 MemoryContext mqh_context;
143 };
144
145 static void shm_mq_detach_internal(shm_mq *mq);
146 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
147 const void *data, bool nowait, Size *bytes_written);
148 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
149 bool nowait, Size *nbytesp, void **datap);
150 static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
151 BackgroundWorkerHandle *handle);
152 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
153 BackgroundWorkerHandle *handle);
154 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
155 static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
156 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
157 static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
158 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
159 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
160
161 /* Minimum queue size is enough for header and at least one chunk of data. */
162 const Size shm_mq_minimum_size =
163 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
164
165 #define MQH_INITIAL_BUFSIZE 8192
166
167 /*
168 * Initialize a new shared message queue.
169 */
170 shm_mq *
shm_mq_create(void * address,Size size)171 shm_mq_create(void *address, Size size)
172 {
173 shm_mq *mq = address;
174 Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
175
176 /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
177 size = MAXALIGN_DOWN(size);
178
179 /* Queue size must be large enough to hold some data. */
180 Assert(size > data_offset);
181
182 /* Initialize queue header. */
183 SpinLockInit(&mq->mq_mutex);
184 mq->mq_receiver = NULL;
185 mq->mq_sender = NULL;
186 mq->mq_bytes_read = 0;
187 mq->mq_bytes_written = 0;
188 mq->mq_ring_size = size - data_offset;
189 mq->mq_detached = false;
190 mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
191
192 return mq;
193 }
194
195 /*
196 * Set the identity of the process that will receive from a shared message
197 * queue.
198 */
199 void
shm_mq_set_receiver(shm_mq * mq,PGPROC * proc)200 shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
201 {
202 volatile shm_mq *vmq = mq;
203 PGPROC *sender;
204
205 SpinLockAcquire(&mq->mq_mutex);
206 Assert(vmq->mq_receiver == NULL);
207 vmq->mq_receiver = proc;
208 sender = vmq->mq_sender;
209 SpinLockRelease(&mq->mq_mutex);
210
211 if (sender != NULL)
212 SetLatch(&sender->procLatch);
213 }
214
215 /*
216 * Set the identity of the process that will send to a shared message queue.
217 */
218 void
shm_mq_set_sender(shm_mq * mq,PGPROC * proc)219 shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
220 {
221 volatile shm_mq *vmq = mq;
222 PGPROC *receiver;
223
224 SpinLockAcquire(&mq->mq_mutex);
225 Assert(vmq->mq_sender == NULL);
226 vmq->mq_sender = proc;
227 receiver = vmq->mq_receiver;
228 SpinLockRelease(&mq->mq_mutex);
229
230 if (receiver != NULL)
231 SetLatch(&receiver->procLatch);
232 }
233
234 /*
235 * Get the configured receiver.
236 */
237 PGPROC *
shm_mq_get_receiver(shm_mq * mq)238 shm_mq_get_receiver(shm_mq *mq)
239 {
240 volatile shm_mq *vmq = mq;
241 PGPROC *receiver;
242
243 SpinLockAcquire(&mq->mq_mutex);
244 receiver = vmq->mq_receiver;
245 SpinLockRelease(&mq->mq_mutex);
246
247 return receiver;
248 }
249
250 /*
251 * Get the configured sender.
252 */
253 PGPROC *
shm_mq_get_sender(shm_mq * mq)254 shm_mq_get_sender(shm_mq *mq)
255 {
256 volatile shm_mq *vmq = mq;
257 PGPROC *sender;
258
259 SpinLockAcquire(&mq->mq_mutex);
260 sender = vmq->mq_sender;
261 SpinLockRelease(&mq->mq_mutex);
262
263 return sender;
264 }
265
266 /*
267 * Attach to a shared message queue so we can send or receive messages.
268 *
269 * The memory context in effect at the time this function is called should
270 * be one which will last for at least as long as the message queue itself.
271 * We'll allocate the handle in that context, and future allocations that
272 * are needed to buffer incoming data will happen in that context as well.
273 *
274 * If seg != NULL, the queue will be automatically detached when that dynamic
275 * shared memory segment is detached.
276 *
277 * If handle != NULL, the queue can be read or written even before the
278 * other process has attached. We'll wait for it to do so if needed. The
279 * handle must be for a background worker initialized with bgw_notify_pid
280 * equal to our PID.
281 *
282 * shm_mq_detach() should be called when done. This will free the
283 * shm_mq_handle and mark the queue itself as detached, so that our
284 * counterpart won't get stuck waiting for us to fill or drain the queue
285 * after we've already lost interest.
286 */
287 shm_mq_handle *
shm_mq_attach(shm_mq * mq,dsm_segment * seg,BackgroundWorkerHandle * handle)288 shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
289 {
290 shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
291
292 Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
293 mqh->mqh_queue = mq;
294 mqh->mqh_segment = seg;
295 mqh->mqh_handle = handle;
296 mqh->mqh_buffer = NULL;
297 mqh->mqh_buflen = 0;
298 mqh->mqh_consume_pending = 0;
299 mqh->mqh_partial_bytes = 0;
300 mqh->mqh_expected_bytes = 0;
301 mqh->mqh_length_word_complete = false;
302 mqh->mqh_counterparty_attached = false;
303 mqh->mqh_context = CurrentMemoryContext;
304
305 if (seg != NULL)
306 on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
307
308 return mqh;
309 }
310
311 /*
312 * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
313 * been passed to shm_mq_attach.
314 */
315 void
shm_mq_set_handle(shm_mq_handle * mqh,BackgroundWorkerHandle * handle)316 shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
317 {
318 Assert(mqh->mqh_handle == NULL);
319 mqh->mqh_handle = handle;
320 }
321
322 /*
323 * Write a message into a shared message queue.
324 */
325 shm_mq_result
shm_mq_send(shm_mq_handle * mqh,Size nbytes,const void * data,bool nowait)326 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
327 {
328 shm_mq_iovec iov;
329
330 iov.data = data;
331 iov.len = nbytes;
332
333 return shm_mq_sendv(mqh, &iov, 1, nowait);
334 }
335
336 /*
337 * Write a message into a shared message queue, gathered from multiple
338 * addresses.
339 *
340 * When nowait = false, we'll wait on our process latch when the ring buffer
341 * fills up, and then continue writing once the receiver has drained some data.
342 * The process latch is reset after each wait.
343 *
344 * When nowait = true, we do not manipulate the state of the process latch;
345 * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
346 * this case, the caller should call this function again, with the same
347 * arguments, each time the process latch is set. (Once begun, the sending
348 * of a message cannot be aborted except by detaching from the queue; changing
349 * the length or payload will corrupt the queue.)
350 */
351 shm_mq_result
shm_mq_sendv(shm_mq_handle * mqh,shm_mq_iovec * iov,int iovcnt,bool nowait)352 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
353 {
354 shm_mq_result res;
355 shm_mq *mq = mqh->mqh_queue;
356 Size nbytes = 0;
357 Size bytes_written;
358 int i;
359 int which_iov = 0;
360 Size offset;
361
362 Assert(mq->mq_sender == MyProc);
363
364 /* Compute total size of write. */
365 for (i = 0; i < iovcnt; ++i)
366 nbytes += iov[i].len;
367
368 /* Prevent writing messages overwhelming the receiver. */
369 if (nbytes > MaxAllocSize)
370 ereport(ERROR,
371 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
372 errmsg("cannot send a message of size %zu via shared memory queue",
373 nbytes)));
374
375 /* Try to write, or finish writing, the length word into the buffer. */
376 while (!mqh->mqh_length_word_complete)
377 {
378 Assert(mqh->mqh_partial_bytes < sizeof(Size));
379 res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
380 ((char *) &nbytes) + mqh->mqh_partial_bytes,
381 nowait, &bytes_written);
382
383 if (res == SHM_MQ_DETACHED)
384 {
385 /* Reset state in case caller tries to send another message. */
386 mqh->mqh_partial_bytes = 0;
387 mqh->mqh_length_word_complete = false;
388 return res;
389 }
390 mqh->mqh_partial_bytes += bytes_written;
391
392 if (mqh->mqh_partial_bytes >= sizeof(Size))
393 {
394 Assert(mqh->mqh_partial_bytes == sizeof(Size));
395
396 mqh->mqh_partial_bytes = 0;
397 mqh->mqh_length_word_complete = true;
398 }
399
400 if (res != SHM_MQ_SUCCESS)
401 return res;
402
403 /* Length word can't be split unless bigger than required alignment. */
404 Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
405 }
406
407 /* Write the actual data bytes into the buffer. */
408 Assert(mqh->mqh_partial_bytes <= nbytes);
409 offset = mqh->mqh_partial_bytes;
410 do
411 {
412 Size chunksize;
413
414 /* Figure out which bytes need to be sent next. */
415 if (offset >= iov[which_iov].len)
416 {
417 offset -= iov[which_iov].len;
418 ++which_iov;
419 if (which_iov >= iovcnt)
420 break;
421 continue;
422 }
423
424 /*
425 * We want to avoid copying the data if at all possible, but every
426 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
427 * the last. Thus, if a chunk other than the last one ends on a
428 * non-MAXALIGN'd boundary, we have to combine the tail end of its
429 * data with data from one or more following chunks until we either
430 * reach the last chunk or accumulate a number of bytes which is
431 * MAXALIGN'd.
432 */
433 if (which_iov + 1 < iovcnt &&
434 offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
435 {
436 char tmpbuf[MAXIMUM_ALIGNOF];
437 int j = 0;
438
439 for (;;)
440 {
441 if (offset < iov[which_iov].len)
442 {
443 tmpbuf[j] = iov[which_iov].data[offset];
444 j++;
445 offset++;
446 if (j == MAXIMUM_ALIGNOF)
447 break;
448 }
449 else
450 {
451 offset -= iov[which_iov].len;
452 which_iov++;
453 if (which_iov >= iovcnt)
454 break;
455 }
456 }
457
458 res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
459
460 if (res == SHM_MQ_DETACHED)
461 {
462 /* Reset state in case caller tries to send another message. */
463 mqh->mqh_partial_bytes = 0;
464 mqh->mqh_length_word_complete = false;
465 return res;
466 }
467
468 mqh->mqh_partial_bytes += bytes_written;
469 if (res != SHM_MQ_SUCCESS)
470 return res;
471 continue;
472 }
473
474 /*
475 * If this is the last chunk, we can write all the data, even if it
476 * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
477 * MAXALIGN_DOWN the write size.
478 */
479 chunksize = iov[which_iov].len - offset;
480 if (which_iov + 1 < iovcnt)
481 chunksize = MAXALIGN_DOWN(chunksize);
482 res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
483 nowait, &bytes_written);
484
485 if (res == SHM_MQ_DETACHED)
486 {
487 /* Reset state in case caller tries to send another message. */
488 mqh->mqh_length_word_complete = false;
489 mqh->mqh_partial_bytes = 0;
490 return res;
491 }
492
493 mqh->mqh_partial_bytes += bytes_written;
494 offset += bytes_written;
495 if (res != SHM_MQ_SUCCESS)
496 return res;
497 } while (mqh->mqh_partial_bytes < nbytes);
498
499 /* Reset for next message. */
500 mqh->mqh_partial_bytes = 0;
501 mqh->mqh_length_word_complete = false;
502
503 /* Notify receiver of the newly-written data, and return. */
504 return shm_mq_notify_receiver(mq);
505 }
506
507 /*
508 * Receive a message from a shared message queue.
509 *
510 * We set *nbytes to the message length and *data to point to the message
511 * payload. If the entire message exists in the queue as a single,
512 * contiguous chunk, *data will point directly into shared memory; otherwise,
513 * it will point to a temporary buffer. This mostly avoids data copying in
514 * the hoped-for case where messages are short compared to the buffer size,
515 * while still allowing longer messages. In either case, the return value
516 * remains valid until the next receive operation is performed on the queue.
517 *
518 * When nowait = false, we'll wait on our process latch when the ring buffer
519 * is empty and we have not yet received a full message. The sender will
520 * set our process latch after more data has been written, and we'll resume
521 * processing. Each call will therefore return a complete message
522 * (unless the sender detaches the queue).
523 *
524 * When nowait = true, we do not manipulate the state of the process latch;
525 * instead, whenever the buffer is empty and we need to read from it, we
526 * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
527 * function again after the process latch has been set.
528 */
529 shm_mq_result
shm_mq_receive(shm_mq_handle * mqh,Size * nbytesp,void ** datap,bool nowait)530 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
531 {
532 shm_mq *mq = mqh->mqh_queue;
533 shm_mq_result res;
534 Size rb = 0;
535 Size nbytes;
536 void *rawdata;
537
538 Assert(mq->mq_receiver == MyProc);
539
540 /* We can't receive data until the sender has attached. */
541 if (!mqh->mqh_counterparty_attached)
542 {
543 if (nowait)
544 {
545 int counterparty_gone;
546
547 /*
548 * We shouldn't return at this point at all unless the sender
549 * hasn't attached yet. However, the correct return value depends
550 * on whether the sender is still attached. If we first test
551 * whether the sender has ever attached and then test whether the
552 * sender has detached, there's a race condition: a sender that
553 * attaches and detaches very quickly might fool us into thinking
554 * the sender never attached at all. So, test whether our
555 * counterparty is definitively gone first, and only afterwards
556 * check whether the sender ever attached in the first place.
557 */
558 counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
559 if (shm_mq_get_sender(mq) == NULL)
560 {
561 if (counterparty_gone)
562 return SHM_MQ_DETACHED;
563 else
564 return SHM_MQ_WOULD_BLOCK;
565 }
566 }
567 else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
568 && shm_mq_get_sender(mq) == NULL)
569 {
570 mq->mq_detached = true;
571 return SHM_MQ_DETACHED;
572 }
573 mqh->mqh_counterparty_attached = true;
574 }
575
576 /* Consume any zero-copy data from previous receive operation. */
577 if (mqh->mqh_consume_pending > 0)
578 {
579 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
580 mqh->mqh_consume_pending = 0;
581 }
582
583 /* Try to read, or finish reading, the length word from the buffer. */
584 while (!mqh->mqh_length_word_complete)
585 {
586 /* Try to receive the message length word. */
587 Assert(mqh->mqh_partial_bytes < sizeof(Size));
588 res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
589 nowait, &rb, &rawdata);
590 if (res != SHM_MQ_SUCCESS)
591 return res;
592
593 /*
594 * Hopefully, we'll receive the entire message length word at once.
595 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
596 * multiple reads.
597 */
598 if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
599 {
600 Size needed;
601
602 nbytes = *(Size *) rawdata;
603
604 /* If we've already got the whole message, we're done. */
605 needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
606 if (rb >= needed)
607 {
608 /*
609 * Technically, we could consume the message length
610 * information at this point, but the extra write to shared
611 * memory wouldn't be free and in most cases we would reap no
612 * benefit.
613 */
614 mqh->mqh_consume_pending = needed;
615 *nbytesp = nbytes;
616 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
617 return SHM_MQ_SUCCESS;
618 }
619
620 /*
621 * We don't have the whole message, but we at least have the whole
622 * length word.
623 */
624 mqh->mqh_expected_bytes = nbytes;
625 mqh->mqh_length_word_complete = true;
626 shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
627 rb -= MAXALIGN(sizeof(Size));
628 }
629 else
630 {
631 Size lengthbytes;
632
633 /* Can't be split unless bigger than required alignment. */
634 Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
635
636 /* Message word is split; need buffer to reassemble. */
637 if (mqh->mqh_buffer == NULL)
638 {
639 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
640 MQH_INITIAL_BUFSIZE);
641 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
642 }
643 Assert(mqh->mqh_buflen >= sizeof(Size));
644
645 /* Copy and consume partial length word. */
646 if (mqh->mqh_partial_bytes + rb > sizeof(Size))
647 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
648 else
649 lengthbytes = rb;
650 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
651 lengthbytes);
652 mqh->mqh_partial_bytes += lengthbytes;
653 shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
654 rb -= lengthbytes;
655
656 /* If we now have the whole word, we're ready to read payload. */
657 if (mqh->mqh_partial_bytes >= sizeof(Size))
658 {
659 Assert(mqh->mqh_partial_bytes == sizeof(Size));
660 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
661 mqh->mqh_length_word_complete = true;
662 mqh->mqh_partial_bytes = 0;
663 }
664 }
665 }
666 nbytes = mqh->mqh_expected_bytes;
667
668 /*
669 * Should be disallowed on the sending side already, but better check and
670 * error out on the receiver side as well rather than trying to read a
671 * prohibitively large message.
672 */
673 if (nbytes > MaxAllocSize)
674 ereport(ERROR,
675 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
676 errmsg("invalid message size %zu in shared memory queue",
677 nbytes)));
678
679 if (mqh->mqh_partial_bytes == 0)
680 {
681 /*
682 * Try to obtain the whole message in a single chunk. If this works,
683 * we need not copy the data and can return a pointer directly into
684 * shared memory.
685 */
686 res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
687 if (res != SHM_MQ_SUCCESS)
688 return res;
689 if (rb >= nbytes)
690 {
691 mqh->mqh_length_word_complete = false;
692 mqh->mqh_consume_pending = MAXALIGN(nbytes);
693 *nbytesp = nbytes;
694 *datap = rawdata;
695 return SHM_MQ_SUCCESS;
696 }
697
698 /*
699 * The message has wrapped the buffer. We'll need to copy it in order
700 * to return it to the client in one chunk. First, make sure we have
701 * a large enough buffer available.
702 */
703 if (mqh->mqh_buflen < nbytes)
704 {
705 Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
706
707 /*
708 * Double the buffer size until the payload fits, but limit to
709 * MaxAllocSize.
710 */
711 while (newbuflen < nbytes)
712 newbuflen *= 2;
713 newbuflen = Min(newbuflen, MaxAllocSize);
714
715 if (mqh->mqh_buffer != NULL)
716 {
717 pfree(mqh->mqh_buffer);
718 mqh->mqh_buffer = NULL;
719 mqh->mqh_buflen = 0;
720 }
721 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
722 mqh->mqh_buflen = newbuflen;
723 }
724 }
725
726 /* Loop until we've copied the entire message. */
727 for (;;)
728 {
729 Size still_needed;
730
731 /* Copy as much as we can. */
732 Assert(mqh->mqh_partial_bytes + rb <= nbytes);
733 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
734 mqh->mqh_partial_bytes += rb;
735
736 /*
737 * Update count of bytes read, with alignment padding. Note that this
738 * will never actually insert any padding except at the end of a
739 * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
740 * and each read and write is as well.
741 */
742 Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
743 shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
744
745 /* If we got all the data, exit the loop. */
746 if (mqh->mqh_partial_bytes >= nbytes)
747 break;
748
749 /* Wait for some more data. */
750 still_needed = nbytes - mqh->mqh_partial_bytes;
751 res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
752 if (res != SHM_MQ_SUCCESS)
753 return res;
754 if (rb > still_needed)
755 rb = still_needed;
756 }
757
758 /* Return the complete message, and reset for next message. */
759 *nbytesp = nbytes;
760 *datap = mqh->mqh_buffer;
761 mqh->mqh_length_word_complete = false;
762 mqh->mqh_partial_bytes = 0;
763 return SHM_MQ_SUCCESS;
764 }
765
766 /*
767 * Wait for the other process that's supposed to use this queue to attach
768 * to it.
769 *
770 * The return value is SHM_MQ_DETACHED if the worker has already detached or
771 * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
772 * Note that we will only be able to detect that the worker has died before
773 * attaching if a background worker handle was passed to shm_mq_attach().
774 */
775 shm_mq_result
shm_mq_wait_for_attach(shm_mq_handle * mqh)776 shm_mq_wait_for_attach(shm_mq_handle *mqh)
777 {
778 shm_mq *mq = mqh->mqh_queue;
779 PGPROC **victim;
780
781 if (shm_mq_get_receiver(mq) == MyProc)
782 victim = &mq->mq_sender;
783 else
784 {
785 Assert(shm_mq_get_sender(mq) == MyProc);
786 victim = &mq->mq_receiver;
787 }
788
789 if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
790 return SHM_MQ_SUCCESS;
791 else
792 return SHM_MQ_DETACHED;
793 }
794
795 /*
796 * Detach from a shared message queue, and destroy the shm_mq_handle.
797 */
798 void
shm_mq_detach(shm_mq_handle * mqh)799 shm_mq_detach(shm_mq_handle *mqh)
800 {
801 /* Notify counterparty that we're outta here. */
802 shm_mq_detach_internal(mqh->mqh_queue);
803
804 /* Cancel on_dsm_detach callback, if any. */
805 if (mqh->mqh_segment)
806 cancel_on_dsm_detach(mqh->mqh_segment,
807 shm_mq_detach_callback,
808 PointerGetDatum(mqh->mqh_queue));
809
810 /* Release local memory associated with handle. */
811 if (mqh->mqh_buffer != NULL)
812 pfree(mqh->mqh_buffer);
813 pfree(mqh);
814 }
815
816 /*
817 * Notify counterparty that we're detaching from shared message queue.
818 *
819 * The purpose of this function is to make sure that the process
820 * with which we're communicating doesn't block forever waiting for us to
821 * fill or drain the queue once we've lost interest. When the sender
822 * detaches, the receiver can read any messages remaining in the queue;
823 * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
824 * further attempts to send messages will likewise return SHM_MQ_DETACHED.
825 *
826 * This is separated out from shm_mq_detach() because if the on_dsm_detach
827 * callback fires, we only want to do this much. We do not try to touch
828 * the local shm_mq_handle, as it may have been pfree'd already.
829 */
830 static void
shm_mq_detach_internal(shm_mq * mq)831 shm_mq_detach_internal(shm_mq *mq)
832 {
833 volatile shm_mq *vmq = mq;
834 PGPROC *victim;
835
836 SpinLockAcquire(&mq->mq_mutex);
837 if (vmq->mq_sender == MyProc)
838 victim = vmq->mq_receiver;
839 else
840 {
841 Assert(vmq->mq_receiver == MyProc);
842 victim = vmq->mq_sender;
843 }
844 vmq->mq_detached = true;
845 SpinLockRelease(&mq->mq_mutex);
846
847 if (victim != NULL)
848 SetLatch(&victim->procLatch);
849 }
850
851 /*
852 * Get the shm_mq from handle.
853 */
854 shm_mq *
shm_mq_get_queue(shm_mq_handle * mqh)855 shm_mq_get_queue(shm_mq_handle *mqh)
856 {
857 return mqh->mqh_queue;
858 }
859
860 /*
861 * Write bytes into a shared message queue.
862 */
863 static shm_mq_result
shm_mq_send_bytes(shm_mq_handle * mqh,Size nbytes,const void * data,bool nowait,Size * bytes_written)864 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
865 bool nowait, Size *bytes_written)
866 {
867 shm_mq *mq = mqh->mqh_queue;
868 Size sent = 0;
869 uint64 used;
870 Size ringsize = mq->mq_ring_size;
871 Size available;
872
873 while (sent < nbytes)
874 {
875 bool detached;
876 uint64 rb;
877
878 /* Compute number of ring buffer bytes used and available. */
879 rb = shm_mq_get_bytes_read(mq, &detached);
880 Assert(mq->mq_bytes_written >= rb);
881 used = mq->mq_bytes_written - rb;
882 Assert(used <= ringsize);
883 available = Min(ringsize - used, nbytes - sent);
884
885 /* Bail out if the queue has been detached. */
886 if (detached)
887 {
888 *bytes_written = sent;
889 return SHM_MQ_DETACHED;
890 }
891
892 if (available == 0 && !mqh->mqh_counterparty_attached)
893 {
894 /*
895 * The queue is full, so if the receiver isn't yet known to be
896 * attached, we must wait for that to happen.
897 */
898 if (nowait)
899 {
900 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
901 {
902 *bytes_written = sent;
903 return SHM_MQ_DETACHED;
904 }
905 if (shm_mq_get_receiver(mq) == NULL)
906 {
907 *bytes_written = sent;
908 return SHM_MQ_WOULD_BLOCK;
909 }
910 }
911 else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
912 mqh->mqh_handle))
913 {
914 mq->mq_detached = true;
915 *bytes_written = sent;
916 return SHM_MQ_DETACHED;
917 }
918 mqh->mqh_counterparty_attached = true;
919
920 /*
921 * The receiver may have read some data after attaching, so we
922 * must not wait without rechecking the queue state.
923 */
924 }
925 else if (available == 0)
926 {
927 shm_mq_result res;
928
929 /* Let the receiver know that we need them to read some data. */
930 res = shm_mq_notify_receiver(mq);
931 if (res != SHM_MQ_SUCCESS)
932 {
933 *bytes_written = sent;
934 return res;
935 }
936
937 /* Skip manipulation of our latch if nowait = true. */
938 if (nowait)
939 {
940 *bytes_written = sent;
941 return SHM_MQ_WOULD_BLOCK;
942 }
943
944 /*
945 * Wait for our latch to be set. It might already be set for some
946 * unrelated reason, but that'll just result in one extra trip
947 * through the loop. It's worth it to avoid resetting the latch
948 * at top of loop, because setting an already-set latch is much
949 * cheaper than setting one that has been reset.
950 */
951 WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
952
953 /* Reset the latch so we don't spin. */
954 ResetLatch(MyLatch);
955
956 /* An interrupt may have occurred while we were waiting. */
957 CHECK_FOR_INTERRUPTS();
958 }
959 else
960 {
961 Size offset = mq->mq_bytes_written % (uint64) ringsize;
962 Size sendnow = Min(available, ringsize - offset);
963
964 /* Write as much data as we can via a single memcpy(). */
965 memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
966 (char *) data + sent, sendnow);
967 sent += sendnow;
968
969 /*
970 * Update count of bytes written, with alignment padding. Note
971 * that this will never actually insert any padding except at the
972 * end of a run of bytes, because the buffer size is a multiple of
973 * MAXIMUM_ALIGNOF, and each read is as well.
974 */
975 Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
976 shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
977
978 /*
979 * For efficiency, we don't set the reader's latch here. We'll do
980 * that only when the buffer fills up or after writing an entire
981 * message.
982 */
983 }
984 }
985
986 *bytes_written = sent;
987 return SHM_MQ_SUCCESS;
988 }
989
990 /*
991 * Wait until at least *nbytesp bytes are available to be read from the
992 * shared message queue, or until the buffer wraps around. If the queue is
993 * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
994 * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
995 * to the location at which data bytes can be read, *nbytesp is set to the
996 * number of bytes which can be read at that address, and the return value
997 * is SHM_MQ_SUCCESS.
998 */
999 static shm_mq_result
shm_mq_receive_bytes(shm_mq * mq,Size bytes_needed,bool nowait,Size * nbytesp,void ** datap)1000 shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
1001 Size *nbytesp, void **datap)
1002 {
1003 Size ringsize = mq->mq_ring_size;
1004 uint64 used;
1005 uint64 written;
1006
1007 for (;;)
1008 {
1009 Size offset;
1010 bool detached;
1011
1012 /* Get bytes written, so we can compute what's available to read. */
1013 written = shm_mq_get_bytes_written(mq, &detached);
1014 used = written - mq->mq_bytes_read;
1015 Assert(used <= ringsize);
1016 offset = mq->mq_bytes_read % (uint64) ringsize;
1017
1018 /* If we have enough data or buffer has wrapped, we're done. */
1019 if (used >= bytes_needed || offset + used >= ringsize)
1020 {
1021 *nbytesp = Min(used, ringsize - offset);
1022 *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1023 return SHM_MQ_SUCCESS;
1024 }
1025
1026 /*
1027 * Fall out before waiting if the queue has been detached.
1028 *
1029 * Note that we don't check for this until *after* considering whether
1030 * the data already available is enough, since the receiver can finish
1031 * receiving a message stored in the buffer even after the sender has
1032 * detached.
1033 */
1034 if (detached)
1035 return SHM_MQ_DETACHED;
1036
1037 /* Skip manipulation of our latch if nowait = true. */
1038 if (nowait)
1039 return SHM_MQ_WOULD_BLOCK;
1040
1041 /*
1042 * Wait for our latch to be set. It might already be set for some
1043 * unrelated reason, but that'll just result in one extra trip through
1044 * the loop. It's worth it to avoid resetting the latch at top of
1045 * loop, because setting an already-set latch is much cheaper than
1046 * setting one that has been reset.
1047 */
1048 WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_RECEIVE);
1049
1050 /* Reset the latch so we don't spin. */
1051 ResetLatch(MyLatch);
1052
1053 /* An interrupt may have occurred while we were waiting. */
1054 CHECK_FOR_INTERRUPTS();
1055 }
1056 }
1057
1058 /*
1059 * Test whether a counterparty who may not even be alive yet is definitely gone.
1060 */
1061 static bool
shm_mq_counterparty_gone(volatile shm_mq * mq,BackgroundWorkerHandle * handle)1062 shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
1063 {
1064 bool detached;
1065 pid_t pid;
1066
1067 /* Acquire the lock just long enough to check the pointer. */
1068 SpinLockAcquire(&mq->mq_mutex);
1069 detached = mq->mq_detached;
1070 SpinLockRelease(&mq->mq_mutex);
1071
1072 /* If the queue has been detached, counterparty is definitely gone. */
1073 if (detached)
1074 return true;
1075
1076 /* If there's a handle, check worker status. */
1077 if (handle != NULL)
1078 {
1079 BgwHandleStatus status;
1080
1081 /* Check for unexpected worker death. */
1082 status = GetBackgroundWorkerPid(handle, &pid);
1083 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1084 {
1085 /* Mark it detached, just to make it official. */
1086 SpinLockAcquire(&mq->mq_mutex);
1087 mq->mq_detached = true;
1088 SpinLockRelease(&mq->mq_mutex);
1089 return true;
1090 }
1091 }
1092
1093 /* Counterparty is not definitively gone. */
1094 return false;
1095 }
1096
1097 /*
1098 * This is used when a process is waiting for its counterpart to attach to the
1099 * queue. We exit when the other process attaches as expected, or, if
1100 * handle != NULL, when the referenced background process or the postmaster
1101 * dies. Note that if handle == NULL, and the process fails to attach, we'll
1102 * potentially get stuck here forever waiting for a process that may never
1103 * start. We do check for interrupts, though.
1104 *
1105 * ptr is a pointer to the memory address that we're expecting to become
1106 * non-NULL when our counterpart attaches to the queue.
1107 */
1108 static bool
shm_mq_wait_internal(volatile shm_mq * mq,PGPROC * volatile * ptr,BackgroundWorkerHandle * handle)1109 shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
1110 BackgroundWorkerHandle *handle)
1111 {
1112 bool result = false;
1113
1114 for (;;)
1115 {
1116 BgwHandleStatus status;
1117 pid_t pid;
1118 bool detached;
1119
1120 /* Acquire the lock just long enough to check the pointer. */
1121 SpinLockAcquire(&mq->mq_mutex);
1122 detached = mq->mq_detached;
1123 result = (*ptr != NULL);
1124 SpinLockRelease(&mq->mq_mutex);
1125
1126 /* Fail if detached; else succeed if initialized. */
1127 if (detached)
1128 {
1129 result = false;
1130 break;
1131 }
1132 if (result)
1133 break;
1134
1135 if (handle != NULL)
1136 {
1137 /* Check for unexpected worker death. */
1138 status = GetBackgroundWorkerPid(handle, &pid);
1139 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1140 {
1141 result = false;
1142 break;
1143 }
1144 }
1145
1146 /* Wait to be signalled. */
1147 WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_INTERNAL);
1148
1149 /* Reset the latch so we don't spin. */
1150 ResetLatch(MyLatch);
1151
1152 /* An interrupt may have occurred while we were waiting. */
1153 CHECK_FOR_INTERRUPTS();
1154 }
1155
1156 return result;
1157 }
1158
1159 /*
1160 * Get the number of bytes read. The receiver need not use this to access
1161 * the count of bytes read, but the sender must.
1162 */
1163 static uint64
shm_mq_get_bytes_read(volatile shm_mq * mq,bool * detached)1164 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
1165 {
1166 uint64 v;
1167
1168 SpinLockAcquire(&mq->mq_mutex);
1169 v = mq->mq_bytes_read;
1170 *detached = mq->mq_detached;
1171 SpinLockRelease(&mq->mq_mutex);
1172
1173 return v;
1174 }
1175
1176 /*
1177 * Increment the number of bytes read.
1178 */
1179 static void
shm_mq_inc_bytes_read(volatile shm_mq * mq,Size n)1180 shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
1181 {
1182 PGPROC *sender;
1183
1184 SpinLockAcquire(&mq->mq_mutex);
1185 mq->mq_bytes_read += n;
1186 sender = mq->mq_sender;
1187 SpinLockRelease(&mq->mq_mutex);
1188
1189 /* We shouldn't have any bytes to read without a sender. */
1190 Assert(sender != NULL);
1191 SetLatch(&sender->procLatch);
1192 }
1193
1194 /*
1195 * Get the number of bytes written. The sender need not use this to access
1196 * the count of bytes written, but the receiver must.
1197 */
1198 static uint64
shm_mq_get_bytes_written(volatile shm_mq * mq,bool * detached)1199 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
1200 {
1201 uint64 v;
1202
1203 SpinLockAcquire(&mq->mq_mutex);
1204 v = mq->mq_bytes_written;
1205 *detached = mq->mq_detached;
1206 SpinLockRelease(&mq->mq_mutex);
1207
1208 return v;
1209 }
1210
1211 /*
1212 * Increment the number of bytes written.
1213 */
1214 static void
shm_mq_inc_bytes_written(volatile shm_mq * mq,Size n)1215 shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
1216 {
1217 SpinLockAcquire(&mq->mq_mutex);
1218 mq->mq_bytes_written += n;
1219 SpinLockRelease(&mq->mq_mutex);
1220 }
1221
1222 /*
1223 * Set receiver's latch, unless queue is detached.
1224 */
1225 static shm_mq_result
shm_mq_notify_receiver(volatile shm_mq * mq)1226 shm_mq_notify_receiver(volatile shm_mq *mq)
1227 {
1228 PGPROC *receiver;
1229 bool detached;
1230
1231 SpinLockAcquire(&mq->mq_mutex);
1232 detached = mq->mq_detached;
1233 receiver = mq->mq_receiver;
1234 SpinLockRelease(&mq->mq_mutex);
1235
1236 if (detached)
1237 return SHM_MQ_DETACHED;
1238 if (receiver)
1239 SetLatch(&receiver->procLatch);
1240 return SHM_MQ_SUCCESS;
1241 }
1242
1243 /* Shim for on_dsm_callback. */
1244 static void
shm_mq_detach_callback(dsm_segment * seg,Datum arg)1245 shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1246 {
1247 shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1248
1249 shm_mq_detach_internal(mq);
1250 }
1251