1 /*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2019, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 * directory. Inside that directory the state file will contain the slot's
25 * own data. Additional data can be stored alongside that file if required.
26 * While the server is running, the state data is also cached in memory for
27 * efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37 #include "postgres.h"
38
39 #include <unistd.h>
40 #include <sys/stat.h>
41
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "common/string.h"
45 #include "miscadmin.h"
46 #include "pgstat.h"
47 #include "replication/slot.h"
48 #include "storage/fd.h"
49 #include "storage/proc.h"
50 #include "storage/procarray.h"
51 #include "utils/builtins.h"
52
53 /*
54 * Replication slot on-disk data structure.
55 */
56 typedef struct ReplicationSlotOnDisk
57 {
58 /* first part of this struct needs to be version independent */
59
60 /* data not covered by checksum */
61 uint32 magic;
62 pg_crc32c checksum;
63
64 /* data covered by checksum */
65 uint32 version;
66 uint32 length;
67
68 /*
69 * The actual data in the slot that follows can differ based on the above
70 * 'version'.
71 */
72
73 ReplicationSlotPersistentData slotdata;
74 } ReplicationSlotOnDisk;
75
76 /* size of version independent data */
77 #define ReplicationSlotOnDiskConstantSize \
78 offsetof(ReplicationSlotOnDisk, slotdata)
79 /* size of the part of the slot not covered by the checksum */
80 #define SnapBuildOnDiskNotChecksummedSize \
81 offsetof(ReplicationSlotOnDisk, version)
82 /* size of the part covered by the checksum */
83 #define SnapBuildOnDiskChecksummedSize \
84 sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
85 /* size of the slot data that is version dependent */
86 #define ReplicationSlotOnDiskV2Size \
87 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
88
89 #define SLOT_MAGIC 0x1051CA1 /* format identifier */
90 #define SLOT_VERSION 2 /* version for new files */
91
92 /* Control array for replication slot management */
93 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
94
95 /* My backend's replication slot in the shared memory array */
96 ReplicationSlot *MyReplicationSlot = NULL;
97
98 /* GUCs */
99 int max_replication_slots = 0; /* the maximum number of replication
100 * slots */
101
102 static void ReplicationSlotDropAcquired(void);
103 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
104
105 /* internal persistency functions */
106 static void RestoreSlotFromDisk(const char *name);
107 static void CreateSlotOnDisk(ReplicationSlot *slot);
108 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
109
110 /*
111 * Report shared-memory space needed by ReplicationSlotShmemInit.
112 */
113 Size
ReplicationSlotsShmemSize(void)114 ReplicationSlotsShmemSize(void)
115 {
116 Size size = 0;
117
118 if (max_replication_slots == 0)
119 return size;
120
121 size = offsetof(ReplicationSlotCtlData, replication_slots);
122 size = add_size(size,
123 mul_size(max_replication_slots, sizeof(ReplicationSlot)));
124
125 return size;
126 }
127
128 /*
129 * Allocate and initialize shared memory for replication slots.
130 */
131 void
ReplicationSlotsShmemInit(void)132 ReplicationSlotsShmemInit(void)
133 {
134 bool found;
135
136 if (max_replication_slots == 0)
137 return;
138
139 ReplicationSlotCtl = (ReplicationSlotCtlData *)
140 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
141 &found);
142
143 LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
144 "replication_slot_io");
145
146 if (!found)
147 {
148 int i;
149
150 /* First time through, so initialize */
151 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
152
153 for (i = 0; i < max_replication_slots; i++)
154 {
155 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
156
157 /* everything else is zeroed by the memset above */
158 SpinLockInit(&slot->mutex);
159 LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
160 ConditionVariableInit(&slot->active_cv);
161 }
162 }
163 }
164
165 /*
166 * Check whether the passed slot name is valid and report errors at elevel.
167 *
168 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
169 * the name to be used as a directory name on every supported OS.
170 *
171 * Returns whether the directory name is valid or not if elevel < ERROR.
172 */
173 bool
ReplicationSlotValidateName(const char * name,int elevel)174 ReplicationSlotValidateName(const char *name, int elevel)
175 {
176 const char *cp;
177
178 if (strlen(name) == 0)
179 {
180 ereport(elevel,
181 (errcode(ERRCODE_INVALID_NAME),
182 errmsg("replication slot name \"%s\" is too short",
183 name)));
184 return false;
185 }
186
187 if (strlen(name) >= NAMEDATALEN)
188 {
189 ereport(elevel,
190 (errcode(ERRCODE_NAME_TOO_LONG),
191 errmsg("replication slot name \"%s\" is too long",
192 name)));
193 return false;
194 }
195
196 for (cp = name; *cp; cp++)
197 {
198 if (!((*cp >= 'a' && *cp <= 'z')
199 || (*cp >= '0' && *cp <= '9')
200 || (*cp == '_')))
201 {
202 ereport(elevel,
203 (errcode(ERRCODE_INVALID_NAME),
204 errmsg("replication slot name \"%s\" contains invalid character",
205 name),
206 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
207 return false;
208 }
209 }
210 return true;
211 }
212
213 /*
214 * Create a new replication slot and mark it as used by this backend.
215 *
216 * name: Name of the slot
217 * db_specific: logical decoding is db specific; if the slot is going to
218 * be used for that pass true, otherwise false.
219 */
220 void
ReplicationSlotCreate(const char * name,bool db_specific,ReplicationSlotPersistency persistency)221 ReplicationSlotCreate(const char *name, bool db_specific,
222 ReplicationSlotPersistency persistency)
223 {
224 ReplicationSlot *slot = NULL;
225 int i;
226
227 Assert(MyReplicationSlot == NULL);
228
229 ReplicationSlotValidateName(name, ERROR);
230
231 /*
232 * If some other backend ran this code concurrently with us, we'd likely
233 * both allocate the same slot, and that would be bad. We'd also be at
234 * risk of missing a name collision. Also, we don't want to try to create
235 * a new slot while somebody's busy cleaning up an old one, because we
236 * might both be monkeying with the same directory.
237 */
238 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
239
240 /*
241 * Check for name collision, and identify an allocatable slot. We need to
242 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
243 * else can change the in_use flags while we're looking at them.
244 */
245 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
246 for (i = 0; i < max_replication_slots; i++)
247 {
248 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
249
250 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
251 ereport(ERROR,
252 (errcode(ERRCODE_DUPLICATE_OBJECT),
253 errmsg("replication slot \"%s\" already exists", name)));
254 if (!s->in_use && slot == NULL)
255 slot = s;
256 }
257 LWLockRelease(ReplicationSlotControlLock);
258
259 /* If all slots are in use, we're out of luck. */
260 if (slot == NULL)
261 ereport(ERROR,
262 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
263 errmsg("all replication slots are in use"),
264 errhint("Free one or increase max_replication_slots.")));
265
266 /*
267 * Since this slot is not in use, nobody should be looking at any part of
268 * it other than the in_use field unless they're trying to allocate it.
269 * And since we hold ReplicationSlotAllocationLock, nobody except us can
270 * be doing that. So it's safe to initialize the slot.
271 */
272 Assert(!slot->in_use);
273 Assert(slot->active_pid == 0);
274
275 /* first initialize persistent data */
276 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
277 StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
278 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
279 slot->data.persistency = persistency;
280
281 /* and then data only present in shared memory */
282 slot->just_dirtied = false;
283 slot->dirty = false;
284 slot->effective_xmin = InvalidTransactionId;
285 slot->effective_catalog_xmin = InvalidTransactionId;
286 slot->candidate_catalog_xmin = InvalidTransactionId;
287 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
288 slot->candidate_restart_valid = InvalidXLogRecPtr;
289 slot->candidate_restart_lsn = InvalidXLogRecPtr;
290
291 /*
292 * Create the slot on disk. We haven't actually marked the slot allocated
293 * yet, so no special cleanup is required if this errors out.
294 */
295 CreateSlotOnDisk(slot);
296
297 /*
298 * We need to briefly prevent any other backend from iterating over the
299 * slots while we flip the in_use flag. We also need to set the active
300 * flag while holding the ControlLock as otherwise a concurrent
301 * SlotAcquire() could acquire the slot as well.
302 */
303 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
304
305 slot->in_use = true;
306
307 /* We can now mark the slot active, and that makes it our slot. */
308 SpinLockAcquire(&slot->mutex);
309 Assert(slot->active_pid == 0);
310 slot->active_pid = MyProcPid;
311 SpinLockRelease(&slot->mutex);
312 MyReplicationSlot = slot;
313
314 LWLockRelease(ReplicationSlotControlLock);
315
316 /*
317 * Now that the slot has been marked as in_use and active, it's safe to
318 * let somebody else try to allocate a slot.
319 */
320 LWLockRelease(ReplicationSlotAllocationLock);
321
322 /* Let everybody know we've modified this slot */
323 ConditionVariableBroadcast(&slot->active_cv);
324 }
325
326 /*
327 * Find a previously created slot and mark it as used by this backend.
328 */
329 void
ReplicationSlotAcquire(const char * name,bool nowait)330 ReplicationSlotAcquire(const char *name, bool nowait)
331 {
332 ReplicationSlot *slot;
333 int active_pid;
334 int i;
335
336 retry:
337 Assert(MyReplicationSlot == NULL);
338
339 /*
340 * Search for the named slot and mark it active if we find it. If the
341 * slot is already active, we exit the loop with active_pid set to the PID
342 * of the backend that owns it.
343 */
344 active_pid = 0;
345 slot = NULL;
346 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
347 for (i = 0; i < max_replication_slots; i++)
348 {
349 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
350
351 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
352 {
353 /*
354 * This is the slot we want; check if it's active under some other
355 * process. In single user mode, we don't need this check.
356 */
357 if (IsUnderPostmaster)
358 {
359 /*
360 * Get ready to sleep on it in case it is active. (We may end
361 * up not sleeping, but we don't want to do this while holding
362 * the spinlock.)
363 */
364 ConditionVariablePrepareToSleep(&s->active_cv);
365
366 SpinLockAcquire(&s->mutex);
367
368 active_pid = s->active_pid;
369 if (active_pid == 0)
370 active_pid = s->active_pid = MyProcPid;
371
372 SpinLockRelease(&s->mutex);
373 }
374 else
375 active_pid = MyProcPid;
376 slot = s;
377
378 break;
379 }
380 }
381 LWLockRelease(ReplicationSlotControlLock);
382
383 /* If we did not find the slot, error out. */
384 if (slot == NULL)
385 ereport(ERROR,
386 (errcode(ERRCODE_UNDEFINED_OBJECT),
387 errmsg("replication slot \"%s\" does not exist", name)));
388
389 /*
390 * If we found the slot but it's already active in another backend, we
391 * either error out or retry after a short wait, as caller specified.
392 */
393 if (active_pid != MyProcPid)
394 {
395 if (nowait)
396 ereport(ERROR,
397 (errcode(ERRCODE_OBJECT_IN_USE),
398 errmsg("replication slot \"%s\" is active for PID %d",
399 name, active_pid)));
400
401 /* Wait here until we get signaled, and then restart */
402 ConditionVariableSleep(&slot->active_cv,
403 WAIT_EVENT_REPLICATION_SLOT_DROP);
404 ConditionVariableCancelSleep();
405 goto retry;
406 }
407 else
408 ConditionVariableCancelSleep(); /* no sleep needed after all */
409
410 /* Let everybody know we've modified this slot */
411 ConditionVariableBroadcast(&slot->active_cv);
412
413 /* We made this slot active, so it's ours now. */
414 MyReplicationSlot = slot;
415 }
416
417 /*
418 * Release the replication slot that this backend considers to own.
419 *
420 * This or another backend can re-acquire the slot later.
421 * Resources this slot requires will be preserved.
422 */
423 void
ReplicationSlotRelease(void)424 ReplicationSlotRelease(void)
425 {
426 ReplicationSlot *slot = MyReplicationSlot;
427
428 Assert(slot != NULL && slot->active_pid != 0);
429
430 if (slot->data.persistency == RS_EPHEMERAL)
431 {
432 /*
433 * Delete the slot. There is no !PANIC case where this is allowed to
434 * fail, all that may happen is an incomplete cleanup of the on-disk
435 * data.
436 */
437 ReplicationSlotDropAcquired();
438 }
439
440 /*
441 * If slot needed to temporarily restrain both data and catalog xmin to
442 * create the catalog snapshot, remove that temporary constraint.
443 * Snapshots can only be exported while the initial snapshot is still
444 * acquired.
445 */
446 if (!TransactionIdIsValid(slot->data.xmin) &&
447 TransactionIdIsValid(slot->effective_xmin))
448 {
449 SpinLockAcquire(&slot->mutex);
450 slot->effective_xmin = InvalidTransactionId;
451 SpinLockRelease(&slot->mutex);
452 ReplicationSlotsComputeRequiredXmin(false);
453 }
454
455 if (slot->data.persistency == RS_PERSISTENT)
456 {
457 /*
458 * Mark persistent slot inactive. We're not freeing it, just
459 * disconnecting, but wake up others that may be waiting for it.
460 */
461 SpinLockAcquire(&slot->mutex);
462 slot->active_pid = 0;
463 SpinLockRelease(&slot->mutex);
464 ConditionVariableBroadcast(&slot->active_cv);
465 }
466
467 MyReplicationSlot = NULL;
468
469 /* might not have been set when we've been a plain slot */
470 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
471 MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
472 LWLockRelease(ProcArrayLock);
473 }
474
475 /*
476 * Cleanup all temporary slots created in current session.
477 */
478 void
ReplicationSlotCleanup(void)479 ReplicationSlotCleanup(void)
480 {
481 int i;
482
483 Assert(MyReplicationSlot == NULL);
484
485 restart:
486 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
487 for (i = 0; i < max_replication_slots; i++)
488 {
489 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
490
491 if (!s->in_use)
492 continue;
493
494 SpinLockAcquire(&s->mutex);
495 if (s->active_pid == MyProcPid)
496 {
497 Assert(s->data.persistency == RS_TEMPORARY);
498 SpinLockRelease(&s->mutex);
499 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
500
501 ReplicationSlotDropPtr(s);
502
503 ConditionVariableBroadcast(&s->active_cv);
504 goto restart;
505 }
506 else
507 SpinLockRelease(&s->mutex);
508 }
509
510 LWLockRelease(ReplicationSlotControlLock);
511 }
512
513 /*
514 * Permanently drop replication slot identified by the passed in name.
515 */
516 void
ReplicationSlotDrop(const char * name,bool nowait)517 ReplicationSlotDrop(const char *name, bool nowait)
518 {
519 Assert(MyReplicationSlot == NULL);
520
521 ReplicationSlotAcquire(name, nowait);
522
523 ReplicationSlotDropAcquired();
524 }
525
526 /*
527 * Permanently drop the currently acquired replication slot.
528 */
529 static void
ReplicationSlotDropAcquired(void)530 ReplicationSlotDropAcquired(void)
531 {
532 ReplicationSlot *slot = MyReplicationSlot;
533
534 Assert(MyReplicationSlot != NULL);
535
536 /* slot isn't acquired anymore */
537 MyReplicationSlot = NULL;
538
539 ReplicationSlotDropPtr(slot);
540 }
541
542 /*
543 * Permanently drop the replication slot which will be released by the point
544 * this function returns.
545 */
546 static void
ReplicationSlotDropPtr(ReplicationSlot * slot)547 ReplicationSlotDropPtr(ReplicationSlot *slot)
548 {
549 char path[MAXPGPATH];
550 char tmppath[MAXPGPATH];
551
552 /*
553 * If some other backend ran this code concurrently with us, we might try
554 * to delete a slot with a certain name while someone else was trying to
555 * create a slot with the same name.
556 */
557 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
558
559 /* Generate pathnames. */
560 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
561 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
562
563 /*
564 * Rename the slot directory on disk, so that we'll no longer recognize
565 * this as a valid slot. Note that if this fails, we've got to mark the
566 * slot inactive before bailing out. If we're dropping an ephemeral or a
567 * temporary slot, we better never fail hard as the caller won't expect
568 * the slot to survive and this might get called during error handling.
569 */
570 if (rename(path, tmppath) == 0)
571 {
572 /*
573 * We need to fsync() the directory we just renamed and its parent to
574 * make sure that our changes are on disk in a crash-safe fashion. If
575 * fsync() fails, we can't be sure whether the changes are on disk or
576 * not. For now, we handle that by panicking;
577 * StartupReplicationSlots() will try to straighten it out after
578 * restart.
579 */
580 START_CRIT_SECTION();
581 fsync_fname(tmppath, true);
582 fsync_fname("pg_replslot", true);
583 END_CRIT_SECTION();
584 }
585 else
586 {
587 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
588
589 SpinLockAcquire(&slot->mutex);
590 slot->active_pid = 0;
591 SpinLockRelease(&slot->mutex);
592
593 /* wake up anyone waiting on this slot */
594 ConditionVariableBroadcast(&slot->active_cv);
595
596 ereport(fail_softly ? WARNING : ERROR,
597 (errcode_for_file_access(),
598 errmsg("could not rename file \"%s\" to \"%s\": %m",
599 path, tmppath)));
600 }
601
602 /*
603 * The slot is definitely gone. Lock out concurrent scans of the array
604 * long enough to kill it. It's OK to clear the active PID here without
605 * grabbing the mutex because nobody else can be scanning the array here,
606 * and nobody can be attached to this slot and thus access it without
607 * scanning the array.
608 *
609 * Also wake up processes waiting for it.
610 */
611 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
612 slot->active_pid = 0;
613 slot->in_use = false;
614 LWLockRelease(ReplicationSlotControlLock);
615 ConditionVariableBroadcast(&slot->active_cv);
616
617 /*
618 * Slot is dead and doesn't prevent resource removal anymore, recompute
619 * limits.
620 */
621 ReplicationSlotsComputeRequiredXmin(false);
622 ReplicationSlotsComputeRequiredLSN();
623
624 /*
625 * If removing the directory fails, the worst thing that will happen is
626 * that the user won't be able to create a new slot with the same name
627 * until the next server restart. We warn about it, but that's all.
628 */
629 if (!rmtree(tmppath, true))
630 ereport(WARNING,
631 (errmsg("could not remove directory \"%s\"", tmppath)));
632
633 /*
634 * We release this at the very end, so that nobody starts trying to create
635 * a slot while we're still cleaning up the detritus of the old one.
636 */
637 LWLockRelease(ReplicationSlotAllocationLock);
638 }
639
640 /*
641 * Serialize the currently acquired slot's state from memory to disk, thereby
642 * guaranteeing the current state will survive a crash.
643 */
644 void
ReplicationSlotSave(void)645 ReplicationSlotSave(void)
646 {
647 char path[MAXPGPATH];
648
649 Assert(MyReplicationSlot != NULL);
650
651 sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
652 SaveSlotToPath(MyReplicationSlot, path, ERROR);
653 }
654
655 /*
656 * Signal that it would be useful if the currently acquired slot would be
657 * flushed out to disk.
658 *
659 * Note that the actual flush to disk can be delayed for a long time, if
660 * required for correctness explicitly do a ReplicationSlotSave().
661 */
662 void
ReplicationSlotMarkDirty(void)663 ReplicationSlotMarkDirty(void)
664 {
665 ReplicationSlot *slot = MyReplicationSlot;
666
667 Assert(MyReplicationSlot != NULL);
668
669 SpinLockAcquire(&slot->mutex);
670 MyReplicationSlot->just_dirtied = true;
671 MyReplicationSlot->dirty = true;
672 SpinLockRelease(&slot->mutex);
673 }
674
675 /*
676 * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
677 * guaranteeing it will be there after an eventual crash.
678 */
679 void
ReplicationSlotPersist(void)680 ReplicationSlotPersist(void)
681 {
682 ReplicationSlot *slot = MyReplicationSlot;
683
684 Assert(slot != NULL);
685 Assert(slot->data.persistency != RS_PERSISTENT);
686
687 SpinLockAcquire(&slot->mutex);
688 slot->data.persistency = RS_PERSISTENT;
689 SpinLockRelease(&slot->mutex);
690
691 ReplicationSlotMarkDirty();
692 ReplicationSlotSave();
693 }
694
695 /*
696 * Compute the oldest xmin across all slots and store it in the ProcArray.
697 *
698 * If already_locked is true, ProcArrayLock has already been acquired
699 * exclusively.
700 */
701 void
ReplicationSlotsComputeRequiredXmin(bool already_locked)702 ReplicationSlotsComputeRequiredXmin(bool already_locked)
703 {
704 int i;
705 TransactionId agg_xmin = InvalidTransactionId;
706 TransactionId agg_catalog_xmin = InvalidTransactionId;
707
708 Assert(ReplicationSlotCtl != NULL);
709
710 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
711
712 for (i = 0; i < max_replication_slots; i++)
713 {
714 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
715 TransactionId effective_xmin;
716 TransactionId effective_catalog_xmin;
717
718 if (!s->in_use)
719 continue;
720
721 SpinLockAcquire(&s->mutex);
722 effective_xmin = s->effective_xmin;
723 effective_catalog_xmin = s->effective_catalog_xmin;
724 SpinLockRelease(&s->mutex);
725
726 /* check the data xmin */
727 if (TransactionIdIsValid(effective_xmin) &&
728 (!TransactionIdIsValid(agg_xmin) ||
729 TransactionIdPrecedes(effective_xmin, agg_xmin)))
730 agg_xmin = effective_xmin;
731
732 /* check the catalog xmin */
733 if (TransactionIdIsValid(effective_catalog_xmin) &&
734 (!TransactionIdIsValid(agg_catalog_xmin) ||
735 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
736 agg_catalog_xmin = effective_catalog_xmin;
737 }
738
739 LWLockRelease(ReplicationSlotControlLock);
740
741 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
742 }
743
744 /*
745 * Compute the oldest restart LSN across all slots and inform xlog module.
746 */
747 void
ReplicationSlotsComputeRequiredLSN(void)748 ReplicationSlotsComputeRequiredLSN(void)
749 {
750 int i;
751 XLogRecPtr min_required = InvalidXLogRecPtr;
752
753 Assert(ReplicationSlotCtl != NULL);
754
755 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
756 for (i = 0; i < max_replication_slots; i++)
757 {
758 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
759 XLogRecPtr restart_lsn;
760
761 if (!s->in_use)
762 continue;
763
764 SpinLockAcquire(&s->mutex);
765 restart_lsn = s->data.restart_lsn;
766 SpinLockRelease(&s->mutex);
767
768 if (restart_lsn != InvalidXLogRecPtr &&
769 (min_required == InvalidXLogRecPtr ||
770 restart_lsn < min_required))
771 min_required = restart_lsn;
772 }
773 LWLockRelease(ReplicationSlotControlLock);
774
775 XLogSetReplicationSlotMinimumLSN(min_required);
776 }
777
778 /*
779 * Compute the oldest WAL LSN required by *logical* decoding slots..
780 *
781 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
782 * slots exist.
783 *
784 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
785 * ignores physical replication slots.
786 *
787 * The results aren't required frequently, so we don't maintain a precomputed
788 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
789 */
790 XLogRecPtr
ReplicationSlotsComputeLogicalRestartLSN(void)791 ReplicationSlotsComputeLogicalRestartLSN(void)
792 {
793 XLogRecPtr result = InvalidXLogRecPtr;
794 int i;
795
796 if (max_replication_slots <= 0)
797 return InvalidXLogRecPtr;
798
799 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
800
801 for (i = 0; i < max_replication_slots; i++)
802 {
803 ReplicationSlot *s;
804 XLogRecPtr restart_lsn;
805
806 s = &ReplicationSlotCtl->replication_slots[i];
807
808 /* cannot change while ReplicationSlotCtlLock is held */
809 if (!s->in_use)
810 continue;
811
812 /* we're only interested in logical slots */
813 if (!SlotIsLogical(s))
814 continue;
815
816 /* read once, it's ok if it increases while we're checking */
817 SpinLockAcquire(&s->mutex);
818 restart_lsn = s->data.restart_lsn;
819 SpinLockRelease(&s->mutex);
820
821 if (result == InvalidXLogRecPtr ||
822 restart_lsn < result)
823 result = restart_lsn;
824 }
825
826 LWLockRelease(ReplicationSlotControlLock);
827
828 return result;
829 }
830
831 /*
832 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
833 * passed database oid.
834 *
835 * Returns true if there are any slots referencing the database. *nslots will
836 * be set to the absolute number of slots in the database, *nactive to ones
837 * currently active.
838 */
839 bool
ReplicationSlotsCountDBSlots(Oid dboid,int * nslots,int * nactive)840 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
841 {
842 int i;
843
844 *nslots = *nactive = 0;
845
846 if (max_replication_slots <= 0)
847 return false;
848
849 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
850 for (i = 0; i < max_replication_slots; i++)
851 {
852 ReplicationSlot *s;
853
854 s = &ReplicationSlotCtl->replication_slots[i];
855
856 /* cannot change while ReplicationSlotCtlLock is held */
857 if (!s->in_use)
858 continue;
859
860 /* only logical slots are database specific, skip */
861 if (!SlotIsLogical(s))
862 continue;
863
864 /* not our database, skip */
865 if (s->data.database != dboid)
866 continue;
867
868 /* count slots with spinlock held */
869 SpinLockAcquire(&s->mutex);
870 (*nslots)++;
871 if (s->active_pid != 0)
872 (*nactive)++;
873 SpinLockRelease(&s->mutex);
874 }
875 LWLockRelease(ReplicationSlotControlLock);
876
877 if (*nslots > 0)
878 return true;
879 return false;
880 }
881
882 /*
883 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
884 * passed database oid. The caller should hold an exclusive lock on the
885 * pg_database oid for the database to prevent creation of new slots on the db
886 * or replay from existing slots.
887 *
888 * Another session that concurrently acquires an existing slot on the target DB
889 * (most likely to drop it) may cause this function to ERROR. If that happens
890 * it may have dropped some but not all slots.
891 *
892 * This routine isn't as efficient as it could be - but we don't drop
893 * databases often, especially databases with lots of slots.
894 */
895 void
ReplicationSlotsDropDBSlots(Oid dboid)896 ReplicationSlotsDropDBSlots(Oid dboid)
897 {
898 int i;
899
900 if (max_replication_slots <= 0)
901 return;
902
903 restart:
904 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
905 for (i = 0; i < max_replication_slots; i++)
906 {
907 ReplicationSlot *s;
908 char *slotname;
909 int active_pid;
910
911 s = &ReplicationSlotCtl->replication_slots[i];
912
913 /* cannot change while ReplicationSlotCtlLock is held */
914 if (!s->in_use)
915 continue;
916
917 /* only logical slots are database specific, skip */
918 if (!SlotIsLogical(s))
919 continue;
920
921 /* not our database, skip */
922 if (s->data.database != dboid)
923 continue;
924
925 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
926 SpinLockAcquire(&s->mutex);
927 /* can't change while ReplicationSlotControlLock is held */
928 slotname = NameStr(s->data.name);
929 active_pid = s->active_pid;
930 if (active_pid == 0)
931 {
932 MyReplicationSlot = s;
933 s->active_pid = MyProcPid;
934 }
935 SpinLockRelease(&s->mutex);
936
937 /*
938 * Even though we hold an exclusive lock on the database object a
939 * logical slot for that DB can still be active, e.g. if it's
940 * concurrently being dropped by a backend connected to another DB.
941 *
942 * That's fairly unlikely in practice, so we'll just bail out.
943 */
944 if (active_pid)
945 ereport(ERROR,
946 (errcode(ERRCODE_OBJECT_IN_USE),
947 errmsg("replication slot \"%s\" is active for PID %d",
948 slotname, active_pid)));
949
950 /*
951 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
952 * holding ReplicationSlotControlLock over filesystem operations,
953 * release ReplicationSlotControlLock and use
954 * ReplicationSlotDropAcquired.
955 *
956 * As that means the set of slots could change, restart scan from the
957 * beginning each time we release the lock.
958 */
959 LWLockRelease(ReplicationSlotControlLock);
960 ReplicationSlotDropAcquired();
961 goto restart;
962 }
963 LWLockRelease(ReplicationSlotControlLock);
964 }
965
966
967 /*
968 * Check whether the server's configuration supports using replication
969 * slots.
970 */
971 void
CheckSlotRequirements(void)972 CheckSlotRequirements(void)
973 {
974 /*
975 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
976 * needs the same check.
977 */
978
979 if (max_replication_slots == 0)
980 ereport(ERROR,
981 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
982 (errmsg("replication slots can only be used if max_replication_slots > 0"))));
983
984 if (wal_level < WAL_LEVEL_REPLICA)
985 ereport(ERROR,
986 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
987 errmsg("replication slots can only be used if wal_level >= replica")));
988 }
989
990 /*
991 * Reserve WAL for the currently active slot.
992 *
993 * Compute and set restart_lsn in a manner that's appropriate for the type of
994 * the slot and concurrency safe.
995 */
996 void
ReplicationSlotReserveWal(void)997 ReplicationSlotReserveWal(void)
998 {
999 ReplicationSlot *slot = MyReplicationSlot;
1000
1001 Assert(slot != NULL);
1002 Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1003
1004 /*
1005 * The replication slot mechanism is used to prevent removal of required
1006 * WAL. As there is no interlock between this routine and checkpoints, WAL
1007 * segments could concurrently be removed when a now stale return value of
1008 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1009 * this happens we'll just retry.
1010 */
1011 while (true)
1012 {
1013 XLogSegNo segno;
1014 XLogRecPtr restart_lsn;
1015
1016 /*
1017 * For logical slots log a standby snapshot and start logical decoding
1018 * at exactly that position. That allows the slot to start up more
1019 * quickly.
1020 *
1021 * That's not needed (or indeed helpful) for physical slots as they'll
1022 * start replay at the last logged checkpoint anyway. Instead return
1023 * the location of the last redo LSN. While that slightly increases
1024 * the chance that we have to retry, it's where a base backup has to
1025 * start replay at.
1026 */
1027 if (!RecoveryInProgress() && SlotIsLogical(slot))
1028 {
1029 XLogRecPtr flushptr;
1030
1031 /* start at current insert position */
1032 restart_lsn = GetXLogInsertRecPtr();
1033 SpinLockAcquire(&slot->mutex);
1034 slot->data.restart_lsn = restart_lsn;
1035 SpinLockRelease(&slot->mutex);
1036
1037 /* make sure we have enough information to start */
1038 flushptr = LogStandbySnapshot();
1039
1040 /* and make sure it's fsynced to disk */
1041 XLogFlush(flushptr);
1042 }
1043 else
1044 {
1045 restart_lsn = GetRedoRecPtr();
1046 SpinLockAcquire(&slot->mutex);
1047 slot->data.restart_lsn = restart_lsn;
1048 SpinLockRelease(&slot->mutex);
1049 }
1050
1051 /* prevent WAL removal as fast as possible */
1052 ReplicationSlotsComputeRequiredLSN();
1053
1054 /*
1055 * If all required WAL is still there, great, otherwise retry. The
1056 * slot should prevent further removal of WAL, unless there's a
1057 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1058 * the new restart_lsn above, so normally we should never need to loop
1059 * more than twice.
1060 */
1061 XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1062 if (XLogGetLastRemovedSegno() < segno)
1063 break;
1064 }
1065 }
1066
1067 /*
1068 * Flush all replication slots to disk.
1069 *
1070 * This needn't actually be part of a checkpoint, but it's a convenient
1071 * location.
1072 */
1073 void
CheckPointReplicationSlots(void)1074 CheckPointReplicationSlots(void)
1075 {
1076 int i;
1077
1078 elog(DEBUG1, "performing replication slot checkpoint");
1079
1080 /*
1081 * Prevent any slot from being created/dropped while we're active. As we
1082 * explicitly do *not* want to block iterating over replication_slots or
1083 * acquiring a slot we cannot take the control lock - but that's OK,
1084 * because holding ReplicationSlotAllocationLock is strictly stronger, and
1085 * enough to guarantee that nobody can change the in_use bits on us.
1086 */
1087 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1088
1089 for (i = 0; i < max_replication_slots; i++)
1090 {
1091 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1092 char path[MAXPGPATH];
1093
1094 if (!s->in_use)
1095 continue;
1096
1097 /* save the slot to disk, locking is handled in SaveSlotToPath() */
1098 sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1099 SaveSlotToPath(s, path, LOG);
1100 }
1101 LWLockRelease(ReplicationSlotAllocationLock);
1102 }
1103
1104 /*
1105 * Load all replication slots from disk into memory at server startup. This
1106 * needs to be run before we start crash recovery.
1107 */
1108 void
StartupReplicationSlots(void)1109 StartupReplicationSlots(void)
1110 {
1111 DIR *replication_dir;
1112 struct dirent *replication_de;
1113
1114 elog(DEBUG1, "starting up replication slots");
1115
1116 /* restore all slots by iterating over all on-disk entries */
1117 replication_dir = AllocateDir("pg_replslot");
1118 while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1119 {
1120 struct stat statbuf;
1121 char path[MAXPGPATH + 12];
1122
1123 if (strcmp(replication_de->d_name, ".") == 0 ||
1124 strcmp(replication_de->d_name, "..") == 0)
1125 continue;
1126
1127 snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1128
1129 /* we're only creating directories here, skip if it's not our's */
1130 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
1131 continue;
1132
1133 /* we crashed while a slot was being setup or deleted, clean up */
1134 if (pg_str_endswith(replication_de->d_name, ".tmp"))
1135 {
1136 if (!rmtree(path, true))
1137 {
1138 ereport(WARNING,
1139 (errmsg("could not remove directory \"%s\"",
1140 path)));
1141 continue;
1142 }
1143 fsync_fname("pg_replslot", true);
1144 continue;
1145 }
1146
1147 /* looks like a slot in a normal state, restore */
1148 RestoreSlotFromDisk(replication_de->d_name);
1149 }
1150 FreeDir(replication_dir);
1151
1152 /* currently no slots exist, we're done. */
1153 if (max_replication_slots <= 0)
1154 return;
1155
1156 /* Now that we have recovered all the data, compute replication xmin */
1157 ReplicationSlotsComputeRequiredXmin(false);
1158 ReplicationSlotsComputeRequiredLSN();
1159 }
1160
1161 /* ----
1162 * Manipulation of on-disk state of replication slots
1163 *
1164 * NB: none of the routines below should take any notice whether a slot is the
1165 * current one or not, that's all handled a layer above.
1166 * ----
1167 */
1168 static void
CreateSlotOnDisk(ReplicationSlot * slot)1169 CreateSlotOnDisk(ReplicationSlot *slot)
1170 {
1171 char tmppath[MAXPGPATH];
1172 char path[MAXPGPATH];
1173 struct stat st;
1174
1175 /*
1176 * No need to take out the io_in_progress_lock, nobody else can see this
1177 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1178 * takes out the lock, if we'd take the lock here, we'd deadlock.
1179 */
1180
1181 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1182 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1183
1184 /*
1185 * It's just barely possible that some previous effort to create or drop a
1186 * slot with this name left a temp directory lying around. If that seems
1187 * to be the case, try to remove it. If the rmtree() fails, we'll error
1188 * out at the MakePGDirectory() below, so we don't bother checking
1189 * success.
1190 */
1191 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1192 rmtree(tmppath, true);
1193
1194 /* Create and fsync the temporary slot directory. */
1195 if (MakePGDirectory(tmppath) < 0)
1196 ereport(ERROR,
1197 (errcode_for_file_access(),
1198 errmsg("could not create directory \"%s\": %m",
1199 tmppath)));
1200 fsync_fname(tmppath, true);
1201
1202 /* Write the actual state file. */
1203 slot->dirty = true; /* signal that we really need to write */
1204 SaveSlotToPath(slot, tmppath, ERROR);
1205
1206 /* Rename the directory into place. */
1207 if (rename(tmppath, path) != 0)
1208 ereport(ERROR,
1209 (errcode_for_file_access(),
1210 errmsg("could not rename file \"%s\" to \"%s\": %m",
1211 tmppath, path)));
1212
1213 /*
1214 * If we'd now fail - really unlikely - we wouldn't know whether this slot
1215 * would persist after an OS crash or not - so, force a restart. The
1216 * restart would try to fsync this again till it works.
1217 */
1218 START_CRIT_SECTION();
1219
1220 fsync_fname(path, true);
1221 fsync_fname("pg_replslot", true);
1222
1223 END_CRIT_SECTION();
1224 }
1225
1226 /*
1227 * Shared functionality between saving and creating a replication slot.
1228 */
1229 static void
SaveSlotToPath(ReplicationSlot * slot,const char * dir,int elevel)1230 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1231 {
1232 char tmppath[MAXPGPATH];
1233 char path[MAXPGPATH];
1234 int fd;
1235 ReplicationSlotOnDisk cp;
1236 bool was_dirty;
1237
1238 /* first check whether there's something to write out */
1239 SpinLockAcquire(&slot->mutex);
1240 was_dirty = slot->dirty;
1241 slot->just_dirtied = false;
1242 SpinLockRelease(&slot->mutex);
1243
1244 /* and don't do anything if there's nothing to write */
1245 if (!was_dirty)
1246 return;
1247
1248 LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1249
1250 /* silence valgrind :( */
1251 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1252
1253 sprintf(tmppath, "%s/state.tmp", dir);
1254 sprintf(path, "%s/state", dir);
1255
1256 fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1257 if (fd < 0)
1258 {
1259 /*
1260 * If not an ERROR, then release the lock before returning. In case
1261 * of an ERROR, the error recovery path automatically releases the
1262 * lock, but no harm in explicitly releasing even in that case. Note
1263 * that LWLockRelease() could affect errno.
1264 */
1265 int save_errno = errno;
1266
1267 LWLockRelease(&slot->io_in_progress_lock);
1268 errno = save_errno;
1269 ereport(elevel,
1270 (errcode_for_file_access(),
1271 errmsg("could not create file \"%s\": %m",
1272 tmppath)));
1273 return;
1274 }
1275
1276 cp.magic = SLOT_MAGIC;
1277 INIT_CRC32C(cp.checksum);
1278 cp.version = SLOT_VERSION;
1279 cp.length = ReplicationSlotOnDiskV2Size;
1280
1281 SpinLockAcquire(&slot->mutex);
1282
1283 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1284
1285 SpinLockRelease(&slot->mutex);
1286
1287 COMP_CRC32C(cp.checksum,
1288 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1289 SnapBuildOnDiskChecksummedSize);
1290 FIN_CRC32C(cp.checksum);
1291
1292 errno = 0;
1293 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
1294 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1295 {
1296 int save_errno = errno;
1297
1298 pgstat_report_wait_end();
1299 CloseTransientFile(fd);
1300 LWLockRelease(&slot->io_in_progress_lock);
1301
1302 /* if write didn't set errno, assume problem is no disk space */
1303 errno = save_errno ? save_errno : ENOSPC;
1304 ereport(elevel,
1305 (errcode_for_file_access(),
1306 errmsg("could not write to file \"%s\": %m",
1307 tmppath)));
1308 return;
1309 }
1310 pgstat_report_wait_end();
1311
1312 /* fsync the temporary file */
1313 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
1314 if (pg_fsync(fd) != 0)
1315 {
1316 int save_errno = errno;
1317
1318 pgstat_report_wait_end();
1319 CloseTransientFile(fd);
1320 LWLockRelease(&slot->io_in_progress_lock);
1321 errno = save_errno;
1322 ereport(elevel,
1323 (errcode_for_file_access(),
1324 errmsg("could not fsync file \"%s\": %m",
1325 tmppath)));
1326 return;
1327 }
1328 pgstat_report_wait_end();
1329
1330 if (CloseTransientFile(fd))
1331 {
1332 int save_errno = errno;
1333
1334 LWLockRelease(&slot->io_in_progress_lock);
1335 errno = save_errno;
1336 ereport(elevel,
1337 (errcode_for_file_access(),
1338 errmsg("could not close file \"%s\": %m",
1339 tmppath)));
1340 return;
1341 }
1342
1343 /* rename to permanent file, fsync file and directory */
1344 if (rename(tmppath, path) != 0)
1345 {
1346 int save_errno = errno;
1347
1348 LWLockRelease(&slot->io_in_progress_lock);
1349 errno = save_errno;
1350 ereport(elevel,
1351 (errcode_for_file_access(),
1352 errmsg("could not rename file \"%s\" to \"%s\": %m",
1353 tmppath, path)));
1354 return;
1355 }
1356
1357 /*
1358 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
1359 */
1360 START_CRIT_SECTION();
1361
1362 fsync_fname(path, false);
1363 fsync_fname(dir, true);
1364 fsync_fname("pg_replslot", true);
1365
1366 END_CRIT_SECTION();
1367
1368 /*
1369 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1370 * already.
1371 */
1372 SpinLockAcquire(&slot->mutex);
1373 if (!slot->just_dirtied)
1374 slot->dirty = false;
1375 SpinLockRelease(&slot->mutex);
1376
1377 LWLockRelease(&slot->io_in_progress_lock);
1378 }
1379
1380 /*
1381 * Load a single slot from disk into memory.
1382 */
1383 static void
RestoreSlotFromDisk(const char * name)1384 RestoreSlotFromDisk(const char *name)
1385 {
1386 ReplicationSlotOnDisk cp;
1387 int i;
1388 char slotdir[MAXPGPATH + 12];
1389 char path[MAXPGPATH + 22];
1390 int fd;
1391 bool restored = false;
1392 int readBytes;
1393 pg_crc32c checksum;
1394
1395 /* no need to lock here, no concurrent access allowed yet */
1396
1397 /* delete temp file if it exists */
1398 sprintf(slotdir, "pg_replslot/%s", name);
1399 sprintf(path, "%s/state.tmp", slotdir);
1400 if (unlink(path) < 0 && errno != ENOENT)
1401 ereport(PANIC,
1402 (errcode_for_file_access(),
1403 errmsg("could not remove file \"%s\": %m", path)));
1404
1405 sprintf(path, "%s/state", slotdir);
1406
1407 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1408
1409 /* on some operating systems fsyncing a file requires O_RDWR */
1410 fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1411
1412 /*
1413 * We do not need to handle this as we are rename()ing the directory into
1414 * place only after we fsync()ed the state file.
1415 */
1416 if (fd < 0)
1417 ereport(PANIC,
1418 (errcode_for_file_access(),
1419 errmsg("could not open file \"%s\": %m", path)));
1420
1421 /*
1422 * Sync state file before we're reading from it. We might have crashed
1423 * while it wasn't synced yet and we shouldn't continue on that basis.
1424 */
1425 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
1426 if (pg_fsync(fd) != 0)
1427 ereport(PANIC,
1428 (errcode_for_file_access(),
1429 errmsg("could not fsync file \"%s\": %m",
1430 path)));
1431 pgstat_report_wait_end();
1432
1433 /* Also sync the parent directory */
1434 START_CRIT_SECTION();
1435 fsync_fname(slotdir, true);
1436 END_CRIT_SECTION();
1437
1438 /* read part of statefile that's guaranteed to be version independent */
1439 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1440 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1441 pgstat_report_wait_end();
1442 if (readBytes != ReplicationSlotOnDiskConstantSize)
1443 {
1444 if (readBytes < 0)
1445 ereport(PANIC,
1446 (errcode_for_file_access(),
1447 errmsg("could not read file \"%s\": %m", path)));
1448 else
1449 ereport(PANIC,
1450 (errcode(ERRCODE_DATA_CORRUPTED),
1451 errmsg("could not read file \"%s\": read %d of %zu",
1452 path, readBytes,
1453 (Size) ReplicationSlotOnDiskConstantSize)));
1454 }
1455
1456 /* verify magic */
1457 if (cp.magic != SLOT_MAGIC)
1458 ereport(PANIC,
1459 (errcode(ERRCODE_DATA_CORRUPTED),
1460 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1461 path, cp.magic, SLOT_MAGIC)));
1462
1463 /* verify version */
1464 if (cp.version != SLOT_VERSION)
1465 ereport(PANIC,
1466 (errcode(ERRCODE_DATA_CORRUPTED),
1467 errmsg("replication slot file \"%s\" has unsupported version %u",
1468 path, cp.version)));
1469
1470 /* boundary check on length */
1471 if (cp.length != ReplicationSlotOnDiskV2Size)
1472 ereport(PANIC,
1473 (errcode(ERRCODE_DATA_CORRUPTED),
1474 errmsg("replication slot file \"%s\" has corrupted length %u",
1475 path, cp.length)));
1476
1477 /* Now that we know the size, read the entire file */
1478 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1479 readBytes = read(fd,
1480 (char *) &cp + ReplicationSlotOnDiskConstantSize,
1481 cp.length);
1482 pgstat_report_wait_end();
1483 if (readBytes != cp.length)
1484 {
1485 if (readBytes < 0)
1486 ereport(PANIC,
1487 (errcode_for_file_access(),
1488 errmsg("could not read file \"%s\": %m", path)));
1489 else
1490 ereport(PANIC,
1491 (errcode(ERRCODE_DATA_CORRUPTED),
1492 errmsg("could not read file \"%s\": read %d of %zu",
1493 path, readBytes, (Size) cp.length)));
1494 }
1495
1496 if (CloseTransientFile(fd))
1497 ereport(PANIC,
1498 (errcode_for_file_access(),
1499 errmsg("could not close file \"%s\": %m", path)));
1500
1501 /* now verify the CRC */
1502 INIT_CRC32C(checksum);
1503 COMP_CRC32C(checksum,
1504 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1505 SnapBuildOnDiskChecksummedSize);
1506 FIN_CRC32C(checksum);
1507
1508 if (!EQ_CRC32C(checksum, cp.checksum))
1509 ereport(PANIC,
1510 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1511 path, checksum, cp.checksum)));
1512
1513 /*
1514 * If we crashed with an ephemeral slot active, don't restore but delete
1515 * it.
1516 */
1517 if (cp.slotdata.persistency != RS_PERSISTENT)
1518 {
1519 if (!rmtree(slotdir, true))
1520 {
1521 ereport(WARNING,
1522 (errmsg("could not remove directory \"%s\"",
1523 slotdir)));
1524 }
1525 fsync_fname("pg_replslot", true);
1526 return;
1527 }
1528
1529 /*
1530 * Verify that requirements for the specific slot type are met. That's
1531 * important because if these aren't met we're not guaranteed to retain
1532 * all the necessary resources for the slot.
1533 *
1534 * NB: We have to do so *after* the above checks for ephemeral slots,
1535 * because otherwise a slot that shouldn't exist anymore could prevent
1536 * restarts.
1537 *
1538 * NB: Changing the requirements here also requires adapting
1539 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
1540 */
1541 if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
1542 ereport(FATAL,
1543 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1544 errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
1545 NameStr(cp.slotdata.name)),
1546 errhint("Change wal_level to be logical or higher.")));
1547 else if (wal_level < WAL_LEVEL_REPLICA)
1548 ereport(FATAL,
1549 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1550 errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
1551 NameStr(cp.slotdata.name)),
1552 errhint("Change wal_level to be replica or higher.")));
1553
1554 /* nothing can be active yet, don't lock anything */
1555 for (i = 0; i < max_replication_slots; i++)
1556 {
1557 ReplicationSlot *slot;
1558
1559 slot = &ReplicationSlotCtl->replication_slots[i];
1560
1561 if (slot->in_use)
1562 continue;
1563
1564 /* restore the entire set of persistent data */
1565 memcpy(&slot->data, &cp.slotdata,
1566 sizeof(ReplicationSlotPersistentData));
1567
1568 /* initialize in memory state */
1569 slot->effective_xmin = cp.slotdata.xmin;
1570 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1571
1572 slot->candidate_catalog_xmin = InvalidTransactionId;
1573 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1574 slot->candidate_restart_lsn = InvalidXLogRecPtr;
1575 slot->candidate_restart_valid = InvalidXLogRecPtr;
1576
1577 slot->in_use = true;
1578 slot->active_pid = 0;
1579
1580 restored = true;
1581 break;
1582 }
1583
1584 if (!restored)
1585 ereport(FATAL,
1586 (errmsg("too many replication slots active before shutdown"),
1587 errhint("Increase max_replication_slots and try again.")));
1588 }
1589