1 /*-------------------------------------------------------------------------
2  *
3  * shm_mq.c
4  *	  single-reader, single-writer shared memory message queue
5  *
6  * Both the sender and the receiver must have a PGPROC; their respective
7  * process latches are used for synchronization.  Only the sender may send,
8  * and only the receiver may receive.  This is intended to allow a user
9  * backend to communicate with worker backends that it has registered.
10  *
11  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * src/include/storage/shm_mq.h
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postmaster/bgworker.h"
24 #include "storage/procsignal.h"
25 #include "storage/shm_mq.h"
26 #include "storage/spin.h"
27 #include "utils/memutils.h"
28 
29 /*
30  * This structure represents the actual queue, stored in shared memory.
31  *
32  * Some notes on synchronization:
33  *
34  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
35  * mq_sender and mq_bytes_written can only be changed by the sender.  However,
36  * because most of these fields are 8 bytes and we don't assume that 8 byte
37  * reads and writes are atomic, the spinlock must be taken whenever the field
38  * is updated, and whenever it is read by a process other than the one allowed
39  * to modify it. But the process that is allowed to modify it is also allowed
40  * to read it without the lock.  On architectures where 8-byte writes are
41  * atomic, we could replace these spinlocks with memory barriers, but
42  * testing found no performance benefit, so it seems best to keep things
43  * simple for now.
44  *
45  * mq_detached can be set by either the sender or the receiver, so the mutex
46  * must be held to read or write it.  Memory barriers could be used here as
47  * well, if needed.
48  *
49  * mq_ring_size and mq_ring_offset never change after initialization, and
50  * can therefore be read without the lock.
51  *
52  * Importantly, mq_ring can be safely read and written without a lock.  Were
53  * this not the case, we'd have to hold the spinlock for much longer
54  * intervals, and performance might suffer.  Fortunately, that's not
55  * necessary.  At any given time, the difference between mq_bytes_read and
56  * mq_bytes_written defines the number of bytes within mq_ring that contain
57  * unread data, and mq_bytes_read defines the position where those bytes
58  * begin.  The sender can increase the number of unread bytes at any time,
59  * but only the receiver can give license to overwrite those bytes, by
60  * incrementing mq_bytes_read.  Therefore, it's safe for the receiver to read
61  * the unread bytes it knows to be present without the lock.  Conversely,
62  * the sender can write to the unused portion of the ring buffer without
63  * the lock, because nobody else can be reading or writing those bytes.  The
64  * receiver could be making more bytes unused by incrementing mq_bytes_read,
65  * but that's OK.  Note that it would be unsafe for the receiver to read any
66  * data it's already marked as read, or to write any data; and it would be
67  * unsafe for the sender to reread any data after incrementing
68  * mq_bytes_written, but fortunately there's no need for any of that.
69  */
70 struct shm_mq
71 {
72 	slock_t		mq_mutex;
73 	PGPROC	   *mq_receiver;
74 	PGPROC	   *mq_sender;
75 	uint64		mq_bytes_read;
76 	uint64		mq_bytes_written;
77 	Size		mq_ring_size;
78 	bool		mq_detached;
79 	uint8		mq_ring_offset;
80 	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
81 };
82 
83 /*
84  * This structure is a backend-private handle for access to a queue.
85  *
86  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
87  * an optional pointer to the dynamic shared memory segment that contains it.
88  * (If mqh_segment is provided, we register an on_dsm_detach callback to
89  * make sure we detach from the queue before detaching from DSM.)
90  *
91  * If this queue is intended to connect the current process with a background
92  * worker that started it, the user can pass a pointer to the worker handle
93  * to shm_mq_attach(), and we'll store it in mqh_handle.  The point of this
94  * is to allow us to begin sending to or receiving from that queue before the
95  * process we'll be communicating with has even been started.  If it fails
96  * to start, the handle will allow us to notice that and fail cleanly, rather
97  * than waiting forever; see shm_mq_wait_internal.  This is mostly useful in
98  * simple cases - e.g. where there are just 2 processes communicating; in
99  * more complex scenarios, every process may not have a BackgroundWorkerHandle
100  * available, or may need to watch for the failure of more than one other
101  * process at a time.
102  *
103  * When a message exists as a contiguous chunk of bytes in the queue - that is,
104  * it is smaller than the size of the ring buffer and does not wrap around
105  * the end - we return the message to the caller as a pointer into the buffer.
106  * For messages that are larger or happen to wrap, we reassemble the message
107  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
108  * the buffer, and mqh_buflen is the number of bytes allocated for it.
109  *
110  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
111  * are used to track the state of non-blocking operations.  When the caller
112  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
113  * are expected to retry the call at a later time with the same argument;
114  * we need to retain enough state to pick up where we left off.
115  * mqh_length_word_complete tracks whether we are done sending or receiving
116  * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
117  * the number of bytes read or written for either the length word or the
118  * message itself, and mqh_expected_bytes - which is used only for reads -
119  * tracks the expected total size of the payload.
120  *
121  * mqh_counterparty_attached tracks whether we know the counterparty to have
122  * attached to the queue at some previous point.  This lets us avoid some
123  * mutex acquisitions.
124  *
125  * mqh_context is the memory context in effect at the time we attached to
126  * the shm_mq.  The shm_mq_handle itself is allocated in this context, and
127  * we make sure any other allocations we do happen in this context as well,
128  * to avoid nasty surprises.
129  */
130 struct shm_mq_handle
131 {
132 	shm_mq	   *mqh_queue;
133 	dsm_segment *mqh_segment;
134 	BackgroundWorkerHandle *mqh_handle;
135 	char	   *mqh_buffer;
136 	Size		mqh_buflen;
137 	Size		mqh_consume_pending;
138 	Size		mqh_partial_bytes;
139 	Size		mqh_expected_bytes;
140 	bool		mqh_length_word_complete;
141 	bool		mqh_counterparty_attached;
142 	MemoryContext mqh_context;
143 };
144 
145 static void shm_mq_detach_internal(shm_mq *mq);
146 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
147 				  const void *data, bool nowait, Size *bytes_written);
148 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
149 					 bool nowait, Size *nbytesp, void **datap);
150 static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
151 						 BackgroundWorkerHandle *handle);
152 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
153 					 BackgroundWorkerHandle *handle);
154 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
155 static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
156 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
157 static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
158 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
159 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
160 
161 /* Minimum queue size is enough for header and at least one chunk of data. */
162 const Size	shm_mq_minimum_size =
163 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
164 
165 #define MQH_INITIAL_BUFSIZE				8192
166 
167 /*
168  * Initialize a new shared message queue.
169  */
170 shm_mq *
shm_mq_create(void * address,Size size)171 shm_mq_create(void *address, Size size)
172 {
173 	shm_mq	   *mq = address;
174 	Size		data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
175 
176 	/* If the size isn't MAXALIGN'd, just discard the odd bytes. */
177 	size = MAXALIGN_DOWN(size);
178 
179 	/* Queue size must be large enough to hold some data. */
180 	Assert(size > data_offset);
181 
182 	/* Initialize queue header. */
183 	SpinLockInit(&mq->mq_mutex);
184 	mq->mq_receiver = NULL;
185 	mq->mq_sender = NULL;
186 	mq->mq_bytes_read = 0;
187 	mq->mq_bytes_written = 0;
188 	mq->mq_ring_size = size - data_offset;
189 	mq->mq_detached = false;
190 	mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
191 
192 	return mq;
193 }
194 
195 /*
196  * Set the identity of the process that will receive from a shared message
197  * queue.
198  */
199 void
shm_mq_set_receiver(shm_mq * mq,PGPROC * proc)200 shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
201 {
202 	volatile shm_mq *vmq = mq;
203 	PGPROC	   *sender;
204 
205 	SpinLockAcquire(&mq->mq_mutex);
206 	Assert(vmq->mq_receiver == NULL);
207 	vmq->mq_receiver = proc;
208 	sender = vmq->mq_sender;
209 	SpinLockRelease(&mq->mq_mutex);
210 
211 	if (sender != NULL)
212 		SetLatch(&sender->procLatch);
213 }
214 
215 /*
216  * Set the identity of the process that will send to a shared message queue.
217  */
218 void
shm_mq_set_sender(shm_mq * mq,PGPROC * proc)219 shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
220 {
221 	volatile shm_mq *vmq = mq;
222 	PGPROC	   *receiver;
223 
224 	SpinLockAcquire(&mq->mq_mutex);
225 	Assert(vmq->mq_sender == NULL);
226 	vmq->mq_sender = proc;
227 	receiver = vmq->mq_receiver;
228 	SpinLockRelease(&mq->mq_mutex);
229 
230 	if (receiver != NULL)
231 		SetLatch(&receiver->procLatch);
232 }
233 
234 /*
235  * Get the configured receiver.
236  */
237 PGPROC *
shm_mq_get_receiver(shm_mq * mq)238 shm_mq_get_receiver(shm_mq *mq)
239 {
240 	volatile shm_mq *vmq = mq;
241 	PGPROC	   *receiver;
242 
243 	SpinLockAcquire(&mq->mq_mutex);
244 	receiver = vmq->mq_receiver;
245 	SpinLockRelease(&mq->mq_mutex);
246 
247 	return receiver;
248 }
249 
250 /*
251  * Get the configured sender.
252  */
253 PGPROC *
shm_mq_get_sender(shm_mq * mq)254 shm_mq_get_sender(shm_mq *mq)
255 {
256 	volatile shm_mq *vmq = mq;
257 	PGPROC	   *sender;
258 
259 	SpinLockAcquire(&mq->mq_mutex);
260 	sender = vmq->mq_sender;
261 	SpinLockRelease(&mq->mq_mutex);
262 
263 	return sender;
264 }
265 
266 /*
267  * Attach to a shared message queue so we can send or receive messages.
268  *
269  * The memory context in effect at the time this function is called should
270  * be one which will last for at least as long as the message queue itself.
271  * We'll allocate the handle in that context, and future allocations that
272  * are needed to buffer incoming data will happen in that context as well.
273  *
274  * If seg != NULL, the queue will be automatically detached when that dynamic
275  * shared memory segment is detached.
276  *
277  * If handle != NULL, the queue can be read or written even before the
278  * other process has attached.  We'll wait for it to do so if needed.  The
279  * handle must be for a background worker initialized with bgw_notify_pid
280  * equal to our PID.
281  *
282  * shm_mq_detach() should be called when done.  This will free the
283  * shm_mq_handle and mark the queue itself as detached, so that our
284  * counterpart won't get stuck waiting for us to fill or drain the queue
285  * after we've already lost interest.
286  */
287 shm_mq_handle *
shm_mq_attach(shm_mq * mq,dsm_segment * seg,BackgroundWorkerHandle * handle)288 shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
289 {
290 	shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
291 
292 	Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
293 	mqh->mqh_queue = mq;
294 	mqh->mqh_segment = seg;
295 	mqh->mqh_handle = handle;
296 	mqh->mqh_buffer = NULL;
297 	mqh->mqh_buflen = 0;
298 	mqh->mqh_consume_pending = 0;
299 	mqh->mqh_partial_bytes = 0;
300 	mqh->mqh_expected_bytes = 0;
301 	mqh->mqh_length_word_complete = false;
302 	mqh->mqh_counterparty_attached = false;
303 	mqh->mqh_context = CurrentMemoryContext;
304 
305 	if (seg != NULL)
306 		on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
307 
308 	return mqh;
309 }
310 
311 /*
312  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
313  * been passed to shm_mq_attach.
314  */
315 void
shm_mq_set_handle(shm_mq_handle * mqh,BackgroundWorkerHandle * handle)316 shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
317 {
318 	Assert(mqh->mqh_handle == NULL);
319 	mqh->mqh_handle = handle;
320 }
321 
322 /*
323  * Write a message into a shared message queue.
324  */
325 shm_mq_result
shm_mq_send(shm_mq_handle * mqh,Size nbytes,const void * data,bool nowait)326 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
327 {
328 	shm_mq_iovec iov;
329 
330 	iov.data = data;
331 	iov.len = nbytes;
332 
333 	return shm_mq_sendv(mqh, &iov, 1, nowait);
334 }
335 
336 /*
337  * Write a message into a shared message queue, gathered from multiple
338  * addresses.
339  *
340  * When nowait = false, we'll wait on our process latch when the ring buffer
341  * fills up, and then continue writing once the receiver has drained some data.
342  * The process latch is reset after each wait.
343  *
344  * When nowait = true, we do not manipulate the state of the process latch;
345  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
346  * this case, the caller should call this function again, with the same
347  * arguments, each time the process latch is set.  (Once begun, the sending
348  * of a message cannot be aborted except by detaching from the queue; changing
349  * the length or payload will corrupt the queue.)
350  */
351 shm_mq_result
shm_mq_sendv(shm_mq_handle * mqh,shm_mq_iovec * iov,int iovcnt,bool nowait)352 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
353 {
354 	shm_mq_result res;
355 	shm_mq	   *mq = mqh->mqh_queue;
356 	Size		nbytes = 0;
357 	Size		bytes_written;
358 	int			i;
359 	int			which_iov = 0;
360 	Size		offset;
361 
362 	Assert(mq->mq_sender == MyProc);
363 
364 	/* Compute total size of write. */
365 	for (i = 0; i < iovcnt; ++i)
366 		nbytes += iov[i].len;
367 
368 	/* Prevent writing messages overwhelming the receiver. */
369 	if (nbytes > MaxAllocSize)
370 		ereport(ERROR,
371 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
372 				 errmsg("cannot send a message of size %zu via shared memory queue",
373 						nbytes)));
374 
375 	/* Try to write, or finish writing, the length word into the buffer. */
376 	while (!mqh->mqh_length_word_complete)
377 	{
378 		Assert(mqh->mqh_partial_bytes < sizeof(Size));
379 		res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
380 								((char *) &nbytes) + mqh->mqh_partial_bytes,
381 								nowait, &bytes_written);
382 
383 		if (res == SHM_MQ_DETACHED)
384 		{
385 			/* Reset state in case caller tries to send another message. */
386 			mqh->mqh_partial_bytes = 0;
387 			mqh->mqh_length_word_complete = false;
388 			return res;
389 		}
390 		mqh->mqh_partial_bytes += bytes_written;
391 
392 		if (mqh->mqh_partial_bytes >= sizeof(Size))
393 		{
394 			Assert(mqh->mqh_partial_bytes == sizeof(Size));
395 
396 			mqh->mqh_partial_bytes = 0;
397 			mqh->mqh_length_word_complete = true;
398 		}
399 
400 		if (res != SHM_MQ_SUCCESS)
401 			return res;
402 
403 		/* Length word can't be split unless bigger than required alignment. */
404 		Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
405 	}
406 
407 	/* Write the actual data bytes into the buffer. */
408 	Assert(mqh->mqh_partial_bytes <= nbytes);
409 	offset = mqh->mqh_partial_bytes;
410 	do
411 	{
412 		Size		chunksize;
413 
414 		/* Figure out which bytes need to be sent next. */
415 		if (offset >= iov[which_iov].len)
416 		{
417 			offset -= iov[which_iov].len;
418 			++which_iov;
419 			if (which_iov >= iovcnt)
420 				break;
421 			continue;
422 		}
423 
424 		/*
425 		 * We want to avoid copying the data if at all possible, but every
426 		 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
427 		 * the last.  Thus, if a chunk other than the last one ends on a
428 		 * non-MAXALIGN'd boundary, we have to combine the tail end of its
429 		 * data with data from one or more following chunks until we either
430 		 * reach the last chunk or accumulate a number of bytes which is
431 		 * MAXALIGN'd.
432 		 */
433 		if (which_iov + 1 < iovcnt &&
434 			offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
435 		{
436 			char		tmpbuf[MAXIMUM_ALIGNOF];
437 			int			j = 0;
438 
439 			for (;;)
440 			{
441 				if (offset < iov[which_iov].len)
442 				{
443 					tmpbuf[j] = iov[which_iov].data[offset];
444 					j++;
445 					offset++;
446 					if (j == MAXIMUM_ALIGNOF)
447 						break;
448 				}
449 				else
450 				{
451 					offset -= iov[which_iov].len;
452 					which_iov++;
453 					if (which_iov >= iovcnt)
454 						break;
455 				}
456 			}
457 
458 			res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
459 
460 			if (res == SHM_MQ_DETACHED)
461 			{
462 				/* Reset state in case caller tries to send another message. */
463 				mqh->mqh_partial_bytes = 0;
464 				mqh->mqh_length_word_complete = false;
465 				return res;
466 			}
467 
468 			mqh->mqh_partial_bytes += bytes_written;
469 			if (res != SHM_MQ_SUCCESS)
470 				return res;
471 			continue;
472 		}
473 
474 		/*
475 		 * If this is the last chunk, we can write all the data, even if it
476 		 * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
477 		 * MAXALIGN_DOWN the write size.
478 		 */
479 		chunksize = iov[which_iov].len - offset;
480 		if (which_iov + 1 < iovcnt)
481 			chunksize = MAXALIGN_DOWN(chunksize);
482 		res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
483 								nowait, &bytes_written);
484 
485 		if (res == SHM_MQ_DETACHED)
486 		{
487 			/* Reset state in case caller tries to send another message. */
488 			mqh->mqh_length_word_complete = false;
489 			mqh->mqh_partial_bytes = 0;
490 			return res;
491 		}
492 
493 		mqh->mqh_partial_bytes += bytes_written;
494 		offset += bytes_written;
495 		if (res != SHM_MQ_SUCCESS)
496 			return res;
497 	} while (mqh->mqh_partial_bytes < nbytes);
498 
499 	/* Reset for next message. */
500 	mqh->mqh_partial_bytes = 0;
501 	mqh->mqh_length_word_complete = false;
502 
503 	/* Notify receiver of the newly-written data, and return. */
504 	return shm_mq_notify_receiver(mq);
505 }
506 
507 /*
508  * Receive a message from a shared message queue.
509  *
510  * We set *nbytes to the message length and *data to point to the message
511  * payload.  If the entire message exists in the queue as a single,
512  * contiguous chunk, *data will point directly into shared memory; otherwise,
513  * it will point to a temporary buffer.  This mostly avoids data copying in
514  * the hoped-for case where messages are short compared to the buffer size,
515  * while still allowing longer messages.  In either case, the return value
516  * remains valid until the next receive operation is performed on the queue.
517  *
518  * When nowait = false, we'll wait on our process latch when the ring buffer
519  * is empty and we have not yet received a full message.  The sender will
520  * set our process latch after more data has been written, and we'll resume
521  * processing.  Each call will therefore return a complete message
522  * (unless the sender detaches the queue).
523  *
524  * When nowait = true, we do not manipulate the state of the process latch;
525  * instead, whenever the buffer is empty and we need to read from it, we
526  * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
527  * function again after the process latch has been set.
528  */
529 shm_mq_result
shm_mq_receive(shm_mq_handle * mqh,Size * nbytesp,void ** datap,bool nowait)530 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
531 {
532 	shm_mq	   *mq = mqh->mqh_queue;
533 	shm_mq_result res;
534 	Size		rb = 0;
535 	Size		nbytes;
536 	void	   *rawdata;
537 
538 	Assert(mq->mq_receiver == MyProc);
539 
540 	/* We can't receive data until the sender has attached. */
541 	if (!mqh->mqh_counterparty_attached)
542 	{
543 		if (nowait)
544 		{
545 			int			counterparty_gone;
546 
547 			/*
548 			 * We shouldn't return at this point at all unless the sender
549 			 * hasn't attached yet.  However, the correct return value depends
550 			 * on whether the sender is still attached.  If we first test
551 			 * whether the sender has ever attached and then test whether the
552 			 * sender has detached, there's a race condition: a sender that
553 			 * attaches and detaches very quickly might fool us into thinking
554 			 * the sender never attached at all.  So, test whether our
555 			 * counterparty is definitively gone first, and only afterwards
556 			 * check whether the sender ever attached in the first place.
557 			 */
558 			counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
559 			if (shm_mq_get_sender(mq) == NULL)
560 			{
561 				if (counterparty_gone)
562 					return SHM_MQ_DETACHED;
563 				else
564 					return SHM_MQ_WOULD_BLOCK;
565 			}
566 		}
567 		else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
568 				 && shm_mq_get_sender(mq) == NULL)
569 		{
570 			mq->mq_detached = true;
571 			return SHM_MQ_DETACHED;
572 		}
573 		mqh->mqh_counterparty_attached = true;
574 	}
575 
576 	/* Consume any zero-copy data from previous receive operation. */
577 	if (mqh->mqh_consume_pending > 0)
578 	{
579 		shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
580 		mqh->mqh_consume_pending = 0;
581 	}
582 
583 	/* Try to read, or finish reading, the length word from the buffer. */
584 	while (!mqh->mqh_length_word_complete)
585 	{
586 		/* Try to receive the message length word. */
587 		Assert(mqh->mqh_partial_bytes < sizeof(Size));
588 		res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
589 								   nowait, &rb, &rawdata);
590 		if (res != SHM_MQ_SUCCESS)
591 			return res;
592 
593 		/*
594 		 * Hopefully, we'll receive the entire message length word at once.
595 		 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
596 		 * multiple reads.
597 		 */
598 		if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
599 		{
600 			Size		needed;
601 
602 			nbytes = *(Size *) rawdata;
603 
604 			/* If we've already got the whole message, we're done. */
605 			needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
606 			if (rb >= needed)
607 			{
608 				/*
609 				 * Technically, we could consume the message length
610 				 * information at this point, but the extra write to shared
611 				 * memory wouldn't be free and in most cases we would reap no
612 				 * benefit.
613 				 */
614 				mqh->mqh_consume_pending = needed;
615 				*nbytesp = nbytes;
616 				*datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
617 				return SHM_MQ_SUCCESS;
618 			}
619 
620 			/*
621 			 * We don't have the whole message, but we at least have the whole
622 			 * length word.
623 			 */
624 			mqh->mqh_expected_bytes = nbytes;
625 			mqh->mqh_length_word_complete = true;
626 			shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
627 			rb -= MAXALIGN(sizeof(Size));
628 		}
629 		else
630 		{
631 			Size		lengthbytes;
632 
633 			/* Can't be split unless bigger than required alignment. */
634 			Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
635 
636 			/* Message word is split; need buffer to reassemble. */
637 			if (mqh->mqh_buffer == NULL)
638 			{
639 				mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
640 													 MQH_INITIAL_BUFSIZE);
641 				mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
642 			}
643 			Assert(mqh->mqh_buflen >= sizeof(Size));
644 
645 			/* Copy and consume partial length word. */
646 			if (mqh->mqh_partial_bytes + rb > sizeof(Size))
647 				lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
648 			else
649 				lengthbytes = rb;
650 			memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
651 				   lengthbytes);
652 			mqh->mqh_partial_bytes += lengthbytes;
653 			shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
654 			rb -= lengthbytes;
655 
656 			/* If we now have the whole word, we're ready to read payload. */
657 			if (mqh->mqh_partial_bytes >= sizeof(Size))
658 			{
659 				Assert(mqh->mqh_partial_bytes == sizeof(Size));
660 				mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
661 				mqh->mqh_length_word_complete = true;
662 				mqh->mqh_partial_bytes = 0;
663 			}
664 		}
665 	}
666 	nbytes = mqh->mqh_expected_bytes;
667 
668 	/*
669 	 * Should be disallowed on the sending side already, but better check and
670 	 * error out on the receiver side as well rather than trying to read a
671 	 * prohibitively large message.
672 	 */
673 	if (nbytes > MaxAllocSize)
674 		ereport(ERROR,
675 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
676 				 errmsg("invalid message size %zu in shared memory queue",
677 						nbytes)));
678 
679 	if (mqh->mqh_partial_bytes == 0)
680 	{
681 		/*
682 		 * Try to obtain the whole message in a single chunk.  If this works,
683 		 * we need not copy the data and can return a pointer directly into
684 		 * shared memory.
685 		 */
686 		res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
687 		if (res != SHM_MQ_SUCCESS)
688 			return res;
689 		if (rb >= nbytes)
690 		{
691 			mqh->mqh_length_word_complete = false;
692 			mqh->mqh_consume_pending = MAXALIGN(nbytes);
693 			*nbytesp = nbytes;
694 			*datap = rawdata;
695 			return SHM_MQ_SUCCESS;
696 		}
697 
698 		/*
699 		 * The message has wrapped the buffer.  We'll need to copy it in order
700 		 * to return it to the client in one chunk.  First, make sure we have
701 		 * a large enough buffer available.
702 		 */
703 		if (mqh->mqh_buflen < nbytes)
704 		{
705 			Size		newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
706 
707 			/*
708 			 * Double the buffer size until the payload fits, but limit to
709 			 * MaxAllocSize.
710 			 */
711 			while (newbuflen < nbytes)
712 				newbuflen *= 2;
713 			newbuflen = Min(newbuflen, MaxAllocSize);
714 
715 			if (mqh->mqh_buffer != NULL)
716 			{
717 				pfree(mqh->mqh_buffer);
718 				mqh->mqh_buffer = NULL;
719 				mqh->mqh_buflen = 0;
720 			}
721 			mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
722 			mqh->mqh_buflen = newbuflen;
723 		}
724 	}
725 
726 	/* Loop until we've copied the entire message. */
727 	for (;;)
728 	{
729 		Size		still_needed;
730 
731 		/* Copy as much as we can. */
732 		Assert(mqh->mqh_partial_bytes + rb <= nbytes);
733 		memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
734 		mqh->mqh_partial_bytes += rb;
735 
736 		/*
737 		 * Update count of bytes read, with alignment padding.  Note that this
738 		 * will never actually insert any padding except at the end of a
739 		 * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
740 		 * and each read and write is as well.
741 		 */
742 		Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
743 		shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
744 
745 		/* If we got all the data, exit the loop. */
746 		if (mqh->mqh_partial_bytes >= nbytes)
747 			break;
748 
749 		/* Wait for some more data. */
750 		still_needed = nbytes - mqh->mqh_partial_bytes;
751 		res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
752 		if (res != SHM_MQ_SUCCESS)
753 			return res;
754 		if (rb > still_needed)
755 			rb = still_needed;
756 	}
757 
758 	/* Return the complete message, and reset for next message. */
759 	*nbytesp = nbytes;
760 	*datap = mqh->mqh_buffer;
761 	mqh->mqh_length_word_complete = false;
762 	mqh->mqh_partial_bytes = 0;
763 	return SHM_MQ_SUCCESS;
764 }
765 
766 /*
767  * Wait for the other process that's supposed to use this queue to attach
768  * to it.
769  *
770  * The return value is SHM_MQ_DETACHED if the worker has already detached or
771  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
772  * Note that we will only be able to detect that the worker has died before
773  * attaching if a background worker handle was passed to shm_mq_attach().
774  */
775 shm_mq_result
shm_mq_wait_for_attach(shm_mq_handle * mqh)776 shm_mq_wait_for_attach(shm_mq_handle *mqh)
777 {
778 	shm_mq	   *mq = mqh->mqh_queue;
779 	PGPROC	  **victim;
780 
781 	if (shm_mq_get_receiver(mq) == MyProc)
782 		victim = &mq->mq_sender;
783 	else
784 	{
785 		Assert(shm_mq_get_sender(mq) == MyProc);
786 		victim = &mq->mq_receiver;
787 	}
788 
789 	if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
790 		return SHM_MQ_SUCCESS;
791 	else
792 		return SHM_MQ_DETACHED;
793 }
794 
795 /*
796  * Detach from a shared message queue, and destroy the shm_mq_handle.
797  */
798 void
shm_mq_detach(shm_mq_handle * mqh)799 shm_mq_detach(shm_mq_handle *mqh)
800 {
801 	/* Notify counterparty that we're outta here. */
802 	shm_mq_detach_internal(mqh->mqh_queue);
803 
804 	/* Cancel on_dsm_detach callback, if any. */
805 	if (mqh->mqh_segment)
806 		cancel_on_dsm_detach(mqh->mqh_segment,
807 							 shm_mq_detach_callback,
808 							 PointerGetDatum(mqh->mqh_queue));
809 
810 	/* Release local memory associated with handle. */
811 	if (mqh->mqh_buffer != NULL)
812 		pfree(mqh->mqh_buffer);
813 	pfree(mqh);
814 }
815 
816 /*
817  * Notify counterparty that we're detaching from shared message queue.
818  *
819  * The purpose of this function is to make sure that the process
820  * with which we're communicating doesn't block forever waiting for us to
821  * fill or drain the queue once we've lost interest.  When the sender
822  * detaches, the receiver can read any messages remaining in the queue;
823  * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
824  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
825  *
826  * This is separated out from shm_mq_detach() because if the on_dsm_detach
827  * callback fires, we only want to do this much.  We do not try to touch
828  * the local shm_mq_handle, as it may have been pfree'd already.
829  */
830 static void
shm_mq_detach_internal(shm_mq * mq)831 shm_mq_detach_internal(shm_mq *mq)
832 {
833 	volatile shm_mq *vmq = mq;
834 	PGPROC	   *victim;
835 
836 	SpinLockAcquire(&mq->mq_mutex);
837 	if (vmq->mq_sender == MyProc)
838 		victim = vmq->mq_receiver;
839 	else
840 	{
841 		Assert(vmq->mq_receiver == MyProc);
842 		victim = vmq->mq_sender;
843 	}
844 	vmq->mq_detached = true;
845 	SpinLockRelease(&mq->mq_mutex);
846 
847 	if (victim != NULL)
848 		SetLatch(&victim->procLatch);
849 }
850 
851 /*
852  * Get the shm_mq from handle.
853  */
854 shm_mq *
shm_mq_get_queue(shm_mq_handle * mqh)855 shm_mq_get_queue(shm_mq_handle *mqh)
856 {
857 	return mqh->mqh_queue;
858 }
859 
860 /*
861  * Write bytes into a shared message queue.
862  */
863 static shm_mq_result
shm_mq_send_bytes(shm_mq_handle * mqh,Size nbytes,const void * data,bool nowait,Size * bytes_written)864 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
865 				  bool nowait, Size *bytes_written)
866 {
867 	shm_mq	   *mq = mqh->mqh_queue;
868 	Size		sent = 0;
869 	uint64		used;
870 	Size		ringsize = mq->mq_ring_size;
871 	Size		available;
872 
873 	while (sent < nbytes)
874 	{
875 		bool		detached;
876 		uint64		rb;
877 
878 		/* Compute number of ring buffer bytes used and available. */
879 		rb = shm_mq_get_bytes_read(mq, &detached);
880 		Assert(mq->mq_bytes_written >= rb);
881 		used = mq->mq_bytes_written - rb;
882 		Assert(used <= ringsize);
883 		available = Min(ringsize - used, nbytes - sent);
884 
885 		/* Bail out if the queue has been detached. */
886 		if (detached)
887 		{
888 			*bytes_written = sent;
889 			return SHM_MQ_DETACHED;
890 		}
891 
892 		if (available == 0 && !mqh->mqh_counterparty_attached)
893 		{
894 			/*
895 			 * The queue is full, so if the receiver isn't yet known to be
896 			 * attached, we must wait for that to happen.
897 			 */
898 			if (nowait)
899 			{
900 				if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
901 				{
902 					*bytes_written = sent;
903 					return SHM_MQ_DETACHED;
904 				}
905 				if (shm_mq_get_receiver(mq) == NULL)
906 				{
907 					*bytes_written = sent;
908 					return SHM_MQ_WOULD_BLOCK;
909 				}
910 			}
911 			else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
912 										   mqh->mqh_handle))
913 			{
914 				mq->mq_detached = true;
915 				*bytes_written = sent;
916 				return SHM_MQ_DETACHED;
917 			}
918 			mqh->mqh_counterparty_attached = true;
919 
920 			/*
921 			 * The receiver may have read some data after attaching, so we
922 			 * must not wait without rechecking the queue state.
923 			 */
924 		}
925 		else if (available == 0)
926 		{
927 			shm_mq_result res;
928 
929 			/* Let the receiver know that we need them to read some data. */
930 			res = shm_mq_notify_receiver(mq);
931 			if (res != SHM_MQ_SUCCESS)
932 			{
933 				*bytes_written = sent;
934 				return res;
935 			}
936 
937 			/* Skip manipulation of our latch if nowait = true. */
938 			if (nowait)
939 			{
940 				*bytes_written = sent;
941 				return SHM_MQ_WOULD_BLOCK;
942 			}
943 
944 			/*
945 			 * Wait for our latch to be set.  It might already be set for some
946 			 * unrelated reason, but that'll just result in one extra trip
947 			 * through the loop.  It's worth it to avoid resetting the latch
948 			 * at top of loop, because setting an already-set latch is much
949 			 * cheaper than setting one that has been reset.
950 			 */
951 			WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
952 
953 			/* Reset the latch so we don't spin. */
954 			ResetLatch(MyLatch);
955 
956 			/* An interrupt may have occurred while we were waiting. */
957 			CHECK_FOR_INTERRUPTS();
958 		}
959 		else
960 		{
961 			Size		offset = mq->mq_bytes_written % (uint64) ringsize;
962 			Size		sendnow = Min(available, ringsize - offset);
963 
964 			/* Write as much data as we can via a single memcpy(). */
965 			memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
966 				   (char *) data + sent, sendnow);
967 			sent += sendnow;
968 
969 			/*
970 			 * Update count of bytes written, with alignment padding.  Note
971 			 * that this will never actually insert any padding except at the
972 			 * end of a run of bytes, because the buffer size is a multiple of
973 			 * MAXIMUM_ALIGNOF, and each read is as well.
974 			 */
975 			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
976 			shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
977 
978 			/*
979 			 * For efficiency, we don't set the reader's latch here.  We'll do
980 			 * that only when the buffer fills up or after writing an entire
981 			 * message.
982 			 */
983 		}
984 	}
985 
986 	*bytes_written = sent;
987 	return SHM_MQ_SUCCESS;
988 }
989 
990 /*
991  * Wait until at least *nbytesp bytes are available to be read from the
992  * shared message queue, or until the buffer wraps around.  If the queue is
993  * detached, returns SHM_MQ_DETACHED.  If nowait is specified and a wait
994  * would be required, returns SHM_MQ_WOULD_BLOCK.  Otherwise, *datap is set
995  * to the location at which data bytes can be read, *nbytesp is set to the
996  * number of bytes which can be read at that address, and the return value
997  * is SHM_MQ_SUCCESS.
998  */
999 static shm_mq_result
shm_mq_receive_bytes(shm_mq * mq,Size bytes_needed,bool nowait,Size * nbytesp,void ** datap)1000 shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
1001 					 Size *nbytesp, void **datap)
1002 {
1003 	Size		ringsize = mq->mq_ring_size;
1004 	uint64		used;
1005 	uint64		written;
1006 
1007 	for (;;)
1008 	{
1009 		Size		offset;
1010 		bool		detached;
1011 
1012 		/* Get bytes written, so we can compute what's available to read. */
1013 		written = shm_mq_get_bytes_written(mq, &detached);
1014 		used = written - mq->mq_bytes_read;
1015 		Assert(used <= ringsize);
1016 		offset = mq->mq_bytes_read % (uint64) ringsize;
1017 
1018 		/* If we have enough data or buffer has wrapped, we're done. */
1019 		if (used >= bytes_needed || offset + used >= ringsize)
1020 		{
1021 			*nbytesp = Min(used, ringsize - offset);
1022 			*datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1023 			return SHM_MQ_SUCCESS;
1024 		}
1025 
1026 		/*
1027 		 * Fall out before waiting if the queue has been detached.
1028 		 *
1029 		 * Note that we don't check for this until *after* considering whether
1030 		 * the data already available is enough, since the receiver can finish
1031 		 * receiving a message stored in the buffer even after the sender has
1032 		 * detached.
1033 		 */
1034 		if (detached)
1035 			return SHM_MQ_DETACHED;
1036 
1037 		/* Skip manipulation of our latch if nowait = true. */
1038 		if (nowait)
1039 			return SHM_MQ_WOULD_BLOCK;
1040 
1041 		/*
1042 		 * Wait for our latch to be set.  It might already be set for some
1043 		 * unrelated reason, but that'll just result in one extra trip through
1044 		 * the loop.  It's worth it to avoid resetting the latch at top of
1045 		 * loop, because setting an already-set latch is much cheaper than
1046 		 * setting one that has been reset.
1047 		 */
1048 		WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_RECEIVE);
1049 
1050 		/* Reset the latch so we don't spin. */
1051 		ResetLatch(MyLatch);
1052 
1053 		/* An interrupt may have occurred while we were waiting. */
1054 		CHECK_FOR_INTERRUPTS();
1055 	}
1056 }
1057 
1058 /*
1059  * Test whether a counterparty who may not even be alive yet is definitely gone.
1060  */
1061 static bool
shm_mq_counterparty_gone(volatile shm_mq * mq,BackgroundWorkerHandle * handle)1062 shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
1063 {
1064 	bool		detached;
1065 	pid_t		pid;
1066 
1067 	/* Acquire the lock just long enough to check the pointer. */
1068 	SpinLockAcquire(&mq->mq_mutex);
1069 	detached = mq->mq_detached;
1070 	SpinLockRelease(&mq->mq_mutex);
1071 
1072 	/* If the queue has been detached, counterparty is definitely gone. */
1073 	if (detached)
1074 		return true;
1075 
1076 	/* If there's a handle, check worker status. */
1077 	if (handle != NULL)
1078 	{
1079 		BgwHandleStatus status;
1080 
1081 		/* Check for unexpected worker death. */
1082 		status = GetBackgroundWorkerPid(handle, &pid);
1083 		if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1084 		{
1085 			/* Mark it detached, just to make it official. */
1086 			SpinLockAcquire(&mq->mq_mutex);
1087 			mq->mq_detached = true;
1088 			SpinLockRelease(&mq->mq_mutex);
1089 			return true;
1090 		}
1091 	}
1092 
1093 	/* Counterparty is not definitively gone. */
1094 	return false;
1095 }
1096 
1097 /*
1098  * This is used when a process is waiting for its counterpart to attach to the
1099  * queue.  We exit when the other process attaches as expected, or, if
1100  * handle != NULL, when the referenced background process or the postmaster
1101  * dies.  Note that if handle == NULL, and the process fails to attach, we'll
1102  * potentially get stuck here forever waiting for a process that may never
1103  * start.  We do check for interrupts, though.
1104  *
1105  * ptr is a pointer to the memory address that we're expecting to become
1106  * non-NULL when our counterpart attaches to the queue.
1107  */
1108 static bool
shm_mq_wait_internal(volatile shm_mq * mq,PGPROC * volatile * ptr,BackgroundWorkerHandle * handle)1109 shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
1110 					 BackgroundWorkerHandle *handle)
1111 {
1112 	bool		result = false;
1113 
1114 	for (;;)
1115 	{
1116 		BgwHandleStatus status;
1117 		pid_t		pid;
1118 		bool		detached;
1119 
1120 		/* Acquire the lock just long enough to check the pointer. */
1121 		SpinLockAcquire(&mq->mq_mutex);
1122 		detached = mq->mq_detached;
1123 		result = (*ptr != NULL);
1124 		SpinLockRelease(&mq->mq_mutex);
1125 
1126 		/* Fail if detached; else succeed if initialized. */
1127 		if (detached)
1128 		{
1129 			result = false;
1130 			break;
1131 		}
1132 		if (result)
1133 			break;
1134 
1135 		if (handle != NULL)
1136 		{
1137 			/* Check for unexpected worker death. */
1138 			status = GetBackgroundWorkerPid(handle, &pid);
1139 			if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1140 			{
1141 				result = false;
1142 				break;
1143 			}
1144 		}
1145 
1146 		/* Wait to be signalled. */
1147 		WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_INTERNAL);
1148 
1149 		/* Reset the latch so we don't spin. */
1150 		ResetLatch(MyLatch);
1151 
1152 		/* An interrupt may have occurred while we were waiting. */
1153 		CHECK_FOR_INTERRUPTS();
1154 	}
1155 
1156 	return result;
1157 }
1158 
1159 /*
1160  * Get the number of bytes read.  The receiver need not use this to access
1161  * the count of bytes read, but the sender must.
1162  */
1163 static uint64
shm_mq_get_bytes_read(volatile shm_mq * mq,bool * detached)1164 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
1165 {
1166 	uint64		v;
1167 
1168 	SpinLockAcquire(&mq->mq_mutex);
1169 	v = mq->mq_bytes_read;
1170 	*detached = mq->mq_detached;
1171 	SpinLockRelease(&mq->mq_mutex);
1172 
1173 	return v;
1174 }
1175 
1176 /*
1177  * Increment the number of bytes read.
1178  */
1179 static void
shm_mq_inc_bytes_read(volatile shm_mq * mq,Size n)1180 shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
1181 {
1182 	PGPROC	   *sender;
1183 
1184 	SpinLockAcquire(&mq->mq_mutex);
1185 	mq->mq_bytes_read += n;
1186 	sender = mq->mq_sender;
1187 	SpinLockRelease(&mq->mq_mutex);
1188 
1189 	/* We shouldn't have any bytes to read without a sender. */
1190 	Assert(sender != NULL);
1191 	SetLatch(&sender->procLatch);
1192 }
1193 
1194 /*
1195  * Get the number of bytes written.  The sender need not use this to access
1196  * the count of bytes written, but the receiver must.
1197  */
1198 static uint64
shm_mq_get_bytes_written(volatile shm_mq * mq,bool * detached)1199 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
1200 {
1201 	uint64		v;
1202 
1203 	SpinLockAcquire(&mq->mq_mutex);
1204 	v = mq->mq_bytes_written;
1205 	*detached = mq->mq_detached;
1206 	SpinLockRelease(&mq->mq_mutex);
1207 
1208 	return v;
1209 }
1210 
1211 /*
1212  * Increment the number of bytes written.
1213  */
1214 static void
shm_mq_inc_bytes_written(volatile shm_mq * mq,Size n)1215 shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
1216 {
1217 	SpinLockAcquire(&mq->mq_mutex);
1218 	mq->mq_bytes_written += n;
1219 	SpinLockRelease(&mq->mq_mutex);
1220 }
1221 
1222 /*
1223  * Set receiver's latch, unless queue is detached.
1224  */
1225 static shm_mq_result
shm_mq_notify_receiver(volatile shm_mq * mq)1226 shm_mq_notify_receiver(volatile shm_mq *mq)
1227 {
1228 	PGPROC	   *receiver;
1229 	bool		detached;
1230 
1231 	SpinLockAcquire(&mq->mq_mutex);
1232 	detached = mq->mq_detached;
1233 	receiver = mq->mq_receiver;
1234 	SpinLockRelease(&mq->mq_mutex);
1235 
1236 	if (detached)
1237 		return SHM_MQ_DETACHED;
1238 	if (receiver)
1239 		SetLatch(&receiver->procLatch);
1240 	return SHM_MQ_SUCCESS;
1241 }
1242 
1243 /* Shim for on_dsm_callback. */
1244 static void
shm_mq_detach_callback(dsm_segment * seg,Datum arg)1245 shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1246 {
1247 	shm_mq	   *mq = (shm_mq *) DatumGetPointer(arg);
1248 
1249 	shm_mq_detach_internal(mq);
1250 }
1251