1 /*-------------------------------------------------------------------------
2 *
3 * shm_mq.c
4 * single-reader, single-writer shared memory message queue
5 *
6 * Both the sender and the receiver must have a PGPROC; their respective
7 * process latches are used for synchronization. Only the sender may send,
8 * and only the receiver may receive. This is intended to allow a user
9 * backend to communicate with worker backends that it has registered.
10 *
11 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
13 *
14 * src/include/storage/shm_mq.h
15 *
16 *-------------------------------------------------------------------------
17 */
18
19 #include "postgres.h"
20
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postmaster/bgworker.h"
24 #include "storage/procsignal.h"
25 #include "storage/shm_mq.h"
26 #include "storage/spin.h"
27 #include "utils/memutils.h"
28
29 /*
30 * This structure represents the actual queue, stored in shared memory.
31 *
32 * Some notes on synchronization:
33 *
34 * mq_receiver and mq_bytes_read can only be changed by the receiver; and
35 * mq_sender and mq_bytes_written can only be changed by the sender.
36 * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
37 * they cannot change once set, and thus may be read without a lock once this
38 * is known to be the case.
39 *
40 * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
41 * they are written atomically using 8 byte loads and stores. Memory barriers
42 * must be carefully used to synchronize reads and writes of these values with
43 * reads and writes of the actual data in mq_ring.
44 *
45 * mq_detached needs no locking. It can be set by either the sender or the
46 * receiver, but only ever from false to true, so redundant writes don't
47 * matter. It is important that if we set mq_detached and then set the
48 * counterparty's latch, the counterparty must be certain to see the change
49 * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
50 * ends with one, this should be OK.
51 *
52 * mq_ring_size and mq_ring_offset never change after initialization, and
53 * can therefore be read without the lock.
54 *
55 * Importantly, mq_ring can be safely read and written without a lock.
56 * At any given time, the difference between mq_bytes_read and
57 * mq_bytes_written defines the number of bytes within mq_ring that contain
58 * unread data, and mq_bytes_read defines the position where those bytes
59 * begin. The sender can increase the number of unread bytes at any time,
60 * but only the receiver can give license to overwrite those bytes, by
61 * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
62 * the unread bytes it knows to be present without the lock. Conversely,
63 * the sender can write to the unused portion of the ring buffer without
64 * the lock, because nobody else can be reading or writing those bytes. The
65 * receiver could be making more bytes unused by incrementing mq_bytes_read,
66 * but that's OK. Note that it would be unsafe for the receiver to read any
67 * data it's already marked as read, or to write any data; and it would be
68 * unsafe for the sender to reread any data after incrementing
69 * mq_bytes_written, but fortunately there's no need for any of that.
70 */
71 struct shm_mq
72 {
73 slock_t mq_mutex;
74 PGPROC *mq_receiver;
75 PGPROC *mq_sender;
76 pg_atomic_uint64 mq_bytes_read;
77 pg_atomic_uint64 mq_bytes_written;
78 Size mq_ring_size;
79 bool mq_detached;
80 uint8 mq_ring_offset;
81 char mq_ring[FLEXIBLE_ARRAY_MEMBER];
82 };
83
84 /*
85 * This structure is a backend-private handle for access to a queue.
86 *
87 * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
88 * an optional pointer to the dynamic shared memory segment that contains it.
89 * (If mqh_segment is provided, we register an on_dsm_detach callback to
90 * make sure we detach from the queue before detaching from DSM.)
91 *
92 * If this queue is intended to connect the current process with a background
93 * worker that started it, the user can pass a pointer to the worker handle
94 * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
95 * is to allow us to begin sending to or receiving from that queue before the
96 * process we'll be communicating with has even been started. If it fails
97 * to start, the handle will allow us to notice that and fail cleanly, rather
98 * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
99 * simple cases - e.g. where there are just 2 processes communicating; in
100 * more complex scenarios, every process may not have a BackgroundWorkerHandle
101 * available, or may need to watch for the failure of more than one other
102 * process at a time.
103 *
104 * When a message exists as a contiguous chunk of bytes in the queue - that is,
105 * it is smaller than the size of the ring buffer and does not wrap around
106 * the end - we return the message to the caller as a pointer into the buffer.
107 * For messages that are larger or happen to wrap, we reassemble the message
108 * locally by copying the chunks into a backend-local buffer. mqh_buffer is
109 * the buffer, and mqh_buflen is the number of bytes allocated for it.
110 *
111 * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
112 * are used to track the state of non-blocking operations. When the caller
113 * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
114 * are expected to retry the call at a later time with the same argument;
115 * we need to retain enough state to pick up where we left off.
116 * mqh_length_word_complete tracks whether we are done sending or receiving
117 * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
118 * the number of bytes read or written for either the length word or the
119 * message itself, and mqh_expected_bytes - which is used only for reads -
120 * tracks the expected total size of the payload.
121 *
122 * mqh_counterparty_attached tracks whether we know the counterparty to have
123 * attached to the queue at some previous point. This lets us avoid some
124 * mutex acquisitions.
125 *
126 * mqh_context is the memory context in effect at the time we attached to
127 * the shm_mq. The shm_mq_handle itself is allocated in this context, and
128 * we make sure any other allocations we do happen in this context as well,
129 * to avoid nasty surprises.
130 */
131 struct shm_mq_handle
132 {
133 shm_mq *mqh_queue;
134 dsm_segment *mqh_segment;
135 BackgroundWorkerHandle *mqh_handle;
136 char *mqh_buffer;
137 Size mqh_buflen;
138 Size mqh_consume_pending;
139 Size mqh_partial_bytes;
140 Size mqh_expected_bytes;
141 bool mqh_length_word_complete;
142 bool mqh_counterparty_attached;
143 MemoryContext mqh_context;
144 };
145
146 static void shm_mq_detach_internal(shm_mq *mq);
147 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
148 const void *data, bool nowait, Size *bytes_written);
149 static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
150 Size bytes_needed, bool nowait, Size *nbytesp,
151 void **datap);
152 static bool shm_mq_counterparty_gone(shm_mq *mq,
153 BackgroundWorkerHandle *handle);
154 static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
155 BackgroundWorkerHandle *handle);
156 static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
157 static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
158 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
159
160 /* Minimum queue size is enough for header and at least one chunk of data. */
161 const Size shm_mq_minimum_size =
162 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
163
164 #define MQH_INITIAL_BUFSIZE 8192
165
166 /*
167 * Initialize a new shared message queue.
168 */
169 shm_mq *
shm_mq_create(void * address,Size size)170 shm_mq_create(void *address, Size size)
171 {
172 shm_mq *mq = address;
173 Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
174
175 /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
176 size = MAXALIGN_DOWN(size);
177
178 /* Queue size must be large enough to hold some data. */
179 Assert(size > data_offset);
180
181 /* Initialize queue header. */
182 SpinLockInit(&mq->mq_mutex);
183 mq->mq_receiver = NULL;
184 mq->mq_sender = NULL;
185 pg_atomic_init_u64(&mq->mq_bytes_read, 0);
186 pg_atomic_init_u64(&mq->mq_bytes_written, 0);
187 mq->mq_ring_size = size - data_offset;
188 mq->mq_detached = false;
189 mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
190
191 return mq;
192 }
193
194 /*
195 * Set the identity of the process that will receive from a shared message
196 * queue.
197 */
198 void
shm_mq_set_receiver(shm_mq * mq,PGPROC * proc)199 shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
200 {
201 PGPROC *sender;
202
203 SpinLockAcquire(&mq->mq_mutex);
204 Assert(mq->mq_receiver == NULL);
205 mq->mq_receiver = proc;
206 sender = mq->mq_sender;
207 SpinLockRelease(&mq->mq_mutex);
208
209 if (sender != NULL)
210 SetLatch(&sender->procLatch);
211 }
212
213 /*
214 * Set the identity of the process that will send to a shared message queue.
215 */
216 void
shm_mq_set_sender(shm_mq * mq,PGPROC * proc)217 shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
218 {
219 PGPROC *receiver;
220
221 SpinLockAcquire(&mq->mq_mutex);
222 Assert(mq->mq_sender == NULL);
223 mq->mq_sender = proc;
224 receiver = mq->mq_receiver;
225 SpinLockRelease(&mq->mq_mutex);
226
227 if (receiver != NULL)
228 SetLatch(&receiver->procLatch);
229 }
230
231 /*
232 * Get the configured receiver.
233 */
234 PGPROC *
shm_mq_get_receiver(shm_mq * mq)235 shm_mq_get_receiver(shm_mq *mq)
236 {
237 PGPROC *receiver;
238
239 SpinLockAcquire(&mq->mq_mutex);
240 receiver = mq->mq_receiver;
241 SpinLockRelease(&mq->mq_mutex);
242
243 return receiver;
244 }
245
246 /*
247 * Get the configured sender.
248 */
249 PGPROC *
shm_mq_get_sender(shm_mq * mq)250 shm_mq_get_sender(shm_mq *mq)
251 {
252 PGPROC *sender;
253
254 SpinLockAcquire(&mq->mq_mutex);
255 sender = mq->mq_sender;
256 SpinLockRelease(&mq->mq_mutex);
257
258 return sender;
259 }
260
261 /*
262 * Attach to a shared message queue so we can send or receive messages.
263 *
264 * The memory context in effect at the time this function is called should
265 * be one which will last for at least as long as the message queue itself.
266 * We'll allocate the handle in that context, and future allocations that
267 * are needed to buffer incoming data will happen in that context as well.
268 *
269 * If seg != NULL, the queue will be automatically detached when that dynamic
270 * shared memory segment is detached.
271 *
272 * If handle != NULL, the queue can be read or written even before the
273 * other process has attached. We'll wait for it to do so if needed. The
274 * handle must be for a background worker initialized with bgw_notify_pid
275 * equal to our PID.
276 *
277 * shm_mq_detach() should be called when done. This will free the
278 * shm_mq_handle and mark the queue itself as detached, so that our
279 * counterpart won't get stuck waiting for us to fill or drain the queue
280 * after we've already lost interest.
281 */
282 shm_mq_handle *
shm_mq_attach(shm_mq * mq,dsm_segment * seg,BackgroundWorkerHandle * handle)283 shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
284 {
285 shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
286
287 Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
288 mqh->mqh_queue = mq;
289 mqh->mqh_segment = seg;
290 mqh->mqh_handle = handle;
291 mqh->mqh_buffer = NULL;
292 mqh->mqh_buflen = 0;
293 mqh->mqh_consume_pending = 0;
294 mqh->mqh_partial_bytes = 0;
295 mqh->mqh_expected_bytes = 0;
296 mqh->mqh_length_word_complete = false;
297 mqh->mqh_counterparty_attached = false;
298 mqh->mqh_context = CurrentMemoryContext;
299
300 if (seg != NULL)
301 on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
302
303 return mqh;
304 }
305
306 /*
307 * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
308 * been passed to shm_mq_attach.
309 */
310 void
shm_mq_set_handle(shm_mq_handle * mqh,BackgroundWorkerHandle * handle)311 shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
312 {
313 Assert(mqh->mqh_handle == NULL);
314 mqh->mqh_handle = handle;
315 }
316
317 /*
318 * Write a message into a shared message queue.
319 */
320 shm_mq_result
shm_mq_send(shm_mq_handle * mqh,Size nbytes,const void * data,bool nowait)321 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
322 {
323 shm_mq_iovec iov;
324
325 iov.data = data;
326 iov.len = nbytes;
327
328 return shm_mq_sendv(mqh, &iov, 1, nowait);
329 }
330
331 /*
332 * Write a message into a shared message queue, gathered from multiple
333 * addresses.
334 *
335 * When nowait = false, we'll wait on our process latch when the ring buffer
336 * fills up, and then continue writing once the receiver has drained some data.
337 * The process latch is reset after each wait.
338 *
339 * When nowait = true, we do not manipulate the state of the process latch;
340 * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
341 * this case, the caller should call this function again, with the same
342 * arguments, each time the process latch is set. (Once begun, the sending
343 * of a message cannot be aborted except by detaching from the queue; changing
344 * the length or payload will corrupt the queue.)
345 */
346 shm_mq_result
shm_mq_sendv(shm_mq_handle * mqh,shm_mq_iovec * iov,int iovcnt,bool nowait)347 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
348 {
349 shm_mq_result res;
350 shm_mq *mq = mqh->mqh_queue;
351 PGPROC *receiver;
352 Size nbytes = 0;
353 Size bytes_written;
354 int i;
355 int which_iov = 0;
356 Size offset;
357
358 Assert(mq->mq_sender == MyProc);
359
360 /* Compute total size of write. */
361 for (i = 0; i < iovcnt; ++i)
362 nbytes += iov[i].len;
363
364 /* Prevent writing messages overwhelming the receiver. */
365 if (nbytes > MaxAllocSize)
366 ereport(ERROR,
367 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
368 errmsg("cannot send a message of size %zu via shared memory queue",
369 nbytes)));
370
371 /* Try to write, or finish writing, the length word into the buffer. */
372 while (!mqh->mqh_length_word_complete)
373 {
374 Assert(mqh->mqh_partial_bytes < sizeof(Size));
375 res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
376 ((char *) &nbytes) + mqh->mqh_partial_bytes,
377 nowait, &bytes_written);
378
379 if (res == SHM_MQ_DETACHED)
380 {
381 /* Reset state in case caller tries to send another message. */
382 mqh->mqh_partial_bytes = 0;
383 mqh->mqh_length_word_complete = false;
384 return res;
385 }
386 mqh->mqh_partial_bytes += bytes_written;
387
388 if (mqh->mqh_partial_bytes >= sizeof(Size))
389 {
390 Assert(mqh->mqh_partial_bytes == sizeof(Size));
391
392 mqh->mqh_partial_bytes = 0;
393 mqh->mqh_length_word_complete = true;
394 }
395
396 if (res != SHM_MQ_SUCCESS)
397 return res;
398
399 /* Length word can't be split unless bigger than required alignment. */
400 Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
401 }
402
403 /* Write the actual data bytes into the buffer. */
404 Assert(mqh->mqh_partial_bytes <= nbytes);
405 offset = mqh->mqh_partial_bytes;
406 do
407 {
408 Size chunksize;
409
410 /* Figure out which bytes need to be sent next. */
411 if (offset >= iov[which_iov].len)
412 {
413 offset -= iov[which_iov].len;
414 ++which_iov;
415 if (which_iov >= iovcnt)
416 break;
417 continue;
418 }
419
420 /*
421 * We want to avoid copying the data if at all possible, but every
422 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
423 * the last. Thus, if a chunk other than the last one ends on a
424 * non-MAXALIGN'd boundary, we have to combine the tail end of its
425 * data with data from one or more following chunks until we either
426 * reach the last chunk or accumulate a number of bytes which is
427 * MAXALIGN'd.
428 */
429 if (which_iov + 1 < iovcnt &&
430 offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
431 {
432 char tmpbuf[MAXIMUM_ALIGNOF];
433 int j = 0;
434
435 for (;;)
436 {
437 if (offset < iov[which_iov].len)
438 {
439 tmpbuf[j] = iov[which_iov].data[offset];
440 j++;
441 offset++;
442 if (j == MAXIMUM_ALIGNOF)
443 break;
444 }
445 else
446 {
447 offset -= iov[which_iov].len;
448 which_iov++;
449 if (which_iov >= iovcnt)
450 break;
451 }
452 }
453
454 res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
455
456 if (res == SHM_MQ_DETACHED)
457 {
458 /* Reset state in case caller tries to send another message. */
459 mqh->mqh_partial_bytes = 0;
460 mqh->mqh_length_word_complete = false;
461 return res;
462 }
463
464 mqh->mqh_partial_bytes += bytes_written;
465 if (res != SHM_MQ_SUCCESS)
466 return res;
467 continue;
468 }
469
470 /*
471 * If this is the last chunk, we can write all the data, even if it
472 * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
473 * MAXALIGN_DOWN the write size.
474 */
475 chunksize = iov[which_iov].len - offset;
476 if (which_iov + 1 < iovcnt)
477 chunksize = MAXALIGN_DOWN(chunksize);
478 res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
479 nowait, &bytes_written);
480
481 if (res == SHM_MQ_DETACHED)
482 {
483 /* Reset state in case caller tries to send another message. */
484 mqh->mqh_length_word_complete = false;
485 mqh->mqh_partial_bytes = 0;
486 return res;
487 }
488
489 mqh->mqh_partial_bytes += bytes_written;
490 offset += bytes_written;
491 if (res != SHM_MQ_SUCCESS)
492 return res;
493 } while (mqh->mqh_partial_bytes < nbytes);
494
495 /* Reset for next message. */
496 mqh->mqh_partial_bytes = 0;
497 mqh->mqh_length_word_complete = false;
498
499 /* If queue has been detached, let caller know. */
500 if (mq->mq_detached)
501 return SHM_MQ_DETACHED;
502
503 /*
504 * If the counterparty is known to have attached, we can read mq_receiver
505 * without acquiring the spinlock and assume it isn't NULL. Otherwise,
506 * more caution is needed.
507 */
508 if (mqh->mqh_counterparty_attached)
509 receiver = mq->mq_receiver;
510 else
511 {
512 SpinLockAcquire(&mq->mq_mutex);
513 receiver = mq->mq_receiver;
514 SpinLockRelease(&mq->mq_mutex);
515 if (receiver == NULL)
516 return SHM_MQ_SUCCESS;
517 mqh->mqh_counterparty_attached = true;
518 }
519
520 /* Notify receiver of the newly-written data, and return. */
521 SetLatch(&receiver->procLatch);
522 return SHM_MQ_SUCCESS;
523 }
524
525 /*
526 * Receive a message from a shared message queue.
527 *
528 * We set *nbytes to the message length and *data to point to the message
529 * payload. If the entire message exists in the queue as a single,
530 * contiguous chunk, *data will point directly into shared memory; otherwise,
531 * it will point to a temporary buffer. This mostly avoids data copying in
532 * the hoped-for case where messages are short compared to the buffer size,
533 * while still allowing longer messages. In either case, the return value
534 * remains valid until the next receive operation is performed on the queue.
535 *
536 * When nowait = false, we'll wait on our process latch when the ring buffer
537 * is empty and we have not yet received a full message. The sender will
538 * set our process latch after more data has been written, and we'll resume
539 * processing. Each call will therefore return a complete message
540 * (unless the sender detaches the queue).
541 *
542 * When nowait = true, we do not manipulate the state of the process latch;
543 * instead, whenever the buffer is empty and we need to read from it, we
544 * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
545 * function again after the process latch has been set.
546 */
547 shm_mq_result
shm_mq_receive(shm_mq_handle * mqh,Size * nbytesp,void ** datap,bool nowait)548 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
549 {
550 shm_mq *mq = mqh->mqh_queue;
551 shm_mq_result res;
552 Size rb = 0;
553 Size nbytes;
554 void *rawdata;
555
556 Assert(mq->mq_receiver == MyProc);
557
558 /* We can't receive data until the sender has attached. */
559 if (!mqh->mqh_counterparty_attached)
560 {
561 if (nowait)
562 {
563 int counterparty_gone;
564
565 /*
566 * We shouldn't return at this point at all unless the sender
567 * hasn't attached yet. However, the correct return value depends
568 * on whether the sender is still attached. If we first test
569 * whether the sender has ever attached and then test whether the
570 * sender has detached, there's a race condition: a sender that
571 * attaches and detaches very quickly might fool us into thinking
572 * the sender never attached at all. So, test whether our
573 * counterparty is definitively gone first, and only afterwards
574 * check whether the sender ever attached in the first place.
575 */
576 counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
577 if (shm_mq_get_sender(mq) == NULL)
578 {
579 if (counterparty_gone)
580 return SHM_MQ_DETACHED;
581 else
582 return SHM_MQ_WOULD_BLOCK;
583 }
584 }
585 else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
586 && shm_mq_get_sender(mq) == NULL)
587 {
588 mq->mq_detached = true;
589 return SHM_MQ_DETACHED;
590 }
591 mqh->mqh_counterparty_attached = true;
592 }
593
594 /*
595 * If we've consumed an amount of data greater than 1/4th of the ring
596 * size, mark it consumed in shared memory. We try to avoid doing this
597 * unnecessarily when only a small amount of data has been consumed,
598 * because SetLatch() is fairly expensive and we don't want to do it too
599 * often.
600 */
601 if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
602 {
603 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
604 mqh->mqh_consume_pending = 0;
605 }
606
607 /* Try to read, or finish reading, the length word from the buffer. */
608 while (!mqh->mqh_length_word_complete)
609 {
610 /* Try to receive the message length word. */
611 Assert(mqh->mqh_partial_bytes < sizeof(Size));
612 res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
613 nowait, &rb, &rawdata);
614 if (res != SHM_MQ_SUCCESS)
615 return res;
616
617 /*
618 * Hopefully, we'll receive the entire message length word at once.
619 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
620 * multiple reads.
621 */
622 if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
623 {
624 Size needed;
625
626 nbytes = *(Size *) rawdata;
627
628 /* If we've already got the whole message, we're done. */
629 needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
630 if (rb >= needed)
631 {
632 mqh->mqh_consume_pending += needed;
633 *nbytesp = nbytes;
634 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
635 return SHM_MQ_SUCCESS;
636 }
637
638 /*
639 * We don't have the whole message, but we at least have the whole
640 * length word.
641 */
642 mqh->mqh_expected_bytes = nbytes;
643 mqh->mqh_length_word_complete = true;
644 mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
645 rb -= MAXALIGN(sizeof(Size));
646 }
647 else
648 {
649 Size lengthbytes;
650
651 /* Can't be split unless bigger than required alignment. */
652 Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
653
654 /* Message word is split; need buffer to reassemble. */
655 if (mqh->mqh_buffer == NULL)
656 {
657 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
658 MQH_INITIAL_BUFSIZE);
659 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
660 }
661 Assert(mqh->mqh_buflen >= sizeof(Size));
662
663 /* Copy partial length word; remember to consume it. */
664 if (mqh->mqh_partial_bytes + rb > sizeof(Size))
665 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
666 else
667 lengthbytes = rb;
668 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
669 lengthbytes);
670 mqh->mqh_partial_bytes += lengthbytes;
671 mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
672 rb -= lengthbytes;
673
674 /* If we now have the whole word, we're ready to read payload. */
675 if (mqh->mqh_partial_bytes >= sizeof(Size))
676 {
677 Assert(mqh->mqh_partial_bytes == sizeof(Size));
678 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
679 mqh->mqh_length_word_complete = true;
680 mqh->mqh_partial_bytes = 0;
681 }
682 }
683 }
684 nbytes = mqh->mqh_expected_bytes;
685
686 /*
687 * Should be disallowed on the sending side already, but better check and
688 * error out on the receiver side as well rather than trying to read a
689 * prohibitively large message.
690 */
691 if (nbytes > MaxAllocSize)
692 ereport(ERROR,
693 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
694 errmsg("invalid message size %zu in shared memory queue",
695 nbytes)));
696
697 if (mqh->mqh_partial_bytes == 0)
698 {
699 /*
700 * Try to obtain the whole message in a single chunk. If this works,
701 * we need not copy the data and can return a pointer directly into
702 * shared memory.
703 */
704 res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
705 if (res != SHM_MQ_SUCCESS)
706 return res;
707 if (rb >= nbytes)
708 {
709 mqh->mqh_length_word_complete = false;
710 mqh->mqh_consume_pending += MAXALIGN(nbytes);
711 *nbytesp = nbytes;
712 *datap = rawdata;
713 return SHM_MQ_SUCCESS;
714 }
715
716 /*
717 * The message has wrapped the buffer. We'll need to copy it in order
718 * to return it to the client in one chunk. First, make sure we have
719 * a large enough buffer available.
720 */
721 if (mqh->mqh_buflen < nbytes)
722 {
723 Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
724
725 /*
726 * Double the buffer size until the payload fits, but limit to
727 * MaxAllocSize.
728 */
729 while (newbuflen < nbytes)
730 newbuflen *= 2;
731 newbuflen = Min(newbuflen, MaxAllocSize);
732
733 if (mqh->mqh_buffer != NULL)
734 {
735 pfree(mqh->mqh_buffer);
736 mqh->mqh_buffer = NULL;
737 mqh->mqh_buflen = 0;
738 }
739 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
740 mqh->mqh_buflen = newbuflen;
741 }
742 }
743
744 /* Loop until we've copied the entire message. */
745 for (;;)
746 {
747 Size still_needed;
748
749 /* Copy as much as we can. */
750 Assert(mqh->mqh_partial_bytes + rb <= nbytes);
751 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
752 mqh->mqh_partial_bytes += rb;
753
754 /*
755 * Update count of bytes that can be consumed, accounting for
756 * alignment padding. Note that this will never actually insert any
757 * padding except at the end of a message, because the buffer size is
758 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
759 */
760 Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
761 mqh->mqh_consume_pending += MAXALIGN(rb);
762
763 /* If we got all the data, exit the loop. */
764 if (mqh->mqh_partial_bytes >= nbytes)
765 break;
766
767 /* Wait for some more data. */
768 still_needed = nbytes - mqh->mqh_partial_bytes;
769 res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
770 if (res != SHM_MQ_SUCCESS)
771 return res;
772 if (rb > still_needed)
773 rb = still_needed;
774 }
775
776 /* Return the complete message, and reset for next message. */
777 *nbytesp = nbytes;
778 *datap = mqh->mqh_buffer;
779 mqh->mqh_length_word_complete = false;
780 mqh->mqh_partial_bytes = 0;
781 return SHM_MQ_SUCCESS;
782 }
783
784 /*
785 * Wait for the other process that's supposed to use this queue to attach
786 * to it.
787 *
788 * The return value is SHM_MQ_DETACHED if the worker has already detached or
789 * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
790 * Note that we will only be able to detect that the worker has died before
791 * attaching if a background worker handle was passed to shm_mq_attach().
792 */
793 shm_mq_result
shm_mq_wait_for_attach(shm_mq_handle * mqh)794 shm_mq_wait_for_attach(shm_mq_handle *mqh)
795 {
796 shm_mq *mq = mqh->mqh_queue;
797 PGPROC **victim;
798
799 if (shm_mq_get_receiver(mq) == MyProc)
800 victim = &mq->mq_sender;
801 else
802 {
803 Assert(shm_mq_get_sender(mq) == MyProc);
804 victim = &mq->mq_receiver;
805 }
806
807 if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
808 return SHM_MQ_SUCCESS;
809 else
810 return SHM_MQ_DETACHED;
811 }
812
813 /*
814 * Detach from a shared message queue, and destroy the shm_mq_handle.
815 */
816 void
shm_mq_detach(shm_mq_handle * mqh)817 shm_mq_detach(shm_mq_handle *mqh)
818 {
819 /* Notify counterparty that we're outta here. */
820 shm_mq_detach_internal(mqh->mqh_queue);
821
822 /* Cancel on_dsm_detach callback, if any. */
823 if (mqh->mqh_segment)
824 cancel_on_dsm_detach(mqh->mqh_segment,
825 shm_mq_detach_callback,
826 PointerGetDatum(mqh->mqh_queue));
827
828 /* Release local memory associated with handle. */
829 if (mqh->mqh_buffer != NULL)
830 pfree(mqh->mqh_buffer);
831 pfree(mqh);
832 }
833
834 /*
835 * Notify counterparty that we're detaching from shared message queue.
836 *
837 * The purpose of this function is to make sure that the process
838 * with which we're communicating doesn't block forever waiting for us to
839 * fill or drain the queue once we've lost interest. When the sender
840 * detaches, the receiver can read any messages remaining in the queue;
841 * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
842 * further attempts to send messages will likewise return SHM_MQ_DETACHED.
843 *
844 * This is separated out from shm_mq_detach() because if the on_dsm_detach
845 * callback fires, we only want to do this much. We do not try to touch
846 * the local shm_mq_handle, as it may have been pfree'd already.
847 */
848 static void
shm_mq_detach_internal(shm_mq * mq)849 shm_mq_detach_internal(shm_mq *mq)
850 {
851 PGPROC *victim;
852
853 SpinLockAcquire(&mq->mq_mutex);
854 if (mq->mq_sender == MyProc)
855 victim = mq->mq_receiver;
856 else
857 {
858 Assert(mq->mq_receiver == MyProc);
859 victim = mq->mq_sender;
860 }
861 mq->mq_detached = true;
862 SpinLockRelease(&mq->mq_mutex);
863
864 if (victim != NULL)
865 SetLatch(&victim->procLatch);
866 }
867
868 /*
869 * Get the shm_mq from handle.
870 */
871 shm_mq *
shm_mq_get_queue(shm_mq_handle * mqh)872 shm_mq_get_queue(shm_mq_handle *mqh)
873 {
874 return mqh->mqh_queue;
875 }
876
877 /*
878 * Write bytes into a shared message queue.
879 */
880 static shm_mq_result
shm_mq_send_bytes(shm_mq_handle * mqh,Size nbytes,const void * data,bool nowait,Size * bytes_written)881 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
882 bool nowait, Size *bytes_written)
883 {
884 shm_mq *mq = mqh->mqh_queue;
885 Size sent = 0;
886 uint64 used;
887 Size ringsize = mq->mq_ring_size;
888 Size available;
889
890 while (sent < nbytes)
891 {
892 uint64 rb;
893 uint64 wb;
894
895 /* Compute number of ring buffer bytes used and available. */
896 rb = pg_atomic_read_u64(&mq->mq_bytes_read);
897 wb = pg_atomic_read_u64(&mq->mq_bytes_written);
898 Assert(wb >= rb);
899 used = wb - rb;
900 Assert(used <= ringsize);
901 available = Min(ringsize - used, nbytes - sent);
902
903 /*
904 * Bail out if the queue has been detached. Note that we would be in
905 * trouble if the compiler decided to cache the value of
906 * mq->mq_detached in a register or on the stack across loop
907 * iterations. It probably shouldn't do that anyway since we'll
908 * always return, call an external function that performs a system
909 * call, or reach a memory barrier at some point later in the loop,
910 * but just to be sure, insert a compiler barrier here.
911 */
912 pg_compiler_barrier();
913 if (mq->mq_detached)
914 {
915 *bytes_written = sent;
916 return SHM_MQ_DETACHED;
917 }
918
919 if (available == 0 && !mqh->mqh_counterparty_attached)
920 {
921 /*
922 * The queue is full, so if the receiver isn't yet known to be
923 * attached, we must wait for that to happen.
924 */
925 if (nowait)
926 {
927 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
928 {
929 *bytes_written = sent;
930 return SHM_MQ_DETACHED;
931 }
932 if (shm_mq_get_receiver(mq) == NULL)
933 {
934 *bytes_written = sent;
935 return SHM_MQ_WOULD_BLOCK;
936 }
937 }
938 else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
939 mqh->mqh_handle))
940 {
941 mq->mq_detached = true;
942 *bytes_written = sent;
943 return SHM_MQ_DETACHED;
944 }
945 mqh->mqh_counterparty_attached = true;
946
947 /*
948 * The receiver may have read some data after attaching, so we
949 * must not wait without rechecking the queue state.
950 */
951 }
952 else if (available == 0)
953 {
954 /*
955 * Since mq->mqh_counterparty_attached is known to be true at this
956 * point, mq_receiver has been set, and it can't change once set.
957 * Therefore, we can read it without acquiring the spinlock.
958 */
959 Assert(mqh->mqh_counterparty_attached);
960 SetLatch(&mq->mq_receiver->procLatch);
961
962 /* Skip manipulation of our latch if nowait = true. */
963 if (nowait)
964 {
965 *bytes_written = sent;
966 return SHM_MQ_WOULD_BLOCK;
967 }
968
969 /*
970 * Wait for our latch to be set. It might already be set for some
971 * unrelated reason, but that'll just result in one extra trip
972 * through the loop. It's worth it to avoid resetting the latch
973 * at top of loop, because setting an already-set latch is much
974 * cheaper than setting one that has been reset.
975 */
976 WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
977
978 /* Reset the latch so we don't spin. */
979 ResetLatch(MyLatch);
980
981 /* An interrupt may have occurred while we were waiting. */
982 CHECK_FOR_INTERRUPTS();
983 }
984 else
985 {
986 Size offset;
987 Size sendnow;
988
989 offset = wb % (uint64) ringsize;
990 sendnow = Min(available, ringsize - offset);
991
992 /*
993 * Write as much data as we can via a single memcpy(). Make sure
994 * these writes happen after the read of mq_bytes_read, above.
995 * This barrier pairs with the one in shm_mq_inc_bytes_read.
996 * (Since we're separating the read of mq_bytes_read from a
997 * subsequent write to mq_ring, we need a full barrier here.)
998 */
999 pg_memory_barrier();
1000 memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1001 (char *) data + sent, sendnow);
1002 sent += sendnow;
1003
1004 /*
1005 * Update count of bytes written, with alignment padding. Note
1006 * that this will never actually insert any padding except at the
1007 * end of a run of bytes, because the buffer size is a multiple of
1008 * MAXIMUM_ALIGNOF, and each read is as well.
1009 */
1010 Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1011 shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
1012
1013 /*
1014 * For efficiency, we don't set the reader's latch here. We'll do
1015 * that only when the buffer fills up or after writing an entire
1016 * message.
1017 */
1018 }
1019 }
1020
1021 *bytes_written = sent;
1022 return SHM_MQ_SUCCESS;
1023 }
1024
1025 /*
1026 * Wait until at least *nbytesp bytes are available to be read from the
1027 * shared message queue, or until the buffer wraps around. If the queue is
1028 * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1029 * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1030 * to the location at which data bytes can be read, *nbytesp is set to the
1031 * number of bytes which can be read at that address, and the return value
1032 * is SHM_MQ_SUCCESS.
1033 */
1034 static shm_mq_result
shm_mq_receive_bytes(shm_mq_handle * mqh,Size bytes_needed,bool nowait,Size * nbytesp,void ** datap)1035 shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1036 Size *nbytesp, void **datap)
1037 {
1038 shm_mq *mq = mqh->mqh_queue;
1039 Size ringsize = mq->mq_ring_size;
1040 uint64 used;
1041 uint64 written;
1042
1043 for (;;)
1044 {
1045 Size offset;
1046 uint64 read;
1047
1048 /* Get bytes written, so we can compute what's available to read. */
1049 written = pg_atomic_read_u64(&mq->mq_bytes_written);
1050
1051 /*
1052 * Get bytes read. Include bytes we could consume but have not yet
1053 * consumed.
1054 */
1055 read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1056 mqh->mqh_consume_pending;
1057 used = written - read;
1058 Assert(used <= ringsize);
1059 offset = read % (uint64) ringsize;
1060
1061 /* If we have enough data or buffer has wrapped, we're done. */
1062 if (used >= bytes_needed || offset + used >= ringsize)
1063 {
1064 *nbytesp = Min(used, ringsize - offset);
1065 *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1066
1067 /*
1068 * Separate the read of mq_bytes_written, above, from caller's
1069 * attempt to read the data itself. Pairs with the barrier in
1070 * shm_mq_inc_bytes_written.
1071 */
1072 pg_read_barrier();
1073 return SHM_MQ_SUCCESS;
1074 }
1075
1076 /*
1077 * Fall out before waiting if the queue has been detached.
1078 *
1079 * Note that we don't check for this until *after* considering whether
1080 * the data already available is enough, since the receiver can finish
1081 * receiving a message stored in the buffer even after the sender has
1082 * detached.
1083 */
1084 if (mq->mq_detached)
1085 {
1086 /*
1087 * If the writer advanced mq_bytes_written and then set
1088 * mq_detached, we might not have read the final value of
1089 * mq_bytes_written above. Insert a read barrier and then check
1090 * again if mq_bytes_written has advanced.
1091 */
1092 pg_read_barrier();
1093 if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1094 continue;
1095
1096 return SHM_MQ_DETACHED;
1097 }
1098
1099 /*
1100 * We didn't get enough data to satisfy the request, so mark any data
1101 * previously-consumed as read to make more buffer space.
1102 */
1103 if (mqh->mqh_consume_pending > 0)
1104 {
1105 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1106 mqh->mqh_consume_pending = 0;
1107 }
1108
1109 /* Skip manipulation of our latch if nowait = true. */
1110 if (nowait)
1111 return SHM_MQ_WOULD_BLOCK;
1112
1113 /*
1114 * Wait for our latch to be set. It might already be set for some
1115 * unrelated reason, but that'll just result in one extra trip through
1116 * the loop. It's worth it to avoid resetting the latch at top of
1117 * loop, because setting an already-set latch is much cheaper than
1118 * setting one that has been reset.
1119 */
1120 WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_RECEIVE);
1121
1122 /* Reset the latch so we don't spin. */
1123 ResetLatch(MyLatch);
1124
1125 /* An interrupt may have occurred while we were waiting. */
1126 CHECK_FOR_INTERRUPTS();
1127 }
1128 }
1129
1130 /*
1131 * Test whether a counterparty who may not even be alive yet is definitely gone.
1132 */
1133 static bool
shm_mq_counterparty_gone(shm_mq * mq,BackgroundWorkerHandle * handle)1134 shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
1135 {
1136 pid_t pid;
1137
1138 /* If the queue has been detached, counterparty is definitely gone. */
1139 if (mq->mq_detached)
1140 return true;
1141
1142 /* If there's a handle, check worker status. */
1143 if (handle != NULL)
1144 {
1145 BgwHandleStatus status;
1146
1147 /* Check for unexpected worker death. */
1148 status = GetBackgroundWorkerPid(handle, &pid);
1149 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1150 {
1151 /* Mark it detached, just to make it official. */
1152 mq->mq_detached = true;
1153 return true;
1154 }
1155 }
1156
1157 /* Counterparty is not definitively gone. */
1158 return false;
1159 }
1160
1161 /*
1162 * This is used when a process is waiting for its counterpart to attach to the
1163 * queue. We exit when the other process attaches as expected, or, if
1164 * handle != NULL, when the referenced background process or the postmaster
1165 * dies. Note that if handle == NULL, and the process fails to attach, we'll
1166 * potentially get stuck here forever waiting for a process that may never
1167 * start. We do check for interrupts, though.
1168 *
1169 * ptr is a pointer to the memory address that we're expecting to become
1170 * non-NULL when our counterpart attaches to the queue.
1171 */
1172 static bool
shm_mq_wait_internal(shm_mq * mq,PGPROC ** ptr,BackgroundWorkerHandle * handle)1173 shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
1174 {
1175 bool result = false;
1176
1177 for (;;)
1178 {
1179 BgwHandleStatus status;
1180 pid_t pid;
1181
1182 /* Acquire the lock just long enough to check the pointer. */
1183 SpinLockAcquire(&mq->mq_mutex);
1184 result = (*ptr != NULL);
1185 SpinLockRelease(&mq->mq_mutex);
1186
1187 /* Fail if detached; else succeed if initialized. */
1188 if (mq->mq_detached)
1189 {
1190 result = false;
1191 break;
1192 }
1193 if (result)
1194 break;
1195
1196 if (handle != NULL)
1197 {
1198 /* Check for unexpected worker death. */
1199 status = GetBackgroundWorkerPid(handle, &pid);
1200 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1201 {
1202 result = false;
1203 break;
1204 }
1205 }
1206
1207 /* Wait to be signalled. */
1208 WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_INTERNAL);
1209
1210 /* Reset the latch so we don't spin. */
1211 ResetLatch(MyLatch);
1212
1213 /* An interrupt may have occurred while we were waiting. */
1214 CHECK_FOR_INTERRUPTS();
1215 }
1216
1217 return result;
1218 }
1219
1220 /*
1221 * Increment the number of bytes read.
1222 */
1223 static void
shm_mq_inc_bytes_read(shm_mq * mq,Size n)1224 shm_mq_inc_bytes_read(shm_mq *mq, Size n)
1225 {
1226 PGPROC *sender;
1227
1228 /*
1229 * Separate prior reads of mq_ring from the increment of mq_bytes_read
1230 * which follows. This pairs with the full barrier in
1231 * shm_mq_send_bytes(). We only need a read barrier here because the
1232 * increment of mq_bytes_read is actually a read followed by a dependent
1233 * write.
1234 */
1235 pg_read_barrier();
1236
1237 /*
1238 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1239 * else can be changing this value. This method should be cheaper.
1240 */
1241 pg_atomic_write_u64(&mq->mq_bytes_read,
1242 pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1243
1244 /*
1245 * We shouldn't have any bytes to read without a sender, so we can read
1246 * mq_sender here without a lock. Once it's initialized, it can't change.
1247 */
1248 sender = mq->mq_sender;
1249 Assert(sender != NULL);
1250 SetLatch(&sender->procLatch);
1251 }
1252
1253 /*
1254 * Increment the number of bytes written.
1255 */
1256 static void
shm_mq_inc_bytes_written(shm_mq * mq,Size n)1257 shm_mq_inc_bytes_written(shm_mq *mq, Size n)
1258 {
1259 /*
1260 * Separate prior reads of mq_ring from the write of mq_bytes_written
1261 * which we're about to do. Pairs with the read barrier found in
1262 * shm_mq_get_receive_bytes.
1263 */
1264 pg_write_barrier();
1265
1266 /*
1267 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1268 * else can be changing this value. This method avoids taking the bus
1269 * lock unnecessarily.
1270 */
1271 pg_atomic_write_u64(&mq->mq_bytes_written,
1272 pg_atomic_read_u64(&mq->mq_bytes_written) + n);
1273 }
1274
1275 /* Shim for on_dsm_callback. */
1276 static void
shm_mq_detach_callback(dsm_segment * seg,Datum arg)1277 shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1278 {
1279 shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1280
1281 shm_mq_detach_internal(mq);
1282 }
1283