1 /*-------------------------------------------------------------------------
2  *
3  * shm_mq.c
4  *	  single-reader, single-writer shared memory message queue
5  *
6  * Both the sender and the receiver must have a PGPROC; their respective
7  * process latches are used for synchronization.  Only the sender may send,
8  * and only the receiver may receive.  This is intended to allow a user
9  * backend to communicate with worker backends that it has registered.
10  *
11  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * src/include/storage/shm_mq.h
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postmaster/bgworker.h"
24 #include "storage/procsignal.h"
25 #include "storage/shm_mq.h"
26 #include "storage/spin.h"
27 #include "utils/memutils.h"
28 
29 /*
30  * This structure represents the actual queue, stored in shared memory.
31  *
32  * Some notes on synchronization:
33  *
34  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
35  * mq_sender and mq_bytes_written can only be changed by the sender.
36  * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
37  * they cannot change once set, and thus may be read without a lock once this
38  * is known to be the case.
39  *
40  * mq_bytes_read and mq_bytes_written are not protected by the mutex.  Instead,
41  * they are written atomically using 8 byte loads and stores.  Memory barriers
42  * must be carefully used to synchronize reads and writes of these values with
43  * reads and writes of the actual data in mq_ring.
44  *
45  * mq_detached needs no locking.  It can be set by either the sender or the
46  * receiver, but only ever from false to true, so redundant writes don't
47  * matter.  It is important that if we set mq_detached and then set the
48  * counterparty's latch, the counterparty must be certain to see the change
49  * after waking up.  Since SetLatch begins with a memory barrier and ResetLatch
50  * ends with one, this should be OK.
51  *
52  * mq_ring_size and mq_ring_offset never change after initialization, and
53  * can therefore be read without the lock.
54  *
55  * Importantly, mq_ring can be safely read and written without a lock.
56  * At any given time, the difference between mq_bytes_read and
57  * mq_bytes_written defines the number of bytes within mq_ring that contain
58  * unread data, and mq_bytes_read defines the position where those bytes
59  * begin.  The sender can increase the number of unread bytes at any time,
60  * but only the receiver can give license to overwrite those bytes, by
61  * incrementing mq_bytes_read.  Therefore, it's safe for the receiver to read
62  * the unread bytes it knows to be present without the lock.  Conversely,
63  * the sender can write to the unused portion of the ring buffer without
64  * the lock, because nobody else can be reading or writing those bytes.  The
65  * receiver could be making more bytes unused by incrementing mq_bytes_read,
66  * but that's OK.  Note that it would be unsafe for the receiver to read any
67  * data it's already marked as read, or to write any data; and it would be
68  * unsafe for the sender to reread any data after incrementing
69  * mq_bytes_written, but fortunately there's no need for any of that.
70  */
71 struct shm_mq
72 {
73 	slock_t		mq_mutex;
74 	PGPROC	   *mq_receiver;
75 	PGPROC	   *mq_sender;
76 	pg_atomic_uint64 mq_bytes_read;
77 	pg_atomic_uint64 mq_bytes_written;
78 	Size		mq_ring_size;
79 	bool		mq_detached;
80 	uint8		mq_ring_offset;
81 	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
82 };
83 
84 /*
85  * This structure is a backend-private handle for access to a queue.
86  *
87  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
88  * an optional pointer to the dynamic shared memory segment that contains it.
89  * (If mqh_segment is provided, we register an on_dsm_detach callback to
90  * make sure we detach from the queue before detaching from DSM.)
91  *
92  * If this queue is intended to connect the current process with a background
93  * worker that started it, the user can pass a pointer to the worker handle
94  * to shm_mq_attach(), and we'll store it in mqh_handle.  The point of this
95  * is to allow us to begin sending to or receiving from that queue before the
96  * process we'll be communicating with has even been started.  If it fails
97  * to start, the handle will allow us to notice that and fail cleanly, rather
98  * than waiting forever; see shm_mq_wait_internal.  This is mostly useful in
99  * simple cases - e.g. where there are just 2 processes communicating; in
100  * more complex scenarios, every process may not have a BackgroundWorkerHandle
101  * available, or may need to watch for the failure of more than one other
102  * process at a time.
103  *
104  * When a message exists as a contiguous chunk of bytes in the queue - that is,
105  * it is smaller than the size of the ring buffer and does not wrap around
106  * the end - we return the message to the caller as a pointer into the buffer.
107  * For messages that are larger or happen to wrap, we reassemble the message
108  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
109  * the buffer, and mqh_buflen is the number of bytes allocated for it.
110  *
111  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
112  * are used to track the state of non-blocking operations.  When the caller
113  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
114  * are expected to retry the call at a later time with the same argument;
115  * we need to retain enough state to pick up where we left off.
116  * mqh_length_word_complete tracks whether we are done sending or receiving
117  * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
118  * the number of bytes read or written for either the length word or the
119  * message itself, and mqh_expected_bytes - which is used only for reads -
120  * tracks the expected total size of the payload.
121  *
122  * mqh_counterparty_attached tracks whether we know the counterparty to have
123  * attached to the queue at some previous point.  This lets us avoid some
124  * mutex acquisitions.
125  *
126  * mqh_context is the memory context in effect at the time we attached to
127  * the shm_mq.  The shm_mq_handle itself is allocated in this context, and
128  * we make sure any other allocations we do happen in this context as well,
129  * to avoid nasty surprises.
130  */
131 struct shm_mq_handle
132 {
133 	shm_mq	   *mqh_queue;
134 	dsm_segment *mqh_segment;
135 	BackgroundWorkerHandle *mqh_handle;
136 	char	   *mqh_buffer;
137 	Size		mqh_buflen;
138 	Size		mqh_consume_pending;
139 	Size		mqh_partial_bytes;
140 	Size		mqh_expected_bytes;
141 	bool		mqh_length_word_complete;
142 	bool		mqh_counterparty_attached;
143 	MemoryContext mqh_context;
144 };
145 
146 static void shm_mq_detach_internal(shm_mq *mq);
147 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
148 				  const void *data, bool nowait, Size *bytes_written);
149 static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
150 					 Size bytes_needed, bool nowait, Size *nbytesp,
151 					 void **datap);
152 static bool shm_mq_counterparty_gone(shm_mq *mq,
153 						 BackgroundWorkerHandle *handle);
154 static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
155 					 BackgroundWorkerHandle *handle);
156 static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
157 static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
158 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
159 
160 /* Minimum queue size is enough for header and at least one chunk of data. */
161 const Size	shm_mq_minimum_size =
162 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
163 
164 #define MQH_INITIAL_BUFSIZE				8192
165 
166 /*
167  * Initialize a new shared message queue.
168  */
169 shm_mq *
shm_mq_create(void * address,Size size)170 shm_mq_create(void *address, Size size)
171 {
172 	shm_mq	   *mq = address;
173 	Size		data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
174 
175 	/* If the size isn't MAXALIGN'd, just discard the odd bytes. */
176 	size = MAXALIGN_DOWN(size);
177 
178 	/* Queue size must be large enough to hold some data. */
179 	Assert(size > data_offset);
180 
181 	/* Initialize queue header. */
182 	SpinLockInit(&mq->mq_mutex);
183 	mq->mq_receiver = NULL;
184 	mq->mq_sender = NULL;
185 	pg_atomic_init_u64(&mq->mq_bytes_read, 0);
186 	pg_atomic_init_u64(&mq->mq_bytes_written, 0);
187 	mq->mq_ring_size = size - data_offset;
188 	mq->mq_detached = false;
189 	mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
190 
191 	return mq;
192 }
193 
194 /*
195  * Set the identity of the process that will receive from a shared message
196  * queue.
197  */
198 void
shm_mq_set_receiver(shm_mq * mq,PGPROC * proc)199 shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
200 {
201 	PGPROC	   *sender;
202 
203 	SpinLockAcquire(&mq->mq_mutex);
204 	Assert(mq->mq_receiver == NULL);
205 	mq->mq_receiver = proc;
206 	sender = mq->mq_sender;
207 	SpinLockRelease(&mq->mq_mutex);
208 
209 	if (sender != NULL)
210 		SetLatch(&sender->procLatch);
211 }
212 
213 /*
214  * Set the identity of the process that will send to a shared message queue.
215  */
216 void
shm_mq_set_sender(shm_mq * mq,PGPROC * proc)217 shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
218 {
219 	PGPROC	   *receiver;
220 
221 	SpinLockAcquire(&mq->mq_mutex);
222 	Assert(mq->mq_sender == NULL);
223 	mq->mq_sender = proc;
224 	receiver = mq->mq_receiver;
225 	SpinLockRelease(&mq->mq_mutex);
226 
227 	if (receiver != NULL)
228 		SetLatch(&receiver->procLatch);
229 }
230 
231 /*
232  * Get the configured receiver.
233  */
234 PGPROC *
shm_mq_get_receiver(shm_mq * mq)235 shm_mq_get_receiver(shm_mq *mq)
236 {
237 	PGPROC	   *receiver;
238 
239 	SpinLockAcquire(&mq->mq_mutex);
240 	receiver = mq->mq_receiver;
241 	SpinLockRelease(&mq->mq_mutex);
242 
243 	return receiver;
244 }
245 
246 /*
247  * Get the configured sender.
248  */
249 PGPROC *
shm_mq_get_sender(shm_mq * mq)250 shm_mq_get_sender(shm_mq *mq)
251 {
252 	PGPROC	   *sender;
253 
254 	SpinLockAcquire(&mq->mq_mutex);
255 	sender = mq->mq_sender;
256 	SpinLockRelease(&mq->mq_mutex);
257 
258 	return sender;
259 }
260 
261 /*
262  * Attach to a shared message queue so we can send or receive messages.
263  *
264  * The memory context in effect at the time this function is called should
265  * be one which will last for at least as long as the message queue itself.
266  * We'll allocate the handle in that context, and future allocations that
267  * are needed to buffer incoming data will happen in that context as well.
268  *
269  * If seg != NULL, the queue will be automatically detached when that dynamic
270  * shared memory segment is detached.
271  *
272  * If handle != NULL, the queue can be read or written even before the
273  * other process has attached.  We'll wait for it to do so if needed.  The
274  * handle must be for a background worker initialized with bgw_notify_pid
275  * equal to our PID.
276  *
277  * shm_mq_detach() should be called when done.  This will free the
278  * shm_mq_handle and mark the queue itself as detached, so that our
279  * counterpart won't get stuck waiting for us to fill or drain the queue
280  * after we've already lost interest.
281  */
282 shm_mq_handle *
shm_mq_attach(shm_mq * mq,dsm_segment * seg,BackgroundWorkerHandle * handle)283 shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
284 {
285 	shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
286 
287 	Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
288 	mqh->mqh_queue = mq;
289 	mqh->mqh_segment = seg;
290 	mqh->mqh_handle = handle;
291 	mqh->mqh_buffer = NULL;
292 	mqh->mqh_buflen = 0;
293 	mqh->mqh_consume_pending = 0;
294 	mqh->mqh_partial_bytes = 0;
295 	mqh->mqh_expected_bytes = 0;
296 	mqh->mqh_length_word_complete = false;
297 	mqh->mqh_counterparty_attached = false;
298 	mqh->mqh_context = CurrentMemoryContext;
299 
300 	if (seg != NULL)
301 		on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
302 
303 	return mqh;
304 }
305 
306 /*
307  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
308  * been passed to shm_mq_attach.
309  */
310 void
shm_mq_set_handle(shm_mq_handle * mqh,BackgroundWorkerHandle * handle)311 shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
312 {
313 	Assert(mqh->mqh_handle == NULL);
314 	mqh->mqh_handle = handle;
315 }
316 
317 /*
318  * Write a message into a shared message queue.
319  */
320 shm_mq_result
shm_mq_send(shm_mq_handle * mqh,Size nbytes,const void * data,bool nowait)321 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
322 {
323 	shm_mq_iovec iov;
324 
325 	iov.data = data;
326 	iov.len = nbytes;
327 
328 	return shm_mq_sendv(mqh, &iov, 1, nowait);
329 }
330 
331 /*
332  * Write a message into a shared message queue, gathered from multiple
333  * addresses.
334  *
335  * When nowait = false, we'll wait on our process latch when the ring buffer
336  * fills up, and then continue writing once the receiver has drained some data.
337  * The process latch is reset after each wait.
338  *
339  * When nowait = true, we do not manipulate the state of the process latch;
340  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
341  * this case, the caller should call this function again, with the same
342  * arguments, each time the process latch is set.  (Once begun, the sending
343  * of a message cannot be aborted except by detaching from the queue; changing
344  * the length or payload will corrupt the queue.)
345  */
346 shm_mq_result
shm_mq_sendv(shm_mq_handle * mqh,shm_mq_iovec * iov,int iovcnt,bool nowait)347 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
348 {
349 	shm_mq_result res;
350 	shm_mq	   *mq = mqh->mqh_queue;
351 	PGPROC	   *receiver;
352 	Size		nbytes = 0;
353 	Size		bytes_written;
354 	int			i;
355 	int			which_iov = 0;
356 	Size		offset;
357 
358 	Assert(mq->mq_sender == MyProc);
359 
360 	/* Compute total size of write. */
361 	for (i = 0; i < iovcnt; ++i)
362 		nbytes += iov[i].len;
363 
364 	/* Prevent writing messages overwhelming the receiver. */
365 	if (nbytes > MaxAllocSize)
366 		ereport(ERROR,
367 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
368 				 errmsg("cannot send a message of size %zu via shared memory queue",
369 						nbytes)));
370 
371 	/* Try to write, or finish writing, the length word into the buffer. */
372 	while (!mqh->mqh_length_word_complete)
373 	{
374 		Assert(mqh->mqh_partial_bytes < sizeof(Size));
375 		res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
376 								((char *) &nbytes) + mqh->mqh_partial_bytes,
377 								nowait, &bytes_written);
378 
379 		if (res == SHM_MQ_DETACHED)
380 		{
381 			/* Reset state in case caller tries to send another message. */
382 			mqh->mqh_partial_bytes = 0;
383 			mqh->mqh_length_word_complete = false;
384 			return res;
385 		}
386 		mqh->mqh_partial_bytes += bytes_written;
387 
388 		if (mqh->mqh_partial_bytes >= sizeof(Size))
389 		{
390 			Assert(mqh->mqh_partial_bytes == sizeof(Size));
391 
392 			mqh->mqh_partial_bytes = 0;
393 			mqh->mqh_length_word_complete = true;
394 		}
395 
396 		if (res != SHM_MQ_SUCCESS)
397 			return res;
398 
399 		/* Length word can't be split unless bigger than required alignment. */
400 		Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
401 	}
402 
403 	/* Write the actual data bytes into the buffer. */
404 	Assert(mqh->mqh_partial_bytes <= nbytes);
405 	offset = mqh->mqh_partial_bytes;
406 	do
407 	{
408 		Size		chunksize;
409 
410 		/* Figure out which bytes need to be sent next. */
411 		if (offset >= iov[which_iov].len)
412 		{
413 			offset -= iov[which_iov].len;
414 			++which_iov;
415 			if (which_iov >= iovcnt)
416 				break;
417 			continue;
418 		}
419 
420 		/*
421 		 * We want to avoid copying the data if at all possible, but every
422 		 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
423 		 * the last.  Thus, if a chunk other than the last one ends on a
424 		 * non-MAXALIGN'd boundary, we have to combine the tail end of its
425 		 * data with data from one or more following chunks until we either
426 		 * reach the last chunk or accumulate a number of bytes which is
427 		 * MAXALIGN'd.
428 		 */
429 		if (which_iov + 1 < iovcnt &&
430 			offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
431 		{
432 			char		tmpbuf[MAXIMUM_ALIGNOF];
433 			int			j = 0;
434 
435 			for (;;)
436 			{
437 				if (offset < iov[which_iov].len)
438 				{
439 					tmpbuf[j] = iov[which_iov].data[offset];
440 					j++;
441 					offset++;
442 					if (j == MAXIMUM_ALIGNOF)
443 						break;
444 				}
445 				else
446 				{
447 					offset -= iov[which_iov].len;
448 					which_iov++;
449 					if (which_iov >= iovcnt)
450 						break;
451 				}
452 			}
453 
454 			res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
455 
456 			if (res == SHM_MQ_DETACHED)
457 			{
458 				/* Reset state in case caller tries to send another message. */
459 				mqh->mqh_partial_bytes = 0;
460 				mqh->mqh_length_word_complete = false;
461 				return res;
462 			}
463 
464 			mqh->mqh_partial_bytes += bytes_written;
465 			if (res != SHM_MQ_SUCCESS)
466 				return res;
467 			continue;
468 		}
469 
470 		/*
471 		 * If this is the last chunk, we can write all the data, even if it
472 		 * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
473 		 * MAXALIGN_DOWN the write size.
474 		 */
475 		chunksize = iov[which_iov].len - offset;
476 		if (which_iov + 1 < iovcnt)
477 			chunksize = MAXALIGN_DOWN(chunksize);
478 		res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
479 								nowait, &bytes_written);
480 
481 		if (res == SHM_MQ_DETACHED)
482 		{
483 			/* Reset state in case caller tries to send another message. */
484 			mqh->mqh_length_word_complete = false;
485 			mqh->mqh_partial_bytes = 0;
486 			return res;
487 		}
488 
489 		mqh->mqh_partial_bytes += bytes_written;
490 		offset += bytes_written;
491 		if (res != SHM_MQ_SUCCESS)
492 			return res;
493 	} while (mqh->mqh_partial_bytes < nbytes);
494 
495 	/* Reset for next message. */
496 	mqh->mqh_partial_bytes = 0;
497 	mqh->mqh_length_word_complete = false;
498 
499 	/* If queue has been detached, let caller know. */
500 	if (mq->mq_detached)
501 		return SHM_MQ_DETACHED;
502 
503 	/*
504 	 * If the counterparty is known to have attached, we can read mq_receiver
505 	 * without acquiring the spinlock and assume it isn't NULL.  Otherwise,
506 	 * more caution is needed.
507 	 */
508 	if (mqh->mqh_counterparty_attached)
509 		receiver = mq->mq_receiver;
510 	else
511 	{
512 		SpinLockAcquire(&mq->mq_mutex);
513 		receiver = mq->mq_receiver;
514 		SpinLockRelease(&mq->mq_mutex);
515 		if (receiver == NULL)
516 			return SHM_MQ_SUCCESS;
517 		mqh->mqh_counterparty_attached = true;
518 	}
519 
520 	/* Notify receiver of the newly-written data, and return. */
521 	SetLatch(&receiver->procLatch);
522 	return SHM_MQ_SUCCESS;
523 }
524 
525 /*
526  * Receive a message from a shared message queue.
527  *
528  * We set *nbytes to the message length and *data to point to the message
529  * payload.  If the entire message exists in the queue as a single,
530  * contiguous chunk, *data will point directly into shared memory; otherwise,
531  * it will point to a temporary buffer.  This mostly avoids data copying in
532  * the hoped-for case where messages are short compared to the buffer size,
533  * while still allowing longer messages.  In either case, the return value
534  * remains valid until the next receive operation is performed on the queue.
535  *
536  * When nowait = false, we'll wait on our process latch when the ring buffer
537  * is empty and we have not yet received a full message.  The sender will
538  * set our process latch after more data has been written, and we'll resume
539  * processing.  Each call will therefore return a complete message
540  * (unless the sender detaches the queue).
541  *
542  * When nowait = true, we do not manipulate the state of the process latch;
543  * instead, whenever the buffer is empty and we need to read from it, we
544  * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
545  * function again after the process latch has been set.
546  */
547 shm_mq_result
shm_mq_receive(shm_mq_handle * mqh,Size * nbytesp,void ** datap,bool nowait)548 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
549 {
550 	shm_mq	   *mq = mqh->mqh_queue;
551 	shm_mq_result res;
552 	Size		rb = 0;
553 	Size		nbytes;
554 	void	   *rawdata;
555 
556 	Assert(mq->mq_receiver == MyProc);
557 
558 	/* We can't receive data until the sender has attached. */
559 	if (!mqh->mqh_counterparty_attached)
560 	{
561 		if (nowait)
562 		{
563 			int			counterparty_gone;
564 
565 			/*
566 			 * We shouldn't return at this point at all unless the sender
567 			 * hasn't attached yet.  However, the correct return value depends
568 			 * on whether the sender is still attached.  If we first test
569 			 * whether the sender has ever attached and then test whether the
570 			 * sender has detached, there's a race condition: a sender that
571 			 * attaches and detaches very quickly might fool us into thinking
572 			 * the sender never attached at all.  So, test whether our
573 			 * counterparty is definitively gone first, and only afterwards
574 			 * check whether the sender ever attached in the first place.
575 			 */
576 			counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
577 			if (shm_mq_get_sender(mq) == NULL)
578 			{
579 				if (counterparty_gone)
580 					return SHM_MQ_DETACHED;
581 				else
582 					return SHM_MQ_WOULD_BLOCK;
583 			}
584 		}
585 		else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
586 				 && shm_mq_get_sender(mq) == NULL)
587 		{
588 			mq->mq_detached = true;
589 			return SHM_MQ_DETACHED;
590 		}
591 		mqh->mqh_counterparty_attached = true;
592 	}
593 
594 	/*
595 	 * If we've consumed an amount of data greater than 1/4th of the ring
596 	 * size, mark it consumed in shared memory.  We try to avoid doing this
597 	 * unnecessarily when only a small amount of data has been consumed,
598 	 * because SetLatch() is fairly expensive and we don't want to do it too
599 	 * often.
600 	 */
601 	if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
602 	{
603 		shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
604 		mqh->mqh_consume_pending = 0;
605 	}
606 
607 	/* Try to read, or finish reading, the length word from the buffer. */
608 	while (!mqh->mqh_length_word_complete)
609 	{
610 		/* Try to receive the message length word. */
611 		Assert(mqh->mqh_partial_bytes < sizeof(Size));
612 		res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
613 								   nowait, &rb, &rawdata);
614 		if (res != SHM_MQ_SUCCESS)
615 			return res;
616 
617 		/*
618 		 * Hopefully, we'll receive the entire message length word at once.
619 		 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
620 		 * multiple reads.
621 		 */
622 		if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
623 		{
624 			Size		needed;
625 
626 			nbytes = *(Size *) rawdata;
627 
628 			/* If we've already got the whole message, we're done. */
629 			needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
630 			if (rb >= needed)
631 			{
632 				mqh->mqh_consume_pending += needed;
633 				*nbytesp = nbytes;
634 				*datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
635 				return SHM_MQ_SUCCESS;
636 			}
637 
638 			/*
639 			 * We don't have the whole message, but we at least have the whole
640 			 * length word.
641 			 */
642 			mqh->mqh_expected_bytes = nbytes;
643 			mqh->mqh_length_word_complete = true;
644 			mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
645 			rb -= MAXALIGN(sizeof(Size));
646 		}
647 		else
648 		{
649 			Size		lengthbytes;
650 
651 			/* Can't be split unless bigger than required alignment. */
652 			Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
653 
654 			/* Message word is split; need buffer to reassemble. */
655 			if (mqh->mqh_buffer == NULL)
656 			{
657 				mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
658 													 MQH_INITIAL_BUFSIZE);
659 				mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
660 			}
661 			Assert(mqh->mqh_buflen >= sizeof(Size));
662 
663 			/* Copy partial length word; remember to consume it. */
664 			if (mqh->mqh_partial_bytes + rb > sizeof(Size))
665 				lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
666 			else
667 				lengthbytes = rb;
668 			memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
669 				   lengthbytes);
670 			mqh->mqh_partial_bytes += lengthbytes;
671 			mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
672 			rb -= lengthbytes;
673 
674 			/* If we now have the whole word, we're ready to read payload. */
675 			if (mqh->mqh_partial_bytes >= sizeof(Size))
676 			{
677 				Assert(mqh->mqh_partial_bytes == sizeof(Size));
678 				mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
679 				mqh->mqh_length_word_complete = true;
680 				mqh->mqh_partial_bytes = 0;
681 			}
682 		}
683 	}
684 	nbytes = mqh->mqh_expected_bytes;
685 
686 	/*
687 	 * Should be disallowed on the sending side already, but better check and
688 	 * error out on the receiver side as well rather than trying to read a
689 	 * prohibitively large message.
690 	 */
691 	if (nbytes > MaxAllocSize)
692 		ereport(ERROR,
693 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
694 				 errmsg("invalid message size %zu in shared memory queue",
695 						nbytes)));
696 
697 	if (mqh->mqh_partial_bytes == 0)
698 	{
699 		/*
700 		 * Try to obtain the whole message in a single chunk.  If this works,
701 		 * we need not copy the data and can return a pointer directly into
702 		 * shared memory.
703 		 */
704 		res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
705 		if (res != SHM_MQ_SUCCESS)
706 			return res;
707 		if (rb >= nbytes)
708 		{
709 			mqh->mqh_length_word_complete = false;
710 			mqh->mqh_consume_pending += MAXALIGN(nbytes);
711 			*nbytesp = nbytes;
712 			*datap = rawdata;
713 			return SHM_MQ_SUCCESS;
714 		}
715 
716 		/*
717 		 * The message has wrapped the buffer.  We'll need to copy it in order
718 		 * to return it to the client in one chunk.  First, make sure we have
719 		 * a large enough buffer available.
720 		 */
721 		if (mqh->mqh_buflen < nbytes)
722 		{
723 			Size		newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
724 
725 			/*
726 			 * Double the buffer size until the payload fits, but limit to
727 			 * MaxAllocSize.
728 			 */
729 			while (newbuflen < nbytes)
730 				newbuflen *= 2;
731 			newbuflen = Min(newbuflen, MaxAllocSize);
732 
733 			if (mqh->mqh_buffer != NULL)
734 			{
735 				pfree(mqh->mqh_buffer);
736 				mqh->mqh_buffer = NULL;
737 				mqh->mqh_buflen = 0;
738 			}
739 			mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
740 			mqh->mqh_buflen = newbuflen;
741 		}
742 	}
743 
744 	/* Loop until we've copied the entire message. */
745 	for (;;)
746 	{
747 		Size		still_needed;
748 
749 		/* Copy as much as we can. */
750 		Assert(mqh->mqh_partial_bytes + rb <= nbytes);
751 		memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
752 		mqh->mqh_partial_bytes += rb;
753 
754 		/*
755 		 * Update count of bytes that can be consumed, accounting for
756 		 * alignment padding.  Note that this will never actually insert any
757 		 * padding except at the end of a message, because the buffer size is
758 		 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
759 		 */
760 		Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
761 		mqh->mqh_consume_pending += MAXALIGN(rb);
762 
763 		/* If we got all the data, exit the loop. */
764 		if (mqh->mqh_partial_bytes >= nbytes)
765 			break;
766 
767 		/* Wait for some more data. */
768 		still_needed = nbytes - mqh->mqh_partial_bytes;
769 		res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
770 		if (res != SHM_MQ_SUCCESS)
771 			return res;
772 		if (rb > still_needed)
773 			rb = still_needed;
774 	}
775 
776 	/* Return the complete message, and reset for next message. */
777 	*nbytesp = nbytes;
778 	*datap = mqh->mqh_buffer;
779 	mqh->mqh_length_word_complete = false;
780 	mqh->mqh_partial_bytes = 0;
781 	return SHM_MQ_SUCCESS;
782 }
783 
784 /*
785  * Wait for the other process that's supposed to use this queue to attach
786  * to it.
787  *
788  * The return value is SHM_MQ_DETACHED if the worker has already detached or
789  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
790  * Note that we will only be able to detect that the worker has died before
791  * attaching if a background worker handle was passed to shm_mq_attach().
792  */
793 shm_mq_result
shm_mq_wait_for_attach(shm_mq_handle * mqh)794 shm_mq_wait_for_attach(shm_mq_handle *mqh)
795 {
796 	shm_mq	   *mq = mqh->mqh_queue;
797 	PGPROC	  **victim;
798 
799 	if (shm_mq_get_receiver(mq) == MyProc)
800 		victim = &mq->mq_sender;
801 	else
802 	{
803 		Assert(shm_mq_get_sender(mq) == MyProc);
804 		victim = &mq->mq_receiver;
805 	}
806 
807 	if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
808 		return SHM_MQ_SUCCESS;
809 	else
810 		return SHM_MQ_DETACHED;
811 }
812 
813 /*
814  * Detach from a shared message queue, and destroy the shm_mq_handle.
815  */
816 void
shm_mq_detach(shm_mq_handle * mqh)817 shm_mq_detach(shm_mq_handle *mqh)
818 {
819 	/* Notify counterparty that we're outta here. */
820 	shm_mq_detach_internal(mqh->mqh_queue);
821 
822 	/* Cancel on_dsm_detach callback, if any. */
823 	if (mqh->mqh_segment)
824 		cancel_on_dsm_detach(mqh->mqh_segment,
825 							 shm_mq_detach_callback,
826 							 PointerGetDatum(mqh->mqh_queue));
827 
828 	/* Release local memory associated with handle. */
829 	if (mqh->mqh_buffer != NULL)
830 		pfree(mqh->mqh_buffer);
831 	pfree(mqh);
832 }
833 
834 /*
835  * Notify counterparty that we're detaching from shared message queue.
836  *
837  * The purpose of this function is to make sure that the process
838  * with which we're communicating doesn't block forever waiting for us to
839  * fill or drain the queue once we've lost interest.  When the sender
840  * detaches, the receiver can read any messages remaining in the queue;
841  * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
842  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
843  *
844  * This is separated out from shm_mq_detach() because if the on_dsm_detach
845  * callback fires, we only want to do this much.  We do not try to touch
846  * the local shm_mq_handle, as it may have been pfree'd already.
847  */
848 static void
shm_mq_detach_internal(shm_mq * mq)849 shm_mq_detach_internal(shm_mq *mq)
850 {
851 	PGPROC	   *victim;
852 
853 	SpinLockAcquire(&mq->mq_mutex);
854 	if (mq->mq_sender == MyProc)
855 		victim = mq->mq_receiver;
856 	else
857 	{
858 		Assert(mq->mq_receiver == MyProc);
859 		victim = mq->mq_sender;
860 	}
861 	mq->mq_detached = true;
862 	SpinLockRelease(&mq->mq_mutex);
863 
864 	if (victim != NULL)
865 		SetLatch(&victim->procLatch);
866 }
867 
868 /*
869  * Get the shm_mq from handle.
870  */
871 shm_mq *
shm_mq_get_queue(shm_mq_handle * mqh)872 shm_mq_get_queue(shm_mq_handle *mqh)
873 {
874 	return mqh->mqh_queue;
875 }
876 
877 /*
878  * Write bytes into a shared message queue.
879  */
880 static shm_mq_result
shm_mq_send_bytes(shm_mq_handle * mqh,Size nbytes,const void * data,bool nowait,Size * bytes_written)881 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
882 				  bool nowait, Size *bytes_written)
883 {
884 	shm_mq	   *mq = mqh->mqh_queue;
885 	Size		sent = 0;
886 	uint64		used;
887 	Size		ringsize = mq->mq_ring_size;
888 	Size		available;
889 
890 	while (sent < nbytes)
891 	{
892 		uint64		rb;
893 		uint64		wb;
894 
895 		/* Compute number of ring buffer bytes used and available. */
896 		rb = pg_atomic_read_u64(&mq->mq_bytes_read);
897 		wb = pg_atomic_read_u64(&mq->mq_bytes_written);
898 		Assert(wb >= rb);
899 		used = wb - rb;
900 		Assert(used <= ringsize);
901 		available = Min(ringsize - used, nbytes - sent);
902 
903 		/*
904 		 * Bail out if the queue has been detached.  Note that we would be in
905 		 * trouble if the compiler decided to cache the value of
906 		 * mq->mq_detached in a register or on the stack across loop
907 		 * iterations.  It probably shouldn't do that anyway since we'll
908 		 * always return, call an external function that performs a system
909 		 * call, or reach a memory barrier at some point later in the loop,
910 		 * but just to be sure, insert a compiler barrier here.
911 		 */
912 		pg_compiler_barrier();
913 		if (mq->mq_detached)
914 		{
915 			*bytes_written = sent;
916 			return SHM_MQ_DETACHED;
917 		}
918 
919 		if (available == 0 && !mqh->mqh_counterparty_attached)
920 		{
921 			/*
922 			 * The queue is full, so if the receiver isn't yet known to be
923 			 * attached, we must wait for that to happen.
924 			 */
925 			if (nowait)
926 			{
927 				if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
928 				{
929 					*bytes_written = sent;
930 					return SHM_MQ_DETACHED;
931 				}
932 				if (shm_mq_get_receiver(mq) == NULL)
933 				{
934 					*bytes_written = sent;
935 					return SHM_MQ_WOULD_BLOCK;
936 				}
937 			}
938 			else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
939 										   mqh->mqh_handle))
940 			{
941 				mq->mq_detached = true;
942 				*bytes_written = sent;
943 				return SHM_MQ_DETACHED;
944 			}
945 			mqh->mqh_counterparty_attached = true;
946 
947 			/*
948 			 * The receiver may have read some data after attaching, so we
949 			 * must not wait without rechecking the queue state.
950 			 */
951 		}
952 		else if (available == 0)
953 		{
954 			/*
955 			 * Since mq->mqh_counterparty_attached is known to be true at this
956 			 * point, mq_receiver has been set, and it can't change once set.
957 			 * Therefore, we can read it without acquiring the spinlock.
958 			 */
959 			Assert(mqh->mqh_counterparty_attached);
960 			SetLatch(&mq->mq_receiver->procLatch);
961 
962 			/* Skip manipulation of our latch if nowait = true. */
963 			if (nowait)
964 			{
965 				*bytes_written = sent;
966 				return SHM_MQ_WOULD_BLOCK;
967 			}
968 
969 			/*
970 			 * Wait for our latch to be set.  It might already be set for some
971 			 * unrelated reason, but that'll just result in one extra trip
972 			 * through the loop.  It's worth it to avoid resetting the latch
973 			 * at top of loop, because setting an already-set latch is much
974 			 * cheaper than setting one that has been reset.
975 			 */
976 			WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
977 
978 			/* Reset the latch so we don't spin. */
979 			ResetLatch(MyLatch);
980 
981 			/* An interrupt may have occurred while we were waiting. */
982 			CHECK_FOR_INTERRUPTS();
983 		}
984 		else
985 		{
986 			Size		offset;
987 			Size		sendnow;
988 
989 			offset = wb % (uint64) ringsize;
990 			sendnow = Min(available, ringsize - offset);
991 
992 			/*
993 			 * Write as much data as we can via a single memcpy(). Make sure
994 			 * these writes happen after the read of mq_bytes_read, above.
995 			 * This barrier pairs with the one in shm_mq_inc_bytes_read.
996 			 * (Since we're separating the read of mq_bytes_read from a
997 			 * subsequent write to mq_ring, we need a full barrier here.)
998 			 */
999 			pg_memory_barrier();
1000 			memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1001 				   (char *) data + sent, sendnow);
1002 			sent += sendnow;
1003 
1004 			/*
1005 			 * Update count of bytes written, with alignment padding.  Note
1006 			 * that this will never actually insert any padding except at the
1007 			 * end of a run of bytes, because the buffer size is a multiple of
1008 			 * MAXIMUM_ALIGNOF, and each read is as well.
1009 			 */
1010 			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1011 			shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
1012 
1013 			/*
1014 			 * For efficiency, we don't set the reader's latch here.  We'll do
1015 			 * that only when the buffer fills up or after writing an entire
1016 			 * message.
1017 			 */
1018 		}
1019 	}
1020 
1021 	*bytes_written = sent;
1022 	return SHM_MQ_SUCCESS;
1023 }
1024 
1025 /*
1026  * Wait until at least *nbytesp bytes are available to be read from the
1027  * shared message queue, or until the buffer wraps around.  If the queue is
1028  * detached, returns SHM_MQ_DETACHED.  If nowait is specified and a wait
1029  * would be required, returns SHM_MQ_WOULD_BLOCK.  Otherwise, *datap is set
1030  * to the location at which data bytes can be read, *nbytesp is set to the
1031  * number of bytes which can be read at that address, and the return value
1032  * is SHM_MQ_SUCCESS.
1033  */
1034 static shm_mq_result
shm_mq_receive_bytes(shm_mq_handle * mqh,Size bytes_needed,bool nowait,Size * nbytesp,void ** datap)1035 shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1036 					 Size *nbytesp, void **datap)
1037 {
1038 	shm_mq	   *mq = mqh->mqh_queue;
1039 	Size		ringsize = mq->mq_ring_size;
1040 	uint64		used;
1041 	uint64		written;
1042 
1043 	for (;;)
1044 	{
1045 		Size		offset;
1046 		uint64		read;
1047 
1048 		/* Get bytes written, so we can compute what's available to read. */
1049 		written = pg_atomic_read_u64(&mq->mq_bytes_written);
1050 
1051 		/*
1052 		 * Get bytes read.  Include bytes we could consume but have not yet
1053 		 * consumed.
1054 		 */
1055 		read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1056 			mqh->mqh_consume_pending;
1057 		used = written - read;
1058 		Assert(used <= ringsize);
1059 		offset = read % (uint64) ringsize;
1060 
1061 		/* If we have enough data or buffer has wrapped, we're done. */
1062 		if (used >= bytes_needed || offset + used >= ringsize)
1063 		{
1064 			*nbytesp = Min(used, ringsize - offset);
1065 			*datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1066 
1067 			/*
1068 			 * Separate the read of mq_bytes_written, above, from caller's
1069 			 * attempt to read the data itself.  Pairs with the barrier in
1070 			 * shm_mq_inc_bytes_written.
1071 			 */
1072 			pg_read_barrier();
1073 			return SHM_MQ_SUCCESS;
1074 		}
1075 
1076 		/*
1077 		 * Fall out before waiting if the queue has been detached.
1078 		 *
1079 		 * Note that we don't check for this until *after* considering whether
1080 		 * the data already available is enough, since the receiver can finish
1081 		 * receiving a message stored in the buffer even after the sender has
1082 		 * detached.
1083 		 */
1084 		if (mq->mq_detached)
1085 		{
1086 			/*
1087 			 * If the writer advanced mq_bytes_written and then set
1088 			 * mq_detached, we might not have read the final value of
1089 			 * mq_bytes_written above.  Insert a read barrier and then check
1090 			 * again if mq_bytes_written has advanced.
1091 			 */
1092 			pg_read_barrier();
1093 			if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1094 				continue;
1095 
1096 			return SHM_MQ_DETACHED;
1097 		}
1098 
1099 		/*
1100 		 * We didn't get enough data to satisfy the request, so mark any data
1101 		 * previously-consumed as read to make more buffer space.
1102 		 */
1103 		if (mqh->mqh_consume_pending > 0)
1104 		{
1105 			shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1106 			mqh->mqh_consume_pending = 0;
1107 		}
1108 
1109 		/* Skip manipulation of our latch if nowait = true. */
1110 		if (nowait)
1111 			return SHM_MQ_WOULD_BLOCK;
1112 
1113 		/*
1114 		 * Wait for our latch to be set.  It might already be set for some
1115 		 * unrelated reason, but that'll just result in one extra trip through
1116 		 * the loop.  It's worth it to avoid resetting the latch at top of
1117 		 * loop, because setting an already-set latch is much cheaper than
1118 		 * setting one that has been reset.
1119 		 */
1120 		WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_RECEIVE);
1121 
1122 		/* Reset the latch so we don't spin. */
1123 		ResetLatch(MyLatch);
1124 
1125 		/* An interrupt may have occurred while we were waiting. */
1126 		CHECK_FOR_INTERRUPTS();
1127 	}
1128 }
1129 
1130 /*
1131  * Test whether a counterparty who may not even be alive yet is definitely gone.
1132  */
1133 static bool
shm_mq_counterparty_gone(shm_mq * mq,BackgroundWorkerHandle * handle)1134 shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
1135 {
1136 	pid_t		pid;
1137 
1138 	/* If the queue has been detached, counterparty is definitely gone. */
1139 	if (mq->mq_detached)
1140 		return true;
1141 
1142 	/* If there's a handle, check worker status. */
1143 	if (handle != NULL)
1144 	{
1145 		BgwHandleStatus status;
1146 
1147 		/* Check for unexpected worker death. */
1148 		status = GetBackgroundWorkerPid(handle, &pid);
1149 		if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1150 		{
1151 			/* Mark it detached, just to make it official. */
1152 			mq->mq_detached = true;
1153 			return true;
1154 		}
1155 	}
1156 
1157 	/* Counterparty is not definitively gone. */
1158 	return false;
1159 }
1160 
1161 /*
1162  * This is used when a process is waiting for its counterpart to attach to the
1163  * queue.  We exit when the other process attaches as expected, or, if
1164  * handle != NULL, when the referenced background process or the postmaster
1165  * dies.  Note that if handle == NULL, and the process fails to attach, we'll
1166  * potentially get stuck here forever waiting for a process that may never
1167  * start.  We do check for interrupts, though.
1168  *
1169  * ptr is a pointer to the memory address that we're expecting to become
1170  * non-NULL when our counterpart attaches to the queue.
1171  */
1172 static bool
shm_mq_wait_internal(shm_mq * mq,PGPROC ** ptr,BackgroundWorkerHandle * handle)1173 shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
1174 {
1175 	bool		result = false;
1176 
1177 	for (;;)
1178 	{
1179 		BgwHandleStatus status;
1180 		pid_t		pid;
1181 
1182 		/* Acquire the lock just long enough to check the pointer. */
1183 		SpinLockAcquire(&mq->mq_mutex);
1184 		result = (*ptr != NULL);
1185 		SpinLockRelease(&mq->mq_mutex);
1186 
1187 		/* Fail if detached; else succeed if initialized. */
1188 		if (mq->mq_detached)
1189 		{
1190 			result = false;
1191 			break;
1192 		}
1193 		if (result)
1194 			break;
1195 
1196 		if (handle != NULL)
1197 		{
1198 			/* Check for unexpected worker death. */
1199 			status = GetBackgroundWorkerPid(handle, &pid);
1200 			if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1201 			{
1202 				result = false;
1203 				break;
1204 			}
1205 		}
1206 
1207 		/* Wait to be signalled. */
1208 		WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_INTERNAL);
1209 
1210 		/* Reset the latch so we don't spin. */
1211 		ResetLatch(MyLatch);
1212 
1213 		/* An interrupt may have occurred while we were waiting. */
1214 		CHECK_FOR_INTERRUPTS();
1215 	}
1216 
1217 	return result;
1218 }
1219 
1220 /*
1221  * Increment the number of bytes read.
1222  */
1223 static void
shm_mq_inc_bytes_read(shm_mq * mq,Size n)1224 shm_mq_inc_bytes_read(shm_mq *mq, Size n)
1225 {
1226 	PGPROC	   *sender;
1227 
1228 	/*
1229 	 * Separate prior reads of mq_ring from the increment of mq_bytes_read
1230 	 * which follows.  This pairs with the full barrier in
1231 	 * shm_mq_send_bytes(). We only need a read barrier here because the
1232 	 * increment of mq_bytes_read is actually a read followed by a dependent
1233 	 * write.
1234 	 */
1235 	pg_read_barrier();
1236 
1237 	/*
1238 	 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1239 	 * else can be changing this value.  This method should be cheaper.
1240 	 */
1241 	pg_atomic_write_u64(&mq->mq_bytes_read,
1242 						pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1243 
1244 	/*
1245 	 * We shouldn't have any bytes to read without a sender, so we can read
1246 	 * mq_sender here without a lock.  Once it's initialized, it can't change.
1247 	 */
1248 	sender = mq->mq_sender;
1249 	Assert(sender != NULL);
1250 	SetLatch(&sender->procLatch);
1251 }
1252 
1253 /*
1254  * Increment the number of bytes written.
1255  */
1256 static void
shm_mq_inc_bytes_written(shm_mq * mq,Size n)1257 shm_mq_inc_bytes_written(shm_mq *mq, Size n)
1258 {
1259 	/*
1260 	 * Separate prior reads of mq_ring from the write of mq_bytes_written
1261 	 * which we're about to do.  Pairs with the read barrier found in
1262 	 * shm_mq_get_receive_bytes.
1263 	 */
1264 	pg_write_barrier();
1265 
1266 	/*
1267 	 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1268 	 * else can be changing this value.  This method avoids taking the bus
1269 	 * lock unnecessarily.
1270 	 */
1271 	pg_atomic_write_u64(&mq->mq_bytes_written,
1272 						pg_atomic_read_u64(&mq->mq_bytes_written) + n);
1273 }
1274 
1275 /* Shim for on_dsm_callback. */
1276 static void
shm_mq_detach_callback(dsm_segment * seg,Datum arg)1277 shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1278 {
1279 	shm_mq	   *mq = (shm_mq *) DatumGetPointer(arg);
1280 
1281 	shm_mq_detach_internal(mq);
1282 }
1283