1 /*-------------------------------------------------------------------------
2 *
3 * origin.c
4 * Logical replication progress tracking support.
5 *
6 * Copyright (c) 2013-2017, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/origin.c
10 *
11 * NOTES
12 *
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
17 *
18 * Replication origin consist out of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
25 *
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
39 *
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
43 * other filtering.
44 *
45 * There are several levels of locking at work:
46 *
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
50 *
51 * * When creating an in-memory replication progress slot the ReplicationOrigin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
56 *
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (c.f. AdvanceReplicationProgress).
64 *
65 * ---------------------------------------------------------------------------
66 */
67
68 #include "postgres.h"
69
70 #include <unistd.h>
71 #include <sys/stat.h>
72
73 #include "funcapi.h"
74 #include "miscadmin.h"
75
76 #include "access/genam.h"
77 #include "access/heapam.h"
78 #include "access/htup_details.h"
79 #include "access/xact.h"
80
81 #include "catalog/indexing.h"
82 #include "nodes/execnodes.h"
83
84 #include "replication/origin.h"
85 #include "replication/logical.h"
86 #include "pgstat.h"
87 #include "storage/fd.h"
88 #include "storage/ipc.h"
89 #include "storage/lmgr.h"
90 #include "storage/condition_variable.h"
91 #include "storage/copydir.h"
92
93 #include "utils/builtins.h"
94 #include "utils/fmgroids.h"
95 #include "utils/pg_lsn.h"
96 #include "utils/rel.h"
97 #include "utils/syscache.h"
98 #include "utils/tqual.h"
99
100 /*
101 * Replay progress of a single remote node.
102 */
103 typedef struct ReplicationState
104 {
105 /*
106 * Local identifier for the remote node.
107 */
108 RepOriginId roident;
109
110 /*
111 * Location of the latest commit from the remote side.
112 */
113 XLogRecPtr remote_lsn;
114
115 /*
116 * Remember the local lsn of the commit record so we can XLogFlush() to it
117 * during a checkpoint so we know the commit record actually is safe on
118 * disk.
119 */
120 XLogRecPtr local_lsn;
121
122 /*
123 * PID of backend that's acquired slot, or 0 if none.
124 */
125 int acquired_by;
126
127 /*
128 * Condition variable that's signalled when acquired_by changes.
129 */
130 ConditionVariable origin_cv;
131
132 /*
133 * Lock protecting remote_lsn and local_lsn.
134 */
135 LWLock lock;
136 } ReplicationState;
137
138 /*
139 * On disk version of ReplicationState.
140 */
141 typedef struct ReplicationStateOnDisk
142 {
143 RepOriginId roident;
144 XLogRecPtr remote_lsn;
145 } ReplicationStateOnDisk;
146
147
148 typedef struct ReplicationStateCtl
149 {
150 /* Tranche to use for per-origin LWLocks */
151 int tranche_id;
152 /* Array of length max_replication_slots */
153 ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
154 } ReplicationStateCtl;
155
156 /* external variables */
157 RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
158 XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
159 TimestampTz replorigin_session_origin_timestamp = 0;
160
161 /*
162 * Base address into a shared memory array of replication states of size
163 * max_replication_slots.
164 *
165 * XXX: Should we use a separate variable to size this rather than
166 * max_replication_slots?
167 */
168 static ReplicationState *replication_states;
169
170 /*
171 * Actual shared memory block (replication_states[] is now part of this).
172 */
173 static ReplicationStateCtl *replication_states_ctl;
174
175 /*
176 * Backend-local, cached element from ReplicationState for use in a backend
177 * replaying remote commits, so we don't have to search ReplicationState for
178 * the backends current RepOriginId.
179 */
180 static ReplicationState *session_replication_state = NULL;
181
182 /* Magic for on disk files. */
183 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
184
185 static void
replorigin_check_prerequisites(bool check_slots,bool recoveryOK)186 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
187 {
188 if (!superuser())
189 ereport(ERROR,
190 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
191 errmsg("only superusers can query or manipulate replication origins")));
192
193 if (check_slots && max_replication_slots == 0)
194 ereport(ERROR,
195 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
196 errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
197
198 if (!recoveryOK && RecoveryInProgress())
199 ereport(ERROR,
200 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
201 errmsg("cannot manipulate replication origins during recovery")));
202
203 }
204
205
206 /* ---------------------------------------------------------------------------
207 * Functions for working with replication origins themselves.
208 * ---------------------------------------------------------------------------
209 */
210
211 /*
212 * Check for a persistent replication origin identified by name.
213 *
214 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
215 */
216 RepOriginId
replorigin_by_name(char * roname,bool missing_ok)217 replorigin_by_name(char *roname, bool missing_ok)
218 {
219 Form_pg_replication_origin ident;
220 Oid roident = InvalidOid;
221 HeapTuple tuple;
222 Datum roname_d;
223
224 roname_d = CStringGetTextDatum(roname);
225
226 tuple = SearchSysCache1(REPLORIGNAME, roname_d);
227 if (HeapTupleIsValid(tuple))
228 {
229 ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
230 roident = ident->roident;
231 ReleaseSysCache(tuple);
232 }
233 else if (!missing_ok)
234 ereport(ERROR,
235 (errcode(ERRCODE_UNDEFINED_OBJECT),
236 errmsg("replication origin \"%s\" does not exist",
237 roname)));
238
239 return roident;
240 }
241
242 /*
243 * Create a replication origin.
244 *
245 * Needs to be called in a transaction.
246 */
247 RepOriginId
replorigin_create(char * roname)248 replorigin_create(char *roname)
249 {
250 Oid roident;
251 HeapTuple tuple = NULL;
252 Relation rel;
253 Datum roname_d;
254 SnapshotData SnapshotDirty;
255 SysScanDesc scan;
256 ScanKeyData key;
257
258 roname_d = CStringGetTextDatum(roname);
259
260 Assert(IsTransactionState());
261
262 /*
263 * We need the numeric replication origin to be 16bit wide, so we cannot
264 * rely on the normal oid allocation. Instead we simply scan
265 * pg_replication_origin for the first unused id. That's not particularly
266 * efficient, but this should be a fairly infrequent operation - we can
267 * easily spend a bit more code on this when it turns out it needs to be
268 * faster.
269 *
270 * We handle concurrency by taking an exclusive lock (allowing reads!)
271 * over the table for the duration of the search. Because we use a "dirty
272 * snapshot" we can read rows that other in-progress sessions have
273 * written, even though they would be invisible with normal snapshots. Due
274 * to the exclusive lock there's no danger that new rows can appear while
275 * we're checking.
276 */
277 InitDirtySnapshot(SnapshotDirty);
278
279 rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
280
281 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
282 {
283 bool nulls[Natts_pg_replication_origin];
284 Datum values[Natts_pg_replication_origin];
285 bool collides;
286
287 CHECK_FOR_INTERRUPTS();
288
289 ScanKeyInit(&key,
290 Anum_pg_replication_origin_roident,
291 BTEqualStrategyNumber, F_OIDEQ,
292 ObjectIdGetDatum(roident));
293
294 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
295 true /* indexOK */ ,
296 &SnapshotDirty,
297 1, &key);
298
299 collides = HeapTupleIsValid(systable_getnext(scan));
300
301 systable_endscan(scan);
302
303 if (!collides)
304 {
305 /*
306 * Ok, found an unused roident, insert the new row and do a CCI,
307 * so our callers can look it up if they want to.
308 */
309 memset(&nulls, 0, sizeof(nulls));
310
311 values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
312 values[Anum_pg_replication_origin_roname - 1] = roname_d;
313
314 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
315 CatalogTupleInsert(rel, tuple);
316 CommandCounterIncrement();
317 break;
318 }
319 }
320
321 /* now release lock again, */
322 heap_close(rel, ExclusiveLock);
323
324 if (tuple == NULL)
325 ereport(ERROR,
326 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
327 errmsg("could not find free replication origin OID")));
328
329 heap_freetuple(tuple);
330 return roident;
331 }
332
333
334 /*
335 * Drop replication origin.
336 *
337 * Needs to be called in a transaction.
338 */
339 void
replorigin_drop(RepOriginId roident,bool nowait)340 replorigin_drop(RepOriginId roident, bool nowait)
341 {
342 HeapTuple tuple;
343 Relation rel;
344 int i;
345
346 Assert(IsTransactionState());
347
348 /*
349 * To interlock against concurrent drops, we hold ExclusiveLock on
350 * pg_replication_origin throughout this funcion.
351 */
352 rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
353
354 /*
355 * First, clean up the slot state info, if there is any matching slot.
356 */
357 restart:
358 tuple = NULL;
359 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
360
361 for (i = 0; i < max_replication_slots; i++)
362 {
363 ReplicationState *state = &replication_states[i];
364
365 if (state->roident == roident)
366 {
367 /* found our slot, is it busy? */
368 if (state->acquired_by != 0)
369 {
370 ConditionVariable *cv;
371
372 if (nowait)
373 ereport(ERROR,
374 (errcode(ERRCODE_OBJECT_IN_USE),
375 errmsg("could not drop replication origin with OID %d, in use by PID %d",
376 state->roident,
377 state->acquired_by)));
378
379 /*
380 * We must wait and then retry. Since we don't know which CV
381 * to wait on until here, we can't readily use
382 * ConditionVariablePrepareToSleep (calling it here would be
383 * wrong, since we could miss the signal if we did so); just
384 * use ConditionVariableSleep directly.
385 */
386 cv = &state->origin_cv;
387
388 LWLockRelease(ReplicationOriginLock);
389
390 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
391 goto restart;
392 }
393
394 /* first make a WAL log entry */
395 {
396 xl_replorigin_drop xlrec;
397
398 xlrec.node_id = roident;
399 XLogBeginInsert();
400 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
401 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
402 }
403
404 /* then clear the in-memory slot */
405 state->roident = InvalidRepOriginId;
406 state->remote_lsn = InvalidXLogRecPtr;
407 state->local_lsn = InvalidXLogRecPtr;
408 break;
409 }
410 }
411 LWLockRelease(ReplicationOriginLock);
412 ConditionVariableCancelSleep();
413
414 /*
415 * Now, we can delete the catalog entry.
416 */
417 tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
418 if (!HeapTupleIsValid(tuple))
419 elog(ERROR, "cache lookup failed for replication origin with oid %u",
420 roident);
421
422 CatalogTupleDelete(rel, &tuple->t_self);
423 ReleaseSysCache(tuple);
424
425 CommandCounterIncrement();
426
427 /* now release lock again */
428 heap_close(rel, ExclusiveLock);
429 }
430
431
432 /*
433 * Lookup replication origin via it's oid and return the name.
434 *
435 * The external name is palloc'd in the calling context.
436 *
437 * Returns true if the origin is known, false otherwise.
438 */
439 bool
replorigin_by_oid(RepOriginId roident,bool missing_ok,char ** roname)440 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
441 {
442 HeapTuple tuple;
443 Form_pg_replication_origin ric;
444
445 Assert(OidIsValid((Oid) roident));
446 Assert(roident != InvalidRepOriginId);
447 Assert(roident != DoNotReplicateId);
448
449 tuple = SearchSysCache1(REPLORIGIDENT,
450 ObjectIdGetDatum((Oid) roident));
451
452 if (HeapTupleIsValid(tuple))
453 {
454 ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
455 *roname = text_to_cstring(&ric->roname);
456 ReleaseSysCache(tuple);
457
458 return true;
459 }
460 else
461 {
462 *roname = NULL;
463
464 if (!missing_ok)
465 ereport(ERROR,
466 (errcode(ERRCODE_UNDEFINED_OBJECT),
467 errmsg("replication origin with OID %u does not exist",
468 roident)));
469
470 return false;
471 }
472 }
473
474
475 /* ---------------------------------------------------------------------------
476 * Functions for handling replication progress.
477 * ---------------------------------------------------------------------------
478 */
479
480 Size
ReplicationOriginShmemSize(void)481 ReplicationOriginShmemSize(void)
482 {
483 Size size = 0;
484
485 /*
486 * XXX: max_replication_slots is arguably the wrong thing to use, as here
487 * we keep the replay state of *remote* transactions. But for now it seems
488 * sufficient to reuse it, rather than introduce a separate GUC.
489 */
490 if (max_replication_slots == 0)
491 return size;
492
493 size = add_size(size, offsetof(ReplicationStateCtl, states));
494
495 size = add_size(size,
496 mul_size(max_replication_slots, sizeof(ReplicationState)));
497 return size;
498 }
499
500 void
ReplicationOriginShmemInit(void)501 ReplicationOriginShmemInit(void)
502 {
503 bool found;
504
505 if (max_replication_slots == 0)
506 return;
507
508 replication_states_ctl = (ReplicationStateCtl *)
509 ShmemInitStruct("ReplicationOriginState",
510 ReplicationOriginShmemSize(),
511 &found);
512 replication_states = replication_states_ctl->states;
513
514 if (!found)
515 {
516 int i;
517
518 MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
519
520 replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN;
521
522 for (i = 0; i < max_replication_slots; i++)
523 {
524 LWLockInitialize(&replication_states[i].lock,
525 replication_states_ctl->tranche_id);
526 ConditionVariableInit(&replication_states[i].origin_cv);
527 }
528 }
529
530 LWLockRegisterTranche(replication_states_ctl->tranche_id,
531 "replication_origin");
532 }
533
534 /* ---------------------------------------------------------------------------
535 * Perform a checkpoint of each replication origin's progress with respect to
536 * the replayed remote_lsn. Make sure that all transactions we refer to in the
537 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
538 * if the transactions were originally committed asynchronously.
539 *
540 * We store checkpoints in the following format:
541 * +-------+------------------------+------------------+-----+--------+
542 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
543 * +-------+------------------------+------------------+-----+--------+
544 *
545 * So its just the magic, followed by the statically sized
546 * ReplicationStateOnDisk structs. Note that the maximum number of
547 * ReplicationState is determined by max_replication_slots.
548 * ---------------------------------------------------------------------------
549 */
550 void
CheckPointReplicationOrigin(void)551 CheckPointReplicationOrigin(void)
552 {
553 const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
554 const char *path = "pg_logical/replorigin_checkpoint";
555 int tmpfd;
556 int i;
557 uint32 magic = REPLICATION_STATE_MAGIC;
558 pg_crc32c crc;
559
560 if (max_replication_slots == 0)
561 return;
562
563 INIT_CRC32C(crc);
564
565 /* make sure no old temp file is remaining */
566 if (unlink(tmppath) < 0 && errno != ENOENT)
567 ereport(PANIC,
568 (errcode_for_file_access(),
569 errmsg("could not remove file \"%s\": %m",
570 tmppath)));
571
572 /*
573 * no other backend can perform this at the same time, we're protected by
574 * CheckpointLock.
575 */
576 tmpfd = OpenTransientFile((char *) tmppath,
577 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
578 S_IRUSR | S_IWUSR);
579 if (tmpfd < 0)
580 ereport(PANIC,
581 (errcode_for_file_access(),
582 errmsg("could not create file \"%s\": %m",
583 tmppath)));
584
585 /* write magic */
586 errno = 0;
587 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
588 {
589 int save_errno = errno;
590
591 CloseTransientFile(tmpfd);
592
593 /* if write didn't set errno, assume problem is no disk space */
594 errno = save_errno ? save_errno : ENOSPC;
595 ereport(PANIC,
596 (errcode_for_file_access(),
597 errmsg("could not write to file \"%s\": %m",
598 tmppath)));
599 }
600 COMP_CRC32C(crc, &magic, sizeof(magic));
601
602 /* prevent concurrent creations/drops */
603 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
604
605 /* write actual data */
606 for (i = 0; i < max_replication_slots; i++)
607 {
608 ReplicationStateOnDisk disk_state;
609 ReplicationState *curstate = &replication_states[i];
610 XLogRecPtr local_lsn;
611
612 if (curstate->roident == InvalidRepOriginId)
613 continue;
614
615 /* zero, to avoid uninitialized padding bytes */
616 memset(&disk_state, 0, sizeof(disk_state));
617
618 LWLockAcquire(&curstate->lock, LW_SHARED);
619
620 disk_state.roident = curstate->roident;
621
622 disk_state.remote_lsn = curstate->remote_lsn;
623 local_lsn = curstate->local_lsn;
624
625 LWLockRelease(&curstate->lock);
626
627 /* make sure we only write out a commit that's persistent */
628 XLogFlush(local_lsn);
629
630 errno = 0;
631 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
632 sizeof(disk_state))
633 {
634 int save_errno = errno;
635
636 CloseTransientFile(tmpfd);
637
638 /* if write didn't set errno, assume problem is no disk space */
639 errno = save_errno ? save_errno : ENOSPC;
640 ereport(PANIC,
641 (errcode_for_file_access(),
642 errmsg("could not write to file \"%s\": %m",
643 tmppath)));
644 }
645
646 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
647 }
648
649 LWLockRelease(ReplicationOriginLock);
650
651 /* write out the CRC */
652 FIN_CRC32C(crc);
653 errno = 0;
654 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
655 {
656 int save_errno = errno;
657
658 CloseTransientFile(tmpfd);
659
660 /* if write didn't set errno, assume problem is no disk space */
661 errno = save_errno ? save_errno : ENOSPC;
662 ereport(PANIC,
663 (errcode_for_file_access(),
664 errmsg("could not write to file \"%s\": %m",
665 tmppath)));
666 }
667
668 CloseTransientFile(tmpfd);
669
670 /* fsync, rename to permanent file, fsync file and directory */
671 durable_rename(tmppath, path, PANIC);
672 }
673
674 /*
675 * Recover replication replay status from checkpoint data saved earlier by
676 * CheckPointReplicationOrigin.
677 *
678 * This only needs to be called at startup and *not* during every checkpoint
679 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
680 * state thereafter can be recovered by looking at commit records.
681 */
682 void
StartupReplicationOrigin(void)683 StartupReplicationOrigin(void)
684 {
685 const char *path = "pg_logical/replorigin_checkpoint";
686 int fd;
687 int readBytes;
688 uint32 magic = REPLICATION_STATE_MAGIC;
689 int last_state = 0;
690 pg_crc32c file_crc;
691 pg_crc32c crc;
692
693 /* don't want to overwrite already existing state */
694 #ifdef USE_ASSERT_CHECKING
695 static bool already_started = false;
696
697 Assert(!already_started);
698 already_started = true;
699 #endif
700
701 if (max_replication_slots == 0)
702 return;
703
704 INIT_CRC32C(crc);
705
706 elog(DEBUG2, "starting up replication origin progress state");
707
708 fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
709
710 /*
711 * might have had max_replication_slots == 0 last run, or we just brought
712 * up a standby.
713 */
714 if (fd < 0 && errno == ENOENT)
715 return;
716 else if (fd < 0)
717 ereport(PANIC,
718 (errcode_for_file_access(),
719 errmsg("could not open file \"%s\": %m",
720 path)));
721
722 /* verify magic, that is written even if nothing was active */
723 readBytes = read(fd, &magic, sizeof(magic));
724 if (readBytes != sizeof(magic))
725 ereport(PANIC,
726 (errmsg("could not read file \"%s\": %m",
727 path)));
728 COMP_CRC32C(crc, &magic, sizeof(magic));
729
730 if (magic != REPLICATION_STATE_MAGIC)
731 ereport(PANIC,
732 (errmsg("replication checkpoint has wrong magic %u instead of %u",
733 magic, REPLICATION_STATE_MAGIC)));
734
735 /* we can skip locking here, no other access is possible */
736
737 /* recover individual states, until there are no more to be found */
738 while (true)
739 {
740 ReplicationStateOnDisk disk_state;
741
742 readBytes = read(fd, &disk_state, sizeof(disk_state));
743
744 /* no further data */
745 if (readBytes == sizeof(crc))
746 {
747 /* not pretty, but simple ... */
748 file_crc = *(pg_crc32c *) &disk_state;
749 break;
750 }
751
752 if (readBytes < 0)
753 {
754 ereport(PANIC,
755 (errcode_for_file_access(),
756 errmsg("could not read file \"%s\": %m",
757 path)));
758 }
759
760 if (readBytes != sizeof(disk_state))
761 {
762 ereport(PANIC,
763 (errcode_for_file_access(),
764 errmsg("could not read file \"%s\": read %d of %zu",
765 path, readBytes, sizeof(disk_state))));
766 }
767
768 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
769
770 if (last_state == max_replication_slots)
771 ereport(PANIC,
772 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
773 errmsg("could not find free replication state, increase max_replication_slots")));
774
775 /* copy data to shared memory */
776 replication_states[last_state].roident = disk_state.roident;
777 replication_states[last_state].remote_lsn = disk_state.remote_lsn;
778 last_state++;
779
780 elog(LOG, "recovered replication state of node %u to %X/%X",
781 disk_state.roident,
782 (uint32) (disk_state.remote_lsn >> 32),
783 (uint32) disk_state.remote_lsn);
784 }
785
786 /* now check checksum */
787 FIN_CRC32C(crc);
788 if (file_crc != crc)
789 ereport(PANIC,
790 (errcode(ERRCODE_DATA_CORRUPTED),
791 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
792 crc, file_crc)));
793
794 CloseTransientFile(fd);
795 }
796
797 void
replorigin_redo(XLogReaderState * record)798 replorigin_redo(XLogReaderState *record)
799 {
800 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
801
802 switch (info)
803 {
804 case XLOG_REPLORIGIN_SET:
805 {
806 xl_replorigin_set *xlrec =
807 (xl_replorigin_set *) XLogRecGetData(record);
808
809 replorigin_advance(xlrec->node_id,
810 xlrec->remote_lsn, record->EndRecPtr,
811 xlrec->force /* backward */ ,
812 false /* WAL log */ );
813 break;
814 }
815 case XLOG_REPLORIGIN_DROP:
816 {
817 xl_replorigin_drop *xlrec;
818 int i;
819
820 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
821
822 for (i = 0; i < max_replication_slots; i++)
823 {
824 ReplicationState *state = &replication_states[i];
825
826 /* found our slot */
827 if (state->roident == xlrec->node_id)
828 {
829 /* reset entry */
830 state->roident = InvalidRepOriginId;
831 state->remote_lsn = InvalidXLogRecPtr;
832 state->local_lsn = InvalidXLogRecPtr;
833 break;
834 }
835 }
836 break;
837 }
838 default:
839 elog(PANIC, "replorigin_redo: unknown op code %u", info);
840 }
841 }
842
843
844 /*
845 * Tell the replication origin progress machinery that a commit from 'node'
846 * that originated at the LSN remote_commit on the remote node was replayed
847 * successfully and that we don't need to do so again. In combination with
848 * setting up replorigin_session_origin_lsn and replorigin_session_origin
849 * that ensures we won't loose knowledge about that after a crash if the
850 * transaction had a persistent effect (think of asynchronous commits).
851 *
852 * local_commit needs to be a local LSN of the commit so that we can make sure
853 * upon a checkpoint that enough WAL has been persisted to disk.
854 *
855 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
856 * unless running in recovery.
857 */
858 void
replorigin_advance(RepOriginId node,XLogRecPtr remote_commit,XLogRecPtr local_commit,bool go_backward,bool wal_log)859 replorigin_advance(RepOriginId node,
860 XLogRecPtr remote_commit, XLogRecPtr local_commit,
861 bool go_backward, bool wal_log)
862 {
863 int i;
864 ReplicationState *replication_state = NULL;
865 ReplicationState *free_state = NULL;
866
867 Assert(node != InvalidRepOriginId);
868
869 /* we don't track DoNotReplicateId */
870 if (node == DoNotReplicateId)
871 return;
872
873 /*
874 * XXX: For the case where this is called by WAL replay, it'd be more
875 * efficient to restore into a backend local hashtable and only dump into
876 * shmem after recovery is finished. Let's wait with implementing that
877 * till it's shown to be a measurable expense
878 */
879
880 /* Lock exclusively, as we may have to create a new table entry. */
881 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
882
883 /*
884 * Search for either an existing slot for the origin, or a free one we can
885 * use.
886 */
887 for (i = 0; i < max_replication_slots; i++)
888 {
889 ReplicationState *curstate = &replication_states[i];
890
891 /* remember where to insert if necessary */
892 if (curstate->roident == InvalidRepOriginId &&
893 free_state == NULL)
894 {
895 free_state = curstate;
896 continue;
897 }
898
899 /* not our slot */
900 if (curstate->roident != node)
901 {
902 continue;
903 }
904
905 /* ok, found slot */
906 replication_state = curstate;
907
908 LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
909
910 /* Make sure it's not used by somebody else */
911 if (replication_state->acquired_by != 0)
912 {
913 ereport(ERROR,
914 (errcode(ERRCODE_OBJECT_IN_USE),
915 errmsg("replication origin with OID %d is already active for PID %d",
916 replication_state->roident,
917 replication_state->acquired_by)));
918 }
919
920 break;
921 }
922
923 if (replication_state == NULL && free_state == NULL)
924 ereport(ERROR,
925 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
926 errmsg("could not find free replication state slot for replication origin with OID %u",
927 node),
928 errhint("Increase max_replication_slots and try again.")));
929
930 if (replication_state == NULL)
931 {
932 /* initialize new slot */
933 LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
934 replication_state = free_state;
935 Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
936 Assert(replication_state->local_lsn == InvalidXLogRecPtr);
937 replication_state->roident = node;
938 }
939
940 Assert(replication_state->roident != InvalidRepOriginId);
941
942 /*
943 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
944 * and the standby gets the message. Primarily this will be called during
945 * WAL replay (of commit records) where no WAL logging is necessary.
946 */
947 if (wal_log)
948 {
949 xl_replorigin_set xlrec;
950
951 xlrec.remote_lsn = remote_commit;
952 xlrec.node_id = node;
953 xlrec.force = go_backward;
954
955 XLogBeginInsert();
956 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
957
958 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
959 }
960
961 /*
962 * Due to - harmless - race conditions during a checkpoint we could see
963 * values here that are older than the ones we already have in memory.
964 * Don't overwrite those.
965 */
966 if (go_backward || replication_state->remote_lsn < remote_commit)
967 replication_state->remote_lsn = remote_commit;
968 if (local_commit != InvalidXLogRecPtr &&
969 (go_backward || replication_state->local_lsn < local_commit))
970 replication_state->local_lsn = local_commit;
971 LWLockRelease(&replication_state->lock);
972
973 /*
974 * Release *after* changing the LSNs, slot isn't acquired and thus could
975 * otherwise be dropped anytime.
976 */
977 LWLockRelease(ReplicationOriginLock);
978 }
979
980
981 XLogRecPtr
replorigin_get_progress(RepOriginId node,bool flush)982 replorigin_get_progress(RepOriginId node, bool flush)
983 {
984 int i;
985 XLogRecPtr local_lsn = InvalidXLogRecPtr;
986 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
987
988 /* prevent slots from being concurrently dropped */
989 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
990
991 for (i = 0; i < max_replication_slots; i++)
992 {
993 ReplicationState *state;
994
995 state = &replication_states[i];
996
997 if (state->roident == node)
998 {
999 LWLockAcquire(&state->lock, LW_SHARED);
1000
1001 remote_lsn = state->remote_lsn;
1002 local_lsn = state->local_lsn;
1003
1004 LWLockRelease(&state->lock);
1005
1006 break;
1007 }
1008 }
1009
1010 LWLockRelease(ReplicationOriginLock);
1011
1012 if (flush && local_lsn != InvalidXLogRecPtr)
1013 XLogFlush(local_lsn);
1014
1015 return remote_lsn;
1016 }
1017
1018 /*
1019 * Tear down a (possibly) configured session replication origin during process
1020 * exit.
1021 */
1022 static void
ReplicationOriginExitCleanup(int code,Datum arg)1023 ReplicationOriginExitCleanup(int code, Datum arg)
1024 {
1025 ConditionVariable *cv = NULL;
1026
1027 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1028
1029 if (session_replication_state != NULL &&
1030 session_replication_state->acquired_by == MyProcPid)
1031 {
1032 cv = &session_replication_state->origin_cv;
1033
1034 session_replication_state->acquired_by = 0;
1035 session_replication_state = NULL;
1036 }
1037
1038 LWLockRelease(ReplicationOriginLock);
1039
1040 if (cv)
1041 ConditionVariableBroadcast(cv);
1042 }
1043
1044 /*
1045 * Setup a replication origin in the shared memory struct if it doesn't
1046 * already exists and cache access to the specific ReplicationSlot so the
1047 * array doesn't have to be searched when calling
1048 * replorigin_session_advance().
1049 *
1050 * Obviously only one such cached origin can exist per process and the current
1051 * cached value can only be set again after the previous value is torn down
1052 * with replorigin_session_reset().
1053 */
1054 void
replorigin_session_setup(RepOriginId node)1055 replorigin_session_setup(RepOriginId node)
1056 {
1057 static bool registered_cleanup;
1058 int i;
1059 int free_slot = -1;
1060
1061 if (!registered_cleanup)
1062 {
1063 on_shmem_exit(ReplicationOriginExitCleanup, 0);
1064 registered_cleanup = true;
1065 }
1066
1067 Assert(max_replication_slots > 0);
1068
1069 if (session_replication_state != NULL)
1070 ereport(ERROR,
1071 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1072 errmsg("cannot setup replication origin when one is already setup")));
1073
1074 /* Lock exclusively, as we may have to create a new table entry. */
1075 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1076
1077 /*
1078 * Search for either an existing slot for the origin, or a free one we can
1079 * use.
1080 */
1081 for (i = 0; i < max_replication_slots; i++)
1082 {
1083 ReplicationState *curstate = &replication_states[i];
1084
1085 /* remember where to insert if necessary */
1086 if (curstate->roident == InvalidRepOriginId &&
1087 free_slot == -1)
1088 {
1089 free_slot = i;
1090 continue;
1091 }
1092
1093 /* not our slot */
1094 if (curstate->roident != node)
1095 continue;
1096
1097 else if (curstate->acquired_by != 0)
1098 {
1099 ereport(ERROR,
1100 (errcode(ERRCODE_OBJECT_IN_USE),
1101 errmsg("replication origin with OID %d is already active for PID %d",
1102 curstate->roident, curstate->acquired_by)));
1103 }
1104
1105 /* ok, found slot */
1106 session_replication_state = curstate;
1107 }
1108
1109
1110 if (session_replication_state == NULL && free_slot == -1)
1111 ereport(ERROR,
1112 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1113 errmsg("could not find free replication state slot for replication origin with OID %u",
1114 node),
1115 errhint("Increase max_replication_slots and try again.")));
1116 else if (session_replication_state == NULL)
1117 {
1118 /* initialize new slot */
1119 session_replication_state = &replication_states[free_slot];
1120 Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1121 Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1122 session_replication_state->roident = node;
1123 }
1124
1125
1126 Assert(session_replication_state->roident != InvalidRepOriginId);
1127
1128 session_replication_state->acquired_by = MyProcPid;
1129
1130 LWLockRelease(ReplicationOriginLock);
1131
1132 /* probably this one is pointless */
1133 ConditionVariableBroadcast(&session_replication_state->origin_cv);
1134 }
1135
1136 /*
1137 * Reset replay state previously setup in this session.
1138 *
1139 * This function may only be called if an origin was setup with
1140 * replorigin_session_setup().
1141 */
1142 void
replorigin_session_reset(void)1143 replorigin_session_reset(void)
1144 {
1145 ConditionVariable *cv;
1146
1147 Assert(max_replication_slots != 0);
1148
1149 if (session_replication_state == NULL)
1150 ereport(ERROR,
1151 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1152 errmsg("no replication origin is configured")));
1153
1154 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1155
1156 session_replication_state->acquired_by = 0;
1157 cv = &session_replication_state->origin_cv;
1158 session_replication_state = NULL;
1159
1160 LWLockRelease(ReplicationOriginLock);
1161
1162 ConditionVariableBroadcast(cv);
1163 }
1164
1165 /*
1166 * Do the same work replorigin_advance() does, just on the session's
1167 * configured origin.
1168 *
1169 * This is noticeably cheaper than using replorigin_advance().
1170 */
1171 void
replorigin_session_advance(XLogRecPtr remote_commit,XLogRecPtr local_commit)1172 replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1173 {
1174 Assert(session_replication_state != NULL);
1175 Assert(session_replication_state->roident != InvalidRepOriginId);
1176
1177 LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1178 if (session_replication_state->local_lsn < local_commit)
1179 session_replication_state->local_lsn = local_commit;
1180 if (session_replication_state->remote_lsn < remote_commit)
1181 session_replication_state->remote_lsn = remote_commit;
1182 LWLockRelease(&session_replication_state->lock);
1183 }
1184
1185 /*
1186 * Ask the machinery about the point up to which we successfully replayed
1187 * changes from an already setup replication origin.
1188 */
1189 XLogRecPtr
replorigin_session_get_progress(bool flush)1190 replorigin_session_get_progress(bool flush)
1191 {
1192 XLogRecPtr remote_lsn;
1193 XLogRecPtr local_lsn;
1194
1195 Assert(session_replication_state != NULL);
1196
1197 LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1198 remote_lsn = session_replication_state->remote_lsn;
1199 local_lsn = session_replication_state->local_lsn;
1200 LWLockRelease(&session_replication_state->lock);
1201
1202 if (flush && local_lsn != InvalidXLogRecPtr)
1203 XLogFlush(local_lsn);
1204
1205 return remote_lsn;
1206 }
1207
1208
1209
1210 /* ---------------------------------------------------------------------------
1211 * SQL functions for working with replication origin.
1212 *
1213 * These mostly should be fairly short wrappers around more generic functions.
1214 * ---------------------------------------------------------------------------
1215 */
1216
1217 /*
1218 * Create replication origin for the passed in name, and return the assigned
1219 * oid.
1220 */
1221 Datum
pg_replication_origin_create(PG_FUNCTION_ARGS)1222 pg_replication_origin_create(PG_FUNCTION_ARGS)
1223 {
1224 char *name;
1225 RepOriginId roident;
1226
1227 replorigin_check_prerequisites(false, false);
1228
1229 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1230 roident = replorigin_create(name);
1231
1232 pfree(name);
1233
1234 PG_RETURN_OID(roident);
1235 }
1236
1237 /*
1238 * Drop replication origin.
1239 */
1240 Datum
pg_replication_origin_drop(PG_FUNCTION_ARGS)1241 pg_replication_origin_drop(PG_FUNCTION_ARGS)
1242 {
1243 char *name;
1244 RepOriginId roident;
1245
1246 replorigin_check_prerequisites(false, false);
1247
1248 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1249
1250 roident = replorigin_by_name(name, false);
1251 Assert(OidIsValid(roident));
1252
1253 replorigin_drop(roident, true);
1254
1255 pfree(name);
1256
1257 PG_RETURN_VOID();
1258 }
1259
1260 /*
1261 * Return oid of a replication origin.
1262 */
1263 Datum
pg_replication_origin_oid(PG_FUNCTION_ARGS)1264 pg_replication_origin_oid(PG_FUNCTION_ARGS)
1265 {
1266 char *name;
1267 RepOriginId roident;
1268
1269 replorigin_check_prerequisites(false, false);
1270
1271 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1272 roident = replorigin_by_name(name, true);
1273
1274 pfree(name);
1275
1276 if (OidIsValid(roident))
1277 PG_RETURN_OID(roident);
1278 PG_RETURN_NULL();
1279 }
1280
1281 /*
1282 * Setup a replication origin for this session.
1283 */
1284 Datum
pg_replication_origin_session_setup(PG_FUNCTION_ARGS)1285 pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1286 {
1287 char *name;
1288 RepOriginId origin;
1289
1290 replorigin_check_prerequisites(true, false);
1291
1292 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1293 origin = replorigin_by_name(name, false);
1294 replorigin_session_setup(origin);
1295
1296 replorigin_session_origin = origin;
1297
1298 pfree(name);
1299
1300 PG_RETURN_VOID();
1301 }
1302
1303 /*
1304 * Reset previously setup origin in this session
1305 */
1306 Datum
pg_replication_origin_session_reset(PG_FUNCTION_ARGS)1307 pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1308 {
1309 replorigin_check_prerequisites(true, false);
1310
1311 replorigin_session_reset();
1312
1313 replorigin_session_origin = InvalidRepOriginId;
1314 replorigin_session_origin_lsn = InvalidXLogRecPtr;
1315 replorigin_session_origin_timestamp = 0;
1316
1317 PG_RETURN_VOID();
1318 }
1319
1320 /*
1321 * Has a replication origin been setup for this session.
1322 */
1323 Datum
pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)1324 pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1325 {
1326 replorigin_check_prerequisites(false, false);
1327
1328 PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1329 }
1330
1331
1332 /*
1333 * Return the replication progress for origin setup in the current session.
1334 *
1335 * If 'flush' is set to true it is ensured that the returned value corresponds
1336 * to a local transaction that has been flushed. This is useful if asynchronous
1337 * commits are used when replaying replicated transactions.
1338 */
1339 Datum
pg_replication_origin_session_progress(PG_FUNCTION_ARGS)1340 pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1341 {
1342 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1343 bool flush = PG_GETARG_BOOL(0);
1344
1345 replorigin_check_prerequisites(true, false);
1346
1347 if (session_replication_state == NULL)
1348 ereport(ERROR,
1349 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1350 errmsg("no replication origin is configured")));
1351
1352 remote_lsn = replorigin_session_get_progress(flush);
1353
1354 if (remote_lsn == InvalidXLogRecPtr)
1355 PG_RETURN_NULL();
1356
1357 PG_RETURN_LSN(remote_lsn);
1358 }
1359
1360 Datum
pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)1361 pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1362 {
1363 XLogRecPtr location = PG_GETARG_LSN(0);
1364
1365 replorigin_check_prerequisites(true, false);
1366
1367 if (session_replication_state == NULL)
1368 ereport(ERROR,
1369 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1370 errmsg("no replication origin is configured")));
1371
1372 replorigin_session_origin_lsn = location;
1373 replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1374
1375 PG_RETURN_VOID();
1376 }
1377
1378 Datum
pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)1379 pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1380 {
1381 replorigin_check_prerequisites(true, false);
1382
1383 replorigin_session_origin_lsn = InvalidXLogRecPtr;
1384 replorigin_session_origin_timestamp = 0;
1385
1386 PG_RETURN_VOID();
1387 }
1388
1389
1390 Datum
pg_replication_origin_advance(PG_FUNCTION_ARGS)1391 pg_replication_origin_advance(PG_FUNCTION_ARGS)
1392 {
1393 text *name = PG_GETARG_TEXT_PP(0);
1394 XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1395 RepOriginId node;
1396
1397 replorigin_check_prerequisites(true, false);
1398
1399 /* lock to prevent the replication origin from vanishing */
1400 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1401
1402 node = replorigin_by_name(text_to_cstring(name), false);
1403
1404 /*
1405 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1406 * xact hasn't committed yet. This is why this function should be used to
1407 * set up the initial replication state, but not for replay.
1408 */
1409 replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1410 true /* go backward */ , true /* WAL log */ );
1411
1412 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1413
1414 PG_RETURN_VOID();
1415 }
1416
1417
1418 /*
1419 * Return the replication progress for an individual replication origin.
1420 *
1421 * If 'flush' is set to true it is ensured that the returned value corresponds
1422 * to a local transaction that has been flushed. This is useful if asynchronous
1423 * commits are used when replaying replicated transactions.
1424 */
1425 Datum
pg_replication_origin_progress(PG_FUNCTION_ARGS)1426 pg_replication_origin_progress(PG_FUNCTION_ARGS)
1427 {
1428 char *name;
1429 bool flush;
1430 RepOriginId roident;
1431 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1432
1433 replorigin_check_prerequisites(true, true);
1434
1435 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1436 flush = PG_GETARG_BOOL(1);
1437
1438 roident = replorigin_by_name(name, false);
1439 Assert(OidIsValid(roident));
1440
1441 remote_lsn = replorigin_get_progress(roident, flush);
1442
1443 if (remote_lsn == InvalidXLogRecPtr)
1444 PG_RETURN_NULL();
1445
1446 PG_RETURN_LSN(remote_lsn);
1447 }
1448
1449
1450 Datum
pg_show_replication_origin_status(PG_FUNCTION_ARGS)1451 pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1452 {
1453 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1454 TupleDesc tupdesc;
1455 Tuplestorestate *tupstore;
1456 MemoryContext per_query_ctx;
1457 MemoryContext oldcontext;
1458 int i;
1459 #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1460
1461 /* we want to return 0 rows if slot is set to zero */
1462 replorigin_check_prerequisites(false, true);
1463
1464 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1465 ereport(ERROR,
1466 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1467 errmsg("set-valued function called in context that cannot accept a set")));
1468 if (!(rsinfo->allowedModes & SFRM_Materialize))
1469 ereport(ERROR,
1470 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1471 errmsg("materialize mode required, but it is not allowed in this context")));
1472 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1473 elog(ERROR, "return type must be a row type");
1474
1475 if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1476 elog(ERROR, "wrong function definition");
1477
1478 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1479 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1480
1481 tupstore = tuplestore_begin_heap(true, false, work_mem);
1482 rsinfo->returnMode = SFRM_Materialize;
1483 rsinfo->setResult = tupstore;
1484 rsinfo->setDesc = tupdesc;
1485
1486 MemoryContextSwitchTo(oldcontext);
1487
1488
1489 /* prevent slots from being concurrently dropped */
1490 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1491
1492 /*
1493 * Iterate through all possible replication_states, display if they are
1494 * filled. Note that we do not take any locks, so slightly corrupted/out
1495 * of date values are a possibility.
1496 */
1497 for (i = 0; i < max_replication_slots; i++)
1498 {
1499 ReplicationState *state;
1500 Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1501 bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1502 char *roname;
1503
1504 state = &replication_states[i];
1505
1506 /* unused slot, nothing to display */
1507 if (state->roident == InvalidRepOriginId)
1508 continue;
1509
1510 memset(values, 0, sizeof(values));
1511 memset(nulls, 1, sizeof(nulls));
1512
1513 values[0] = ObjectIdGetDatum(state->roident);
1514 nulls[0] = false;
1515
1516 /*
1517 * We're not preventing the origin to be dropped concurrently, so
1518 * silently accept that it might be gone.
1519 */
1520 if (replorigin_by_oid(state->roident, true,
1521 &roname))
1522 {
1523 values[1] = CStringGetTextDatum(roname);
1524 nulls[1] = false;
1525 }
1526
1527 LWLockAcquire(&state->lock, LW_SHARED);
1528
1529 values[2] = LSNGetDatum(state->remote_lsn);
1530 nulls[2] = false;
1531
1532 values[3] = LSNGetDatum(state->local_lsn);
1533 nulls[3] = false;
1534
1535 LWLockRelease(&state->lock);
1536
1537 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1538 }
1539
1540 tuplestore_donestoring(tupstore);
1541
1542 LWLockRelease(ReplicationOriginLock);
1543
1544 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1545
1546 return (Datum) 0;
1547 }
1548