1 /*-------------------------------------------------------------------------
2  *
3  * origin.c
4  *	  Logical replication progress tracking support.
5  *
6  * Copyright (c) 2013-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *	  src/backend/replication/logical/origin.c
10  *
11  * NOTES
12  *
13  * This file provides the following:
14  * * An infrastructure to name nodes in a replication setup
15  * * A facility to efficiently store and persist replication progress in an
16  *	 efficient and durable manner.
17  *
18  * Replication origin consist out of a descriptive, user defined, external
19  * name and a short, thus space efficient, internal 2 byte one. This split
20  * exists because replication origin have to be stored in WAL and shared
21  * memory and long descriptors would be inefficient.  For now only use 2 bytes
22  * for the internal id of a replication origin as it seems unlikely that there
23  * soon will be more than 65k nodes in one replication setup; and using only
24  * two bytes allow us to be more space efficient.
25  *
26  * Replication progress is tracked in a shared memory table
27  * (ReplicationState) that's dumped to disk every checkpoint. Entries
28  * ('slots') in this table are identified by the internal id. That's the case
29  * because it allows to increase replication progress during crash
30  * recovery. To allow doing so we store the original LSN (from the originating
31  * system) of a transaction in the commit record. That allows to recover the
32  * precise replayed state after crash recovery; without requiring synchronous
33  * commits. Allowing logical replication to use asynchronous commit is
34  * generally good for performance, but especially important as it allows a
35  * single threaded replay process to keep up with a source that has multiple
36  * backends generating changes concurrently.  For efficiency and simplicity
37  * reasons a backend can setup one replication origin that's from then used as
38  * the source of changes produced by the backend, until reset again.
39  *
40  * This infrastructure is intended to be used in cooperation with logical
41  * decoding. When replaying from a remote system the configured origin is
42  * provided to output plugins, allowing prevention of replication loops and
43  * other filtering.
44  *
45  * There are several levels of locking at work:
46  *
47  * * To create and drop replication origins an exclusive lock on
48  *	 pg_replication_slot is required for the duration. That allows us to
49  *	 safely and conflict free assign new origins using a dirty snapshot.
50  *
51  * * When creating an in-memory replication progress slot the ReplicationOrigin
52  *	 LWLock has to be held exclusively; when iterating over the replication
53  *	 progress a shared lock has to be held, the same when advancing the
54  *	 replication progress of an individual backend that has not setup as the
55  *	 session's replication origin.
56  *
57  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58  *	 replication progress slot that slot's lwlock has to be held. That's
59  *	 primarily because we do not assume 8 byte writes (the LSN) is atomic on
60  *	 all our platforms, but it also simplifies memory ordering concerns
61  *	 between the remote and local lsn. We use a lwlock instead of a spinlock
62  *	 so it's less harmful to hold the lock over a WAL write
63  *	 (cf. AdvanceReplicationProgress).
64  *
65  * ---------------------------------------------------------------------------
66  */
67 
68 #include "postgres.h"
69 
70 #include <unistd.h>
71 #include <sys/stat.h>
72 
73 #include "access/genam.h"
74 #include "access/htup_details.h"
75 #include "access/table.h"
76 #include "access/xact.h"
77 #include "catalog/catalog.h"
78 #include "catalog/indexing.h"
79 #include "funcapi.h"
80 #include "miscadmin.h"
81 #include "nodes/execnodes.h"
82 #include "pgstat.h"
83 #include "replication/logical.h"
84 #include "replication/origin.h"
85 #include "storage/condition_variable.h"
86 #include "storage/copydir.h"
87 #include "storage/fd.h"
88 #include "storage/ipc.h"
89 #include "storage/lmgr.h"
90 #include "utils/builtins.h"
91 #include "utils/fmgroids.h"
92 #include "utils/pg_lsn.h"
93 #include "utils/rel.h"
94 #include "utils/snapmgr.h"
95 #include "utils/syscache.h"
96 
97 /*
98  * Replay progress of a single remote node.
99  */
100 typedef struct ReplicationState
101 {
102 	/*
103 	 * Local identifier for the remote node.
104 	 */
105 	RepOriginId roident;
106 
107 	/*
108 	 * Location of the latest commit from the remote side.
109 	 */
110 	XLogRecPtr	remote_lsn;
111 
112 	/*
113 	 * Remember the local lsn of the commit record so we can XLogFlush() to it
114 	 * during a checkpoint so we know the commit record actually is safe on
115 	 * disk.
116 	 */
117 	XLogRecPtr	local_lsn;
118 
119 	/*
120 	 * PID of backend that's acquired slot, or 0 if none.
121 	 */
122 	int			acquired_by;
123 
124 	/*
125 	 * Condition variable that's signaled when acquired_by changes.
126 	 */
127 	ConditionVariable origin_cv;
128 
129 	/*
130 	 * Lock protecting remote_lsn and local_lsn.
131 	 */
132 	LWLock		lock;
133 } ReplicationState;
134 
135 /*
136  * On disk version of ReplicationState.
137  */
138 typedef struct ReplicationStateOnDisk
139 {
140 	RepOriginId roident;
141 	XLogRecPtr	remote_lsn;
142 } ReplicationStateOnDisk;
143 
144 
145 typedef struct ReplicationStateCtl
146 {
147 	/* Tranche to use for per-origin LWLocks */
148 	int			tranche_id;
149 	/* Array of length max_replication_slots */
150 	ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
151 } ReplicationStateCtl;
152 
153 /* external variables */
154 RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
155 XLogRecPtr	replorigin_session_origin_lsn = InvalidXLogRecPtr;
156 TimestampTz replorigin_session_origin_timestamp = 0;
157 
158 /*
159  * Base address into a shared memory array of replication states of size
160  * max_replication_slots.
161  *
162  * XXX: Should we use a separate variable to size this rather than
163  * max_replication_slots?
164  */
165 static ReplicationState *replication_states;
166 
167 /*
168  * Actual shared memory block (replication_states[] is now part of this).
169  */
170 static ReplicationStateCtl *replication_states_ctl;
171 
172 /*
173  * Backend-local, cached element from ReplicationState for use in a backend
174  * replaying remote commits, so we don't have to search ReplicationState for
175  * the backends current RepOriginId.
176  */
177 static ReplicationState *session_replication_state = NULL;
178 
179 /* Magic for on disk files. */
180 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
181 
182 static void
replorigin_check_prerequisites(bool check_slots,bool recoveryOK)183 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
184 {
185 	if (check_slots && max_replication_slots == 0)
186 		ereport(ERROR,
187 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
188 				 errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
189 
190 	if (!recoveryOK && RecoveryInProgress())
191 		ereport(ERROR,
192 				(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
193 				 errmsg("cannot manipulate replication origins during recovery")));
194 
195 }
196 
197 
198 /* ---------------------------------------------------------------------------
199  * Functions for working with replication origins themselves.
200  * ---------------------------------------------------------------------------
201  */
202 
203 /*
204  * Check for a persistent replication origin identified by name.
205  *
206  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
207  */
208 RepOriginId
replorigin_by_name(const char * roname,bool missing_ok)209 replorigin_by_name(const char *roname, bool missing_ok)
210 {
211 	Form_pg_replication_origin ident;
212 	Oid			roident = InvalidOid;
213 	HeapTuple	tuple;
214 	Datum		roname_d;
215 
216 	roname_d = CStringGetTextDatum(roname);
217 
218 	tuple = SearchSysCache1(REPLORIGNAME, roname_d);
219 	if (HeapTupleIsValid(tuple))
220 	{
221 		ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
222 		roident = ident->roident;
223 		ReleaseSysCache(tuple);
224 	}
225 	else if (!missing_ok)
226 		ereport(ERROR,
227 				(errcode(ERRCODE_UNDEFINED_OBJECT),
228 				 errmsg("replication origin \"%s\" does not exist",
229 						roname)));
230 
231 	return roident;
232 }
233 
234 /*
235  * Create a replication origin.
236  *
237  * Needs to be called in a transaction.
238  */
239 RepOriginId
replorigin_create(const char * roname)240 replorigin_create(const char *roname)
241 {
242 	Oid			roident;
243 	HeapTuple	tuple = NULL;
244 	Relation	rel;
245 	Datum		roname_d;
246 	SnapshotData SnapshotDirty;
247 	SysScanDesc scan;
248 	ScanKeyData key;
249 
250 	roname_d = CStringGetTextDatum(roname);
251 
252 	Assert(IsTransactionState());
253 
254 	/*
255 	 * We need the numeric replication origin to be 16bit wide, so we cannot
256 	 * rely on the normal oid allocation. Instead we simply scan
257 	 * pg_replication_origin for the first unused id. That's not particularly
258 	 * efficient, but this should be a fairly infrequent operation - we can
259 	 * easily spend a bit more code on this when it turns out it needs to be
260 	 * faster.
261 	 *
262 	 * We handle concurrency by taking an exclusive lock (allowing reads!)
263 	 * over the table for the duration of the search. Because we use a "dirty
264 	 * snapshot" we can read rows that other in-progress sessions have
265 	 * written, even though they would be invisible with normal snapshots. Due
266 	 * to the exclusive lock there's no danger that new rows can appear while
267 	 * we're checking.
268 	 */
269 	InitDirtySnapshot(SnapshotDirty);
270 
271 	rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
272 
273 	for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
274 	{
275 		bool		nulls[Natts_pg_replication_origin];
276 		Datum		values[Natts_pg_replication_origin];
277 		bool		collides;
278 
279 		CHECK_FOR_INTERRUPTS();
280 
281 		ScanKeyInit(&key,
282 					Anum_pg_replication_origin_roident,
283 					BTEqualStrategyNumber, F_OIDEQ,
284 					ObjectIdGetDatum(roident));
285 
286 		scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
287 								  true /* indexOK */ ,
288 								  &SnapshotDirty,
289 								  1, &key);
290 
291 		collides = HeapTupleIsValid(systable_getnext(scan));
292 
293 		systable_endscan(scan);
294 
295 		if (!collides)
296 		{
297 			/*
298 			 * Ok, found an unused roident, insert the new row and do a CCI,
299 			 * so our callers can look it up if they want to.
300 			 */
301 			memset(&nulls, 0, sizeof(nulls));
302 
303 			values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
304 			values[Anum_pg_replication_origin_roname - 1] = roname_d;
305 
306 			tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
307 			CatalogTupleInsert(rel, tuple);
308 			CommandCounterIncrement();
309 			break;
310 		}
311 	}
312 
313 	/* now release lock again,	*/
314 	table_close(rel, ExclusiveLock);
315 
316 	if (tuple == NULL)
317 		ereport(ERROR,
318 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
319 				 errmsg("could not find free replication origin OID")));
320 
321 	heap_freetuple(tuple);
322 	return roident;
323 }
324 
325 /*
326  * Helper function to drop a replication origin.
327  */
328 static void
replorigin_drop_guts(Relation rel,RepOriginId roident,bool nowait)329 replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
330 {
331 	HeapTuple	tuple;
332 	int			i;
333 
334 	/*
335 	 * First, clean up the slot state info, if there is any matching slot.
336 	 */
337 restart:
338 	tuple = NULL;
339 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
340 
341 	for (i = 0; i < max_replication_slots; i++)
342 	{
343 		ReplicationState *state = &replication_states[i];
344 
345 		if (state->roident == roident)
346 		{
347 			/* found our slot, is it busy? */
348 			if (state->acquired_by != 0)
349 			{
350 				ConditionVariable *cv;
351 
352 				if (nowait)
353 					ereport(ERROR,
354 							(errcode(ERRCODE_OBJECT_IN_USE),
355 							 errmsg("could not drop replication origin with OID %d, in use by PID %d",
356 									state->roident,
357 									state->acquired_by)));
358 
359 				/*
360 				 * We must wait and then retry.  Since we don't know which CV
361 				 * to wait on until here, we can't readily use
362 				 * ConditionVariablePrepareToSleep (calling it here would be
363 				 * wrong, since we could miss the signal if we did so); just
364 				 * use ConditionVariableSleep directly.
365 				 */
366 				cv = &state->origin_cv;
367 
368 				LWLockRelease(ReplicationOriginLock);
369 
370 				ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
371 				goto restart;
372 			}
373 
374 			/* first make a WAL log entry */
375 			{
376 				xl_replorigin_drop xlrec;
377 
378 				xlrec.node_id = roident;
379 				XLogBeginInsert();
380 				XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
381 				XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
382 			}
383 
384 			/* then clear the in-memory slot */
385 			state->roident = InvalidRepOriginId;
386 			state->remote_lsn = InvalidXLogRecPtr;
387 			state->local_lsn = InvalidXLogRecPtr;
388 			break;
389 		}
390 	}
391 	LWLockRelease(ReplicationOriginLock);
392 	ConditionVariableCancelSleep();
393 
394 	/*
395 	 * Now, we can delete the catalog entry.
396 	 */
397 	tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
398 	if (!HeapTupleIsValid(tuple))
399 		elog(ERROR, "cache lookup failed for replication origin with oid %u",
400 			 roident);
401 
402 	CatalogTupleDelete(rel, &tuple->t_self);
403 	ReleaseSysCache(tuple);
404 
405 	CommandCounterIncrement();
406 }
407 
408 /*
409  * Drop replication origin (by name).
410  *
411  * Needs to be called in a transaction.
412  */
413 void
replorigin_drop_by_name(const char * name,bool missing_ok,bool nowait)414 replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
415 {
416 	RepOriginId roident;
417 	Relation	rel;
418 
419 	Assert(IsTransactionState());
420 
421 	/*
422 	 * To interlock against concurrent drops, we hold ExclusiveLock on
423 	 * pg_replication_origin till xact commit.
424 	 *
425 	 * XXX We can optimize this by acquiring the lock on a specific origin by
426 	 * using LockSharedObject if required. However, for that, we first to
427 	 * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
428 	 * the specific origin and then re-check if the origin still exists.
429 	 */
430 	rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
431 
432 	roident = replorigin_by_name(name, missing_ok);
433 
434 	if (OidIsValid(roident))
435 		replorigin_drop_guts(rel, roident, nowait);
436 
437 	/* We keep the lock on pg_replication_origin until commit */
438 	table_close(rel, NoLock);
439 }
440 
441 /*
442  * Lookup replication origin via its oid and return the name.
443  *
444  * The external name is palloc'd in the calling context.
445  *
446  * Returns true if the origin is known, false otherwise.
447  */
448 bool
replorigin_by_oid(RepOriginId roident,bool missing_ok,char ** roname)449 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
450 {
451 	HeapTuple	tuple;
452 	Form_pg_replication_origin ric;
453 
454 	Assert(OidIsValid((Oid) roident));
455 	Assert(roident != InvalidRepOriginId);
456 	Assert(roident != DoNotReplicateId);
457 
458 	tuple = SearchSysCache1(REPLORIGIDENT,
459 							ObjectIdGetDatum((Oid) roident));
460 
461 	if (HeapTupleIsValid(tuple))
462 	{
463 		ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
464 		*roname = text_to_cstring(&ric->roname);
465 		ReleaseSysCache(tuple);
466 
467 		return true;
468 	}
469 	else
470 	{
471 		*roname = NULL;
472 
473 		if (!missing_ok)
474 			ereport(ERROR,
475 					(errcode(ERRCODE_UNDEFINED_OBJECT),
476 					 errmsg("replication origin with OID %u does not exist",
477 							roident)));
478 
479 		return false;
480 	}
481 }
482 
483 
484 /* ---------------------------------------------------------------------------
485  * Functions for handling replication progress.
486  * ---------------------------------------------------------------------------
487  */
488 
489 Size
ReplicationOriginShmemSize(void)490 ReplicationOriginShmemSize(void)
491 {
492 	Size		size = 0;
493 
494 	/*
495 	 * XXX: max_replication_slots is arguably the wrong thing to use, as here
496 	 * we keep the replay state of *remote* transactions. But for now it seems
497 	 * sufficient to reuse it, rather than introduce a separate GUC.
498 	 */
499 	if (max_replication_slots == 0)
500 		return size;
501 
502 	size = add_size(size, offsetof(ReplicationStateCtl, states));
503 
504 	size = add_size(size,
505 					mul_size(max_replication_slots, sizeof(ReplicationState)));
506 	return size;
507 }
508 
509 void
ReplicationOriginShmemInit(void)510 ReplicationOriginShmemInit(void)
511 {
512 	bool		found;
513 
514 	if (max_replication_slots == 0)
515 		return;
516 
517 	replication_states_ctl = (ReplicationStateCtl *)
518 		ShmemInitStruct("ReplicationOriginState",
519 						ReplicationOriginShmemSize(),
520 						&found);
521 	replication_states = replication_states_ctl->states;
522 
523 	if (!found)
524 	{
525 		int			i;
526 
527 		MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
528 
529 		replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
530 
531 		for (i = 0; i < max_replication_slots; i++)
532 		{
533 			LWLockInitialize(&replication_states[i].lock,
534 							 replication_states_ctl->tranche_id);
535 			ConditionVariableInit(&replication_states[i].origin_cv);
536 		}
537 	}
538 }
539 
540 /* ---------------------------------------------------------------------------
541  * Perform a checkpoint of each replication origin's progress with respect to
542  * the replayed remote_lsn. Make sure that all transactions we refer to in the
543  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
544  * if the transactions were originally committed asynchronously.
545  *
546  * We store checkpoints in the following format:
547  * +-------+------------------------+------------------+-----+--------+
548  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
549  * +-------+------------------------+------------------+-----+--------+
550  *
551  * So its just the magic, followed by the statically sized
552  * ReplicationStateOnDisk structs. Note that the maximum number of
553  * ReplicationState is determined by max_replication_slots.
554  * ---------------------------------------------------------------------------
555  */
556 void
CheckPointReplicationOrigin(void)557 CheckPointReplicationOrigin(void)
558 {
559 	const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
560 	const char *path = "pg_logical/replorigin_checkpoint";
561 	int			tmpfd;
562 	int			i;
563 	uint32		magic = REPLICATION_STATE_MAGIC;
564 	pg_crc32c	crc;
565 
566 	if (max_replication_slots == 0)
567 		return;
568 
569 	INIT_CRC32C(crc);
570 
571 	/* make sure no old temp file is remaining */
572 	if (unlink(tmppath) < 0 && errno != ENOENT)
573 		ereport(PANIC,
574 				(errcode_for_file_access(),
575 				 errmsg("could not remove file \"%s\": %m",
576 						tmppath)));
577 
578 	/*
579 	 * no other backend can perform this at the same time; only one checkpoint
580 	 * can happen at a time.
581 	 */
582 	tmpfd = OpenTransientFile(tmppath,
583 							  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
584 	if (tmpfd < 0)
585 		ereport(PANIC,
586 				(errcode_for_file_access(),
587 				 errmsg("could not create file \"%s\": %m",
588 						tmppath)));
589 
590 	/* write magic */
591 	errno = 0;
592 	if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
593 	{
594 		/* if write didn't set errno, assume problem is no disk space */
595 		if (errno == 0)
596 			errno = ENOSPC;
597 		ereport(PANIC,
598 				(errcode_for_file_access(),
599 				 errmsg("could not write to file \"%s\": %m",
600 						tmppath)));
601 	}
602 	COMP_CRC32C(crc, &magic, sizeof(magic));
603 
604 	/* prevent concurrent creations/drops */
605 	LWLockAcquire(ReplicationOriginLock, LW_SHARED);
606 
607 	/* write actual data */
608 	for (i = 0; i < max_replication_slots; i++)
609 	{
610 		ReplicationStateOnDisk disk_state;
611 		ReplicationState *curstate = &replication_states[i];
612 		XLogRecPtr	local_lsn;
613 
614 		if (curstate->roident == InvalidRepOriginId)
615 			continue;
616 
617 		/* zero, to avoid uninitialized padding bytes */
618 		memset(&disk_state, 0, sizeof(disk_state));
619 
620 		LWLockAcquire(&curstate->lock, LW_SHARED);
621 
622 		disk_state.roident = curstate->roident;
623 
624 		disk_state.remote_lsn = curstate->remote_lsn;
625 		local_lsn = curstate->local_lsn;
626 
627 		LWLockRelease(&curstate->lock);
628 
629 		/* make sure we only write out a commit that's persistent */
630 		XLogFlush(local_lsn);
631 
632 		errno = 0;
633 		if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
634 			sizeof(disk_state))
635 		{
636 			/* if write didn't set errno, assume problem is no disk space */
637 			if (errno == 0)
638 				errno = ENOSPC;
639 			ereport(PANIC,
640 					(errcode_for_file_access(),
641 					 errmsg("could not write to file \"%s\": %m",
642 							tmppath)));
643 		}
644 
645 		COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
646 	}
647 
648 	LWLockRelease(ReplicationOriginLock);
649 
650 	/* write out the CRC */
651 	FIN_CRC32C(crc);
652 	errno = 0;
653 	if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
654 	{
655 		/* if write didn't set errno, assume problem is no disk space */
656 		if (errno == 0)
657 			errno = ENOSPC;
658 		ereport(PANIC,
659 				(errcode_for_file_access(),
660 				 errmsg("could not write to file \"%s\": %m",
661 						tmppath)));
662 	}
663 
664 	if (CloseTransientFile(tmpfd) != 0)
665 		ereport(PANIC,
666 				(errcode_for_file_access(),
667 				 errmsg("could not close file \"%s\": %m",
668 						tmppath)));
669 
670 	/* fsync, rename to permanent file, fsync file and directory */
671 	durable_rename(tmppath, path, PANIC);
672 }
673 
674 /*
675  * Recover replication replay status from checkpoint data saved earlier by
676  * CheckPointReplicationOrigin.
677  *
678  * This only needs to be called at startup and *not* during every checkpoint
679  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
680  * state thereafter can be recovered by looking at commit records.
681  */
682 void
StartupReplicationOrigin(void)683 StartupReplicationOrigin(void)
684 {
685 	const char *path = "pg_logical/replorigin_checkpoint";
686 	int			fd;
687 	int			readBytes;
688 	uint32		magic = REPLICATION_STATE_MAGIC;
689 	int			last_state = 0;
690 	pg_crc32c	file_crc;
691 	pg_crc32c	crc;
692 
693 	/* don't want to overwrite already existing state */
694 #ifdef USE_ASSERT_CHECKING
695 	static bool already_started = false;
696 
697 	Assert(!already_started);
698 	already_started = true;
699 #endif
700 
701 	if (max_replication_slots == 0)
702 		return;
703 
704 	INIT_CRC32C(crc);
705 
706 	elog(DEBUG2, "starting up replication origin progress state");
707 
708 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
709 
710 	/*
711 	 * might have had max_replication_slots == 0 last run, or we just brought
712 	 * up a standby.
713 	 */
714 	if (fd < 0 && errno == ENOENT)
715 		return;
716 	else if (fd < 0)
717 		ereport(PANIC,
718 				(errcode_for_file_access(),
719 				 errmsg("could not open file \"%s\": %m",
720 						path)));
721 
722 	/* verify magic, that is written even if nothing was active */
723 	readBytes = read(fd, &magic, sizeof(magic));
724 	if (readBytes != sizeof(magic))
725 	{
726 		if (readBytes < 0)
727 			ereport(PANIC,
728 					(errcode_for_file_access(),
729 					 errmsg("could not read file \"%s\": %m",
730 							path)));
731 		else
732 			ereport(PANIC,
733 					(errcode(ERRCODE_DATA_CORRUPTED),
734 					 errmsg("could not read file \"%s\": read %d of %zu",
735 							path, readBytes, sizeof(magic))));
736 	}
737 	COMP_CRC32C(crc, &magic, sizeof(magic));
738 
739 	if (magic != REPLICATION_STATE_MAGIC)
740 		ereport(PANIC,
741 				(errmsg("replication checkpoint has wrong magic %u instead of %u",
742 						magic, REPLICATION_STATE_MAGIC)));
743 
744 	/* we can skip locking here, no other access is possible */
745 
746 	/* recover individual states, until there are no more to be found */
747 	while (true)
748 	{
749 		ReplicationStateOnDisk disk_state;
750 
751 		readBytes = read(fd, &disk_state, sizeof(disk_state));
752 
753 		/* no further data */
754 		if (readBytes == sizeof(crc))
755 		{
756 			/* not pretty, but simple ... */
757 			file_crc = *(pg_crc32c *) &disk_state;
758 			break;
759 		}
760 
761 		if (readBytes < 0)
762 		{
763 			ereport(PANIC,
764 					(errcode_for_file_access(),
765 					 errmsg("could not read file \"%s\": %m",
766 							path)));
767 		}
768 
769 		if (readBytes != sizeof(disk_state))
770 		{
771 			ereport(PANIC,
772 					(errcode_for_file_access(),
773 					 errmsg("could not read file \"%s\": read %d of %zu",
774 							path, readBytes, sizeof(disk_state))));
775 		}
776 
777 		COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
778 
779 		if (last_state == max_replication_slots)
780 			ereport(PANIC,
781 					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
782 					 errmsg("could not find free replication state, increase max_replication_slots")));
783 
784 		/* copy data to shared memory */
785 		replication_states[last_state].roident = disk_state.roident;
786 		replication_states[last_state].remote_lsn = disk_state.remote_lsn;
787 		last_state++;
788 
789 		ereport(LOG,
790 				(errmsg("recovered replication state of node %u to %X/%X",
791 						disk_state.roident,
792 						LSN_FORMAT_ARGS(disk_state.remote_lsn))));
793 	}
794 
795 	/* now check checksum */
796 	FIN_CRC32C(crc);
797 	if (file_crc != crc)
798 		ereport(PANIC,
799 				(errcode(ERRCODE_DATA_CORRUPTED),
800 				 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
801 						crc, file_crc)));
802 
803 	if (CloseTransientFile(fd) != 0)
804 		ereport(PANIC,
805 				(errcode_for_file_access(),
806 				 errmsg("could not close file \"%s\": %m",
807 						path)));
808 }
809 
810 void
replorigin_redo(XLogReaderState * record)811 replorigin_redo(XLogReaderState *record)
812 {
813 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
814 
815 	switch (info)
816 	{
817 		case XLOG_REPLORIGIN_SET:
818 			{
819 				xl_replorigin_set *xlrec =
820 				(xl_replorigin_set *) XLogRecGetData(record);
821 
822 				replorigin_advance(xlrec->node_id,
823 								   xlrec->remote_lsn, record->EndRecPtr,
824 								   xlrec->force /* backward */ ,
825 								   false /* WAL log */ );
826 				break;
827 			}
828 		case XLOG_REPLORIGIN_DROP:
829 			{
830 				xl_replorigin_drop *xlrec;
831 				int			i;
832 
833 				xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
834 
835 				for (i = 0; i < max_replication_slots; i++)
836 				{
837 					ReplicationState *state = &replication_states[i];
838 
839 					/* found our slot */
840 					if (state->roident == xlrec->node_id)
841 					{
842 						/* reset entry */
843 						state->roident = InvalidRepOriginId;
844 						state->remote_lsn = InvalidXLogRecPtr;
845 						state->local_lsn = InvalidXLogRecPtr;
846 						break;
847 					}
848 				}
849 				break;
850 			}
851 		default:
852 			elog(PANIC, "replorigin_redo: unknown op code %u", info);
853 	}
854 }
855 
856 
857 /*
858  * Tell the replication origin progress machinery that a commit from 'node'
859  * that originated at the LSN remote_commit on the remote node was replayed
860  * successfully and that we don't need to do so again. In combination with
861  * setting up replorigin_session_origin_lsn and replorigin_session_origin
862  * that ensures we won't lose knowledge about that after a crash if the
863  * transaction had a persistent effect (think of asynchronous commits).
864  *
865  * local_commit needs to be a local LSN of the commit so that we can make sure
866  * upon a checkpoint that enough WAL has been persisted to disk.
867  *
868  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
869  * unless running in recovery.
870  */
871 void
replorigin_advance(RepOriginId node,XLogRecPtr remote_commit,XLogRecPtr local_commit,bool go_backward,bool wal_log)872 replorigin_advance(RepOriginId node,
873 				   XLogRecPtr remote_commit, XLogRecPtr local_commit,
874 				   bool go_backward, bool wal_log)
875 {
876 	int			i;
877 	ReplicationState *replication_state = NULL;
878 	ReplicationState *free_state = NULL;
879 
880 	Assert(node != InvalidRepOriginId);
881 
882 	/* we don't track DoNotReplicateId */
883 	if (node == DoNotReplicateId)
884 		return;
885 
886 	/*
887 	 * XXX: For the case where this is called by WAL replay, it'd be more
888 	 * efficient to restore into a backend local hashtable and only dump into
889 	 * shmem after recovery is finished. Let's wait with implementing that
890 	 * till it's shown to be a measurable expense
891 	 */
892 
893 	/* Lock exclusively, as we may have to create a new table entry. */
894 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
895 
896 	/*
897 	 * Search for either an existing slot for the origin, or a free one we can
898 	 * use.
899 	 */
900 	for (i = 0; i < max_replication_slots; i++)
901 	{
902 		ReplicationState *curstate = &replication_states[i];
903 
904 		/* remember where to insert if necessary */
905 		if (curstate->roident == InvalidRepOriginId &&
906 			free_state == NULL)
907 		{
908 			free_state = curstate;
909 			continue;
910 		}
911 
912 		/* not our slot */
913 		if (curstate->roident != node)
914 		{
915 			continue;
916 		}
917 
918 		/* ok, found slot */
919 		replication_state = curstate;
920 
921 		LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
922 
923 		/* Make sure it's not used by somebody else */
924 		if (replication_state->acquired_by != 0)
925 		{
926 			ereport(ERROR,
927 					(errcode(ERRCODE_OBJECT_IN_USE),
928 					 errmsg("replication origin with OID %d is already active for PID %d",
929 							replication_state->roident,
930 							replication_state->acquired_by)));
931 		}
932 
933 		break;
934 	}
935 
936 	if (replication_state == NULL && free_state == NULL)
937 		ereport(ERROR,
938 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
939 				 errmsg("could not find free replication state slot for replication origin with OID %u",
940 						node),
941 				 errhint("Increase max_replication_slots and try again.")));
942 
943 	if (replication_state == NULL)
944 	{
945 		/* initialize new slot */
946 		LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
947 		replication_state = free_state;
948 		Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
949 		Assert(replication_state->local_lsn == InvalidXLogRecPtr);
950 		replication_state->roident = node;
951 	}
952 
953 	Assert(replication_state->roident != InvalidRepOriginId);
954 
955 	/*
956 	 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
957 	 * and the standby gets the message. Primarily this will be called during
958 	 * WAL replay (of commit records) where no WAL logging is necessary.
959 	 */
960 	if (wal_log)
961 	{
962 		xl_replorigin_set xlrec;
963 
964 		xlrec.remote_lsn = remote_commit;
965 		xlrec.node_id = node;
966 		xlrec.force = go_backward;
967 
968 		XLogBeginInsert();
969 		XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
970 
971 		XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
972 	}
973 
974 	/*
975 	 * Due to - harmless - race conditions during a checkpoint we could see
976 	 * values here that are older than the ones we already have in memory.
977 	 * Don't overwrite those.
978 	 */
979 	if (go_backward || replication_state->remote_lsn < remote_commit)
980 		replication_state->remote_lsn = remote_commit;
981 	if (local_commit != InvalidXLogRecPtr &&
982 		(go_backward || replication_state->local_lsn < local_commit))
983 		replication_state->local_lsn = local_commit;
984 	LWLockRelease(&replication_state->lock);
985 
986 	/*
987 	 * Release *after* changing the LSNs, slot isn't acquired and thus could
988 	 * otherwise be dropped anytime.
989 	 */
990 	LWLockRelease(ReplicationOriginLock);
991 }
992 
993 
994 XLogRecPtr
replorigin_get_progress(RepOriginId node,bool flush)995 replorigin_get_progress(RepOriginId node, bool flush)
996 {
997 	int			i;
998 	XLogRecPtr	local_lsn = InvalidXLogRecPtr;
999 	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
1000 
1001 	/* prevent slots from being concurrently dropped */
1002 	LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1003 
1004 	for (i = 0; i < max_replication_slots; i++)
1005 	{
1006 		ReplicationState *state;
1007 
1008 		state = &replication_states[i];
1009 
1010 		if (state->roident == node)
1011 		{
1012 			LWLockAcquire(&state->lock, LW_SHARED);
1013 
1014 			remote_lsn = state->remote_lsn;
1015 			local_lsn = state->local_lsn;
1016 
1017 			LWLockRelease(&state->lock);
1018 
1019 			break;
1020 		}
1021 	}
1022 
1023 	LWLockRelease(ReplicationOriginLock);
1024 
1025 	if (flush && local_lsn != InvalidXLogRecPtr)
1026 		XLogFlush(local_lsn);
1027 
1028 	return remote_lsn;
1029 }
1030 
1031 /*
1032  * Tear down a (possibly) configured session replication origin during process
1033  * exit.
1034  */
1035 static void
ReplicationOriginExitCleanup(int code,Datum arg)1036 ReplicationOriginExitCleanup(int code, Datum arg)
1037 {
1038 	ConditionVariable *cv = NULL;
1039 
1040 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1041 
1042 	if (session_replication_state != NULL &&
1043 		session_replication_state->acquired_by == MyProcPid)
1044 	{
1045 		cv = &session_replication_state->origin_cv;
1046 
1047 		session_replication_state->acquired_by = 0;
1048 		session_replication_state = NULL;
1049 	}
1050 
1051 	LWLockRelease(ReplicationOriginLock);
1052 
1053 	if (cv)
1054 		ConditionVariableBroadcast(cv);
1055 }
1056 
1057 /*
1058  * Setup a replication origin in the shared memory struct if it doesn't
1059  * already exists and cache access to the specific ReplicationSlot so the
1060  * array doesn't have to be searched when calling
1061  * replorigin_session_advance().
1062  *
1063  * Obviously only one such cached origin can exist per process and the current
1064  * cached value can only be set again after the previous value is torn down
1065  * with replorigin_session_reset().
1066  */
1067 void
replorigin_session_setup(RepOriginId node)1068 replorigin_session_setup(RepOriginId node)
1069 {
1070 	static bool registered_cleanup;
1071 	int			i;
1072 	int			free_slot = -1;
1073 
1074 	if (!registered_cleanup)
1075 	{
1076 		on_shmem_exit(ReplicationOriginExitCleanup, 0);
1077 		registered_cleanup = true;
1078 	}
1079 
1080 	Assert(max_replication_slots > 0);
1081 
1082 	if (session_replication_state != NULL)
1083 		ereport(ERROR,
1084 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1085 				 errmsg("cannot setup replication origin when one is already setup")));
1086 
1087 	/* Lock exclusively, as we may have to create a new table entry. */
1088 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1089 
1090 	/*
1091 	 * Search for either an existing slot for the origin, or a free one we can
1092 	 * use.
1093 	 */
1094 	for (i = 0; i < max_replication_slots; i++)
1095 	{
1096 		ReplicationState *curstate = &replication_states[i];
1097 
1098 		/* remember where to insert if necessary */
1099 		if (curstate->roident == InvalidRepOriginId &&
1100 			free_slot == -1)
1101 		{
1102 			free_slot = i;
1103 			continue;
1104 		}
1105 
1106 		/* not our slot */
1107 		if (curstate->roident != node)
1108 			continue;
1109 
1110 		else if (curstate->acquired_by != 0)
1111 		{
1112 			ereport(ERROR,
1113 					(errcode(ERRCODE_OBJECT_IN_USE),
1114 					 errmsg("replication origin with OID %d is already active for PID %d",
1115 							curstate->roident, curstate->acquired_by)));
1116 		}
1117 
1118 		/* ok, found slot */
1119 		session_replication_state = curstate;
1120 	}
1121 
1122 
1123 	if (session_replication_state == NULL && free_slot == -1)
1124 		ereport(ERROR,
1125 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1126 				 errmsg("could not find free replication state slot for replication origin with OID %u",
1127 						node),
1128 				 errhint("Increase max_replication_slots and try again.")));
1129 	else if (session_replication_state == NULL)
1130 	{
1131 		/* initialize new slot */
1132 		session_replication_state = &replication_states[free_slot];
1133 		Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1134 		Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1135 		session_replication_state->roident = node;
1136 	}
1137 
1138 
1139 	Assert(session_replication_state->roident != InvalidRepOriginId);
1140 
1141 	session_replication_state->acquired_by = MyProcPid;
1142 
1143 	LWLockRelease(ReplicationOriginLock);
1144 
1145 	/* probably this one is pointless */
1146 	ConditionVariableBroadcast(&session_replication_state->origin_cv);
1147 }
1148 
1149 /*
1150  * Reset replay state previously setup in this session.
1151  *
1152  * This function may only be called if an origin was setup with
1153  * replorigin_session_setup().
1154  */
1155 void
replorigin_session_reset(void)1156 replorigin_session_reset(void)
1157 {
1158 	ConditionVariable *cv;
1159 
1160 	Assert(max_replication_slots != 0);
1161 
1162 	if (session_replication_state == NULL)
1163 		ereport(ERROR,
1164 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1165 				 errmsg("no replication origin is configured")));
1166 
1167 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1168 
1169 	session_replication_state->acquired_by = 0;
1170 	cv = &session_replication_state->origin_cv;
1171 	session_replication_state = NULL;
1172 
1173 	LWLockRelease(ReplicationOriginLock);
1174 
1175 	ConditionVariableBroadcast(cv);
1176 }
1177 
1178 /*
1179  * Do the same work replorigin_advance() does, just on the session's
1180  * configured origin.
1181  *
1182  * This is noticeably cheaper than using replorigin_advance().
1183  */
1184 void
replorigin_session_advance(XLogRecPtr remote_commit,XLogRecPtr local_commit)1185 replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1186 {
1187 	Assert(session_replication_state != NULL);
1188 	Assert(session_replication_state->roident != InvalidRepOriginId);
1189 
1190 	LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1191 	if (session_replication_state->local_lsn < local_commit)
1192 		session_replication_state->local_lsn = local_commit;
1193 	if (session_replication_state->remote_lsn < remote_commit)
1194 		session_replication_state->remote_lsn = remote_commit;
1195 	LWLockRelease(&session_replication_state->lock);
1196 }
1197 
1198 /*
1199  * Ask the machinery about the point up to which we successfully replayed
1200  * changes from an already setup replication origin.
1201  */
1202 XLogRecPtr
replorigin_session_get_progress(bool flush)1203 replorigin_session_get_progress(bool flush)
1204 {
1205 	XLogRecPtr	remote_lsn;
1206 	XLogRecPtr	local_lsn;
1207 
1208 	Assert(session_replication_state != NULL);
1209 
1210 	LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1211 	remote_lsn = session_replication_state->remote_lsn;
1212 	local_lsn = session_replication_state->local_lsn;
1213 	LWLockRelease(&session_replication_state->lock);
1214 
1215 	if (flush && local_lsn != InvalidXLogRecPtr)
1216 		XLogFlush(local_lsn);
1217 
1218 	return remote_lsn;
1219 }
1220 
1221 
1222 
1223 /* ---------------------------------------------------------------------------
1224  * SQL functions for working with replication origin.
1225  *
1226  * These mostly should be fairly short wrappers around more generic functions.
1227  * ---------------------------------------------------------------------------
1228  */
1229 
1230 /*
1231  * Create replication origin for the passed in name, and return the assigned
1232  * oid.
1233  */
1234 Datum
pg_replication_origin_create(PG_FUNCTION_ARGS)1235 pg_replication_origin_create(PG_FUNCTION_ARGS)
1236 {
1237 	char	   *name;
1238 	RepOriginId roident;
1239 
1240 	replorigin_check_prerequisites(false, false);
1241 
1242 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1243 
1244 	/* Replication origins "pg_xxx" are reserved for internal use */
1245 	if (IsReservedName(name))
1246 		ereport(ERROR,
1247 				(errcode(ERRCODE_RESERVED_NAME),
1248 				 errmsg("replication origin name \"%s\" is reserved",
1249 						name),
1250 				 errdetail("Origin names starting with \"pg_\" are reserved.")));
1251 
1252 	/*
1253 	 * If built with appropriate switch, whine when regression-testing
1254 	 * conventions for replication origin names are violated.
1255 	 */
1256 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1257 	if (strncmp(name, "regress_", 8) != 0)
1258 		elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1259 #endif
1260 
1261 	roident = replorigin_create(name);
1262 
1263 	pfree(name);
1264 
1265 	PG_RETURN_OID(roident);
1266 }
1267 
1268 /*
1269  * Drop replication origin.
1270  */
1271 Datum
pg_replication_origin_drop(PG_FUNCTION_ARGS)1272 pg_replication_origin_drop(PG_FUNCTION_ARGS)
1273 {
1274 	char	   *name;
1275 
1276 	replorigin_check_prerequisites(false, false);
1277 
1278 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1279 
1280 	replorigin_drop_by_name(name, false, true);
1281 
1282 	pfree(name);
1283 
1284 	PG_RETURN_VOID();
1285 }
1286 
1287 /*
1288  * Return oid of a replication origin.
1289  */
1290 Datum
pg_replication_origin_oid(PG_FUNCTION_ARGS)1291 pg_replication_origin_oid(PG_FUNCTION_ARGS)
1292 {
1293 	char	   *name;
1294 	RepOriginId roident;
1295 
1296 	replorigin_check_prerequisites(false, false);
1297 
1298 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1299 	roident = replorigin_by_name(name, true);
1300 
1301 	pfree(name);
1302 
1303 	if (OidIsValid(roident))
1304 		PG_RETURN_OID(roident);
1305 	PG_RETURN_NULL();
1306 }
1307 
1308 /*
1309  * Setup a replication origin for this session.
1310  */
1311 Datum
pg_replication_origin_session_setup(PG_FUNCTION_ARGS)1312 pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1313 {
1314 	char	   *name;
1315 	RepOriginId origin;
1316 
1317 	replorigin_check_prerequisites(true, false);
1318 
1319 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1320 	origin = replorigin_by_name(name, false);
1321 	replorigin_session_setup(origin);
1322 
1323 	replorigin_session_origin = origin;
1324 
1325 	pfree(name);
1326 
1327 	PG_RETURN_VOID();
1328 }
1329 
1330 /*
1331  * Reset previously setup origin in this session
1332  */
1333 Datum
pg_replication_origin_session_reset(PG_FUNCTION_ARGS)1334 pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1335 {
1336 	replorigin_check_prerequisites(true, false);
1337 
1338 	replorigin_session_reset();
1339 
1340 	replorigin_session_origin = InvalidRepOriginId;
1341 	replorigin_session_origin_lsn = InvalidXLogRecPtr;
1342 	replorigin_session_origin_timestamp = 0;
1343 
1344 	PG_RETURN_VOID();
1345 }
1346 
1347 /*
1348  * Has a replication origin been setup for this session.
1349  */
1350 Datum
pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)1351 pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1352 {
1353 	replorigin_check_prerequisites(false, false);
1354 
1355 	PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1356 }
1357 
1358 
1359 /*
1360  * Return the replication progress for origin setup in the current session.
1361  *
1362  * If 'flush' is set to true it is ensured that the returned value corresponds
1363  * to a local transaction that has been flushed. This is useful if asynchronous
1364  * commits are used when replaying replicated transactions.
1365  */
1366 Datum
pg_replication_origin_session_progress(PG_FUNCTION_ARGS)1367 pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1368 {
1369 	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
1370 	bool		flush = PG_GETARG_BOOL(0);
1371 
1372 	replorigin_check_prerequisites(true, false);
1373 
1374 	if (session_replication_state == NULL)
1375 		ereport(ERROR,
1376 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1377 				 errmsg("no replication origin is configured")));
1378 
1379 	remote_lsn = replorigin_session_get_progress(flush);
1380 
1381 	if (remote_lsn == InvalidXLogRecPtr)
1382 		PG_RETURN_NULL();
1383 
1384 	PG_RETURN_LSN(remote_lsn);
1385 }
1386 
1387 Datum
pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)1388 pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1389 {
1390 	XLogRecPtr	location = PG_GETARG_LSN(0);
1391 
1392 	replorigin_check_prerequisites(true, false);
1393 
1394 	if (session_replication_state == NULL)
1395 		ereport(ERROR,
1396 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1397 				 errmsg("no replication origin is configured")));
1398 
1399 	replorigin_session_origin_lsn = location;
1400 	replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1401 
1402 	PG_RETURN_VOID();
1403 }
1404 
1405 Datum
pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)1406 pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1407 {
1408 	replorigin_check_prerequisites(true, false);
1409 
1410 	replorigin_session_origin_lsn = InvalidXLogRecPtr;
1411 	replorigin_session_origin_timestamp = 0;
1412 
1413 	PG_RETURN_VOID();
1414 }
1415 
1416 
1417 Datum
pg_replication_origin_advance(PG_FUNCTION_ARGS)1418 pg_replication_origin_advance(PG_FUNCTION_ARGS)
1419 {
1420 	text	   *name = PG_GETARG_TEXT_PP(0);
1421 	XLogRecPtr	remote_commit = PG_GETARG_LSN(1);
1422 	RepOriginId node;
1423 
1424 	replorigin_check_prerequisites(true, false);
1425 
1426 	/* lock to prevent the replication origin from vanishing */
1427 	LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1428 
1429 	node = replorigin_by_name(text_to_cstring(name), false);
1430 
1431 	/*
1432 	 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1433 	 * xact hasn't committed yet. This is why this function should be used to
1434 	 * set up the initial replication state, but not for replay.
1435 	 */
1436 	replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1437 					   true /* go backward */ , true /* WAL log */ );
1438 
1439 	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1440 
1441 	PG_RETURN_VOID();
1442 }
1443 
1444 
1445 /*
1446  * Return the replication progress for an individual replication origin.
1447  *
1448  * If 'flush' is set to true it is ensured that the returned value corresponds
1449  * to a local transaction that has been flushed. This is useful if asynchronous
1450  * commits are used when replaying replicated transactions.
1451  */
1452 Datum
pg_replication_origin_progress(PG_FUNCTION_ARGS)1453 pg_replication_origin_progress(PG_FUNCTION_ARGS)
1454 {
1455 	char	   *name;
1456 	bool		flush;
1457 	RepOriginId roident;
1458 	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
1459 
1460 	replorigin_check_prerequisites(true, true);
1461 
1462 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1463 	flush = PG_GETARG_BOOL(1);
1464 
1465 	roident = replorigin_by_name(name, false);
1466 	Assert(OidIsValid(roident));
1467 
1468 	remote_lsn = replorigin_get_progress(roident, flush);
1469 
1470 	if (remote_lsn == InvalidXLogRecPtr)
1471 		PG_RETURN_NULL();
1472 
1473 	PG_RETURN_LSN(remote_lsn);
1474 }
1475 
1476 
1477 Datum
pg_show_replication_origin_status(PG_FUNCTION_ARGS)1478 pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1479 {
1480 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1481 	TupleDesc	tupdesc;
1482 	Tuplestorestate *tupstore;
1483 	MemoryContext per_query_ctx;
1484 	MemoryContext oldcontext;
1485 	int			i;
1486 #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1487 
1488 	/* we want to return 0 rows if slot is set to zero */
1489 	replorigin_check_prerequisites(false, true);
1490 
1491 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1492 		ereport(ERROR,
1493 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1494 				 errmsg("set-valued function called in context that cannot accept a set")));
1495 	if (!(rsinfo->allowedModes & SFRM_Materialize))
1496 		ereport(ERROR,
1497 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1498 				 errmsg("materialize mode required, but it is not allowed in this context")));
1499 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1500 		elog(ERROR, "return type must be a row type");
1501 
1502 	if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1503 		elog(ERROR, "wrong function definition");
1504 
1505 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1506 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
1507 
1508 	tupstore = tuplestore_begin_heap(true, false, work_mem);
1509 	rsinfo->returnMode = SFRM_Materialize;
1510 	rsinfo->setResult = tupstore;
1511 	rsinfo->setDesc = tupdesc;
1512 
1513 	MemoryContextSwitchTo(oldcontext);
1514 
1515 
1516 	/* prevent slots from being concurrently dropped */
1517 	LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1518 
1519 	/*
1520 	 * Iterate through all possible replication_states, display if they are
1521 	 * filled. Note that we do not take any locks, so slightly corrupted/out
1522 	 * of date values are a possibility.
1523 	 */
1524 	for (i = 0; i < max_replication_slots; i++)
1525 	{
1526 		ReplicationState *state;
1527 		Datum		values[REPLICATION_ORIGIN_PROGRESS_COLS];
1528 		bool		nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1529 		char	   *roname;
1530 
1531 		state = &replication_states[i];
1532 
1533 		/* unused slot, nothing to display */
1534 		if (state->roident == InvalidRepOriginId)
1535 			continue;
1536 
1537 		memset(values, 0, sizeof(values));
1538 		memset(nulls, 1, sizeof(nulls));
1539 
1540 		values[0] = ObjectIdGetDatum(state->roident);
1541 		nulls[0] = false;
1542 
1543 		/*
1544 		 * We're not preventing the origin to be dropped concurrently, so
1545 		 * silently accept that it might be gone.
1546 		 */
1547 		if (replorigin_by_oid(state->roident, true,
1548 							  &roname))
1549 		{
1550 			values[1] = CStringGetTextDatum(roname);
1551 			nulls[1] = false;
1552 		}
1553 
1554 		LWLockAcquire(&state->lock, LW_SHARED);
1555 
1556 		values[2] = LSNGetDatum(state->remote_lsn);
1557 		nulls[2] = false;
1558 
1559 		values[3] = LSNGetDatum(state->local_lsn);
1560 		nulls[3] = false;
1561 
1562 		LWLockRelease(&state->lock);
1563 
1564 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1565 	}
1566 
1567 	tuplestore_donestoring(tupstore);
1568 
1569 	LWLockRelease(ReplicationOriginLock);
1570 
1571 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1572 
1573 	return (Datum) 0;
1574 }
1575