1 /*-------------------------------------------------------------------------
2  *
3  * origin.c
4  *	  Logical replication progress tracking support.
5  *
6  * Copyright (c) 2013-2016, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *	  src/backend/replication/logical/origin.c
10  *
11  * NOTES
12  *
13  * This file provides the following:
14  * * An infrastructure to name nodes in a replication setup
15  * * A facility to efficiently store and persist replication progress in an
16  *	 efficient and durable manner.
17  *
18  * Replication origin consist out of a descriptive, user defined, external
19  * name and a short, thus space efficient, internal 2 byte one. This split
20  * exists because replication origin have to be stored in WAL and shared
21  * memory and long descriptors would be inefficient.  For now only use 2 bytes
22  * for the internal id of a replication origin as it seems unlikely that there
23  * soon will be more than 65k nodes in one replication setup; and using only
24  * two bytes allow us to be more space efficient.
25  *
26  * Replication progress is tracked in a shared memory table
27  * (ReplicationStates) that's dumped to disk every checkpoint. Entries
28  * ('slots') in this table are identified by the internal id. That's the case
29  * because it allows to increase replication progress during crash
30  * recovery. To allow doing so we store the original LSN (from the originating
31  * system) of a transaction in the commit record. That allows to recover the
32  * precise replayed state after crash recovery; without requiring synchronous
33  * commits. Allowing logical replication to use asynchronous commit is
34  * generally good for performance, but especially important as it allows a
35  * single threaded replay process to keep up with a source that has multiple
36  * backends generating changes concurrently.  For efficiency and simplicity
37  * reasons a backend can setup one replication origin that's from then used as
38  * the source of changes produced by the backend, until reset again.
39  *
40  * This infrastructure is intended to be used in cooperation with logical
41  * decoding. When replaying from a remote system the configured origin is
42  * provided to output plugins, allowing prevention of replication loops and
43  * other filtering.
44  *
45  * There are several levels of locking at work:
46  *
47  * * To create and drop replication origins an exclusive lock on
48  *	 pg_replication_slot is required for the duration. That allows us to
49  *	 safely and conflict free assign new origins using a dirty snapshot.
50  *
51  * * When creating an in-memory replication progress slot the ReplicationOirgin
52  *	 LWLock has to be held exclusively; when iterating over the replication
53  *	 progress a shared lock has to be held, the same when advancing the
54  *	 replication progress of an individual backend that has not setup as the
55  *	 session's replication origin.
56  *
57  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58  *	 replication progress slot that slot's lwlock has to be held. That's
59  *	 primarily because we do not assume 8 byte writes (the LSN) is atomic on
60  *	 all our platforms, but it also simplifies memory ordering concerns
61  *	 between the remote and local lsn. We use a lwlock instead of a spinlock
62  *	 so it's less harmful to hold the lock over a WAL write
63  *	 (c.f. AdvanceReplicationProgress).
64  *
65  * ---------------------------------------------------------------------------
66  */
67 
68 #include "postgres.h"
69 
70 #include <unistd.h>
71 #include <sys/stat.h>
72 
73 #include "funcapi.h"
74 #include "miscadmin.h"
75 
76 #include "access/genam.h"
77 #include "access/heapam.h"
78 #include "access/htup_details.h"
79 #include "access/xact.h"
80 
81 #include "catalog/indexing.h"
82 
83 #include "nodes/execnodes.h"
84 
85 #include "replication/origin.h"
86 #include "replication/logical.h"
87 
88 #include "storage/fd.h"
89 #include "storage/ipc.h"
90 #include "storage/lmgr.h"
91 #include "storage/copydir.h"
92 
93 #include "utils/builtins.h"
94 #include "utils/fmgroids.h"
95 #include "utils/pg_lsn.h"
96 #include "utils/rel.h"
97 #include "utils/syscache.h"
98 #include "utils/tqual.h"
99 
100 /*
101  * Replay progress of a single remote node.
102  */
103 typedef struct ReplicationState
104 {
105 	/*
106 	 * Local identifier for the remote node.
107 	 */
108 	RepOriginId roident;
109 
110 	/*
111 	 * Location of the latest commit from the remote side.
112 	 */
113 	XLogRecPtr	remote_lsn;
114 
115 	/*
116 	 * Remember the local lsn of the commit record so we can XLogFlush() to it
117 	 * during a checkpoint so we know the commit record actually is safe on
118 	 * disk.
119 	 */
120 	XLogRecPtr	local_lsn;
121 
122 	/*
123 	 * PID of backend that's acquired slot, or 0 if none.
124 	 */
125 	int			acquired_by;
126 
127 	/*
128 	 * Lock protecting remote_lsn and local_lsn.
129 	 */
130 	LWLock		lock;
131 } ReplicationState;
132 
133 /*
134  * On disk version of ReplicationState.
135  */
136 typedef struct ReplicationStateOnDisk
137 {
138 	RepOriginId roident;
139 	XLogRecPtr	remote_lsn;
140 } ReplicationStateOnDisk;
141 
142 
143 typedef struct ReplicationStateCtl
144 {
145 	int			tranche_id;
146 	LWLockTranche tranche;
147 	/* Array of length max_replication_slots */
148 	ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
149 } ReplicationStateCtl;
150 
151 /* external variables */
152 RepOriginId replorigin_session_origin = InvalidRepOriginId;		/* assumed identity */
153 XLogRecPtr	replorigin_session_origin_lsn = InvalidXLogRecPtr;
154 TimestampTz replorigin_session_origin_timestamp = 0;
155 
156 /*
157  * Base address into a shared memory array of replication states of size
158  * max_replication_slots.
159  *
160  * XXX: Should we use a separate variable to size this rather than
161  * max_replication_slots?
162  */
163 static ReplicationState *replication_states;
164 
165 /*
166  * Actual shared memory block (replication_states[] is now part of this).
167  */
168 static ReplicationStateCtl *replication_states_ctl;
169 
170 /*
171  * Backend-local, cached element from ReplicationStates for use in a backend
172  * replaying remote commits, so we don't have to search ReplicationStates for
173  * the backends current RepOriginId.
174  */
175 static ReplicationState *session_replication_state = NULL;
176 
177 /* Magic for on disk files. */
178 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
179 
180 static void
replorigin_check_prerequisites(bool check_slots,bool recoveryOK)181 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
182 {
183 	if (!superuser())
184 		ereport(ERROR,
185 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
186 				 errmsg("only superusers can query or manipulate replication origins")));
187 
188 	if (check_slots && max_replication_slots == 0)
189 		ereport(ERROR,
190 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
191 				 errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
192 
193 	if (!recoveryOK && RecoveryInProgress())
194 		ereport(ERROR,
195 				(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
196 		   errmsg("cannot manipulate replication origins during recovery")));
197 
198 }
199 
200 
201 /* ---------------------------------------------------------------------------
202  * Functions for working with replication origins themselves.
203  * ---------------------------------------------------------------------------
204  */
205 
206 /*
207  * Check for a persistent replication origin identified by name.
208  *
209  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
210  */
211 RepOriginId
replorigin_by_name(char * roname,bool missing_ok)212 replorigin_by_name(char *roname, bool missing_ok)
213 {
214 	Form_pg_replication_origin ident;
215 	Oid			roident = InvalidOid;
216 	HeapTuple	tuple;
217 	Datum		roname_d;
218 
219 	roname_d = CStringGetTextDatum(roname);
220 
221 	tuple = SearchSysCache1(REPLORIGNAME, roname_d);
222 	if (HeapTupleIsValid(tuple))
223 	{
224 		ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
225 		roident = ident->roident;
226 		ReleaseSysCache(tuple);
227 	}
228 	else if (!missing_ok)
229 		ereport(ERROR,
230 				(errcode(ERRCODE_UNDEFINED_OBJECT),
231 				 errmsg("replication origin \"%s\" does not exist",
232 						roname)));
233 
234 	return roident;
235 }
236 
237 /*
238  * Create a replication origin.
239  *
240  * Needs to be called in a transaction.
241  */
242 RepOriginId
replorigin_create(char * roname)243 replorigin_create(char *roname)
244 {
245 	Oid			roident;
246 	HeapTuple	tuple = NULL;
247 	Relation	rel;
248 	Datum		roname_d;
249 	SnapshotData SnapshotDirty;
250 	SysScanDesc scan;
251 	ScanKeyData key;
252 
253 	roname_d = CStringGetTextDatum(roname);
254 
255 	Assert(IsTransactionState());
256 
257 	/*
258 	 * We need the numeric replication origin to be 16bit wide, so we cannot
259 	 * rely on the normal oid allocation. Instead we simply scan
260 	 * pg_replication_origin for the first unused id. That's not particularly
261 	 * efficient, but this should be a fairly infrequent operation - we can
262 	 * easily spend a bit more code on this when it turns out it needs to be
263 	 * faster.
264 	 *
265 	 * We handle concurrency by taking an exclusive lock (allowing reads!)
266 	 * over the table for the duration of the search. Because we use a "dirty
267 	 * snapshot" we can read rows that other in-progress sessions have
268 	 * written, even though they would be invisible with normal snapshots. Due
269 	 * to the exclusive lock there's no danger that new rows can appear while
270 	 * we're checking.
271 	 */
272 	InitDirtySnapshot(SnapshotDirty);
273 
274 	rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
275 
276 	for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
277 	{
278 		bool		nulls[Natts_pg_replication_origin];
279 		Datum		values[Natts_pg_replication_origin];
280 		bool		collides;
281 
282 		CHECK_FOR_INTERRUPTS();
283 
284 		ScanKeyInit(&key,
285 					Anum_pg_replication_origin_roident,
286 					BTEqualStrategyNumber, F_OIDEQ,
287 					ObjectIdGetDatum(roident));
288 
289 		scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
290 								  true /* indexOK */ ,
291 								  &SnapshotDirty,
292 								  1, &key);
293 
294 		collides = HeapTupleIsValid(systable_getnext(scan));
295 
296 		systable_endscan(scan);
297 
298 		if (!collides)
299 		{
300 			/*
301 			 * Ok, found an unused roident, insert the new row and do a CCI,
302 			 * so our callers can look it up if they want to.
303 			 */
304 			memset(&nulls, 0, sizeof(nulls));
305 
306 			values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
307 			values[Anum_pg_replication_origin_roname - 1] = roname_d;
308 
309 			tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
310 			simple_heap_insert(rel, tuple);
311 			CatalogUpdateIndexes(rel, tuple);
312 			CommandCounterIncrement();
313 			break;
314 		}
315 	}
316 
317 	/* now release lock again,	*/
318 	heap_close(rel, ExclusiveLock);
319 
320 	if (tuple == NULL)
321 		ereport(ERROR,
322 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
323 				 errmsg("could not find free replication origin OID")));
324 
325 	heap_freetuple(tuple);
326 	return roident;
327 }
328 
329 
330 /*
331  * Drop replication origin.
332  *
333  * Needs to be called in a transaction.
334  */
335 void
replorigin_drop(RepOriginId roident)336 replorigin_drop(RepOriginId roident)
337 {
338 	HeapTuple	tuple = NULL;
339 	Relation	rel;
340 	int			i;
341 
342 	Assert(IsTransactionState());
343 
344 	rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
345 
346 	/* cleanup the slot state info */
347 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
348 
349 	for (i = 0; i < max_replication_slots; i++)
350 	{
351 		ReplicationState *state = &replication_states[i];
352 
353 		/* found our slot */
354 		if (state->roident == roident)
355 		{
356 			if (state->acquired_by != 0)
357 			{
358 				ereport(ERROR,
359 						(errcode(ERRCODE_OBJECT_IN_USE),
360 						 errmsg("could not drop replication origin with OID %d, in use by PID %d",
361 								state->roident,
362 								state->acquired_by)));
363 			}
364 
365 			/* first WAL log */
366 			{
367 				xl_replorigin_drop xlrec;
368 
369 				xlrec.node_id = roident;
370 				XLogBeginInsert();
371 				XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
372 				XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
373 			}
374 
375 			/* then reset the in-memory entry */
376 			state->roident = InvalidRepOriginId;
377 			state->remote_lsn = InvalidXLogRecPtr;
378 			state->local_lsn = InvalidXLogRecPtr;
379 			break;
380 		}
381 	}
382 	LWLockRelease(ReplicationOriginLock);
383 
384 	tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
385 	if (!HeapTupleIsValid(tuple))
386 		elog(ERROR, "cache lookup failed for replication origin with oid %u",
387 			 roident);
388 
389 	simple_heap_delete(rel, &tuple->t_self);
390 	ReleaseSysCache(tuple);
391 
392 	CommandCounterIncrement();
393 
394 	/* now release lock again,	*/
395 	heap_close(rel, ExclusiveLock);
396 }
397 
398 
399 /*
400  * Lookup replication origin via it's oid and return the name.
401  *
402  * The external name is palloc'd in the calling context.
403  *
404  * Returns true if the origin is known, false otherwise.
405  */
406 bool
replorigin_by_oid(RepOriginId roident,bool missing_ok,char ** roname)407 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
408 {
409 	HeapTuple	tuple;
410 	Form_pg_replication_origin ric;
411 
412 	Assert(OidIsValid((Oid) roident));
413 	Assert(roident != InvalidRepOriginId);
414 	Assert(roident != DoNotReplicateId);
415 
416 	tuple = SearchSysCache1(REPLORIGIDENT,
417 							ObjectIdGetDatum((Oid) roident));
418 
419 	if (HeapTupleIsValid(tuple))
420 	{
421 		ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
422 		*roname = text_to_cstring(&ric->roname);
423 		ReleaseSysCache(tuple);
424 
425 		return true;
426 	}
427 	else
428 	{
429 		*roname = NULL;
430 
431 		if (!missing_ok)
432 			ereport(ERROR,
433 					(errcode(ERRCODE_UNDEFINED_OBJECT),
434 					 errmsg("replication origin with OID %u does not exist",
435 							roident)));
436 
437 		return false;
438 	}
439 }
440 
441 
442 /* ---------------------------------------------------------------------------
443  * Functions for handling replication progress.
444  * ---------------------------------------------------------------------------
445  */
446 
447 Size
ReplicationOriginShmemSize(void)448 ReplicationOriginShmemSize(void)
449 {
450 	Size		size = 0;
451 
452 	/*
453 	 * XXX: max_replication_slots is arguably the wrong thing to use, as here
454 	 * we keep the replay state of *remote* transactions. But for now it seems
455 	 * sufficient to reuse it, rather than introduce a separate GUC.
456 	 */
457 	if (max_replication_slots == 0)
458 		return size;
459 
460 	size = add_size(size, offsetof(ReplicationStateCtl, states));
461 
462 	size = add_size(size,
463 				  mul_size(max_replication_slots, sizeof(ReplicationState)));
464 	return size;
465 }
466 
467 void
ReplicationOriginShmemInit(void)468 ReplicationOriginShmemInit(void)
469 {
470 	bool		found;
471 
472 	if (max_replication_slots == 0)
473 		return;
474 
475 	replication_states_ctl = (ReplicationStateCtl *)
476 		ShmemInitStruct("ReplicationOriginState",
477 						ReplicationOriginShmemSize(),
478 						&found);
479 	replication_states = replication_states_ctl->states;
480 
481 	if (!found)
482 	{
483 		int			i;
484 
485 		MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
486 
487 		replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN;
488 		replication_states_ctl->tranche.name = "replication_origin";
489 		replication_states_ctl->tranche.array_base =
490 			&replication_states[0].lock;
491 		replication_states_ctl->tranche.array_stride =
492 			sizeof(ReplicationState);
493 
494 		for (i = 0; i < max_replication_slots; i++)
495 			LWLockInitialize(&replication_states[i].lock,
496 							 replication_states_ctl->tranche_id);
497 	}
498 
499 	LWLockRegisterTranche(replication_states_ctl->tranche_id,
500 						  &replication_states_ctl->tranche);
501 }
502 
503 /* ---------------------------------------------------------------------------
504  * Perform a checkpoint of each replication origin's progress with respect to
505  * the replayed remote_lsn. Make sure that all transactions we refer to in the
506  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
507  * if the transactions were originally committed asynchronously.
508  *
509  * We store checkpoints in the following format:
510  * +-------+------------------------+------------------+-----+--------+
511  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
512  * +-------+------------------------+------------------+-----+--------+
513  *
514  * So its just the magic, followed by the statically sized
515  * ReplicationStateOnDisk structs. Note that the maximum number of
516  * ReplicationStates is determined by max_replication_slots.
517  * ---------------------------------------------------------------------------
518  */
519 void
CheckPointReplicationOrigin(void)520 CheckPointReplicationOrigin(void)
521 {
522 	const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
523 	const char *path = "pg_logical/replorigin_checkpoint";
524 	int			tmpfd;
525 	int			i;
526 	uint32		magic = REPLICATION_STATE_MAGIC;
527 	pg_crc32c	crc;
528 
529 	if (max_replication_slots == 0)
530 		return;
531 
532 	INIT_CRC32C(crc);
533 
534 	/* make sure no old temp file is remaining */
535 	if (unlink(tmppath) < 0 && errno != ENOENT)
536 		ereport(PANIC,
537 				(errcode_for_file_access(),
538 				 errmsg("could not remove file \"%s\": %m",
539 						tmppath)));
540 
541 	/*
542 	 * no other backend can perform this at the same time, we're protected by
543 	 * CheckpointLock.
544 	 */
545 	tmpfd = OpenTransientFile((char *) tmppath,
546 							  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
547 							  S_IRUSR | S_IWUSR);
548 	if (tmpfd < 0)
549 		ereport(PANIC,
550 				(errcode_for_file_access(),
551 				 errmsg("could not create file \"%s\": %m",
552 						tmppath)));
553 
554 	/* write magic */
555 	errno = 0;
556 	if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
557 	{
558 		int			save_errno = errno;
559 
560 		CloseTransientFile(tmpfd);
561 
562 		/* if write didn't set errno, assume problem is no disk space */
563 		errno = save_errno ? save_errno : ENOSPC;
564 		ereport(PANIC,
565 				(errcode_for_file_access(),
566 				 errmsg("could not write to file \"%s\": %m",
567 						tmppath)));
568 	}
569 	COMP_CRC32C(crc, &magic, sizeof(magic));
570 
571 	/* prevent concurrent creations/drops */
572 	LWLockAcquire(ReplicationOriginLock, LW_SHARED);
573 
574 	/* write actual data */
575 	for (i = 0; i < max_replication_slots; i++)
576 	{
577 		ReplicationStateOnDisk disk_state;
578 		ReplicationState *curstate = &replication_states[i];
579 		XLogRecPtr	local_lsn;
580 
581 		if (curstate->roident == InvalidRepOriginId)
582 			continue;
583 
584 		/* zero, to avoid uninitialized padding bytes */
585 		memset(&disk_state, 0, sizeof(disk_state));
586 
587 		LWLockAcquire(&curstate->lock, LW_SHARED);
588 
589 		disk_state.roident = curstate->roident;
590 
591 		disk_state.remote_lsn = curstate->remote_lsn;
592 		local_lsn = curstate->local_lsn;
593 
594 		LWLockRelease(&curstate->lock);
595 
596 		/* make sure we only write out a commit that's persistent */
597 		XLogFlush(local_lsn);
598 
599 		errno = 0;
600 		if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
601 			sizeof(disk_state))
602 		{
603 			int			save_errno = errno;
604 
605 			CloseTransientFile(tmpfd);
606 
607 			/* if write didn't set errno, assume problem is no disk space */
608 			errno = save_errno ? save_errno : ENOSPC;
609 			ereport(PANIC,
610 					(errcode_for_file_access(),
611 					 errmsg("could not write to file \"%s\": %m",
612 							tmppath)));
613 		}
614 
615 		COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
616 	}
617 
618 	LWLockRelease(ReplicationOriginLock);
619 
620 	/* write out the CRC */
621 	FIN_CRC32C(crc);
622 	errno = 0;
623 	if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
624 	{
625 		int			save_errno = errno;
626 
627 		CloseTransientFile(tmpfd);
628 
629 		/* if write didn't set errno, assume problem is no disk space */
630 		errno = save_errno ? save_errno : ENOSPC;
631 		ereport(PANIC,
632 				(errcode_for_file_access(),
633 				 errmsg("could not write to file \"%s\": %m",
634 						tmppath)));
635 	}
636 
637 	CloseTransientFile(tmpfd);
638 
639 	/* fsync, rename to permanent file, fsync file and directory */
640 	durable_rename(tmppath, path, PANIC);
641 }
642 
643 /*
644  * Recover replication replay status from checkpoint data saved earlier by
645  * CheckPointReplicationOrigin.
646  *
647  * This only needs to be called at startup and *not* during every checkpoint
648  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
649  * state thereafter can be recovered by looking at commit records.
650  */
651 void
StartupReplicationOrigin(void)652 StartupReplicationOrigin(void)
653 {
654 	const char *path = "pg_logical/replorigin_checkpoint";
655 	int			fd;
656 	int			readBytes;
657 	uint32		magic = REPLICATION_STATE_MAGIC;
658 	int			last_state = 0;
659 	pg_crc32c	file_crc;
660 	pg_crc32c	crc;
661 
662 	/* don't want to overwrite already existing state */
663 #ifdef USE_ASSERT_CHECKING
664 	static bool already_started = false;
665 
666 	Assert(!already_started);
667 	already_started = true;
668 #endif
669 
670 	if (max_replication_slots == 0)
671 		return;
672 
673 	INIT_CRC32C(crc);
674 
675 	elog(DEBUG2, "starting up replication origin progress state");
676 
677 	fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
678 
679 	/*
680 	 * might have had max_replication_slots == 0 last run, or we just brought
681 	 * up a standby.
682 	 */
683 	if (fd < 0 && errno == ENOENT)
684 		return;
685 	else if (fd < 0)
686 		ereport(PANIC,
687 				(errcode_for_file_access(),
688 				 errmsg("could not open file \"%s\": %m",
689 						path)));
690 
691 	/* verify magic, that is written even if nothing was active */
692 	readBytes = read(fd, &magic, sizeof(magic));
693 	if (readBytes != sizeof(magic))
694 		ereport(PANIC,
695 				(errmsg("could not read file \"%s\": %m",
696 						path)));
697 	COMP_CRC32C(crc, &magic, sizeof(magic));
698 
699 	if (magic != REPLICATION_STATE_MAGIC)
700 		ereport(PANIC,
701 		   (errmsg("replication checkpoint has wrong magic %u instead of %u",
702 				   magic, REPLICATION_STATE_MAGIC)));
703 
704 	/* we can skip locking here, no other access is possible */
705 
706 	/* recover individual states, until there are no more to be found */
707 	while (true)
708 	{
709 		ReplicationStateOnDisk disk_state;
710 
711 		readBytes = read(fd, &disk_state, sizeof(disk_state));
712 
713 		/* no further data */
714 		if (readBytes == sizeof(crc))
715 		{
716 			/* not pretty, but simple ... */
717 			file_crc = *(pg_crc32c *) &disk_state;
718 			break;
719 		}
720 
721 		if (readBytes < 0)
722 		{
723 			ereport(PANIC,
724 					(errcode_for_file_access(),
725 					 errmsg("could not read file \"%s\": %m",
726 							path)));
727 		}
728 
729 		if (readBytes != sizeof(disk_state))
730 		{
731 			ereport(PANIC,
732 					(errcode_for_file_access(),
733 					 errmsg("could not read file \"%s\": read %d of %zu",
734 							path, readBytes, sizeof(disk_state))));
735 		}
736 
737 		COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
738 
739 		if (last_state == max_replication_slots)
740 			ereport(PANIC,
741 					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
742 					 errmsg("could not find free replication state, increase max_replication_slots")));
743 
744 		/* copy data to shared memory */
745 		replication_states[last_state].roident = disk_state.roident;
746 		replication_states[last_state].remote_lsn = disk_state.remote_lsn;
747 		last_state++;
748 
749 		elog(LOG, "recovered replication state of node %u to %X/%X",
750 			 disk_state.roident,
751 			 (uint32) (disk_state.remote_lsn >> 32),
752 			 (uint32) disk_state.remote_lsn);
753 	}
754 
755 	/* now check checksum */
756 	FIN_CRC32C(crc);
757 	if (file_crc != crc)
758 		ereport(PANIC,
759 				(errcode(ERRCODE_DATA_CORRUPTED),
760 				 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
761 						crc, file_crc)));
762 
763 	CloseTransientFile(fd);
764 }
765 
766 void
replorigin_redo(XLogReaderState * record)767 replorigin_redo(XLogReaderState *record)
768 {
769 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
770 
771 	switch (info)
772 	{
773 		case XLOG_REPLORIGIN_SET:
774 			{
775 				xl_replorigin_set *xlrec =
776 				(xl_replorigin_set *) XLogRecGetData(record);
777 
778 				replorigin_advance(xlrec->node_id,
779 								   xlrec->remote_lsn, record->EndRecPtr,
780 								   xlrec->force /* backward */ ,
781 								   false /* WAL log */ );
782 				break;
783 			}
784 		case XLOG_REPLORIGIN_DROP:
785 			{
786 				xl_replorigin_drop *xlrec;
787 				int			i;
788 
789 				xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
790 
791 				for (i = 0; i < max_replication_slots; i++)
792 				{
793 					ReplicationState *state = &replication_states[i];
794 
795 					/* found our slot */
796 					if (state->roident == xlrec->node_id)
797 					{
798 						/* reset entry */
799 						state->roident = InvalidRepOriginId;
800 						state->remote_lsn = InvalidXLogRecPtr;
801 						state->local_lsn = InvalidXLogRecPtr;
802 						break;
803 					}
804 				}
805 				break;
806 			}
807 		default:
808 			elog(PANIC, "replorigin_redo: unknown op code %u", info);
809 	}
810 }
811 
812 
813 /*
814  * Tell the replication origin progress machinery that a commit from 'node'
815  * that originated at the LSN remote_commit on the remote node was replayed
816  * successfully and that we don't need to do so again. In combination with
817  * setting up replorigin_session_origin_lsn and replorigin_session_origin
818  * that ensures we won't loose knowledge about that after a crash if the
819  * transaction had a persistent effect (think of asynchronous commits).
820  *
821  * local_commit needs to be a local LSN of the commit so that we can make sure
822  * upon a checkpoint that enough WAL has been persisted to disk.
823  *
824  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
825  * unless running in recovery.
826  */
827 void
replorigin_advance(RepOriginId node,XLogRecPtr remote_commit,XLogRecPtr local_commit,bool go_backward,bool wal_log)828 replorigin_advance(RepOriginId node,
829 				   XLogRecPtr remote_commit, XLogRecPtr local_commit,
830 				   bool go_backward, bool wal_log)
831 {
832 	int			i;
833 	ReplicationState *replication_state = NULL;
834 	ReplicationState *free_state = NULL;
835 
836 	Assert(node != InvalidRepOriginId);
837 
838 	/* we don't track DoNotReplicateId */
839 	if (node == DoNotReplicateId)
840 		return;
841 
842 	/*
843 	 * XXX: For the case where this is called by WAL replay, it'd be more
844 	 * efficient to restore into a backend local hashtable and only dump into
845 	 * shmem after recovery is finished. Let's wait with implementing that
846 	 * till it's shown to be a measurable expense
847 	 */
848 
849 	/* Lock exclusively, as we may have to create a new table entry. */
850 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
851 
852 	/*
853 	 * Search for either an existing slot for the origin, or a free one we can
854 	 * use.
855 	 */
856 	for (i = 0; i < max_replication_slots; i++)
857 	{
858 		ReplicationState *curstate = &replication_states[i];
859 
860 		/* remember where to insert if necessary */
861 		if (curstate->roident == InvalidRepOriginId &&
862 			free_state == NULL)
863 		{
864 			free_state = curstate;
865 			continue;
866 		}
867 
868 		/* not our slot */
869 		if (curstate->roident != node)
870 		{
871 			continue;
872 		}
873 
874 		/* ok, found slot */
875 		replication_state = curstate;
876 
877 		LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
878 
879 		/* Make sure it's not used by somebody else */
880 		if (replication_state->acquired_by != 0)
881 		{
882 			ereport(ERROR,
883 					(errcode(ERRCODE_OBJECT_IN_USE),
884 					 errmsg("replication origin with OID %d is already active for PID %d",
885 							replication_state->roident,
886 							replication_state->acquired_by)));
887 		}
888 
889 		break;
890 	}
891 
892 	if (replication_state == NULL && free_state == NULL)
893 		ereport(ERROR,
894 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
895 				 errmsg("could not find free replication state slot for replication origin with OID %u",
896 						node),
897 				 errhint("Increase max_replication_slots and try again.")));
898 
899 	if (replication_state == NULL)
900 	{
901 		/* initialize new slot */
902 		LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
903 		replication_state = free_state;
904 		Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
905 		Assert(replication_state->local_lsn == InvalidXLogRecPtr);
906 		replication_state->roident = node;
907 	}
908 
909 	Assert(replication_state->roident != InvalidRepOriginId);
910 
911 	/*
912 	 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
913 	 * and the standby gets the message. Primarily this will be called during
914 	 * WAL replay (of commit records) where no WAL logging is necessary.
915 	 */
916 	if (wal_log)
917 	{
918 		xl_replorigin_set xlrec;
919 
920 		xlrec.remote_lsn = remote_commit;
921 		xlrec.node_id = node;
922 		xlrec.force = go_backward;
923 
924 		XLogBeginInsert();
925 		XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
926 
927 		XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
928 	}
929 
930 	/*
931 	 * Due to - harmless - race conditions during a checkpoint we could see
932 	 * values here that are older than the ones we already have in memory.
933 	 * Don't overwrite those.
934 	 */
935 	if (go_backward || replication_state->remote_lsn < remote_commit)
936 		replication_state->remote_lsn = remote_commit;
937 	if (local_commit != InvalidXLogRecPtr &&
938 		(go_backward || replication_state->local_lsn < local_commit))
939 		replication_state->local_lsn = local_commit;
940 	LWLockRelease(&replication_state->lock);
941 
942 	/*
943 	 * Release *after* changing the LSNs, slot isn't acquired and thus could
944 	 * otherwise be dropped anytime.
945 	 */
946 	LWLockRelease(ReplicationOriginLock);
947 }
948 
949 
950 XLogRecPtr
replorigin_get_progress(RepOriginId node,bool flush)951 replorigin_get_progress(RepOriginId node, bool flush)
952 {
953 	int			i;
954 	XLogRecPtr	local_lsn = InvalidXLogRecPtr;
955 	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
956 
957 	/* prevent slots from being concurrently dropped */
958 	LWLockAcquire(ReplicationOriginLock, LW_SHARED);
959 
960 	for (i = 0; i < max_replication_slots; i++)
961 	{
962 		ReplicationState *state;
963 
964 		state = &replication_states[i];
965 
966 		if (state->roident == node)
967 		{
968 			LWLockAcquire(&state->lock, LW_SHARED);
969 
970 			remote_lsn = state->remote_lsn;
971 			local_lsn = state->local_lsn;
972 
973 			LWLockRelease(&state->lock);
974 
975 			break;
976 		}
977 	}
978 
979 	LWLockRelease(ReplicationOriginLock);
980 
981 	if (flush && local_lsn != InvalidXLogRecPtr)
982 		XLogFlush(local_lsn);
983 
984 	return remote_lsn;
985 }
986 
987 /*
988  * Tear down a (possibly) configured session replication origin during process
989  * exit.
990  */
991 static void
ReplicationOriginExitCleanup(int code,Datum arg)992 ReplicationOriginExitCleanup(int code, Datum arg)
993 {
994 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
995 
996 	if (session_replication_state != NULL &&
997 		session_replication_state->acquired_by == MyProcPid)
998 	{
999 		session_replication_state->acquired_by = 0;
1000 		session_replication_state = NULL;
1001 	}
1002 
1003 	LWLockRelease(ReplicationOriginLock);
1004 }
1005 
1006 /*
1007  * Setup a replication origin in the shared memory struct if it doesn't
1008  * already exists and cache access to the specific ReplicationSlot so the
1009  * array doesn't have to be searched when calling
1010  * replorigin_session_advance().
1011  *
1012  * Obviously only one such cached origin can exist per process and the current
1013  * cached value can only be set again after the previous value is torn down
1014  * with replorigin_session_reset().
1015  */
1016 void
replorigin_session_setup(RepOriginId node)1017 replorigin_session_setup(RepOriginId node)
1018 {
1019 	static bool registered_cleanup;
1020 	int			i;
1021 	int			free_slot = -1;
1022 
1023 	if (!registered_cleanup)
1024 	{
1025 		on_shmem_exit(ReplicationOriginExitCleanup, 0);
1026 		registered_cleanup = true;
1027 	}
1028 
1029 	Assert(max_replication_slots > 0);
1030 
1031 	if (session_replication_state != NULL)
1032 		ereport(ERROR,
1033 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1034 		errmsg("cannot setup replication origin when one is already setup")));
1035 
1036 	/* Lock exclusively, as we may have to create a new table entry. */
1037 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1038 
1039 	/*
1040 	 * Search for either an existing slot for the origin, or a free one we can
1041 	 * use.
1042 	 */
1043 	for (i = 0; i < max_replication_slots; i++)
1044 	{
1045 		ReplicationState *curstate = &replication_states[i];
1046 
1047 		/* remember where to insert if necessary */
1048 		if (curstate->roident == InvalidRepOriginId &&
1049 			free_slot == -1)
1050 		{
1051 			free_slot = i;
1052 			continue;
1053 		}
1054 
1055 		/* not our slot */
1056 		if (curstate->roident != node)
1057 			continue;
1058 
1059 		else if (curstate->acquired_by != 0)
1060 		{
1061 			ereport(ERROR,
1062 					(errcode(ERRCODE_OBJECT_IN_USE),
1063 			 errmsg("replication origin with OID %d is already active for PID %d",
1064 					curstate->roident, curstate->acquired_by)));
1065 		}
1066 
1067 		/* ok, found slot */
1068 		session_replication_state = curstate;
1069 	}
1070 
1071 
1072 	if (session_replication_state == NULL && free_slot == -1)
1073 		ereport(ERROR,
1074 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1075 				 errmsg("could not find free replication state slot for replication origin with OID %u",
1076 						node),
1077 				 errhint("Increase max_replication_slots and try again.")));
1078 	else if (session_replication_state == NULL)
1079 	{
1080 		/* initialize new slot */
1081 		session_replication_state = &replication_states[free_slot];
1082 		Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1083 		Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1084 		session_replication_state->roident = node;
1085 	}
1086 
1087 
1088 	Assert(session_replication_state->roident != InvalidRepOriginId);
1089 
1090 	session_replication_state->acquired_by = MyProcPid;
1091 
1092 	LWLockRelease(ReplicationOriginLock);
1093 }
1094 
1095 /*
1096  * Reset replay state previously setup in this session.
1097  *
1098  * This function may only be called if an origin was setup with
1099  * replorigin_session_setup().
1100  */
1101 void
replorigin_session_reset(void)1102 replorigin_session_reset(void)
1103 {
1104 	Assert(max_replication_slots != 0);
1105 
1106 	if (session_replication_state == NULL)
1107 		ereport(ERROR,
1108 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1109 				 errmsg("no replication origin is configured")));
1110 
1111 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1112 
1113 	session_replication_state->acquired_by = 0;
1114 	session_replication_state = NULL;
1115 
1116 	LWLockRelease(ReplicationOriginLock);
1117 }
1118 
1119 /*
1120  * Do the same work replorigin_advance() does, just on the session's
1121  * configured origin.
1122  *
1123  * This is noticeably cheaper than using replorigin_advance().
1124  */
1125 void
replorigin_session_advance(XLogRecPtr remote_commit,XLogRecPtr local_commit)1126 replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1127 {
1128 	Assert(session_replication_state != NULL);
1129 	Assert(session_replication_state->roident != InvalidRepOriginId);
1130 
1131 	LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1132 	if (session_replication_state->local_lsn < local_commit)
1133 		session_replication_state->local_lsn = local_commit;
1134 	if (session_replication_state->remote_lsn < remote_commit)
1135 		session_replication_state->remote_lsn = remote_commit;
1136 	LWLockRelease(&session_replication_state->lock);
1137 }
1138 
1139 /*
1140  * Ask the machinery about the point up to which we successfully replayed
1141  * changes from an already setup replication origin.
1142  */
1143 XLogRecPtr
replorigin_session_get_progress(bool flush)1144 replorigin_session_get_progress(bool flush)
1145 {
1146 	XLogRecPtr	remote_lsn;
1147 	XLogRecPtr	local_lsn;
1148 
1149 	Assert(session_replication_state != NULL);
1150 
1151 	LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1152 	remote_lsn = session_replication_state->remote_lsn;
1153 	local_lsn = session_replication_state->local_lsn;
1154 	LWLockRelease(&session_replication_state->lock);
1155 
1156 	if (flush && local_lsn != InvalidXLogRecPtr)
1157 		XLogFlush(local_lsn);
1158 
1159 	return remote_lsn;
1160 }
1161 
1162 
1163 
1164 /* ---------------------------------------------------------------------------
1165  * SQL functions for working with replication origin.
1166  *
1167  * These mostly should be fairly short wrappers around more generic functions.
1168  * ---------------------------------------------------------------------------
1169  */
1170 
1171 /*
1172  * Create replication origin for the passed in name, and return the assigned
1173  * oid.
1174  */
1175 Datum
pg_replication_origin_create(PG_FUNCTION_ARGS)1176 pg_replication_origin_create(PG_FUNCTION_ARGS)
1177 {
1178 	char	   *name;
1179 	RepOriginId roident;
1180 
1181 	replorigin_check_prerequisites(false, false);
1182 
1183 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1184 	roident = replorigin_create(name);
1185 
1186 	pfree(name);
1187 
1188 	PG_RETURN_OID(roident);
1189 }
1190 
1191 /*
1192  * Drop replication origin.
1193  */
1194 Datum
pg_replication_origin_drop(PG_FUNCTION_ARGS)1195 pg_replication_origin_drop(PG_FUNCTION_ARGS)
1196 {
1197 	char	   *name;
1198 	RepOriginId roident;
1199 
1200 	replorigin_check_prerequisites(false, false);
1201 
1202 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1203 
1204 	roident = replorigin_by_name(name, false);
1205 	Assert(OidIsValid(roident));
1206 
1207 	replorigin_drop(roident);
1208 
1209 	pfree(name);
1210 
1211 	PG_RETURN_VOID();
1212 }
1213 
1214 /*
1215  * Return oid of a replication origin.
1216  */
1217 Datum
pg_replication_origin_oid(PG_FUNCTION_ARGS)1218 pg_replication_origin_oid(PG_FUNCTION_ARGS)
1219 {
1220 	char	   *name;
1221 	RepOriginId roident;
1222 
1223 	replorigin_check_prerequisites(false, false);
1224 
1225 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1226 	roident = replorigin_by_name(name, true);
1227 
1228 	pfree(name);
1229 
1230 	if (OidIsValid(roident))
1231 		PG_RETURN_OID(roident);
1232 	PG_RETURN_NULL();
1233 }
1234 
1235 /*
1236  * Setup a replication origin for this session.
1237  */
1238 Datum
pg_replication_origin_session_setup(PG_FUNCTION_ARGS)1239 pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1240 {
1241 	char	   *name;
1242 	RepOriginId origin;
1243 
1244 	replorigin_check_prerequisites(true, false);
1245 
1246 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1247 	origin = replorigin_by_name(name, false);
1248 	replorigin_session_setup(origin);
1249 
1250 	replorigin_session_origin = origin;
1251 
1252 	pfree(name);
1253 
1254 	PG_RETURN_VOID();
1255 }
1256 
1257 /*
1258  * Reset previously setup origin in this session
1259  */
1260 Datum
pg_replication_origin_session_reset(PG_FUNCTION_ARGS)1261 pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1262 {
1263 	replorigin_check_prerequisites(true, false);
1264 
1265 	replorigin_session_reset();
1266 
1267 	replorigin_session_origin = InvalidRepOriginId;
1268 	replorigin_session_origin_lsn = InvalidXLogRecPtr;
1269 	replorigin_session_origin_timestamp = 0;
1270 
1271 	PG_RETURN_VOID();
1272 }
1273 
1274 /*
1275  * Has a replication origin been setup for this session.
1276  */
1277 Datum
pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)1278 pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1279 {
1280 	replorigin_check_prerequisites(false, false);
1281 
1282 	PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1283 }
1284 
1285 
1286 /*
1287  * Return the replication progress for origin setup in the current session.
1288  *
1289  * If 'flush' is set to true it is ensured that the returned value corresponds
1290  * to a local transaction that has been flushed. this is useful if asynchronous
1291  * commits are used when replaying replicated transactions.
1292  */
1293 Datum
pg_replication_origin_session_progress(PG_FUNCTION_ARGS)1294 pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1295 {
1296 	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
1297 	bool		flush = PG_GETARG_BOOL(0);
1298 
1299 	replorigin_check_prerequisites(true, false);
1300 
1301 	if (session_replication_state == NULL)
1302 		ereport(ERROR,
1303 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1304 				 errmsg("no replication origin is configured")));
1305 
1306 	remote_lsn = replorigin_session_get_progress(flush);
1307 
1308 	if (remote_lsn == InvalidXLogRecPtr)
1309 		PG_RETURN_NULL();
1310 
1311 	PG_RETURN_LSN(remote_lsn);
1312 }
1313 
1314 Datum
pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)1315 pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1316 {
1317 	XLogRecPtr	location = PG_GETARG_LSN(0);
1318 
1319 	replorigin_check_prerequisites(true, false);
1320 
1321 	if (session_replication_state == NULL)
1322 		ereport(ERROR,
1323 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1324 				 errmsg("no replication origin is configured")));
1325 
1326 	replorigin_session_origin_lsn = location;
1327 	replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1328 
1329 	PG_RETURN_VOID();
1330 }
1331 
1332 Datum
pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)1333 pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1334 {
1335 	replorigin_check_prerequisites(true, false);
1336 
1337 	replorigin_session_origin_lsn = InvalidXLogRecPtr;
1338 	replorigin_session_origin_timestamp = 0;
1339 
1340 	PG_RETURN_VOID();
1341 }
1342 
1343 
1344 Datum
pg_replication_origin_advance(PG_FUNCTION_ARGS)1345 pg_replication_origin_advance(PG_FUNCTION_ARGS)
1346 {
1347 	text	   *name = PG_GETARG_TEXT_P(0);
1348 	XLogRecPtr	remote_commit = PG_GETARG_LSN(1);
1349 	RepOriginId node;
1350 
1351 	replorigin_check_prerequisites(true, false);
1352 
1353 	/* lock to prevent the replication origin from vanishing */
1354 	LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1355 
1356 	node = replorigin_by_name(text_to_cstring(name), false);
1357 
1358 	/*
1359 	 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1360 	 * xact hasn't committed yet. This is why this function should be used to
1361 	 * set up the initial replication state, but not for replay.
1362 	 */
1363 	replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1364 					   true /* go backward */ , true /* wal log */ );
1365 
1366 	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1367 
1368 	PG_RETURN_VOID();
1369 }
1370 
1371 
1372 /*
1373  * Return the replication progress for an individual replication origin.
1374  *
1375  * If 'flush' is set to true it is ensured that the returned value corresponds
1376  * to a local transaction that has been flushed. this is useful if asynchronous
1377  * commits are used when replaying replicated transactions.
1378  */
1379 Datum
pg_replication_origin_progress(PG_FUNCTION_ARGS)1380 pg_replication_origin_progress(PG_FUNCTION_ARGS)
1381 {
1382 	char	   *name;
1383 	bool		flush;
1384 	RepOriginId roident;
1385 	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
1386 
1387 	replorigin_check_prerequisites(true, true);
1388 
1389 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1390 	flush = PG_GETARG_BOOL(1);
1391 
1392 	roident = replorigin_by_name(name, false);
1393 	Assert(OidIsValid(roident));
1394 
1395 	remote_lsn = replorigin_get_progress(roident, flush);
1396 
1397 	if (remote_lsn == InvalidXLogRecPtr)
1398 		PG_RETURN_NULL();
1399 
1400 	PG_RETURN_LSN(remote_lsn);
1401 }
1402 
1403 
1404 Datum
pg_show_replication_origin_status(PG_FUNCTION_ARGS)1405 pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1406 {
1407 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1408 	TupleDesc	tupdesc;
1409 	Tuplestorestate *tupstore;
1410 	MemoryContext per_query_ctx;
1411 	MemoryContext oldcontext;
1412 	int			i;
1413 #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1414 
1415 	/* we want to return 0 rows if slot is set to zero */
1416 	replorigin_check_prerequisites(false, true);
1417 
1418 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1419 		ereport(ERROR,
1420 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1421 				 errmsg("set-valued function called in context that cannot accept a set")));
1422 	if (!(rsinfo->allowedModes & SFRM_Materialize))
1423 		ereport(ERROR,
1424 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1425 				 errmsg("materialize mode required, but it is not allowed in this context")));
1426 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1427 		elog(ERROR, "return type must be a row type");
1428 
1429 	if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1430 		elog(ERROR, "wrong function definition");
1431 
1432 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1433 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
1434 
1435 	tupstore = tuplestore_begin_heap(true, false, work_mem);
1436 	rsinfo->returnMode = SFRM_Materialize;
1437 	rsinfo->setResult = tupstore;
1438 	rsinfo->setDesc = tupdesc;
1439 
1440 	MemoryContextSwitchTo(oldcontext);
1441 
1442 
1443 	/* prevent slots from being concurrently dropped */
1444 	LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1445 
1446 	/*
1447 	 * Iterate through all possible replication_states, display if they are
1448 	 * filled. Note that we do not take any locks, so slightly corrupted/out
1449 	 * of date values are a possibility.
1450 	 */
1451 	for (i = 0; i < max_replication_slots; i++)
1452 	{
1453 		ReplicationState *state;
1454 		Datum		values[REPLICATION_ORIGIN_PROGRESS_COLS];
1455 		bool		nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1456 		char	   *roname;
1457 
1458 		state = &replication_states[i];
1459 
1460 		/* unused slot, nothing to display */
1461 		if (state->roident == InvalidRepOriginId)
1462 			continue;
1463 
1464 		memset(values, 0, sizeof(values));
1465 		memset(nulls, 1, sizeof(nulls));
1466 
1467 		values[0] = ObjectIdGetDatum(state->roident);
1468 		nulls[0] = false;
1469 
1470 		/*
1471 		 * We're not preventing the origin to be dropped concurrently, so
1472 		 * silently accept that it might be gone.
1473 		 */
1474 		if (replorigin_by_oid(state->roident, true,
1475 							  &roname))
1476 		{
1477 			values[1] = CStringGetTextDatum(roname);
1478 			nulls[1] = false;
1479 		}
1480 
1481 		LWLockAcquire(&state->lock, LW_SHARED);
1482 
1483 		values[2] = LSNGetDatum(state->remote_lsn);
1484 		nulls[2] = false;
1485 
1486 		values[3] = LSNGetDatum(state->local_lsn);
1487 		nulls[3] = false;
1488 
1489 		LWLockRelease(&state->lock);
1490 
1491 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1492 	}
1493 
1494 	tuplestore_donestoring(tupstore);
1495 
1496 	LWLockRelease(ReplicationOriginLock);
1497 
1498 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1499 
1500 	return (Datum) 0;
1501 }
1502