1 /*-------------------------------------------------------------------------
2 *
3 * origin.c
4 * Logical replication progress tracking support.
5 *
6 * Copyright (c) 2013-2016, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/origin.c
10 *
11 * NOTES
12 *
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
17 *
18 * Replication origin consist out of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
25 *
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationStates) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
39 *
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
43 * other filtering.
44 *
45 * There are several levels of locking at work:
46 *
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
50 *
51 * * When creating an in-memory replication progress slot the ReplicationOirgin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
56 *
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (c.f. AdvanceReplicationProgress).
64 *
65 * ---------------------------------------------------------------------------
66 */
67
68 #include "postgres.h"
69
70 #include <unistd.h>
71 #include <sys/stat.h>
72
73 #include "funcapi.h"
74 #include "miscadmin.h"
75
76 #include "access/genam.h"
77 #include "access/heapam.h"
78 #include "access/htup_details.h"
79 #include "access/xact.h"
80
81 #include "catalog/indexing.h"
82
83 #include "nodes/execnodes.h"
84
85 #include "replication/origin.h"
86 #include "replication/logical.h"
87
88 #include "storage/fd.h"
89 #include "storage/ipc.h"
90 #include "storage/lmgr.h"
91 #include "storage/copydir.h"
92
93 #include "utils/builtins.h"
94 #include "utils/fmgroids.h"
95 #include "utils/pg_lsn.h"
96 #include "utils/rel.h"
97 #include "utils/syscache.h"
98 #include "utils/tqual.h"
99
100 /*
101 * Replay progress of a single remote node.
102 */
103 typedef struct ReplicationState
104 {
105 /*
106 * Local identifier for the remote node.
107 */
108 RepOriginId roident;
109
110 /*
111 * Location of the latest commit from the remote side.
112 */
113 XLogRecPtr remote_lsn;
114
115 /*
116 * Remember the local lsn of the commit record so we can XLogFlush() to it
117 * during a checkpoint so we know the commit record actually is safe on
118 * disk.
119 */
120 XLogRecPtr local_lsn;
121
122 /*
123 * PID of backend that's acquired slot, or 0 if none.
124 */
125 int acquired_by;
126
127 /*
128 * Lock protecting remote_lsn and local_lsn.
129 */
130 LWLock lock;
131 } ReplicationState;
132
133 /*
134 * On disk version of ReplicationState.
135 */
136 typedef struct ReplicationStateOnDisk
137 {
138 RepOriginId roident;
139 XLogRecPtr remote_lsn;
140 } ReplicationStateOnDisk;
141
142
143 typedef struct ReplicationStateCtl
144 {
145 int tranche_id;
146 LWLockTranche tranche;
147 /* Array of length max_replication_slots */
148 ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
149 } ReplicationStateCtl;
150
151 /* external variables */
152 RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
153 XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
154 TimestampTz replorigin_session_origin_timestamp = 0;
155
156 /*
157 * Base address into a shared memory array of replication states of size
158 * max_replication_slots.
159 *
160 * XXX: Should we use a separate variable to size this rather than
161 * max_replication_slots?
162 */
163 static ReplicationState *replication_states;
164
165 /*
166 * Actual shared memory block (replication_states[] is now part of this).
167 */
168 static ReplicationStateCtl *replication_states_ctl;
169
170 /*
171 * Backend-local, cached element from ReplicationStates for use in a backend
172 * replaying remote commits, so we don't have to search ReplicationStates for
173 * the backends current RepOriginId.
174 */
175 static ReplicationState *session_replication_state = NULL;
176
177 /* Magic for on disk files. */
178 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
179
180 static void
replorigin_check_prerequisites(bool check_slots,bool recoveryOK)181 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
182 {
183 if (!superuser())
184 ereport(ERROR,
185 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
186 errmsg("only superusers can query or manipulate replication origins")));
187
188 if (check_slots && max_replication_slots == 0)
189 ereport(ERROR,
190 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
191 errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
192
193 if (!recoveryOK && RecoveryInProgress())
194 ereport(ERROR,
195 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
196 errmsg("cannot manipulate replication origins during recovery")));
197
198 }
199
200
201 /* ---------------------------------------------------------------------------
202 * Functions for working with replication origins themselves.
203 * ---------------------------------------------------------------------------
204 */
205
206 /*
207 * Check for a persistent replication origin identified by name.
208 *
209 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
210 */
211 RepOriginId
replorigin_by_name(char * roname,bool missing_ok)212 replorigin_by_name(char *roname, bool missing_ok)
213 {
214 Form_pg_replication_origin ident;
215 Oid roident = InvalidOid;
216 HeapTuple tuple;
217 Datum roname_d;
218
219 roname_d = CStringGetTextDatum(roname);
220
221 tuple = SearchSysCache1(REPLORIGNAME, roname_d);
222 if (HeapTupleIsValid(tuple))
223 {
224 ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
225 roident = ident->roident;
226 ReleaseSysCache(tuple);
227 }
228 else if (!missing_ok)
229 ereport(ERROR,
230 (errcode(ERRCODE_UNDEFINED_OBJECT),
231 errmsg("replication origin \"%s\" does not exist",
232 roname)));
233
234 return roident;
235 }
236
237 /*
238 * Create a replication origin.
239 *
240 * Needs to be called in a transaction.
241 */
242 RepOriginId
replorigin_create(char * roname)243 replorigin_create(char *roname)
244 {
245 Oid roident;
246 HeapTuple tuple = NULL;
247 Relation rel;
248 Datum roname_d;
249 SnapshotData SnapshotDirty;
250 SysScanDesc scan;
251 ScanKeyData key;
252
253 roname_d = CStringGetTextDatum(roname);
254
255 Assert(IsTransactionState());
256
257 /*
258 * We need the numeric replication origin to be 16bit wide, so we cannot
259 * rely on the normal oid allocation. Instead we simply scan
260 * pg_replication_origin for the first unused id. That's not particularly
261 * efficient, but this should be a fairly infrequent operation - we can
262 * easily spend a bit more code on this when it turns out it needs to be
263 * faster.
264 *
265 * We handle concurrency by taking an exclusive lock (allowing reads!)
266 * over the table for the duration of the search. Because we use a "dirty
267 * snapshot" we can read rows that other in-progress sessions have
268 * written, even though they would be invisible with normal snapshots. Due
269 * to the exclusive lock there's no danger that new rows can appear while
270 * we're checking.
271 */
272 InitDirtySnapshot(SnapshotDirty);
273
274 rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
275
276 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
277 {
278 bool nulls[Natts_pg_replication_origin];
279 Datum values[Natts_pg_replication_origin];
280 bool collides;
281
282 CHECK_FOR_INTERRUPTS();
283
284 ScanKeyInit(&key,
285 Anum_pg_replication_origin_roident,
286 BTEqualStrategyNumber, F_OIDEQ,
287 ObjectIdGetDatum(roident));
288
289 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
290 true /* indexOK */ ,
291 &SnapshotDirty,
292 1, &key);
293
294 collides = HeapTupleIsValid(systable_getnext(scan));
295
296 systable_endscan(scan);
297
298 if (!collides)
299 {
300 /*
301 * Ok, found an unused roident, insert the new row and do a CCI,
302 * so our callers can look it up if they want to.
303 */
304 memset(&nulls, 0, sizeof(nulls));
305
306 values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
307 values[Anum_pg_replication_origin_roname - 1] = roname_d;
308
309 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
310 simple_heap_insert(rel, tuple);
311 CatalogUpdateIndexes(rel, tuple);
312 CommandCounterIncrement();
313 break;
314 }
315 }
316
317 /* now release lock again, */
318 heap_close(rel, ExclusiveLock);
319
320 if (tuple == NULL)
321 ereport(ERROR,
322 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
323 errmsg("could not find free replication origin OID")));
324
325 heap_freetuple(tuple);
326 return roident;
327 }
328
329
330 /*
331 * Drop replication origin.
332 *
333 * Needs to be called in a transaction.
334 */
335 void
replorigin_drop(RepOriginId roident)336 replorigin_drop(RepOriginId roident)
337 {
338 HeapTuple tuple = NULL;
339 Relation rel;
340 int i;
341
342 Assert(IsTransactionState());
343
344 rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
345
346 /* cleanup the slot state info */
347 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
348
349 for (i = 0; i < max_replication_slots; i++)
350 {
351 ReplicationState *state = &replication_states[i];
352
353 /* found our slot */
354 if (state->roident == roident)
355 {
356 if (state->acquired_by != 0)
357 {
358 ereport(ERROR,
359 (errcode(ERRCODE_OBJECT_IN_USE),
360 errmsg("could not drop replication origin with OID %d, in use by PID %d",
361 state->roident,
362 state->acquired_by)));
363 }
364
365 /* first WAL log */
366 {
367 xl_replorigin_drop xlrec;
368
369 xlrec.node_id = roident;
370 XLogBeginInsert();
371 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
372 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
373 }
374
375 /* then reset the in-memory entry */
376 state->roident = InvalidRepOriginId;
377 state->remote_lsn = InvalidXLogRecPtr;
378 state->local_lsn = InvalidXLogRecPtr;
379 break;
380 }
381 }
382 LWLockRelease(ReplicationOriginLock);
383
384 tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
385 if (!HeapTupleIsValid(tuple))
386 elog(ERROR, "cache lookup failed for replication origin with oid %u",
387 roident);
388
389 simple_heap_delete(rel, &tuple->t_self);
390 ReleaseSysCache(tuple);
391
392 CommandCounterIncrement();
393
394 /* now release lock again, */
395 heap_close(rel, ExclusiveLock);
396 }
397
398
399 /*
400 * Lookup replication origin via it's oid and return the name.
401 *
402 * The external name is palloc'd in the calling context.
403 *
404 * Returns true if the origin is known, false otherwise.
405 */
406 bool
replorigin_by_oid(RepOriginId roident,bool missing_ok,char ** roname)407 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
408 {
409 HeapTuple tuple;
410 Form_pg_replication_origin ric;
411
412 Assert(OidIsValid((Oid) roident));
413 Assert(roident != InvalidRepOriginId);
414 Assert(roident != DoNotReplicateId);
415
416 tuple = SearchSysCache1(REPLORIGIDENT,
417 ObjectIdGetDatum((Oid) roident));
418
419 if (HeapTupleIsValid(tuple))
420 {
421 ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
422 *roname = text_to_cstring(&ric->roname);
423 ReleaseSysCache(tuple);
424
425 return true;
426 }
427 else
428 {
429 *roname = NULL;
430
431 if (!missing_ok)
432 ereport(ERROR,
433 (errcode(ERRCODE_UNDEFINED_OBJECT),
434 errmsg("replication origin with OID %u does not exist",
435 roident)));
436
437 return false;
438 }
439 }
440
441
442 /* ---------------------------------------------------------------------------
443 * Functions for handling replication progress.
444 * ---------------------------------------------------------------------------
445 */
446
447 Size
ReplicationOriginShmemSize(void)448 ReplicationOriginShmemSize(void)
449 {
450 Size size = 0;
451
452 /*
453 * XXX: max_replication_slots is arguably the wrong thing to use, as here
454 * we keep the replay state of *remote* transactions. But for now it seems
455 * sufficient to reuse it, rather than introduce a separate GUC.
456 */
457 if (max_replication_slots == 0)
458 return size;
459
460 size = add_size(size, offsetof(ReplicationStateCtl, states));
461
462 size = add_size(size,
463 mul_size(max_replication_slots, sizeof(ReplicationState)));
464 return size;
465 }
466
467 void
ReplicationOriginShmemInit(void)468 ReplicationOriginShmemInit(void)
469 {
470 bool found;
471
472 if (max_replication_slots == 0)
473 return;
474
475 replication_states_ctl = (ReplicationStateCtl *)
476 ShmemInitStruct("ReplicationOriginState",
477 ReplicationOriginShmemSize(),
478 &found);
479 replication_states = replication_states_ctl->states;
480
481 if (!found)
482 {
483 int i;
484
485 MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
486
487 replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN;
488 replication_states_ctl->tranche.name = "replication_origin";
489 replication_states_ctl->tranche.array_base =
490 &replication_states[0].lock;
491 replication_states_ctl->tranche.array_stride =
492 sizeof(ReplicationState);
493
494 for (i = 0; i < max_replication_slots; i++)
495 LWLockInitialize(&replication_states[i].lock,
496 replication_states_ctl->tranche_id);
497 }
498
499 LWLockRegisterTranche(replication_states_ctl->tranche_id,
500 &replication_states_ctl->tranche);
501 }
502
503 /* ---------------------------------------------------------------------------
504 * Perform a checkpoint of each replication origin's progress with respect to
505 * the replayed remote_lsn. Make sure that all transactions we refer to in the
506 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
507 * if the transactions were originally committed asynchronously.
508 *
509 * We store checkpoints in the following format:
510 * +-------+------------------------+------------------+-----+--------+
511 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
512 * +-------+------------------------+------------------+-----+--------+
513 *
514 * So its just the magic, followed by the statically sized
515 * ReplicationStateOnDisk structs. Note that the maximum number of
516 * ReplicationStates is determined by max_replication_slots.
517 * ---------------------------------------------------------------------------
518 */
519 void
CheckPointReplicationOrigin(void)520 CheckPointReplicationOrigin(void)
521 {
522 const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
523 const char *path = "pg_logical/replorigin_checkpoint";
524 int tmpfd;
525 int i;
526 uint32 magic = REPLICATION_STATE_MAGIC;
527 pg_crc32c crc;
528
529 if (max_replication_slots == 0)
530 return;
531
532 INIT_CRC32C(crc);
533
534 /* make sure no old temp file is remaining */
535 if (unlink(tmppath) < 0 && errno != ENOENT)
536 ereport(PANIC,
537 (errcode_for_file_access(),
538 errmsg("could not remove file \"%s\": %m",
539 tmppath)));
540
541 /*
542 * no other backend can perform this at the same time, we're protected by
543 * CheckpointLock.
544 */
545 tmpfd = OpenTransientFile((char *) tmppath,
546 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
547 S_IRUSR | S_IWUSR);
548 if (tmpfd < 0)
549 ereport(PANIC,
550 (errcode_for_file_access(),
551 errmsg("could not create file \"%s\": %m",
552 tmppath)));
553
554 /* write magic */
555 errno = 0;
556 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
557 {
558 int save_errno = errno;
559
560 CloseTransientFile(tmpfd);
561
562 /* if write didn't set errno, assume problem is no disk space */
563 errno = save_errno ? save_errno : ENOSPC;
564 ereport(PANIC,
565 (errcode_for_file_access(),
566 errmsg("could not write to file \"%s\": %m",
567 tmppath)));
568 }
569 COMP_CRC32C(crc, &magic, sizeof(magic));
570
571 /* prevent concurrent creations/drops */
572 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
573
574 /* write actual data */
575 for (i = 0; i < max_replication_slots; i++)
576 {
577 ReplicationStateOnDisk disk_state;
578 ReplicationState *curstate = &replication_states[i];
579 XLogRecPtr local_lsn;
580
581 if (curstate->roident == InvalidRepOriginId)
582 continue;
583
584 /* zero, to avoid uninitialized padding bytes */
585 memset(&disk_state, 0, sizeof(disk_state));
586
587 LWLockAcquire(&curstate->lock, LW_SHARED);
588
589 disk_state.roident = curstate->roident;
590
591 disk_state.remote_lsn = curstate->remote_lsn;
592 local_lsn = curstate->local_lsn;
593
594 LWLockRelease(&curstate->lock);
595
596 /* make sure we only write out a commit that's persistent */
597 XLogFlush(local_lsn);
598
599 errno = 0;
600 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
601 sizeof(disk_state))
602 {
603 int save_errno = errno;
604
605 CloseTransientFile(tmpfd);
606
607 /* if write didn't set errno, assume problem is no disk space */
608 errno = save_errno ? save_errno : ENOSPC;
609 ereport(PANIC,
610 (errcode_for_file_access(),
611 errmsg("could not write to file \"%s\": %m",
612 tmppath)));
613 }
614
615 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
616 }
617
618 LWLockRelease(ReplicationOriginLock);
619
620 /* write out the CRC */
621 FIN_CRC32C(crc);
622 errno = 0;
623 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
624 {
625 int save_errno = errno;
626
627 CloseTransientFile(tmpfd);
628
629 /* if write didn't set errno, assume problem is no disk space */
630 errno = save_errno ? save_errno : ENOSPC;
631 ereport(PANIC,
632 (errcode_for_file_access(),
633 errmsg("could not write to file \"%s\": %m",
634 tmppath)));
635 }
636
637 CloseTransientFile(tmpfd);
638
639 /* fsync, rename to permanent file, fsync file and directory */
640 durable_rename(tmppath, path, PANIC);
641 }
642
643 /*
644 * Recover replication replay status from checkpoint data saved earlier by
645 * CheckPointReplicationOrigin.
646 *
647 * This only needs to be called at startup and *not* during every checkpoint
648 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
649 * state thereafter can be recovered by looking at commit records.
650 */
651 void
StartupReplicationOrigin(void)652 StartupReplicationOrigin(void)
653 {
654 const char *path = "pg_logical/replorigin_checkpoint";
655 int fd;
656 int readBytes;
657 uint32 magic = REPLICATION_STATE_MAGIC;
658 int last_state = 0;
659 pg_crc32c file_crc;
660 pg_crc32c crc;
661
662 /* don't want to overwrite already existing state */
663 #ifdef USE_ASSERT_CHECKING
664 static bool already_started = false;
665
666 Assert(!already_started);
667 already_started = true;
668 #endif
669
670 if (max_replication_slots == 0)
671 return;
672
673 INIT_CRC32C(crc);
674
675 elog(DEBUG2, "starting up replication origin progress state");
676
677 fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
678
679 /*
680 * might have had max_replication_slots == 0 last run, or we just brought
681 * up a standby.
682 */
683 if (fd < 0 && errno == ENOENT)
684 return;
685 else if (fd < 0)
686 ereport(PANIC,
687 (errcode_for_file_access(),
688 errmsg("could not open file \"%s\": %m",
689 path)));
690
691 /* verify magic, that is written even if nothing was active */
692 readBytes = read(fd, &magic, sizeof(magic));
693 if (readBytes != sizeof(magic))
694 ereport(PANIC,
695 (errmsg("could not read file \"%s\": %m",
696 path)));
697 COMP_CRC32C(crc, &magic, sizeof(magic));
698
699 if (magic != REPLICATION_STATE_MAGIC)
700 ereport(PANIC,
701 (errmsg("replication checkpoint has wrong magic %u instead of %u",
702 magic, REPLICATION_STATE_MAGIC)));
703
704 /* we can skip locking here, no other access is possible */
705
706 /* recover individual states, until there are no more to be found */
707 while (true)
708 {
709 ReplicationStateOnDisk disk_state;
710
711 readBytes = read(fd, &disk_state, sizeof(disk_state));
712
713 /* no further data */
714 if (readBytes == sizeof(crc))
715 {
716 /* not pretty, but simple ... */
717 file_crc = *(pg_crc32c *) &disk_state;
718 break;
719 }
720
721 if (readBytes < 0)
722 {
723 ereport(PANIC,
724 (errcode_for_file_access(),
725 errmsg("could not read file \"%s\": %m",
726 path)));
727 }
728
729 if (readBytes != sizeof(disk_state))
730 {
731 ereport(PANIC,
732 (errcode_for_file_access(),
733 errmsg("could not read file \"%s\": read %d of %zu",
734 path, readBytes, sizeof(disk_state))));
735 }
736
737 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
738
739 if (last_state == max_replication_slots)
740 ereport(PANIC,
741 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
742 errmsg("could not find free replication state, increase max_replication_slots")));
743
744 /* copy data to shared memory */
745 replication_states[last_state].roident = disk_state.roident;
746 replication_states[last_state].remote_lsn = disk_state.remote_lsn;
747 last_state++;
748
749 elog(LOG, "recovered replication state of node %u to %X/%X",
750 disk_state.roident,
751 (uint32) (disk_state.remote_lsn >> 32),
752 (uint32) disk_state.remote_lsn);
753 }
754
755 /* now check checksum */
756 FIN_CRC32C(crc);
757 if (file_crc != crc)
758 ereport(PANIC,
759 (errcode(ERRCODE_DATA_CORRUPTED),
760 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
761 crc, file_crc)));
762
763 CloseTransientFile(fd);
764 }
765
766 void
replorigin_redo(XLogReaderState * record)767 replorigin_redo(XLogReaderState *record)
768 {
769 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
770
771 switch (info)
772 {
773 case XLOG_REPLORIGIN_SET:
774 {
775 xl_replorigin_set *xlrec =
776 (xl_replorigin_set *) XLogRecGetData(record);
777
778 replorigin_advance(xlrec->node_id,
779 xlrec->remote_lsn, record->EndRecPtr,
780 xlrec->force /* backward */ ,
781 false /* WAL log */ );
782 break;
783 }
784 case XLOG_REPLORIGIN_DROP:
785 {
786 xl_replorigin_drop *xlrec;
787 int i;
788
789 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
790
791 for (i = 0; i < max_replication_slots; i++)
792 {
793 ReplicationState *state = &replication_states[i];
794
795 /* found our slot */
796 if (state->roident == xlrec->node_id)
797 {
798 /* reset entry */
799 state->roident = InvalidRepOriginId;
800 state->remote_lsn = InvalidXLogRecPtr;
801 state->local_lsn = InvalidXLogRecPtr;
802 break;
803 }
804 }
805 break;
806 }
807 default:
808 elog(PANIC, "replorigin_redo: unknown op code %u", info);
809 }
810 }
811
812
813 /*
814 * Tell the replication origin progress machinery that a commit from 'node'
815 * that originated at the LSN remote_commit on the remote node was replayed
816 * successfully and that we don't need to do so again. In combination with
817 * setting up replorigin_session_origin_lsn and replorigin_session_origin
818 * that ensures we won't loose knowledge about that after a crash if the
819 * transaction had a persistent effect (think of asynchronous commits).
820 *
821 * local_commit needs to be a local LSN of the commit so that we can make sure
822 * upon a checkpoint that enough WAL has been persisted to disk.
823 *
824 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
825 * unless running in recovery.
826 */
827 void
replorigin_advance(RepOriginId node,XLogRecPtr remote_commit,XLogRecPtr local_commit,bool go_backward,bool wal_log)828 replorigin_advance(RepOriginId node,
829 XLogRecPtr remote_commit, XLogRecPtr local_commit,
830 bool go_backward, bool wal_log)
831 {
832 int i;
833 ReplicationState *replication_state = NULL;
834 ReplicationState *free_state = NULL;
835
836 Assert(node != InvalidRepOriginId);
837
838 /* we don't track DoNotReplicateId */
839 if (node == DoNotReplicateId)
840 return;
841
842 /*
843 * XXX: For the case where this is called by WAL replay, it'd be more
844 * efficient to restore into a backend local hashtable and only dump into
845 * shmem after recovery is finished. Let's wait with implementing that
846 * till it's shown to be a measurable expense
847 */
848
849 /* Lock exclusively, as we may have to create a new table entry. */
850 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
851
852 /*
853 * Search for either an existing slot for the origin, or a free one we can
854 * use.
855 */
856 for (i = 0; i < max_replication_slots; i++)
857 {
858 ReplicationState *curstate = &replication_states[i];
859
860 /* remember where to insert if necessary */
861 if (curstate->roident == InvalidRepOriginId &&
862 free_state == NULL)
863 {
864 free_state = curstate;
865 continue;
866 }
867
868 /* not our slot */
869 if (curstate->roident != node)
870 {
871 continue;
872 }
873
874 /* ok, found slot */
875 replication_state = curstate;
876
877 LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
878
879 /* Make sure it's not used by somebody else */
880 if (replication_state->acquired_by != 0)
881 {
882 ereport(ERROR,
883 (errcode(ERRCODE_OBJECT_IN_USE),
884 errmsg("replication origin with OID %d is already active for PID %d",
885 replication_state->roident,
886 replication_state->acquired_by)));
887 }
888
889 break;
890 }
891
892 if (replication_state == NULL && free_state == NULL)
893 ereport(ERROR,
894 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
895 errmsg("could not find free replication state slot for replication origin with OID %u",
896 node),
897 errhint("Increase max_replication_slots and try again.")));
898
899 if (replication_state == NULL)
900 {
901 /* initialize new slot */
902 LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
903 replication_state = free_state;
904 Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
905 Assert(replication_state->local_lsn == InvalidXLogRecPtr);
906 replication_state->roident = node;
907 }
908
909 Assert(replication_state->roident != InvalidRepOriginId);
910
911 /*
912 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
913 * and the standby gets the message. Primarily this will be called during
914 * WAL replay (of commit records) where no WAL logging is necessary.
915 */
916 if (wal_log)
917 {
918 xl_replorigin_set xlrec;
919
920 xlrec.remote_lsn = remote_commit;
921 xlrec.node_id = node;
922 xlrec.force = go_backward;
923
924 XLogBeginInsert();
925 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
926
927 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
928 }
929
930 /*
931 * Due to - harmless - race conditions during a checkpoint we could see
932 * values here that are older than the ones we already have in memory.
933 * Don't overwrite those.
934 */
935 if (go_backward || replication_state->remote_lsn < remote_commit)
936 replication_state->remote_lsn = remote_commit;
937 if (local_commit != InvalidXLogRecPtr &&
938 (go_backward || replication_state->local_lsn < local_commit))
939 replication_state->local_lsn = local_commit;
940 LWLockRelease(&replication_state->lock);
941
942 /*
943 * Release *after* changing the LSNs, slot isn't acquired and thus could
944 * otherwise be dropped anytime.
945 */
946 LWLockRelease(ReplicationOriginLock);
947 }
948
949
950 XLogRecPtr
replorigin_get_progress(RepOriginId node,bool flush)951 replorigin_get_progress(RepOriginId node, bool flush)
952 {
953 int i;
954 XLogRecPtr local_lsn = InvalidXLogRecPtr;
955 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
956
957 /* prevent slots from being concurrently dropped */
958 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
959
960 for (i = 0; i < max_replication_slots; i++)
961 {
962 ReplicationState *state;
963
964 state = &replication_states[i];
965
966 if (state->roident == node)
967 {
968 LWLockAcquire(&state->lock, LW_SHARED);
969
970 remote_lsn = state->remote_lsn;
971 local_lsn = state->local_lsn;
972
973 LWLockRelease(&state->lock);
974
975 break;
976 }
977 }
978
979 LWLockRelease(ReplicationOriginLock);
980
981 if (flush && local_lsn != InvalidXLogRecPtr)
982 XLogFlush(local_lsn);
983
984 return remote_lsn;
985 }
986
987 /*
988 * Tear down a (possibly) configured session replication origin during process
989 * exit.
990 */
991 static void
ReplicationOriginExitCleanup(int code,Datum arg)992 ReplicationOriginExitCleanup(int code, Datum arg)
993 {
994 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
995
996 if (session_replication_state != NULL &&
997 session_replication_state->acquired_by == MyProcPid)
998 {
999 session_replication_state->acquired_by = 0;
1000 session_replication_state = NULL;
1001 }
1002
1003 LWLockRelease(ReplicationOriginLock);
1004 }
1005
1006 /*
1007 * Setup a replication origin in the shared memory struct if it doesn't
1008 * already exists and cache access to the specific ReplicationSlot so the
1009 * array doesn't have to be searched when calling
1010 * replorigin_session_advance().
1011 *
1012 * Obviously only one such cached origin can exist per process and the current
1013 * cached value can only be set again after the previous value is torn down
1014 * with replorigin_session_reset().
1015 */
1016 void
replorigin_session_setup(RepOriginId node)1017 replorigin_session_setup(RepOriginId node)
1018 {
1019 static bool registered_cleanup;
1020 int i;
1021 int free_slot = -1;
1022
1023 if (!registered_cleanup)
1024 {
1025 on_shmem_exit(ReplicationOriginExitCleanup, 0);
1026 registered_cleanup = true;
1027 }
1028
1029 Assert(max_replication_slots > 0);
1030
1031 if (session_replication_state != NULL)
1032 ereport(ERROR,
1033 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1034 errmsg("cannot setup replication origin when one is already setup")));
1035
1036 /* Lock exclusively, as we may have to create a new table entry. */
1037 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1038
1039 /*
1040 * Search for either an existing slot for the origin, or a free one we can
1041 * use.
1042 */
1043 for (i = 0; i < max_replication_slots; i++)
1044 {
1045 ReplicationState *curstate = &replication_states[i];
1046
1047 /* remember where to insert if necessary */
1048 if (curstate->roident == InvalidRepOriginId &&
1049 free_slot == -1)
1050 {
1051 free_slot = i;
1052 continue;
1053 }
1054
1055 /* not our slot */
1056 if (curstate->roident != node)
1057 continue;
1058
1059 else if (curstate->acquired_by != 0)
1060 {
1061 ereport(ERROR,
1062 (errcode(ERRCODE_OBJECT_IN_USE),
1063 errmsg("replication origin with OID %d is already active for PID %d",
1064 curstate->roident, curstate->acquired_by)));
1065 }
1066
1067 /* ok, found slot */
1068 session_replication_state = curstate;
1069 }
1070
1071
1072 if (session_replication_state == NULL && free_slot == -1)
1073 ereport(ERROR,
1074 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1075 errmsg("could not find free replication state slot for replication origin with OID %u",
1076 node),
1077 errhint("Increase max_replication_slots and try again.")));
1078 else if (session_replication_state == NULL)
1079 {
1080 /* initialize new slot */
1081 session_replication_state = &replication_states[free_slot];
1082 Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1083 Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1084 session_replication_state->roident = node;
1085 }
1086
1087
1088 Assert(session_replication_state->roident != InvalidRepOriginId);
1089
1090 session_replication_state->acquired_by = MyProcPid;
1091
1092 LWLockRelease(ReplicationOriginLock);
1093 }
1094
1095 /*
1096 * Reset replay state previously setup in this session.
1097 *
1098 * This function may only be called if an origin was setup with
1099 * replorigin_session_setup().
1100 */
1101 void
replorigin_session_reset(void)1102 replorigin_session_reset(void)
1103 {
1104 Assert(max_replication_slots != 0);
1105
1106 if (session_replication_state == NULL)
1107 ereport(ERROR,
1108 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1109 errmsg("no replication origin is configured")));
1110
1111 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1112
1113 session_replication_state->acquired_by = 0;
1114 session_replication_state = NULL;
1115
1116 LWLockRelease(ReplicationOriginLock);
1117 }
1118
1119 /*
1120 * Do the same work replorigin_advance() does, just on the session's
1121 * configured origin.
1122 *
1123 * This is noticeably cheaper than using replorigin_advance().
1124 */
1125 void
replorigin_session_advance(XLogRecPtr remote_commit,XLogRecPtr local_commit)1126 replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1127 {
1128 Assert(session_replication_state != NULL);
1129 Assert(session_replication_state->roident != InvalidRepOriginId);
1130
1131 LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1132 if (session_replication_state->local_lsn < local_commit)
1133 session_replication_state->local_lsn = local_commit;
1134 if (session_replication_state->remote_lsn < remote_commit)
1135 session_replication_state->remote_lsn = remote_commit;
1136 LWLockRelease(&session_replication_state->lock);
1137 }
1138
1139 /*
1140 * Ask the machinery about the point up to which we successfully replayed
1141 * changes from an already setup replication origin.
1142 */
1143 XLogRecPtr
replorigin_session_get_progress(bool flush)1144 replorigin_session_get_progress(bool flush)
1145 {
1146 XLogRecPtr remote_lsn;
1147 XLogRecPtr local_lsn;
1148
1149 Assert(session_replication_state != NULL);
1150
1151 LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1152 remote_lsn = session_replication_state->remote_lsn;
1153 local_lsn = session_replication_state->local_lsn;
1154 LWLockRelease(&session_replication_state->lock);
1155
1156 if (flush && local_lsn != InvalidXLogRecPtr)
1157 XLogFlush(local_lsn);
1158
1159 return remote_lsn;
1160 }
1161
1162
1163
1164 /* ---------------------------------------------------------------------------
1165 * SQL functions for working with replication origin.
1166 *
1167 * These mostly should be fairly short wrappers around more generic functions.
1168 * ---------------------------------------------------------------------------
1169 */
1170
1171 /*
1172 * Create replication origin for the passed in name, and return the assigned
1173 * oid.
1174 */
1175 Datum
pg_replication_origin_create(PG_FUNCTION_ARGS)1176 pg_replication_origin_create(PG_FUNCTION_ARGS)
1177 {
1178 char *name;
1179 RepOriginId roident;
1180
1181 replorigin_check_prerequisites(false, false);
1182
1183 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1184 roident = replorigin_create(name);
1185
1186 pfree(name);
1187
1188 PG_RETURN_OID(roident);
1189 }
1190
1191 /*
1192 * Drop replication origin.
1193 */
1194 Datum
pg_replication_origin_drop(PG_FUNCTION_ARGS)1195 pg_replication_origin_drop(PG_FUNCTION_ARGS)
1196 {
1197 char *name;
1198 RepOriginId roident;
1199
1200 replorigin_check_prerequisites(false, false);
1201
1202 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1203
1204 roident = replorigin_by_name(name, false);
1205 Assert(OidIsValid(roident));
1206
1207 replorigin_drop(roident);
1208
1209 pfree(name);
1210
1211 PG_RETURN_VOID();
1212 }
1213
1214 /*
1215 * Return oid of a replication origin.
1216 */
1217 Datum
pg_replication_origin_oid(PG_FUNCTION_ARGS)1218 pg_replication_origin_oid(PG_FUNCTION_ARGS)
1219 {
1220 char *name;
1221 RepOriginId roident;
1222
1223 replorigin_check_prerequisites(false, false);
1224
1225 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1226 roident = replorigin_by_name(name, true);
1227
1228 pfree(name);
1229
1230 if (OidIsValid(roident))
1231 PG_RETURN_OID(roident);
1232 PG_RETURN_NULL();
1233 }
1234
1235 /*
1236 * Setup a replication origin for this session.
1237 */
1238 Datum
pg_replication_origin_session_setup(PG_FUNCTION_ARGS)1239 pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1240 {
1241 char *name;
1242 RepOriginId origin;
1243
1244 replorigin_check_prerequisites(true, false);
1245
1246 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1247 origin = replorigin_by_name(name, false);
1248 replorigin_session_setup(origin);
1249
1250 replorigin_session_origin = origin;
1251
1252 pfree(name);
1253
1254 PG_RETURN_VOID();
1255 }
1256
1257 /*
1258 * Reset previously setup origin in this session
1259 */
1260 Datum
pg_replication_origin_session_reset(PG_FUNCTION_ARGS)1261 pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1262 {
1263 replorigin_check_prerequisites(true, false);
1264
1265 replorigin_session_reset();
1266
1267 replorigin_session_origin = InvalidRepOriginId;
1268 replorigin_session_origin_lsn = InvalidXLogRecPtr;
1269 replorigin_session_origin_timestamp = 0;
1270
1271 PG_RETURN_VOID();
1272 }
1273
1274 /*
1275 * Has a replication origin been setup for this session.
1276 */
1277 Datum
pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)1278 pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1279 {
1280 replorigin_check_prerequisites(false, false);
1281
1282 PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1283 }
1284
1285
1286 /*
1287 * Return the replication progress for origin setup in the current session.
1288 *
1289 * If 'flush' is set to true it is ensured that the returned value corresponds
1290 * to a local transaction that has been flushed. this is useful if asynchronous
1291 * commits are used when replaying replicated transactions.
1292 */
1293 Datum
pg_replication_origin_session_progress(PG_FUNCTION_ARGS)1294 pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1295 {
1296 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1297 bool flush = PG_GETARG_BOOL(0);
1298
1299 replorigin_check_prerequisites(true, false);
1300
1301 if (session_replication_state == NULL)
1302 ereport(ERROR,
1303 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1304 errmsg("no replication origin is configured")));
1305
1306 remote_lsn = replorigin_session_get_progress(flush);
1307
1308 if (remote_lsn == InvalidXLogRecPtr)
1309 PG_RETURN_NULL();
1310
1311 PG_RETURN_LSN(remote_lsn);
1312 }
1313
1314 Datum
pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)1315 pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1316 {
1317 XLogRecPtr location = PG_GETARG_LSN(0);
1318
1319 replorigin_check_prerequisites(true, false);
1320
1321 if (session_replication_state == NULL)
1322 ereport(ERROR,
1323 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1324 errmsg("no replication origin is configured")));
1325
1326 replorigin_session_origin_lsn = location;
1327 replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1328
1329 PG_RETURN_VOID();
1330 }
1331
1332 Datum
pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)1333 pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1334 {
1335 replorigin_check_prerequisites(true, false);
1336
1337 replorigin_session_origin_lsn = InvalidXLogRecPtr;
1338 replorigin_session_origin_timestamp = 0;
1339
1340 PG_RETURN_VOID();
1341 }
1342
1343
1344 Datum
pg_replication_origin_advance(PG_FUNCTION_ARGS)1345 pg_replication_origin_advance(PG_FUNCTION_ARGS)
1346 {
1347 text *name = PG_GETARG_TEXT_P(0);
1348 XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1349 RepOriginId node;
1350
1351 replorigin_check_prerequisites(true, false);
1352
1353 /* lock to prevent the replication origin from vanishing */
1354 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1355
1356 node = replorigin_by_name(text_to_cstring(name), false);
1357
1358 /*
1359 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1360 * xact hasn't committed yet. This is why this function should be used to
1361 * set up the initial replication state, but not for replay.
1362 */
1363 replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1364 true /* go backward */ , true /* wal log */ );
1365
1366 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1367
1368 PG_RETURN_VOID();
1369 }
1370
1371
1372 /*
1373 * Return the replication progress for an individual replication origin.
1374 *
1375 * If 'flush' is set to true it is ensured that the returned value corresponds
1376 * to a local transaction that has been flushed. this is useful if asynchronous
1377 * commits are used when replaying replicated transactions.
1378 */
1379 Datum
pg_replication_origin_progress(PG_FUNCTION_ARGS)1380 pg_replication_origin_progress(PG_FUNCTION_ARGS)
1381 {
1382 char *name;
1383 bool flush;
1384 RepOriginId roident;
1385 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1386
1387 replorigin_check_prerequisites(true, true);
1388
1389 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1390 flush = PG_GETARG_BOOL(1);
1391
1392 roident = replorigin_by_name(name, false);
1393 Assert(OidIsValid(roident));
1394
1395 remote_lsn = replorigin_get_progress(roident, flush);
1396
1397 if (remote_lsn == InvalidXLogRecPtr)
1398 PG_RETURN_NULL();
1399
1400 PG_RETURN_LSN(remote_lsn);
1401 }
1402
1403
1404 Datum
pg_show_replication_origin_status(PG_FUNCTION_ARGS)1405 pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1406 {
1407 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1408 TupleDesc tupdesc;
1409 Tuplestorestate *tupstore;
1410 MemoryContext per_query_ctx;
1411 MemoryContext oldcontext;
1412 int i;
1413 #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1414
1415 /* we want to return 0 rows if slot is set to zero */
1416 replorigin_check_prerequisites(false, true);
1417
1418 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1419 ereport(ERROR,
1420 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1421 errmsg("set-valued function called in context that cannot accept a set")));
1422 if (!(rsinfo->allowedModes & SFRM_Materialize))
1423 ereport(ERROR,
1424 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1425 errmsg("materialize mode required, but it is not allowed in this context")));
1426 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1427 elog(ERROR, "return type must be a row type");
1428
1429 if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1430 elog(ERROR, "wrong function definition");
1431
1432 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1433 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1434
1435 tupstore = tuplestore_begin_heap(true, false, work_mem);
1436 rsinfo->returnMode = SFRM_Materialize;
1437 rsinfo->setResult = tupstore;
1438 rsinfo->setDesc = tupdesc;
1439
1440 MemoryContextSwitchTo(oldcontext);
1441
1442
1443 /* prevent slots from being concurrently dropped */
1444 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1445
1446 /*
1447 * Iterate through all possible replication_states, display if they are
1448 * filled. Note that we do not take any locks, so slightly corrupted/out
1449 * of date values are a possibility.
1450 */
1451 for (i = 0; i < max_replication_slots; i++)
1452 {
1453 ReplicationState *state;
1454 Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1455 bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1456 char *roname;
1457
1458 state = &replication_states[i];
1459
1460 /* unused slot, nothing to display */
1461 if (state->roident == InvalidRepOriginId)
1462 continue;
1463
1464 memset(values, 0, sizeof(values));
1465 memset(nulls, 1, sizeof(nulls));
1466
1467 values[0] = ObjectIdGetDatum(state->roident);
1468 nulls[0] = false;
1469
1470 /*
1471 * We're not preventing the origin to be dropped concurrently, so
1472 * silently accept that it might be gone.
1473 */
1474 if (replorigin_by_oid(state->roident, true,
1475 &roname))
1476 {
1477 values[1] = CStringGetTextDatum(roname);
1478 nulls[1] = false;
1479 }
1480
1481 LWLockAcquire(&state->lock, LW_SHARED);
1482
1483 values[2] = LSNGetDatum(state->remote_lsn);
1484 nulls[2] = false;
1485
1486 values[3] = LSNGetDatum(state->local_lsn);
1487 nulls[3] = false;
1488
1489 LWLockRelease(&state->lock);
1490
1491 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1492 }
1493
1494 tuplestore_donestoring(tupstore);
1495
1496 LWLockRelease(ReplicationOriginLock);
1497
1498 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1499
1500 return (Datum) 0;
1501 }
1502