1 /*-------------------------------------------------------------------------
2  *
3  * origin.c
4  *	  Logical replication progress tracking support.
5  *
6  * Copyright (c) 2013-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *	  src/backend/replication/logical/origin.c
10  *
11  * NOTES
12  *
13  * This file provides the following:
14  * * An infrastructure to name nodes in a replication setup
15  * * A facility to efficiently store and persist replication progress in an
16  *	 efficient and durable manner.
17  *
18  * Replication origin consist out of a descriptive, user defined, external
19  * name and a short, thus space efficient, internal 2 byte one. This split
20  * exists because replication origin have to be stored in WAL and shared
21  * memory and long descriptors would be inefficient.  For now only use 2 bytes
22  * for the internal id of a replication origin as it seems unlikely that there
23  * soon will be more than 65k nodes in one replication setup; and using only
24  * two bytes allow us to be more space efficient.
25  *
26  * Replication progress is tracked in a shared memory table
27  * (ReplicationState) that's dumped to disk every checkpoint. Entries
28  * ('slots') in this table are identified by the internal id. That's the case
29  * because it allows to increase replication progress during crash
30  * recovery. To allow doing so we store the original LSN (from the originating
31  * system) of a transaction in the commit record. That allows to recover the
32  * precise replayed state after crash recovery; without requiring synchronous
33  * commits. Allowing logical replication to use asynchronous commit is
34  * generally good for performance, but especially important as it allows a
35  * single threaded replay process to keep up with a source that has multiple
36  * backends generating changes concurrently.  For efficiency and simplicity
37  * reasons a backend can setup one replication origin that's from then used as
38  * the source of changes produced by the backend, until reset again.
39  *
40  * This infrastructure is intended to be used in cooperation with logical
41  * decoding. When replaying from a remote system the configured origin is
42  * provided to output plugins, allowing prevention of replication loops and
43  * other filtering.
44  *
45  * There are several levels of locking at work:
46  *
47  * * To create and drop replication origins an exclusive lock on
48  *	 pg_replication_slot is required for the duration. That allows us to
49  *	 safely and conflict free assign new origins using a dirty snapshot.
50  *
51  * * When creating an in-memory replication progress slot the ReplicationOrigin
52  *	 LWLock has to be held exclusively; when iterating over the replication
53  *	 progress a shared lock has to be held, the same when advancing the
54  *	 replication progress of an individual backend that has not setup as the
55  *	 session's replication origin.
56  *
57  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58  *	 replication progress slot that slot's lwlock has to be held. That's
59  *	 primarily because we do not assume 8 byte writes (the LSN) is atomic on
60  *	 all our platforms, but it also simplifies memory ordering concerns
61  *	 between the remote and local lsn. We use a lwlock instead of a spinlock
62  *	 so it's less harmful to hold the lock over a WAL write
63  *	 (c.f. AdvanceReplicationProgress).
64  *
65  * ---------------------------------------------------------------------------
66  */
67 
68 #include "postgres.h"
69 
70 #include <unistd.h>
71 #include <sys/stat.h>
72 
73 #include "funcapi.h"
74 #include "miscadmin.h"
75 
76 #include "access/genam.h"
77 #include "access/heapam.h"
78 #include "access/htup_details.h"
79 #include "access/xact.h"
80 
81 #include "catalog/indexing.h"
82 #include "nodes/execnodes.h"
83 
84 #include "replication/origin.h"
85 #include "replication/logical.h"
86 #include "pgstat.h"
87 #include "storage/fd.h"
88 #include "storage/ipc.h"
89 #include "storage/lmgr.h"
90 #include "storage/condition_variable.h"
91 #include "storage/copydir.h"
92 
93 #include "utils/builtins.h"
94 #include "utils/fmgroids.h"
95 #include "utils/pg_lsn.h"
96 #include "utils/rel.h"
97 #include "utils/syscache.h"
98 #include "utils/tqual.h"
99 
100 /*
101  * Replay progress of a single remote node.
102  */
103 typedef struct ReplicationState
104 {
105 	/*
106 	 * Local identifier for the remote node.
107 	 */
108 	RepOriginId roident;
109 
110 	/*
111 	 * Location of the latest commit from the remote side.
112 	 */
113 	XLogRecPtr	remote_lsn;
114 
115 	/*
116 	 * Remember the local lsn of the commit record so we can XLogFlush() to it
117 	 * during a checkpoint so we know the commit record actually is safe on
118 	 * disk.
119 	 */
120 	XLogRecPtr	local_lsn;
121 
122 	/*
123 	 * PID of backend that's acquired slot, or 0 if none.
124 	 */
125 	int			acquired_by;
126 
127 	/*
128 	 * Condition variable that's signalled when acquired_by changes.
129 	 */
130 	ConditionVariable origin_cv;
131 
132 	/*
133 	 * Lock protecting remote_lsn and local_lsn.
134 	 */
135 	LWLock		lock;
136 } ReplicationState;
137 
138 /*
139  * On disk version of ReplicationState.
140  */
141 typedef struct ReplicationStateOnDisk
142 {
143 	RepOriginId roident;
144 	XLogRecPtr	remote_lsn;
145 } ReplicationStateOnDisk;
146 
147 
148 typedef struct ReplicationStateCtl
149 {
150 	/* Tranche to use for per-origin LWLocks */
151 	int			tranche_id;
152 	/* Array of length max_replication_slots */
153 	ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
154 } ReplicationStateCtl;
155 
156 /* external variables */
157 RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
158 XLogRecPtr	replorigin_session_origin_lsn = InvalidXLogRecPtr;
159 TimestampTz replorigin_session_origin_timestamp = 0;
160 
161 /*
162  * Base address into a shared memory array of replication states of size
163  * max_replication_slots.
164  *
165  * XXX: Should we use a separate variable to size this rather than
166  * max_replication_slots?
167  */
168 static ReplicationState *replication_states;
169 
170 /*
171  * Actual shared memory block (replication_states[] is now part of this).
172  */
173 static ReplicationStateCtl *replication_states_ctl;
174 
175 /*
176  * Backend-local, cached element from ReplicationState for use in a backend
177  * replaying remote commits, so we don't have to search ReplicationState for
178  * the backends current RepOriginId.
179  */
180 static ReplicationState *session_replication_state = NULL;
181 
182 /* Magic for on disk files. */
183 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
184 
185 static void
replorigin_check_prerequisites(bool check_slots,bool recoveryOK)186 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
187 {
188 	if (!superuser())
189 		ereport(ERROR,
190 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
191 				 errmsg("only superusers can query or manipulate replication origins")));
192 
193 	if (check_slots && max_replication_slots == 0)
194 		ereport(ERROR,
195 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
196 				 errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
197 
198 	if (!recoveryOK && RecoveryInProgress())
199 		ereport(ERROR,
200 				(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
201 				 errmsg("cannot manipulate replication origins during recovery")));
202 
203 }
204 
205 
206 /* ---------------------------------------------------------------------------
207  * Functions for working with replication origins themselves.
208  * ---------------------------------------------------------------------------
209  */
210 
211 /*
212  * Check for a persistent replication origin identified by name.
213  *
214  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
215  */
216 RepOriginId
replorigin_by_name(char * roname,bool missing_ok)217 replorigin_by_name(char *roname, bool missing_ok)
218 {
219 	Form_pg_replication_origin ident;
220 	Oid			roident = InvalidOid;
221 	HeapTuple	tuple;
222 	Datum		roname_d;
223 
224 	roname_d = CStringGetTextDatum(roname);
225 
226 	tuple = SearchSysCache1(REPLORIGNAME, roname_d);
227 	if (HeapTupleIsValid(tuple))
228 	{
229 		ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
230 		roident = ident->roident;
231 		ReleaseSysCache(tuple);
232 	}
233 	else if (!missing_ok)
234 		ereport(ERROR,
235 				(errcode(ERRCODE_UNDEFINED_OBJECT),
236 				 errmsg("replication origin \"%s\" does not exist",
237 						roname)));
238 
239 	return roident;
240 }
241 
242 /*
243  * Create a replication origin.
244  *
245  * Needs to be called in a transaction.
246  */
247 RepOriginId
replorigin_create(char * roname)248 replorigin_create(char *roname)
249 {
250 	Oid			roident;
251 	HeapTuple	tuple = NULL;
252 	Relation	rel;
253 	Datum		roname_d;
254 	SnapshotData SnapshotDirty;
255 	SysScanDesc scan;
256 	ScanKeyData key;
257 
258 	roname_d = CStringGetTextDatum(roname);
259 
260 	Assert(IsTransactionState());
261 
262 	/*
263 	 * We need the numeric replication origin to be 16bit wide, so we cannot
264 	 * rely on the normal oid allocation. Instead we simply scan
265 	 * pg_replication_origin for the first unused id. That's not particularly
266 	 * efficient, but this should be a fairly infrequent operation - we can
267 	 * easily spend a bit more code on this when it turns out it needs to be
268 	 * faster.
269 	 *
270 	 * We handle concurrency by taking an exclusive lock (allowing reads!)
271 	 * over the table for the duration of the search. Because we use a "dirty
272 	 * snapshot" we can read rows that other in-progress sessions have
273 	 * written, even though they would be invisible with normal snapshots. Due
274 	 * to the exclusive lock there's no danger that new rows can appear while
275 	 * we're checking.
276 	 */
277 	InitDirtySnapshot(SnapshotDirty);
278 
279 	rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
280 
281 	for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
282 	{
283 		bool		nulls[Natts_pg_replication_origin];
284 		Datum		values[Natts_pg_replication_origin];
285 		bool		collides;
286 
287 		CHECK_FOR_INTERRUPTS();
288 
289 		ScanKeyInit(&key,
290 					Anum_pg_replication_origin_roident,
291 					BTEqualStrategyNumber, F_OIDEQ,
292 					ObjectIdGetDatum(roident));
293 
294 		scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
295 								  true /* indexOK */ ,
296 								  &SnapshotDirty,
297 								  1, &key);
298 
299 		collides = HeapTupleIsValid(systable_getnext(scan));
300 
301 		systable_endscan(scan);
302 
303 		if (!collides)
304 		{
305 			/*
306 			 * Ok, found an unused roident, insert the new row and do a CCI,
307 			 * so our callers can look it up if they want to.
308 			 */
309 			memset(&nulls, 0, sizeof(nulls));
310 
311 			values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
312 			values[Anum_pg_replication_origin_roname - 1] = roname_d;
313 
314 			tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
315 			CatalogTupleInsert(rel, tuple);
316 			CommandCounterIncrement();
317 			break;
318 		}
319 	}
320 
321 	/* now release lock again,	*/
322 	heap_close(rel, ExclusiveLock);
323 
324 	if (tuple == NULL)
325 		ereport(ERROR,
326 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
327 				 errmsg("could not find free replication origin OID")));
328 
329 	heap_freetuple(tuple);
330 	return roident;
331 }
332 
333 
334 /*
335  * Drop replication origin.
336  *
337  * Needs to be called in a transaction.
338  */
339 void
replorigin_drop(RepOriginId roident,bool nowait)340 replorigin_drop(RepOriginId roident, bool nowait)
341 {
342 	HeapTuple	tuple;
343 	Relation	rel;
344 	int			i;
345 
346 	Assert(IsTransactionState());
347 
348 	/*
349 	 * To interlock against concurrent drops, we hold ExclusiveLock on
350 	 * pg_replication_origin throughout this funcion.
351 	 */
352 	rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
353 
354 	/*
355 	 * First, clean up the slot state info, if there is any matching slot.
356 	 */
357 restart:
358 	tuple = NULL;
359 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
360 
361 	for (i = 0; i < max_replication_slots; i++)
362 	{
363 		ReplicationState *state = &replication_states[i];
364 
365 		if (state->roident == roident)
366 		{
367 			/* found our slot, is it busy? */
368 			if (state->acquired_by != 0)
369 			{
370 				ConditionVariable *cv;
371 
372 				if (nowait)
373 					ereport(ERROR,
374 							(errcode(ERRCODE_OBJECT_IN_USE),
375 							 errmsg("could not drop replication origin with OID %d, in use by PID %d",
376 									state->roident,
377 									state->acquired_by)));
378 
379 				/*
380 				 * We must wait and then retry.  Since we don't know which CV
381 				 * to wait on until here, we can't readily use
382 				 * ConditionVariablePrepareToSleep (calling it here would be
383 				 * wrong, since we could miss the signal if we did so); just
384 				 * use ConditionVariableSleep directly.
385 				 */
386 				cv = &state->origin_cv;
387 
388 				LWLockRelease(ReplicationOriginLock);
389 
390 				ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
391 				goto restart;
392 			}
393 
394 			/* first make a WAL log entry */
395 			{
396 				xl_replorigin_drop xlrec;
397 
398 				xlrec.node_id = roident;
399 				XLogBeginInsert();
400 				XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
401 				XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
402 			}
403 
404 			/* then clear the in-memory slot */
405 			state->roident = InvalidRepOriginId;
406 			state->remote_lsn = InvalidXLogRecPtr;
407 			state->local_lsn = InvalidXLogRecPtr;
408 			break;
409 		}
410 	}
411 	LWLockRelease(ReplicationOriginLock);
412 	ConditionVariableCancelSleep();
413 
414 	/*
415 	 * Now, we can delete the catalog entry.
416 	 */
417 	tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
418 	if (!HeapTupleIsValid(tuple))
419 		elog(ERROR, "cache lookup failed for replication origin with oid %u",
420 			 roident);
421 
422 	CatalogTupleDelete(rel, &tuple->t_self);
423 	ReleaseSysCache(tuple);
424 
425 	CommandCounterIncrement();
426 
427 	/* now release lock again */
428 	heap_close(rel, ExclusiveLock);
429 }
430 
431 
432 /*
433  * Lookup replication origin via it's oid and return the name.
434  *
435  * The external name is palloc'd in the calling context.
436  *
437  * Returns true if the origin is known, false otherwise.
438  */
439 bool
replorigin_by_oid(RepOriginId roident,bool missing_ok,char ** roname)440 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
441 {
442 	HeapTuple	tuple;
443 	Form_pg_replication_origin ric;
444 
445 	Assert(OidIsValid((Oid) roident));
446 	Assert(roident != InvalidRepOriginId);
447 	Assert(roident != DoNotReplicateId);
448 
449 	tuple = SearchSysCache1(REPLORIGIDENT,
450 							ObjectIdGetDatum((Oid) roident));
451 
452 	if (HeapTupleIsValid(tuple))
453 	{
454 		ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
455 		*roname = text_to_cstring(&ric->roname);
456 		ReleaseSysCache(tuple);
457 
458 		return true;
459 	}
460 	else
461 	{
462 		*roname = NULL;
463 
464 		if (!missing_ok)
465 			ereport(ERROR,
466 					(errcode(ERRCODE_UNDEFINED_OBJECT),
467 					 errmsg("replication origin with OID %u does not exist",
468 							roident)));
469 
470 		return false;
471 	}
472 }
473 
474 
475 /* ---------------------------------------------------------------------------
476  * Functions for handling replication progress.
477  * ---------------------------------------------------------------------------
478  */
479 
480 Size
ReplicationOriginShmemSize(void)481 ReplicationOriginShmemSize(void)
482 {
483 	Size		size = 0;
484 
485 	/*
486 	 * XXX: max_replication_slots is arguably the wrong thing to use, as here
487 	 * we keep the replay state of *remote* transactions. But for now it seems
488 	 * sufficient to reuse it, rather than introduce a separate GUC.
489 	 */
490 	if (max_replication_slots == 0)
491 		return size;
492 
493 	size = add_size(size, offsetof(ReplicationStateCtl, states));
494 
495 	size = add_size(size,
496 					mul_size(max_replication_slots, sizeof(ReplicationState)));
497 	return size;
498 }
499 
500 void
ReplicationOriginShmemInit(void)501 ReplicationOriginShmemInit(void)
502 {
503 	bool		found;
504 
505 	if (max_replication_slots == 0)
506 		return;
507 
508 	replication_states_ctl = (ReplicationStateCtl *)
509 		ShmemInitStruct("ReplicationOriginState",
510 						ReplicationOriginShmemSize(),
511 						&found);
512 	replication_states = replication_states_ctl->states;
513 
514 	if (!found)
515 	{
516 		int			i;
517 
518 		MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
519 
520 		replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN;
521 
522 		for (i = 0; i < max_replication_slots; i++)
523 		{
524 			LWLockInitialize(&replication_states[i].lock,
525 							 replication_states_ctl->tranche_id);
526 			ConditionVariableInit(&replication_states[i].origin_cv);
527 		}
528 	}
529 
530 	LWLockRegisterTranche(replication_states_ctl->tranche_id,
531 						  "replication_origin");
532 }
533 
534 /* ---------------------------------------------------------------------------
535  * Perform a checkpoint of each replication origin's progress with respect to
536  * the replayed remote_lsn. Make sure that all transactions we refer to in the
537  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
538  * if the transactions were originally committed asynchronously.
539  *
540  * We store checkpoints in the following format:
541  * +-------+------------------------+------------------+-----+--------+
542  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
543  * +-------+------------------------+------------------+-----+--------+
544  *
545  * So its just the magic, followed by the statically sized
546  * ReplicationStateOnDisk structs. Note that the maximum number of
547  * ReplicationState is determined by max_replication_slots.
548  * ---------------------------------------------------------------------------
549  */
550 void
CheckPointReplicationOrigin(void)551 CheckPointReplicationOrigin(void)
552 {
553 	const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
554 	const char *path = "pg_logical/replorigin_checkpoint";
555 	int			tmpfd;
556 	int			i;
557 	uint32		magic = REPLICATION_STATE_MAGIC;
558 	pg_crc32c	crc;
559 
560 	if (max_replication_slots == 0)
561 		return;
562 
563 	INIT_CRC32C(crc);
564 
565 	/* make sure no old temp file is remaining */
566 	if (unlink(tmppath) < 0 && errno != ENOENT)
567 		ereport(PANIC,
568 				(errcode_for_file_access(),
569 				 errmsg("could not remove file \"%s\": %m",
570 						tmppath)));
571 
572 	/*
573 	 * no other backend can perform this at the same time, we're protected by
574 	 * CheckpointLock.
575 	 */
576 	tmpfd = OpenTransientFile((char *) tmppath,
577 							  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
578 							  S_IRUSR | S_IWUSR);
579 	if (tmpfd < 0)
580 		ereport(PANIC,
581 				(errcode_for_file_access(),
582 				 errmsg("could not create file \"%s\": %m",
583 						tmppath)));
584 
585 	/* write magic */
586 	errno = 0;
587 	if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
588 	{
589 		int			save_errno = errno;
590 
591 		CloseTransientFile(tmpfd);
592 
593 		/* if write didn't set errno, assume problem is no disk space */
594 		errno = save_errno ? save_errno : ENOSPC;
595 		ereport(PANIC,
596 				(errcode_for_file_access(),
597 				 errmsg("could not write to file \"%s\": %m",
598 						tmppath)));
599 	}
600 	COMP_CRC32C(crc, &magic, sizeof(magic));
601 
602 	/* prevent concurrent creations/drops */
603 	LWLockAcquire(ReplicationOriginLock, LW_SHARED);
604 
605 	/* write actual data */
606 	for (i = 0; i < max_replication_slots; i++)
607 	{
608 		ReplicationStateOnDisk disk_state;
609 		ReplicationState *curstate = &replication_states[i];
610 		XLogRecPtr	local_lsn;
611 
612 		if (curstate->roident == InvalidRepOriginId)
613 			continue;
614 
615 		/* zero, to avoid uninitialized padding bytes */
616 		memset(&disk_state, 0, sizeof(disk_state));
617 
618 		LWLockAcquire(&curstate->lock, LW_SHARED);
619 
620 		disk_state.roident = curstate->roident;
621 
622 		disk_state.remote_lsn = curstate->remote_lsn;
623 		local_lsn = curstate->local_lsn;
624 
625 		LWLockRelease(&curstate->lock);
626 
627 		/* make sure we only write out a commit that's persistent */
628 		XLogFlush(local_lsn);
629 
630 		errno = 0;
631 		if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
632 			sizeof(disk_state))
633 		{
634 			int			save_errno = errno;
635 
636 			CloseTransientFile(tmpfd);
637 
638 			/* if write didn't set errno, assume problem is no disk space */
639 			errno = save_errno ? save_errno : ENOSPC;
640 			ereport(PANIC,
641 					(errcode_for_file_access(),
642 					 errmsg("could not write to file \"%s\": %m",
643 							tmppath)));
644 		}
645 
646 		COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
647 	}
648 
649 	LWLockRelease(ReplicationOriginLock);
650 
651 	/* write out the CRC */
652 	FIN_CRC32C(crc);
653 	errno = 0;
654 	if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
655 	{
656 		int			save_errno = errno;
657 
658 		CloseTransientFile(tmpfd);
659 
660 		/* if write didn't set errno, assume problem is no disk space */
661 		errno = save_errno ? save_errno : ENOSPC;
662 		ereport(PANIC,
663 				(errcode_for_file_access(),
664 				 errmsg("could not write to file \"%s\": %m",
665 						tmppath)));
666 	}
667 
668 	CloseTransientFile(tmpfd);
669 
670 	/* fsync, rename to permanent file, fsync file and directory */
671 	durable_rename(tmppath, path, PANIC);
672 }
673 
674 /*
675  * Recover replication replay status from checkpoint data saved earlier by
676  * CheckPointReplicationOrigin.
677  *
678  * This only needs to be called at startup and *not* during every checkpoint
679  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
680  * state thereafter can be recovered by looking at commit records.
681  */
682 void
StartupReplicationOrigin(void)683 StartupReplicationOrigin(void)
684 {
685 	const char *path = "pg_logical/replorigin_checkpoint";
686 	int			fd;
687 	int			readBytes;
688 	uint32		magic = REPLICATION_STATE_MAGIC;
689 	int			last_state = 0;
690 	pg_crc32c	file_crc;
691 	pg_crc32c	crc;
692 
693 	/* don't want to overwrite already existing state */
694 #ifdef USE_ASSERT_CHECKING
695 	static bool already_started = false;
696 
697 	Assert(!already_started);
698 	already_started = true;
699 #endif
700 
701 	if (max_replication_slots == 0)
702 		return;
703 
704 	INIT_CRC32C(crc);
705 
706 	elog(DEBUG2, "starting up replication origin progress state");
707 
708 	fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
709 
710 	/*
711 	 * might have had max_replication_slots == 0 last run, or we just brought
712 	 * up a standby.
713 	 */
714 	if (fd < 0 && errno == ENOENT)
715 		return;
716 	else if (fd < 0)
717 		ereport(PANIC,
718 				(errcode_for_file_access(),
719 				 errmsg("could not open file \"%s\": %m",
720 						path)));
721 
722 	/* verify magic, that is written even if nothing was active */
723 	readBytes = read(fd, &magic, sizeof(magic));
724 	if (readBytes != sizeof(magic))
725 		ereport(PANIC,
726 				(errmsg("could not read file \"%s\": %m",
727 						path)));
728 	COMP_CRC32C(crc, &magic, sizeof(magic));
729 
730 	if (magic != REPLICATION_STATE_MAGIC)
731 		ereport(PANIC,
732 				(errmsg("replication checkpoint has wrong magic %u instead of %u",
733 						magic, REPLICATION_STATE_MAGIC)));
734 
735 	/* we can skip locking here, no other access is possible */
736 
737 	/* recover individual states, until there are no more to be found */
738 	while (true)
739 	{
740 		ReplicationStateOnDisk disk_state;
741 
742 		readBytes = read(fd, &disk_state, sizeof(disk_state));
743 
744 		/* no further data */
745 		if (readBytes == sizeof(crc))
746 		{
747 			/* not pretty, but simple ... */
748 			file_crc = *(pg_crc32c *) &disk_state;
749 			break;
750 		}
751 
752 		if (readBytes < 0)
753 		{
754 			ereport(PANIC,
755 					(errcode_for_file_access(),
756 					 errmsg("could not read file \"%s\": %m",
757 							path)));
758 		}
759 
760 		if (readBytes != sizeof(disk_state))
761 		{
762 			ereport(PANIC,
763 					(errcode_for_file_access(),
764 					 errmsg("could not read file \"%s\": read %d of %zu",
765 							path, readBytes, sizeof(disk_state))));
766 		}
767 
768 		COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
769 
770 		if (last_state == max_replication_slots)
771 			ereport(PANIC,
772 					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
773 					 errmsg("could not find free replication state, increase max_replication_slots")));
774 
775 		/* copy data to shared memory */
776 		replication_states[last_state].roident = disk_state.roident;
777 		replication_states[last_state].remote_lsn = disk_state.remote_lsn;
778 		last_state++;
779 
780 		elog(LOG, "recovered replication state of node %u to %X/%X",
781 			 disk_state.roident,
782 			 (uint32) (disk_state.remote_lsn >> 32),
783 			 (uint32) disk_state.remote_lsn);
784 	}
785 
786 	/* now check checksum */
787 	FIN_CRC32C(crc);
788 	if (file_crc != crc)
789 		ereport(PANIC,
790 				(errcode(ERRCODE_DATA_CORRUPTED),
791 				 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
792 						crc, file_crc)));
793 
794 	CloseTransientFile(fd);
795 }
796 
797 void
replorigin_redo(XLogReaderState * record)798 replorigin_redo(XLogReaderState *record)
799 {
800 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
801 
802 	switch (info)
803 	{
804 		case XLOG_REPLORIGIN_SET:
805 			{
806 				xl_replorigin_set *xlrec =
807 				(xl_replorigin_set *) XLogRecGetData(record);
808 
809 				replorigin_advance(xlrec->node_id,
810 								   xlrec->remote_lsn, record->EndRecPtr,
811 								   xlrec->force /* backward */ ,
812 								   false /* WAL log */ );
813 				break;
814 			}
815 		case XLOG_REPLORIGIN_DROP:
816 			{
817 				xl_replorigin_drop *xlrec;
818 				int			i;
819 
820 				xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
821 
822 				for (i = 0; i < max_replication_slots; i++)
823 				{
824 					ReplicationState *state = &replication_states[i];
825 
826 					/* found our slot */
827 					if (state->roident == xlrec->node_id)
828 					{
829 						/* reset entry */
830 						state->roident = InvalidRepOriginId;
831 						state->remote_lsn = InvalidXLogRecPtr;
832 						state->local_lsn = InvalidXLogRecPtr;
833 						break;
834 					}
835 				}
836 				break;
837 			}
838 		default:
839 			elog(PANIC, "replorigin_redo: unknown op code %u", info);
840 	}
841 }
842 
843 
844 /*
845  * Tell the replication origin progress machinery that a commit from 'node'
846  * that originated at the LSN remote_commit on the remote node was replayed
847  * successfully and that we don't need to do so again. In combination with
848  * setting up replorigin_session_origin_lsn and replorigin_session_origin
849  * that ensures we won't loose knowledge about that after a crash if the
850  * transaction had a persistent effect (think of asynchronous commits).
851  *
852  * local_commit needs to be a local LSN of the commit so that we can make sure
853  * upon a checkpoint that enough WAL has been persisted to disk.
854  *
855  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
856  * unless running in recovery.
857  */
858 void
replorigin_advance(RepOriginId node,XLogRecPtr remote_commit,XLogRecPtr local_commit,bool go_backward,bool wal_log)859 replorigin_advance(RepOriginId node,
860 				   XLogRecPtr remote_commit, XLogRecPtr local_commit,
861 				   bool go_backward, bool wal_log)
862 {
863 	int			i;
864 	ReplicationState *replication_state = NULL;
865 	ReplicationState *free_state = NULL;
866 
867 	Assert(node != InvalidRepOriginId);
868 
869 	/* we don't track DoNotReplicateId */
870 	if (node == DoNotReplicateId)
871 		return;
872 
873 	/*
874 	 * XXX: For the case where this is called by WAL replay, it'd be more
875 	 * efficient to restore into a backend local hashtable and only dump into
876 	 * shmem after recovery is finished. Let's wait with implementing that
877 	 * till it's shown to be a measurable expense
878 	 */
879 
880 	/* Lock exclusively, as we may have to create a new table entry. */
881 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
882 
883 	/*
884 	 * Search for either an existing slot for the origin, or a free one we can
885 	 * use.
886 	 */
887 	for (i = 0; i < max_replication_slots; i++)
888 	{
889 		ReplicationState *curstate = &replication_states[i];
890 
891 		/* remember where to insert if necessary */
892 		if (curstate->roident == InvalidRepOriginId &&
893 			free_state == NULL)
894 		{
895 			free_state = curstate;
896 			continue;
897 		}
898 
899 		/* not our slot */
900 		if (curstate->roident != node)
901 		{
902 			continue;
903 		}
904 
905 		/* ok, found slot */
906 		replication_state = curstate;
907 
908 		LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
909 
910 		/* Make sure it's not used by somebody else */
911 		if (replication_state->acquired_by != 0)
912 		{
913 			ereport(ERROR,
914 					(errcode(ERRCODE_OBJECT_IN_USE),
915 					 errmsg("replication origin with OID %d is already active for PID %d",
916 							replication_state->roident,
917 							replication_state->acquired_by)));
918 		}
919 
920 		break;
921 	}
922 
923 	if (replication_state == NULL && free_state == NULL)
924 		ereport(ERROR,
925 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
926 				 errmsg("could not find free replication state slot for replication origin with OID %u",
927 						node),
928 				 errhint("Increase max_replication_slots and try again.")));
929 
930 	if (replication_state == NULL)
931 	{
932 		/* initialize new slot */
933 		LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
934 		replication_state = free_state;
935 		Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
936 		Assert(replication_state->local_lsn == InvalidXLogRecPtr);
937 		replication_state->roident = node;
938 	}
939 
940 	Assert(replication_state->roident != InvalidRepOriginId);
941 
942 	/*
943 	 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
944 	 * and the standby gets the message. Primarily this will be called during
945 	 * WAL replay (of commit records) where no WAL logging is necessary.
946 	 */
947 	if (wal_log)
948 	{
949 		xl_replorigin_set xlrec;
950 
951 		xlrec.remote_lsn = remote_commit;
952 		xlrec.node_id = node;
953 		xlrec.force = go_backward;
954 
955 		XLogBeginInsert();
956 		XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
957 
958 		XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
959 	}
960 
961 	/*
962 	 * Due to - harmless - race conditions during a checkpoint we could see
963 	 * values here that are older than the ones we already have in memory.
964 	 * Don't overwrite those.
965 	 */
966 	if (go_backward || replication_state->remote_lsn < remote_commit)
967 		replication_state->remote_lsn = remote_commit;
968 	if (local_commit != InvalidXLogRecPtr &&
969 		(go_backward || replication_state->local_lsn < local_commit))
970 		replication_state->local_lsn = local_commit;
971 	LWLockRelease(&replication_state->lock);
972 
973 	/*
974 	 * Release *after* changing the LSNs, slot isn't acquired and thus could
975 	 * otherwise be dropped anytime.
976 	 */
977 	LWLockRelease(ReplicationOriginLock);
978 }
979 
980 
981 XLogRecPtr
replorigin_get_progress(RepOriginId node,bool flush)982 replorigin_get_progress(RepOriginId node, bool flush)
983 {
984 	int			i;
985 	XLogRecPtr	local_lsn = InvalidXLogRecPtr;
986 	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
987 
988 	/* prevent slots from being concurrently dropped */
989 	LWLockAcquire(ReplicationOriginLock, LW_SHARED);
990 
991 	for (i = 0; i < max_replication_slots; i++)
992 	{
993 		ReplicationState *state;
994 
995 		state = &replication_states[i];
996 
997 		if (state->roident == node)
998 		{
999 			LWLockAcquire(&state->lock, LW_SHARED);
1000 
1001 			remote_lsn = state->remote_lsn;
1002 			local_lsn = state->local_lsn;
1003 
1004 			LWLockRelease(&state->lock);
1005 
1006 			break;
1007 		}
1008 	}
1009 
1010 	LWLockRelease(ReplicationOriginLock);
1011 
1012 	if (flush && local_lsn != InvalidXLogRecPtr)
1013 		XLogFlush(local_lsn);
1014 
1015 	return remote_lsn;
1016 }
1017 
1018 /*
1019  * Tear down a (possibly) configured session replication origin during process
1020  * exit.
1021  */
1022 static void
ReplicationOriginExitCleanup(int code,Datum arg)1023 ReplicationOriginExitCleanup(int code, Datum arg)
1024 {
1025 	ConditionVariable *cv = NULL;
1026 
1027 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1028 
1029 	if (session_replication_state != NULL &&
1030 		session_replication_state->acquired_by == MyProcPid)
1031 	{
1032 		cv = &session_replication_state->origin_cv;
1033 
1034 		session_replication_state->acquired_by = 0;
1035 		session_replication_state = NULL;
1036 	}
1037 
1038 	LWLockRelease(ReplicationOriginLock);
1039 
1040 	if (cv)
1041 		ConditionVariableBroadcast(cv);
1042 }
1043 
1044 /*
1045  * Setup a replication origin in the shared memory struct if it doesn't
1046  * already exists and cache access to the specific ReplicationSlot so the
1047  * array doesn't have to be searched when calling
1048  * replorigin_session_advance().
1049  *
1050  * Obviously only one such cached origin can exist per process and the current
1051  * cached value can only be set again after the previous value is torn down
1052  * with replorigin_session_reset().
1053  */
1054 void
replorigin_session_setup(RepOriginId node)1055 replorigin_session_setup(RepOriginId node)
1056 {
1057 	static bool registered_cleanup;
1058 	int			i;
1059 	int			free_slot = -1;
1060 
1061 	if (!registered_cleanup)
1062 	{
1063 		on_shmem_exit(ReplicationOriginExitCleanup, 0);
1064 		registered_cleanup = true;
1065 	}
1066 
1067 	Assert(max_replication_slots > 0);
1068 
1069 	if (session_replication_state != NULL)
1070 		ereport(ERROR,
1071 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1072 				 errmsg("cannot setup replication origin when one is already setup")));
1073 
1074 	/* Lock exclusively, as we may have to create a new table entry. */
1075 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1076 
1077 	/*
1078 	 * Search for either an existing slot for the origin, or a free one we can
1079 	 * use.
1080 	 */
1081 	for (i = 0; i < max_replication_slots; i++)
1082 	{
1083 		ReplicationState *curstate = &replication_states[i];
1084 
1085 		/* remember where to insert if necessary */
1086 		if (curstate->roident == InvalidRepOriginId &&
1087 			free_slot == -1)
1088 		{
1089 			free_slot = i;
1090 			continue;
1091 		}
1092 
1093 		/* not our slot */
1094 		if (curstate->roident != node)
1095 			continue;
1096 
1097 		else if (curstate->acquired_by != 0)
1098 		{
1099 			ereport(ERROR,
1100 					(errcode(ERRCODE_OBJECT_IN_USE),
1101 					 errmsg("replication origin with OID %d is already active for PID %d",
1102 							curstate->roident, curstate->acquired_by)));
1103 		}
1104 
1105 		/* ok, found slot */
1106 		session_replication_state = curstate;
1107 	}
1108 
1109 
1110 	if (session_replication_state == NULL && free_slot == -1)
1111 		ereport(ERROR,
1112 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1113 				 errmsg("could not find free replication state slot for replication origin with OID %u",
1114 						node),
1115 				 errhint("Increase max_replication_slots and try again.")));
1116 	else if (session_replication_state == NULL)
1117 	{
1118 		/* initialize new slot */
1119 		session_replication_state = &replication_states[free_slot];
1120 		Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1121 		Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1122 		session_replication_state->roident = node;
1123 	}
1124 
1125 
1126 	Assert(session_replication_state->roident != InvalidRepOriginId);
1127 
1128 	session_replication_state->acquired_by = MyProcPid;
1129 
1130 	LWLockRelease(ReplicationOriginLock);
1131 
1132 	/* probably this one is pointless */
1133 	ConditionVariableBroadcast(&session_replication_state->origin_cv);
1134 }
1135 
1136 /*
1137  * Reset replay state previously setup in this session.
1138  *
1139  * This function may only be called if an origin was setup with
1140  * replorigin_session_setup().
1141  */
1142 void
replorigin_session_reset(void)1143 replorigin_session_reset(void)
1144 {
1145 	ConditionVariable *cv;
1146 
1147 	Assert(max_replication_slots != 0);
1148 
1149 	if (session_replication_state == NULL)
1150 		ereport(ERROR,
1151 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1152 				 errmsg("no replication origin is configured")));
1153 
1154 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1155 
1156 	session_replication_state->acquired_by = 0;
1157 	cv = &session_replication_state->origin_cv;
1158 	session_replication_state = NULL;
1159 
1160 	LWLockRelease(ReplicationOriginLock);
1161 
1162 	ConditionVariableBroadcast(cv);
1163 }
1164 
1165 /*
1166  * Do the same work replorigin_advance() does, just on the session's
1167  * configured origin.
1168  *
1169  * This is noticeably cheaper than using replorigin_advance().
1170  */
1171 void
replorigin_session_advance(XLogRecPtr remote_commit,XLogRecPtr local_commit)1172 replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1173 {
1174 	Assert(session_replication_state != NULL);
1175 	Assert(session_replication_state->roident != InvalidRepOriginId);
1176 
1177 	LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1178 	if (session_replication_state->local_lsn < local_commit)
1179 		session_replication_state->local_lsn = local_commit;
1180 	if (session_replication_state->remote_lsn < remote_commit)
1181 		session_replication_state->remote_lsn = remote_commit;
1182 	LWLockRelease(&session_replication_state->lock);
1183 }
1184 
1185 /*
1186  * Ask the machinery about the point up to which we successfully replayed
1187  * changes from an already setup replication origin.
1188  */
1189 XLogRecPtr
replorigin_session_get_progress(bool flush)1190 replorigin_session_get_progress(bool flush)
1191 {
1192 	XLogRecPtr	remote_lsn;
1193 	XLogRecPtr	local_lsn;
1194 
1195 	Assert(session_replication_state != NULL);
1196 
1197 	LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1198 	remote_lsn = session_replication_state->remote_lsn;
1199 	local_lsn = session_replication_state->local_lsn;
1200 	LWLockRelease(&session_replication_state->lock);
1201 
1202 	if (flush && local_lsn != InvalidXLogRecPtr)
1203 		XLogFlush(local_lsn);
1204 
1205 	return remote_lsn;
1206 }
1207 
1208 
1209 
1210 /* ---------------------------------------------------------------------------
1211  * SQL functions for working with replication origin.
1212  *
1213  * These mostly should be fairly short wrappers around more generic functions.
1214  * ---------------------------------------------------------------------------
1215  */
1216 
1217 /*
1218  * Create replication origin for the passed in name, and return the assigned
1219  * oid.
1220  */
1221 Datum
pg_replication_origin_create(PG_FUNCTION_ARGS)1222 pg_replication_origin_create(PG_FUNCTION_ARGS)
1223 {
1224 	char	   *name;
1225 	RepOriginId roident;
1226 
1227 	replorigin_check_prerequisites(false, false);
1228 
1229 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1230 	roident = replorigin_create(name);
1231 
1232 	pfree(name);
1233 
1234 	PG_RETURN_OID(roident);
1235 }
1236 
1237 /*
1238  * Drop replication origin.
1239  */
1240 Datum
pg_replication_origin_drop(PG_FUNCTION_ARGS)1241 pg_replication_origin_drop(PG_FUNCTION_ARGS)
1242 {
1243 	char	   *name;
1244 	RepOriginId roident;
1245 
1246 	replorigin_check_prerequisites(false, false);
1247 
1248 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1249 
1250 	roident = replorigin_by_name(name, false);
1251 	Assert(OidIsValid(roident));
1252 
1253 	replorigin_drop(roident, true);
1254 
1255 	pfree(name);
1256 
1257 	PG_RETURN_VOID();
1258 }
1259 
1260 /*
1261  * Return oid of a replication origin.
1262  */
1263 Datum
pg_replication_origin_oid(PG_FUNCTION_ARGS)1264 pg_replication_origin_oid(PG_FUNCTION_ARGS)
1265 {
1266 	char	   *name;
1267 	RepOriginId roident;
1268 
1269 	replorigin_check_prerequisites(false, false);
1270 
1271 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1272 	roident = replorigin_by_name(name, true);
1273 
1274 	pfree(name);
1275 
1276 	if (OidIsValid(roident))
1277 		PG_RETURN_OID(roident);
1278 	PG_RETURN_NULL();
1279 }
1280 
1281 /*
1282  * Setup a replication origin for this session.
1283  */
1284 Datum
pg_replication_origin_session_setup(PG_FUNCTION_ARGS)1285 pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1286 {
1287 	char	   *name;
1288 	RepOriginId origin;
1289 
1290 	replorigin_check_prerequisites(true, false);
1291 
1292 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1293 	origin = replorigin_by_name(name, false);
1294 	replorigin_session_setup(origin);
1295 
1296 	replorigin_session_origin = origin;
1297 
1298 	pfree(name);
1299 
1300 	PG_RETURN_VOID();
1301 }
1302 
1303 /*
1304  * Reset previously setup origin in this session
1305  */
1306 Datum
pg_replication_origin_session_reset(PG_FUNCTION_ARGS)1307 pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1308 {
1309 	replorigin_check_prerequisites(true, false);
1310 
1311 	replorigin_session_reset();
1312 
1313 	replorigin_session_origin = InvalidRepOriginId;
1314 	replorigin_session_origin_lsn = InvalidXLogRecPtr;
1315 	replorigin_session_origin_timestamp = 0;
1316 
1317 	PG_RETURN_VOID();
1318 }
1319 
1320 /*
1321  * Has a replication origin been setup for this session.
1322  */
1323 Datum
pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)1324 pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1325 {
1326 	replorigin_check_prerequisites(false, false);
1327 
1328 	PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1329 }
1330 
1331 
1332 /*
1333  * Return the replication progress for origin setup in the current session.
1334  *
1335  * If 'flush' is set to true it is ensured that the returned value corresponds
1336  * to a local transaction that has been flushed. This is useful if asynchronous
1337  * commits are used when replaying replicated transactions.
1338  */
1339 Datum
pg_replication_origin_session_progress(PG_FUNCTION_ARGS)1340 pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1341 {
1342 	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
1343 	bool		flush = PG_GETARG_BOOL(0);
1344 
1345 	replorigin_check_prerequisites(true, false);
1346 
1347 	if (session_replication_state == NULL)
1348 		ereport(ERROR,
1349 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1350 				 errmsg("no replication origin is configured")));
1351 
1352 	remote_lsn = replorigin_session_get_progress(flush);
1353 
1354 	if (remote_lsn == InvalidXLogRecPtr)
1355 		PG_RETURN_NULL();
1356 
1357 	PG_RETURN_LSN(remote_lsn);
1358 }
1359 
1360 Datum
pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)1361 pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1362 {
1363 	XLogRecPtr	location = PG_GETARG_LSN(0);
1364 
1365 	replorigin_check_prerequisites(true, false);
1366 
1367 	if (session_replication_state == NULL)
1368 		ereport(ERROR,
1369 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1370 				 errmsg("no replication origin is configured")));
1371 
1372 	replorigin_session_origin_lsn = location;
1373 	replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1374 
1375 	PG_RETURN_VOID();
1376 }
1377 
1378 Datum
pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)1379 pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1380 {
1381 	replorigin_check_prerequisites(true, false);
1382 
1383 	replorigin_session_origin_lsn = InvalidXLogRecPtr;
1384 	replorigin_session_origin_timestamp = 0;
1385 
1386 	PG_RETURN_VOID();
1387 }
1388 
1389 
1390 Datum
pg_replication_origin_advance(PG_FUNCTION_ARGS)1391 pg_replication_origin_advance(PG_FUNCTION_ARGS)
1392 {
1393 	text	   *name = PG_GETARG_TEXT_PP(0);
1394 	XLogRecPtr	remote_commit = PG_GETARG_LSN(1);
1395 	RepOriginId node;
1396 
1397 	replorigin_check_prerequisites(true, false);
1398 
1399 	/* lock to prevent the replication origin from vanishing */
1400 	LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1401 
1402 	node = replorigin_by_name(text_to_cstring(name), false);
1403 
1404 	/*
1405 	 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1406 	 * xact hasn't committed yet. This is why this function should be used to
1407 	 * set up the initial replication state, but not for replay.
1408 	 */
1409 	replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1410 					   true /* go backward */ , true /* WAL log */ );
1411 
1412 	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1413 
1414 	PG_RETURN_VOID();
1415 }
1416 
1417 
1418 /*
1419  * Return the replication progress for an individual replication origin.
1420  *
1421  * If 'flush' is set to true it is ensured that the returned value corresponds
1422  * to a local transaction that has been flushed. This is useful if asynchronous
1423  * commits are used when replaying replicated transactions.
1424  */
1425 Datum
pg_replication_origin_progress(PG_FUNCTION_ARGS)1426 pg_replication_origin_progress(PG_FUNCTION_ARGS)
1427 {
1428 	char	   *name;
1429 	bool		flush;
1430 	RepOriginId roident;
1431 	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
1432 
1433 	replorigin_check_prerequisites(true, true);
1434 
1435 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1436 	flush = PG_GETARG_BOOL(1);
1437 
1438 	roident = replorigin_by_name(name, false);
1439 	Assert(OidIsValid(roident));
1440 
1441 	remote_lsn = replorigin_get_progress(roident, flush);
1442 
1443 	if (remote_lsn == InvalidXLogRecPtr)
1444 		PG_RETURN_NULL();
1445 
1446 	PG_RETURN_LSN(remote_lsn);
1447 }
1448 
1449 
1450 Datum
pg_show_replication_origin_status(PG_FUNCTION_ARGS)1451 pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1452 {
1453 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1454 	TupleDesc	tupdesc;
1455 	Tuplestorestate *tupstore;
1456 	MemoryContext per_query_ctx;
1457 	MemoryContext oldcontext;
1458 	int			i;
1459 #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1460 
1461 	/* we want to return 0 rows if slot is set to zero */
1462 	replorigin_check_prerequisites(false, true);
1463 
1464 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1465 		ereport(ERROR,
1466 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1467 				 errmsg("set-valued function called in context that cannot accept a set")));
1468 	if (!(rsinfo->allowedModes & SFRM_Materialize))
1469 		ereport(ERROR,
1470 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1471 				 errmsg("materialize mode required, but it is not allowed in this context")));
1472 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1473 		elog(ERROR, "return type must be a row type");
1474 
1475 	if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1476 		elog(ERROR, "wrong function definition");
1477 
1478 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1479 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
1480 
1481 	tupstore = tuplestore_begin_heap(true, false, work_mem);
1482 	rsinfo->returnMode = SFRM_Materialize;
1483 	rsinfo->setResult = tupstore;
1484 	rsinfo->setDesc = tupdesc;
1485 
1486 	MemoryContextSwitchTo(oldcontext);
1487 
1488 
1489 	/* prevent slots from being concurrently dropped */
1490 	LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1491 
1492 	/*
1493 	 * Iterate through all possible replication_states, display if they are
1494 	 * filled. Note that we do not take any locks, so slightly corrupted/out
1495 	 * of date values are a possibility.
1496 	 */
1497 	for (i = 0; i < max_replication_slots; i++)
1498 	{
1499 		ReplicationState *state;
1500 		Datum		values[REPLICATION_ORIGIN_PROGRESS_COLS];
1501 		bool		nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1502 		char	   *roname;
1503 
1504 		state = &replication_states[i];
1505 
1506 		/* unused slot, nothing to display */
1507 		if (state->roident == InvalidRepOriginId)
1508 			continue;
1509 
1510 		memset(values, 0, sizeof(values));
1511 		memset(nulls, 1, sizeof(nulls));
1512 
1513 		values[0] = ObjectIdGetDatum(state->roident);
1514 		nulls[0] = false;
1515 
1516 		/*
1517 		 * We're not preventing the origin to be dropped concurrently, so
1518 		 * silently accept that it might be gone.
1519 		 */
1520 		if (replorigin_by_oid(state->roident, true,
1521 							  &roname))
1522 		{
1523 			values[1] = CStringGetTextDatum(roname);
1524 			nulls[1] = false;
1525 		}
1526 
1527 		LWLockAcquire(&state->lock, LW_SHARED);
1528 
1529 		values[2] = LSNGetDatum(state->remote_lsn);
1530 		nulls[2] = false;
1531 
1532 		values[3] = LSNGetDatum(state->local_lsn);
1533 		nulls[3] = false;
1534 
1535 		LWLockRelease(&state->lock);
1536 
1537 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1538 	}
1539 
1540 	tuplestore_donestoring(tupstore);
1541 
1542 	LWLockRelease(ReplicationOriginLock);
1543 
1544 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1545 
1546 	return (Datum) 0;
1547 }
1548