1 /*-------------------------------------------------------------------------
2 *
3 * origin.c
4 * Logical replication progress tracking support.
5 *
6 * Copyright (c) 2013-2021, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/origin.c
10 *
11 * NOTES
12 *
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
17 *
18 * Replication origin consist out of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
25 *
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
39 *
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
43 * other filtering.
44 *
45 * There are several levels of locking at work:
46 *
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
50 *
51 * * When creating an in-memory replication progress slot the ReplicationOrigin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
56 *
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (cf. AdvanceReplicationProgress).
64 *
65 * ---------------------------------------------------------------------------
66 */
67
68 #include "postgres.h"
69
70 #include <unistd.h>
71 #include <sys/stat.h>
72
73 #include "access/genam.h"
74 #include "access/htup_details.h"
75 #include "access/table.h"
76 #include "access/xact.h"
77 #include "catalog/catalog.h"
78 #include "catalog/indexing.h"
79 #include "funcapi.h"
80 #include "miscadmin.h"
81 #include "nodes/execnodes.h"
82 #include "pgstat.h"
83 #include "replication/logical.h"
84 #include "replication/origin.h"
85 #include "storage/condition_variable.h"
86 #include "storage/copydir.h"
87 #include "storage/fd.h"
88 #include "storage/ipc.h"
89 #include "storage/lmgr.h"
90 #include "utils/builtins.h"
91 #include "utils/fmgroids.h"
92 #include "utils/pg_lsn.h"
93 #include "utils/rel.h"
94 #include "utils/snapmgr.h"
95 #include "utils/syscache.h"
96
97 /*
98 * Replay progress of a single remote node.
99 */
100 typedef struct ReplicationState
101 {
102 /*
103 * Local identifier for the remote node.
104 */
105 RepOriginId roident;
106
107 /*
108 * Location of the latest commit from the remote side.
109 */
110 XLogRecPtr remote_lsn;
111
112 /*
113 * Remember the local lsn of the commit record so we can XLogFlush() to it
114 * during a checkpoint so we know the commit record actually is safe on
115 * disk.
116 */
117 XLogRecPtr local_lsn;
118
119 /*
120 * PID of backend that's acquired slot, or 0 if none.
121 */
122 int acquired_by;
123
124 /*
125 * Condition variable that's signaled when acquired_by changes.
126 */
127 ConditionVariable origin_cv;
128
129 /*
130 * Lock protecting remote_lsn and local_lsn.
131 */
132 LWLock lock;
133 } ReplicationState;
134
135 /*
136 * On disk version of ReplicationState.
137 */
138 typedef struct ReplicationStateOnDisk
139 {
140 RepOriginId roident;
141 XLogRecPtr remote_lsn;
142 } ReplicationStateOnDisk;
143
144
145 typedef struct ReplicationStateCtl
146 {
147 /* Tranche to use for per-origin LWLocks */
148 int tranche_id;
149 /* Array of length max_replication_slots */
150 ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
151 } ReplicationStateCtl;
152
153 /* external variables */
154 RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
155 XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
156 TimestampTz replorigin_session_origin_timestamp = 0;
157
158 /*
159 * Base address into a shared memory array of replication states of size
160 * max_replication_slots.
161 *
162 * XXX: Should we use a separate variable to size this rather than
163 * max_replication_slots?
164 */
165 static ReplicationState *replication_states;
166
167 /*
168 * Actual shared memory block (replication_states[] is now part of this).
169 */
170 static ReplicationStateCtl *replication_states_ctl;
171
172 /*
173 * Backend-local, cached element from ReplicationState for use in a backend
174 * replaying remote commits, so we don't have to search ReplicationState for
175 * the backends current RepOriginId.
176 */
177 static ReplicationState *session_replication_state = NULL;
178
179 /* Magic for on disk files. */
180 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
181
182 static void
replorigin_check_prerequisites(bool check_slots,bool recoveryOK)183 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
184 {
185 if (check_slots && max_replication_slots == 0)
186 ereport(ERROR,
187 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
188 errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
189
190 if (!recoveryOK && RecoveryInProgress())
191 ereport(ERROR,
192 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
193 errmsg("cannot manipulate replication origins during recovery")));
194
195 }
196
197
198 /* ---------------------------------------------------------------------------
199 * Functions for working with replication origins themselves.
200 * ---------------------------------------------------------------------------
201 */
202
203 /*
204 * Check for a persistent replication origin identified by name.
205 *
206 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
207 */
208 RepOriginId
replorigin_by_name(const char * roname,bool missing_ok)209 replorigin_by_name(const char *roname, bool missing_ok)
210 {
211 Form_pg_replication_origin ident;
212 Oid roident = InvalidOid;
213 HeapTuple tuple;
214 Datum roname_d;
215
216 roname_d = CStringGetTextDatum(roname);
217
218 tuple = SearchSysCache1(REPLORIGNAME, roname_d);
219 if (HeapTupleIsValid(tuple))
220 {
221 ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
222 roident = ident->roident;
223 ReleaseSysCache(tuple);
224 }
225 else if (!missing_ok)
226 ereport(ERROR,
227 (errcode(ERRCODE_UNDEFINED_OBJECT),
228 errmsg("replication origin \"%s\" does not exist",
229 roname)));
230
231 return roident;
232 }
233
234 /*
235 * Create a replication origin.
236 *
237 * Needs to be called in a transaction.
238 */
239 RepOriginId
replorigin_create(const char * roname)240 replorigin_create(const char *roname)
241 {
242 Oid roident;
243 HeapTuple tuple = NULL;
244 Relation rel;
245 Datum roname_d;
246 SnapshotData SnapshotDirty;
247 SysScanDesc scan;
248 ScanKeyData key;
249
250 roname_d = CStringGetTextDatum(roname);
251
252 Assert(IsTransactionState());
253
254 /*
255 * We need the numeric replication origin to be 16bit wide, so we cannot
256 * rely on the normal oid allocation. Instead we simply scan
257 * pg_replication_origin for the first unused id. That's not particularly
258 * efficient, but this should be a fairly infrequent operation - we can
259 * easily spend a bit more code on this when it turns out it needs to be
260 * faster.
261 *
262 * We handle concurrency by taking an exclusive lock (allowing reads!)
263 * over the table for the duration of the search. Because we use a "dirty
264 * snapshot" we can read rows that other in-progress sessions have
265 * written, even though they would be invisible with normal snapshots. Due
266 * to the exclusive lock there's no danger that new rows can appear while
267 * we're checking.
268 */
269 InitDirtySnapshot(SnapshotDirty);
270
271 rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
272
273 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
274 {
275 bool nulls[Natts_pg_replication_origin];
276 Datum values[Natts_pg_replication_origin];
277 bool collides;
278
279 CHECK_FOR_INTERRUPTS();
280
281 ScanKeyInit(&key,
282 Anum_pg_replication_origin_roident,
283 BTEqualStrategyNumber, F_OIDEQ,
284 ObjectIdGetDatum(roident));
285
286 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
287 true /* indexOK */ ,
288 &SnapshotDirty,
289 1, &key);
290
291 collides = HeapTupleIsValid(systable_getnext(scan));
292
293 systable_endscan(scan);
294
295 if (!collides)
296 {
297 /*
298 * Ok, found an unused roident, insert the new row and do a CCI,
299 * so our callers can look it up if they want to.
300 */
301 memset(&nulls, 0, sizeof(nulls));
302
303 values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
304 values[Anum_pg_replication_origin_roname - 1] = roname_d;
305
306 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
307 CatalogTupleInsert(rel, tuple);
308 CommandCounterIncrement();
309 break;
310 }
311 }
312
313 /* now release lock again, */
314 table_close(rel, ExclusiveLock);
315
316 if (tuple == NULL)
317 ereport(ERROR,
318 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
319 errmsg("could not find free replication origin OID")));
320
321 heap_freetuple(tuple);
322 return roident;
323 }
324
325 /*
326 * Helper function to drop a replication origin.
327 */
328 static void
replorigin_drop_guts(Relation rel,RepOriginId roident,bool nowait)329 replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
330 {
331 HeapTuple tuple;
332 int i;
333
334 /*
335 * First, clean up the slot state info, if there is any matching slot.
336 */
337 restart:
338 tuple = NULL;
339 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
340
341 for (i = 0; i < max_replication_slots; i++)
342 {
343 ReplicationState *state = &replication_states[i];
344
345 if (state->roident == roident)
346 {
347 /* found our slot, is it busy? */
348 if (state->acquired_by != 0)
349 {
350 ConditionVariable *cv;
351
352 if (nowait)
353 ereport(ERROR,
354 (errcode(ERRCODE_OBJECT_IN_USE),
355 errmsg("could not drop replication origin with OID %d, in use by PID %d",
356 state->roident,
357 state->acquired_by)));
358
359 /*
360 * We must wait and then retry. Since we don't know which CV
361 * to wait on until here, we can't readily use
362 * ConditionVariablePrepareToSleep (calling it here would be
363 * wrong, since we could miss the signal if we did so); just
364 * use ConditionVariableSleep directly.
365 */
366 cv = &state->origin_cv;
367
368 LWLockRelease(ReplicationOriginLock);
369
370 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
371 goto restart;
372 }
373
374 /* first make a WAL log entry */
375 {
376 xl_replorigin_drop xlrec;
377
378 xlrec.node_id = roident;
379 XLogBeginInsert();
380 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
381 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
382 }
383
384 /* then clear the in-memory slot */
385 state->roident = InvalidRepOriginId;
386 state->remote_lsn = InvalidXLogRecPtr;
387 state->local_lsn = InvalidXLogRecPtr;
388 break;
389 }
390 }
391 LWLockRelease(ReplicationOriginLock);
392 ConditionVariableCancelSleep();
393
394 /*
395 * Now, we can delete the catalog entry.
396 */
397 tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
398 if (!HeapTupleIsValid(tuple))
399 elog(ERROR, "cache lookup failed for replication origin with oid %u",
400 roident);
401
402 CatalogTupleDelete(rel, &tuple->t_self);
403 ReleaseSysCache(tuple);
404
405 CommandCounterIncrement();
406 }
407
408 /*
409 * Drop replication origin (by name).
410 *
411 * Needs to be called in a transaction.
412 */
413 void
replorigin_drop_by_name(const char * name,bool missing_ok,bool nowait)414 replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
415 {
416 RepOriginId roident;
417 Relation rel;
418
419 Assert(IsTransactionState());
420
421 /*
422 * To interlock against concurrent drops, we hold ExclusiveLock on
423 * pg_replication_origin till xact commit.
424 *
425 * XXX We can optimize this by acquiring the lock on a specific origin by
426 * using LockSharedObject if required. However, for that, we first to
427 * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
428 * the specific origin and then re-check if the origin still exists.
429 */
430 rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
431
432 roident = replorigin_by_name(name, missing_ok);
433
434 if (OidIsValid(roident))
435 replorigin_drop_guts(rel, roident, nowait);
436
437 /* We keep the lock on pg_replication_origin until commit */
438 table_close(rel, NoLock);
439 }
440
441 /*
442 * Lookup replication origin via its oid and return the name.
443 *
444 * The external name is palloc'd in the calling context.
445 *
446 * Returns true if the origin is known, false otherwise.
447 */
448 bool
replorigin_by_oid(RepOriginId roident,bool missing_ok,char ** roname)449 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
450 {
451 HeapTuple tuple;
452 Form_pg_replication_origin ric;
453
454 Assert(OidIsValid((Oid) roident));
455 Assert(roident != InvalidRepOriginId);
456 Assert(roident != DoNotReplicateId);
457
458 tuple = SearchSysCache1(REPLORIGIDENT,
459 ObjectIdGetDatum((Oid) roident));
460
461 if (HeapTupleIsValid(tuple))
462 {
463 ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
464 *roname = text_to_cstring(&ric->roname);
465 ReleaseSysCache(tuple);
466
467 return true;
468 }
469 else
470 {
471 *roname = NULL;
472
473 if (!missing_ok)
474 ereport(ERROR,
475 (errcode(ERRCODE_UNDEFINED_OBJECT),
476 errmsg("replication origin with OID %u does not exist",
477 roident)));
478
479 return false;
480 }
481 }
482
483
484 /* ---------------------------------------------------------------------------
485 * Functions for handling replication progress.
486 * ---------------------------------------------------------------------------
487 */
488
489 Size
ReplicationOriginShmemSize(void)490 ReplicationOriginShmemSize(void)
491 {
492 Size size = 0;
493
494 /*
495 * XXX: max_replication_slots is arguably the wrong thing to use, as here
496 * we keep the replay state of *remote* transactions. But for now it seems
497 * sufficient to reuse it, rather than introduce a separate GUC.
498 */
499 if (max_replication_slots == 0)
500 return size;
501
502 size = add_size(size, offsetof(ReplicationStateCtl, states));
503
504 size = add_size(size,
505 mul_size(max_replication_slots, sizeof(ReplicationState)));
506 return size;
507 }
508
509 void
ReplicationOriginShmemInit(void)510 ReplicationOriginShmemInit(void)
511 {
512 bool found;
513
514 if (max_replication_slots == 0)
515 return;
516
517 replication_states_ctl = (ReplicationStateCtl *)
518 ShmemInitStruct("ReplicationOriginState",
519 ReplicationOriginShmemSize(),
520 &found);
521 replication_states = replication_states_ctl->states;
522
523 if (!found)
524 {
525 int i;
526
527 MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
528
529 replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
530
531 for (i = 0; i < max_replication_slots; i++)
532 {
533 LWLockInitialize(&replication_states[i].lock,
534 replication_states_ctl->tranche_id);
535 ConditionVariableInit(&replication_states[i].origin_cv);
536 }
537 }
538 }
539
540 /* ---------------------------------------------------------------------------
541 * Perform a checkpoint of each replication origin's progress with respect to
542 * the replayed remote_lsn. Make sure that all transactions we refer to in the
543 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
544 * if the transactions were originally committed asynchronously.
545 *
546 * We store checkpoints in the following format:
547 * +-------+------------------------+------------------+-----+--------+
548 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
549 * +-------+------------------------+------------------+-----+--------+
550 *
551 * So its just the magic, followed by the statically sized
552 * ReplicationStateOnDisk structs. Note that the maximum number of
553 * ReplicationState is determined by max_replication_slots.
554 * ---------------------------------------------------------------------------
555 */
556 void
CheckPointReplicationOrigin(void)557 CheckPointReplicationOrigin(void)
558 {
559 const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
560 const char *path = "pg_logical/replorigin_checkpoint";
561 int tmpfd;
562 int i;
563 uint32 magic = REPLICATION_STATE_MAGIC;
564 pg_crc32c crc;
565
566 if (max_replication_slots == 0)
567 return;
568
569 INIT_CRC32C(crc);
570
571 /* make sure no old temp file is remaining */
572 if (unlink(tmppath) < 0 && errno != ENOENT)
573 ereport(PANIC,
574 (errcode_for_file_access(),
575 errmsg("could not remove file \"%s\": %m",
576 tmppath)));
577
578 /*
579 * no other backend can perform this at the same time; only one checkpoint
580 * can happen at a time.
581 */
582 tmpfd = OpenTransientFile(tmppath,
583 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
584 if (tmpfd < 0)
585 ereport(PANIC,
586 (errcode_for_file_access(),
587 errmsg("could not create file \"%s\": %m",
588 tmppath)));
589
590 /* write magic */
591 errno = 0;
592 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
593 {
594 /* if write didn't set errno, assume problem is no disk space */
595 if (errno == 0)
596 errno = ENOSPC;
597 ereport(PANIC,
598 (errcode_for_file_access(),
599 errmsg("could not write to file \"%s\": %m",
600 tmppath)));
601 }
602 COMP_CRC32C(crc, &magic, sizeof(magic));
603
604 /* prevent concurrent creations/drops */
605 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
606
607 /* write actual data */
608 for (i = 0; i < max_replication_slots; i++)
609 {
610 ReplicationStateOnDisk disk_state;
611 ReplicationState *curstate = &replication_states[i];
612 XLogRecPtr local_lsn;
613
614 if (curstate->roident == InvalidRepOriginId)
615 continue;
616
617 /* zero, to avoid uninitialized padding bytes */
618 memset(&disk_state, 0, sizeof(disk_state));
619
620 LWLockAcquire(&curstate->lock, LW_SHARED);
621
622 disk_state.roident = curstate->roident;
623
624 disk_state.remote_lsn = curstate->remote_lsn;
625 local_lsn = curstate->local_lsn;
626
627 LWLockRelease(&curstate->lock);
628
629 /* make sure we only write out a commit that's persistent */
630 XLogFlush(local_lsn);
631
632 errno = 0;
633 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
634 sizeof(disk_state))
635 {
636 /* if write didn't set errno, assume problem is no disk space */
637 if (errno == 0)
638 errno = ENOSPC;
639 ereport(PANIC,
640 (errcode_for_file_access(),
641 errmsg("could not write to file \"%s\": %m",
642 tmppath)));
643 }
644
645 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
646 }
647
648 LWLockRelease(ReplicationOriginLock);
649
650 /* write out the CRC */
651 FIN_CRC32C(crc);
652 errno = 0;
653 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
654 {
655 /* if write didn't set errno, assume problem is no disk space */
656 if (errno == 0)
657 errno = ENOSPC;
658 ereport(PANIC,
659 (errcode_for_file_access(),
660 errmsg("could not write to file \"%s\": %m",
661 tmppath)));
662 }
663
664 if (CloseTransientFile(tmpfd) != 0)
665 ereport(PANIC,
666 (errcode_for_file_access(),
667 errmsg("could not close file \"%s\": %m",
668 tmppath)));
669
670 /* fsync, rename to permanent file, fsync file and directory */
671 durable_rename(tmppath, path, PANIC);
672 }
673
674 /*
675 * Recover replication replay status from checkpoint data saved earlier by
676 * CheckPointReplicationOrigin.
677 *
678 * This only needs to be called at startup and *not* during every checkpoint
679 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
680 * state thereafter can be recovered by looking at commit records.
681 */
682 void
StartupReplicationOrigin(void)683 StartupReplicationOrigin(void)
684 {
685 const char *path = "pg_logical/replorigin_checkpoint";
686 int fd;
687 int readBytes;
688 uint32 magic = REPLICATION_STATE_MAGIC;
689 int last_state = 0;
690 pg_crc32c file_crc;
691 pg_crc32c crc;
692
693 /* don't want to overwrite already existing state */
694 #ifdef USE_ASSERT_CHECKING
695 static bool already_started = false;
696
697 Assert(!already_started);
698 already_started = true;
699 #endif
700
701 if (max_replication_slots == 0)
702 return;
703
704 INIT_CRC32C(crc);
705
706 elog(DEBUG2, "starting up replication origin progress state");
707
708 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
709
710 /*
711 * might have had max_replication_slots == 0 last run, or we just brought
712 * up a standby.
713 */
714 if (fd < 0 && errno == ENOENT)
715 return;
716 else if (fd < 0)
717 ereport(PANIC,
718 (errcode_for_file_access(),
719 errmsg("could not open file \"%s\": %m",
720 path)));
721
722 /* verify magic, that is written even if nothing was active */
723 readBytes = read(fd, &magic, sizeof(magic));
724 if (readBytes != sizeof(magic))
725 {
726 if (readBytes < 0)
727 ereport(PANIC,
728 (errcode_for_file_access(),
729 errmsg("could not read file \"%s\": %m",
730 path)));
731 else
732 ereport(PANIC,
733 (errcode(ERRCODE_DATA_CORRUPTED),
734 errmsg("could not read file \"%s\": read %d of %zu",
735 path, readBytes, sizeof(magic))));
736 }
737 COMP_CRC32C(crc, &magic, sizeof(magic));
738
739 if (magic != REPLICATION_STATE_MAGIC)
740 ereport(PANIC,
741 (errmsg("replication checkpoint has wrong magic %u instead of %u",
742 magic, REPLICATION_STATE_MAGIC)));
743
744 /* we can skip locking here, no other access is possible */
745
746 /* recover individual states, until there are no more to be found */
747 while (true)
748 {
749 ReplicationStateOnDisk disk_state;
750
751 readBytes = read(fd, &disk_state, sizeof(disk_state));
752
753 /* no further data */
754 if (readBytes == sizeof(crc))
755 {
756 /* not pretty, but simple ... */
757 file_crc = *(pg_crc32c *) &disk_state;
758 break;
759 }
760
761 if (readBytes < 0)
762 {
763 ereport(PANIC,
764 (errcode_for_file_access(),
765 errmsg("could not read file \"%s\": %m",
766 path)));
767 }
768
769 if (readBytes != sizeof(disk_state))
770 {
771 ereport(PANIC,
772 (errcode_for_file_access(),
773 errmsg("could not read file \"%s\": read %d of %zu",
774 path, readBytes, sizeof(disk_state))));
775 }
776
777 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
778
779 if (last_state == max_replication_slots)
780 ereport(PANIC,
781 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
782 errmsg("could not find free replication state, increase max_replication_slots")));
783
784 /* copy data to shared memory */
785 replication_states[last_state].roident = disk_state.roident;
786 replication_states[last_state].remote_lsn = disk_state.remote_lsn;
787 last_state++;
788
789 ereport(LOG,
790 (errmsg("recovered replication state of node %u to %X/%X",
791 disk_state.roident,
792 LSN_FORMAT_ARGS(disk_state.remote_lsn))));
793 }
794
795 /* now check checksum */
796 FIN_CRC32C(crc);
797 if (file_crc != crc)
798 ereport(PANIC,
799 (errcode(ERRCODE_DATA_CORRUPTED),
800 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
801 crc, file_crc)));
802
803 if (CloseTransientFile(fd) != 0)
804 ereport(PANIC,
805 (errcode_for_file_access(),
806 errmsg("could not close file \"%s\": %m",
807 path)));
808 }
809
810 void
replorigin_redo(XLogReaderState * record)811 replorigin_redo(XLogReaderState *record)
812 {
813 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
814
815 switch (info)
816 {
817 case XLOG_REPLORIGIN_SET:
818 {
819 xl_replorigin_set *xlrec =
820 (xl_replorigin_set *) XLogRecGetData(record);
821
822 replorigin_advance(xlrec->node_id,
823 xlrec->remote_lsn, record->EndRecPtr,
824 xlrec->force /* backward */ ,
825 false /* WAL log */ );
826 break;
827 }
828 case XLOG_REPLORIGIN_DROP:
829 {
830 xl_replorigin_drop *xlrec;
831 int i;
832
833 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
834
835 for (i = 0; i < max_replication_slots; i++)
836 {
837 ReplicationState *state = &replication_states[i];
838
839 /* found our slot */
840 if (state->roident == xlrec->node_id)
841 {
842 /* reset entry */
843 state->roident = InvalidRepOriginId;
844 state->remote_lsn = InvalidXLogRecPtr;
845 state->local_lsn = InvalidXLogRecPtr;
846 break;
847 }
848 }
849 break;
850 }
851 default:
852 elog(PANIC, "replorigin_redo: unknown op code %u", info);
853 }
854 }
855
856
857 /*
858 * Tell the replication origin progress machinery that a commit from 'node'
859 * that originated at the LSN remote_commit on the remote node was replayed
860 * successfully and that we don't need to do so again. In combination with
861 * setting up replorigin_session_origin_lsn and replorigin_session_origin
862 * that ensures we won't lose knowledge about that after a crash if the
863 * transaction had a persistent effect (think of asynchronous commits).
864 *
865 * local_commit needs to be a local LSN of the commit so that we can make sure
866 * upon a checkpoint that enough WAL has been persisted to disk.
867 *
868 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
869 * unless running in recovery.
870 */
871 void
replorigin_advance(RepOriginId node,XLogRecPtr remote_commit,XLogRecPtr local_commit,bool go_backward,bool wal_log)872 replorigin_advance(RepOriginId node,
873 XLogRecPtr remote_commit, XLogRecPtr local_commit,
874 bool go_backward, bool wal_log)
875 {
876 int i;
877 ReplicationState *replication_state = NULL;
878 ReplicationState *free_state = NULL;
879
880 Assert(node != InvalidRepOriginId);
881
882 /* we don't track DoNotReplicateId */
883 if (node == DoNotReplicateId)
884 return;
885
886 /*
887 * XXX: For the case where this is called by WAL replay, it'd be more
888 * efficient to restore into a backend local hashtable and only dump into
889 * shmem after recovery is finished. Let's wait with implementing that
890 * till it's shown to be a measurable expense
891 */
892
893 /* Lock exclusively, as we may have to create a new table entry. */
894 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
895
896 /*
897 * Search for either an existing slot for the origin, or a free one we can
898 * use.
899 */
900 for (i = 0; i < max_replication_slots; i++)
901 {
902 ReplicationState *curstate = &replication_states[i];
903
904 /* remember where to insert if necessary */
905 if (curstate->roident == InvalidRepOriginId &&
906 free_state == NULL)
907 {
908 free_state = curstate;
909 continue;
910 }
911
912 /* not our slot */
913 if (curstate->roident != node)
914 {
915 continue;
916 }
917
918 /* ok, found slot */
919 replication_state = curstate;
920
921 LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
922
923 /* Make sure it's not used by somebody else */
924 if (replication_state->acquired_by != 0)
925 {
926 ereport(ERROR,
927 (errcode(ERRCODE_OBJECT_IN_USE),
928 errmsg("replication origin with OID %d is already active for PID %d",
929 replication_state->roident,
930 replication_state->acquired_by)));
931 }
932
933 break;
934 }
935
936 if (replication_state == NULL && free_state == NULL)
937 ereport(ERROR,
938 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
939 errmsg("could not find free replication state slot for replication origin with OID %u",
940 node),
941 errhint("Increase max_replication_slots and try again.")));
942
943 if (replication_state == NULL)
944 {
945 /* initialize new slot */
946 LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
947 replication_state = free_state;
948 Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
949 Assert(replication_state->local_lsn == InvalidXLogRecPtr);
950 replication_state->roident = node;
951 }
952
953 Assert(replication_state->roident != InvalidRepOriginId);
954
955 /*
956 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
957 * and the standby gets the message. Primarily this will be called during
958 * WAL replay (of commit records) where no WAL logging is necessary.
959 */
960 if (wal_log)
961 {
962 xl_replorigin_set xlrec;
963
964 xlrec.remote_lsn = remote_commit;
965 xlrec.node_id = node;
966 xlrec.force = go_backward;
967
968 XLogBeginInsert();
969 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
970
971 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
972 }
973
974 /*
975 * Due to - harmless - race conditions during a checkpoint we could see
976 * values here that are older than the ones we already have in memory.
977 * Don't overwrite those.
978 */
979 if (go_backward || replication_state->remote_lsn < remote_commit)
980 replication_state->remote_lsn = remote_commit;
981 if (local_commit != InvalidXLogRecPtr &&
982 (go_backward || replication_state->local_lsn < local_commit))
983 replication_state->local_lsn = local_commit;
984 LWLockRelease(&replication_state->lock);
985
986 /*
987 * Release *after* changing the LSNs, slot isn't acquired and thus could
988 * otherwise be dropped anytime.
989 */
990 LWLockRelease(ReplicationOriginLock);
991 }
992
993
994 XLogRecPtr
replorigin_get_progress(RepOriginId node,bool flush)995 replorigin_get_progress(RepOriginId node, bool flush)
996 {
997 int i;
998 XLogRecPtr local_lsn = InvalidXLogRecPtr;
999 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1000
1001 /* prevent slots from being concurrently dropped */
1002 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1003
1004 for (i = 0; i < max_replication_slots; i++)
1005 {
1006 ReplicationState *state;
1007
1008 state = &replication_states[i];
1009
1010 if (state->roident == node)
1011 {
1012 LWLockAcquire(&state->lock, LW_SHARED);
1013
1014 remote_lsn = state->remote_lsn;
1015 local_lsn = state->local_lsn;
1016
1017 LWLockRelease(&state->lock);
1018
1019 break;
1020 }
1021 }
1022
1023 LWLockRelease(ReplicationOriginLock);
1024
1025 if (flush && local_lsn != InvalidXLogRecPtr)
1026 XLogFlush(local_lsn);
1027
1028 return remote_lsn;
1029 }
1030
1031 /*
1032 * Tear down a (possibly) configured session replication origin during process
1033 * exit.
1034 */
1035 static void
ReplicationOriginExitCleanup(int code,Datum arg)1036 ReplicationOriginExitCleanup(int code, Datum arg)
1037 {
1038 ConditionVariable *cv = NULL;
1039
1040 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1041
1042 if (session_replication_state != NULL &&
1043 session_replication_state->acquired_by == MyProcPid)
1044 {
1045 cv = &session_replication_state->origin_cv;
1046
1047 session_replication_state->acquired_by = 0;
1048 session_replication_state = NULL;
1049 }
1050
1051 LWLockRelease(ReplicationOriginLock);
1052
1053 if (cv)
1054 ConditionVariableBroadcast(cv);
1055 }
1056
1057 /*
1058 * Setup a replication origin in the shared memory struct if it doesn't
1059 * already exists and cache access to the specific ReplicationSlot so the
1060 * array doesn't have to be searched when calling
1061 * replorigin_session_advance().
1062 *
1063 * Obviously only one such cached origin can exist per process and the current
1064 * cached value can only be set again after the previous value is torn down
1065 * with replorigin_session_reset().
1066 */
1067 void
replorigin_session_setup(RepOriginId node)1068 replorigin_session_setup(RepOriginId node)
1069 {
1070 static bool registered_cleanup;
1071 int i;
1072 int free_slot = -1;
1073
1074 if (!registered_cleanup)
1075 {
1076 on_shmem_exit(ReplicationOriginExitCleanup, 0);
1077 registered_cleanup = true;
1078 }
1079
1080 Assert(max_replication_slots > 0);
1081
1082 if (session_replication_state != NULL)
1083 ereport(ERROR,
1084 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1085 errmsg("cannot setup replication origin when one is already setup")));
1086
1087 /* Lock exclusively, as we may have to create a new table entry. */
1088 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1089
1090 /*
1091 * Search for either an existing slot for the origin, or a free one we can
1092 * use.
1093 */
1094 for (i = 0; i < max_replication_slots; i++)
1095 {
1096 ReplicationState *curstate = &replication_states[i];
1097
1098 /* remember where to insert if necessary */
1099 if (curstate->roident == InvalidRepOriginId &&
1100 free_slot == -1)
1101 {
1102 free_slot = i;
1103 continue;
1104 }
1105
1106 /* not our slot */
1107 if (curstate->roident != node)
1108 continue;
1109
1110 else if (curstate->acquired_by != 0)
1111 {
1112 ereport(ERROR,
1113 (errcode(ERRCODE_OBJECT_IN_USE),
1114 errmsg("replication origin with OID %d is already active for PID %d",
1115 curstate->roident, curstate->acquired_by)));
1116 }
1117
1118 /* ok, found slot */
1119 session_replication_state = curstate;
1120 }
1121
1122
1123 if (session_replication_state == NULL && free_slot == -1)
1124 ereport(ERROR,
1125 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1126 errmsg("could not find free replication state slot for replication origin with OID %u",
1127 node),
1128 errhint("Increase max_replication_slots and try again.")));
1129 else if (session_replication_state == NULL)
1130 {
1131 /* initialize new slot */
1132 session_replication_state = &replication_states[free_slot];
1133 Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1134 Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1135 session_replication_state->roident = node;
1136 }
1137
1138
1139 Assert(session_replication_state->roident != InvalidRepOriginId);
1140
1141 session_replication_state->acquired_by = MyProcPid;
1142
1143 LWLockRelease(ReplicationOriginLock);
1144
1145 /* probably this one is pointless */
1146 ConditionVariableBroadcast(&session_replication_state->origin_cv);
1147 }
1148
1149 /*
1150 * Reset replay state previously setup in this session.
1151 *
1152 * This function may only be called if an origin was setup with
1153 * replorigin_session_setup().
1154 */
1155 void
replorigin_session_reset(void)1156 replorigin_session_reset(void)
1157 {
1158 ConditionVariable *cv;
1159
1160 Assert(max_replication_slots != 0);
1161
1162 if (session_replication_state == NULL)
1163 ereport(ERROR,
1164 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1165 errmsg("no replication origin is configured")));
1166
1167 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1168
1169 session_replication_state->acquired_by = 0;
1170 cv = &session_replication_state->origin_cv;
1171 session_replication_state = NULL;
1172
1173 LWLockRelease(ReplicationOriginLock);
1174
1175 ConditionVariableBroadcast(cv);
1176 }
1177
1178 /*
1179 * Do the same work replorigin_advance() does, just on the session's
1180 * configured origin.
1181 *
1182 * This is noticeably cheaper than using replorigin_advance().
1183 */
1184 void
replorigin_session_advance(XLogRecPtr remote_commit,XLogRecPtr local_commit)1185 replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1186 {
1187 Assert(session_replication_state != NULL);
1188 Assert(session_replication_state->roident != InvalidRepOriginId);
1189
1190 LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1191 if (session_replication_state->local_lsn < local_commit)
1192 session_replication_state->local_lsn = local_commit;
1193 if (session_replication_state->remote_lsn < remote_commit)
1194 session_replication_state->remote_lsn = remote_commit;
1195 LWLockRelease(&session_replication_state->lock);
1196 }
1197
1198 /*
1199 * Ask the machinery about the point up to which we successfully replayed
1200 * changes from an already setup replication origin.
1201 */
1202 XLogRecPtr
replorigin_session_get_progress(bool flush)1203 replorigin_session_get_progress(bool flush)
1204 {
1205 XLogRecPtr remote_lsn;
1206 XLogRecPtr local_lsn;
1207
1208 Assert(session_replication_state != NULL);
1209
1210 LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1211 remote_lsn = session_replication_state->remote_lsn;
1212 local_lsn = session_replication_state->local_lsn;
1213 LWLockRelease(&session_replication_state->lock);
1214
1215 if (flush && local_lsn != InvalidXLogRecPtr)
1216 XLogFlush(local_lsn);
1217
1218 return remote_lsn;
1219 }
1220
1221
1222
1223 /* ---------------------------------------------------------------------------
1224 * SQL functions for working with replication origin.
1225 *
1226 * These mostly should be fairly short wrappers around more generic functions.
1227 * ---------------------------------------------------------------------------
1228 */
1229
1230 /*
1231 * Create replication origin for the passed in name, and return the assigned
1232 * oid.
1233 */
1234 Datum
pg_replication_origin_create(PG_FUNCTION_ARGS)1235 pg_replication_origin_create(PG_FUNCTION_ARGS)
1236 {
1237 char *name;
1238 RepOriginId roident;
1239
1240 replorigin_check_prerequisites(false, false);
1241
1242 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1243
1244 /* Replication origins "pg_xxx" are reserved for internal use */
1245 if (IsReservedName(name))
1246 ereport(ERROR,
1247 (errcode(ERRCODE_RESERVED_NAME),
1248 errmsg("replication origin name \"%s\" is reserved",
1249 name),
1250 errdetail("Origin names starting with \"pg_\" are reserved.")));
1251
1252 /*
1253 * If built with appropriate switch, whine when regression-testing
1254 * conventions for replication origin names are violated.
1255 */
1256 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1257 if (strncmp(name, "regress_", 8) != 0)
1258 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1259 #endif
1260
1261 roident = replorigin_create(name);
1262
1263 pfree(name);
1264
1265 PG_RETURN_OID(roident);
1266 }
1267
1268 /*
1269 * Drop replication origin.
1270 */
1271 Datum
pg_replication_origin_drop(PG_FUNCTION_ARGS)1272 pg_replication_origin_drop(PG_FUNCTION_ARGS)
1273 {
1274 char *name;
1275
1276 replorigin_check_prerequisites(false, false);
1277
1278 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1279
1280 replorigin_drop_by_name(name, false, true);
1281
1282 pfree(name);
1283
1284 PG_RETURN_VOID();
1285 }
1286
1287 /*
1288 * Return oid of a replication origin.
1289 */
1290 Datum
pg_replication_origin_oid(PG_FUNCTION_ARGS)1291 pg_replication_origin_oid(PG_FUNCTION_ARGS)
1292 {
1293 char *name;
1294 RepOriginId roident;
1295
1296 replorigin_check_prerequisites(false, false);
1297
1298 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1299 roident = replorigin_by_name(name, true);
1300
1301 pfree(name);
1302
1303 if (OidIsValid(roident))
1304 PG_RETURN_OID(roident);
1305 PG_RETURN_NULL();
1306 }
1307
1308 /*
1309 * Setup a replication origin for this session.
1310 */
1311 Datum
pg_replication_origin_session_setup(PG_FUNCTION_ARGS)1312 pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1313 {
1314 char *name;
1315 RepOriginId origin;
1316
1317 replorigin_check_prerequisites(true, false);
1318
1319 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1320 origin = replorigin_by_name(name, false);
1321 replorigin_session_setup(origin);
1322
1323 replorigin_session_origin = origin;
1324
1325 pfree(name);
1326
1327 PG_RETURN_VOID();
1328 }
1329
1330 /*
1331 * Reset previously setup origin in this session
1332 */
1333 Datum
pg_replication_origin_session_reset(PG_FUNCTION_ARGS)1334 pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1335 {
1336 replorigin_check_prerequisites(true, false);
1337
1338 replorigin_session_reset();
1339
1340 replorigin_session_origin = InvalidRepOriginId;
1341 replorigin_session_origin_lsn = InvalidXLogRecPtr;
1342 replorigin_session_origin_timestamp = 0;
1343
1344 PG_RETURN_VOID();
1345 }
1346
1347 /*
1348 * Has a replication origin been setup for this session.
1349 */
1350 Datum
pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)1351 pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1352 {
1353 replorigin_check_prerequisites(false, false);
1354
1355 PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1356 }
1357
1358
1359 /*
1360 * Return the replication progress for origin setup in the current session.
1361 *
1362 * If 'flush' is set to true it is ensured that the returned value corresponds
1363 * to a local transaction that has been flushed. This is useful if asynchronous
1364 * commits are used when replaying replicated transactions.
1365 */
1366 Datum
pg_replication_origin_session_progress(PG_FUNCTION_ARGS)1367 pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1368 {
1369 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1370 bool flush = PG_GETARG_BOOL(0);
1371
1372 replorigin_check_prerequisites(true, false);
1373
1374 if (session_replication_state == NULL)
1375 ereport(ERROR,
1376 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1377 errmsg("no replication origin is configured")));
1378
1379 remote_lsn = replorigin_session_get_progress(flush);
1380
1381 if (remote_lsn == InvalidXLogRecPtr)
1382 PG_RETURN_NULL();
1383
1384 PG_RETURN_LSN(remote_lsn);
1385 }
1386
1387 Datum
pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)1388 pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1389 {
1390 XLogRecPtr location = PG_GETARG_LSN(0);
1391
1392 replorigin_check_prerequisites(true, false);
1393
1394 if (session_replication_state == NULL)
1395 ereport(ERROR,
1396 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1397 errmsg("no replication origin is configured")));
1398
1399 replorigin_session_origin_lsn = location;
1400 replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1401
1402 PG_RETURN_VOID();
1403 }
1404
1405 Datum
pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)1406 pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1407 {
1408 replorigin_check_prerequisites(true, false);
1409
1410 replorigin_session_origin_lsn = InvalidXLogRecPtr;
1411 replorigin_session_origin_timestamp = 0;
1412
1413 PG_RETURN_VOID();
1414 }
1415
1416
1417 Datum
pg_replication_origin_advance(PG_FUNCTION_ARGS)1418 pg_replication_origin_advance(PG_FUNCTION_ARGS)
1419 {
1420 text *name = PG_GETARG_TEXT_PP(0);
1421 XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1422 RepOriginId node;
1423
1424 replorigin_check_prerequisites(true, false);
1425
1426 /* lock to prevent the replication origin from vanishing */
1427 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1428
1429 node = replorigin_by_name(text_to_cstring(name), false);
1430
1431 /*
1432 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1433 * xact hasn't committed yet. This is why this function should be used to
1434 * set up the initial replication state, but not for replay.
1435 */
1436 replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1437 true /* go backward */ , true /* WAL log */ );
1438
1439 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1440
1441 PG_RETURN_VOID();
1442 }
1443
1444
1445 /*
1446 * Return the replication progress for an individual replication origin.
1447 *
1448 * If 'flush' is set to true it is ensured that the returned value corresponds
1449 * to a local transaction that has been flushed. This is useful if asynchronous
1450 * commits are used when replaying replicated transactions.
1451 */
1452 Datum
pg_replication_origin_progress(PG_FUNCTION_ARGS)1453 pg_replication_origin_progress(PG_FUNCTION_ARGS)
1454 {
1455 char *name;
1456 bool flush;
1457 RepOriginId roident;
1458 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1459
1460 replorigin_check_prerequisites(true, true);
1461
1462 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1463 flush = PG_GETARG_BOOL(1);
1464
1465 roident = replorigin_by_name(name, false);
1466 Assert(OidIsValid(roident));
1467
1468 remote_lsn = replorigin_get_progress(roident, flush);
1469
1470 if (remote_lsn == InvalidXLogRecPtr)
1471 PG_RETURN_NULL();
1472
1473 PG_RETURN_LSN(remote_lsn);
1474 }
1475
1476
1477 Datum
pg_show_replication_origin_status(PG_FUNCTION_ARGS)1478 pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1479 {
1480 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1481 TupleDesc tupdesc;
1482 Tuplestorestate *tupstore;
1483 MemoryContext per_query_ctx;
1484 MemoryContext oldcontext;
1485 int i;
1486 #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1487
1488 /* we want to return 0 rows if slot is set to zero */
1489 replorigin_check_prerequisites(false, true);
1490
1491 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1492 ereport(ERROR,
1493 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1494 errmsg("set-valued function called in context that cannot accept a set")));
1495 if (!(rsinfo->allowedModes & SFRM_Materialize))
1496 ereport(ERROR,
1497 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1498 errmsg("materialize mode required, but it is not allowed in this context")));
1499 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1500 elog(ERROR, "return type must be a row type");
1501
1502 if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1503 elog(ERROR, "wrong function definition");
1504
1505 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1506 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1507
1508 tupstore = tuplestore_begin_heap(true, false, work_mem);
1509 rsinfo->returnMode = SFRM_Materialize;
1510 rsinfo->setResult = tupstore;
1511 rsinfo->setDesc = tupdesc;
1512
1513 MemoryContextSwitchTo(oldcontext);
1514
1515
1516 /* prevent slots from being concurrently dropped */
1517 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1518
1519 /*
1520 * Iterate through all possible replication_states, display if they are
1521 * filled. Note that we do not take any locks, so slightly corrupted/out
1522 * of date values are a possibility.
1523 */
1524 for (i = 0; i < max_replication_slots; i++)
1525 {
1526 ReplicationState *state;
1527 Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1528 bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1529 char *roname;
1530
1531 state = &replication_states[i];
1532
1533 /* unused slot, nothing to display */
1534 if (state->roident == InvalidRepOriginId)
1535 continue;
1536
1537 memset(values, 0, sizeof(values));
1538 memset(nulls, 1, sizeof(nulls));
1539
1540 values[0] = ObjectIdGetDatum(state->roident);
1541 nulls[0] = false;
1542
1543 /*
1544 * We're not preventing the origin to be dropped concurrently, so
1545 * silently accept that it might be gone.
1546 */
1547 if (replorigin_by_oid(state->roident, true,
1548 &roname))
1549 {
1550 values[1] = CStringGetTextDatum(roname);
1551 nulls[1] = false;
1552 }
1553
1554 LWLockAcquire(&state->lock, LW_SHARED);
1555
1556 values[2] = LSNGetDatum(state->remote_lsn);
1557 nulls[2] = false;
1558
1559 values[3] = LSNGetDatum(state->local_lsn);
1560 nulls[3] = false;
1561
1562 LWLockRelease(&state->lock);
1563
1564 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1565 }
1566
1567 tuplestore_donestoring(tupstore);
1568
1569 LWLockRelease(ReplicationOriginLock);
1570
1571 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1572
1573 return (Datum) 0;
1574 }
1575