1 /*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2017, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 * directory. Inside that directory the state file will contain the slot's
25 * own data. Additional data can be stored alongside that file if required.
26 * While the server is running, the state data is also cached in memory for
27 * efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37 #include "postgres.h"
38
39 #include <unistd.h>
40 #include <sys/stat.h>
41
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "common/string.h"
45 #include "miscadmin.h"
46 #include "pgstat.h"
47 #include "replication/slot.h"
48 #include "storage/fd.h"
49 #include "storage/proc.h"
50 #include "storage/procarray.h"
51 #include "utils/builtins.h"
52
53 /*
54 * Replication slot on-disk data structure.
55 */
56 typedef struct ReplicationSlotOnDisk
57 {
58 /* first part of this struct needs to be version independent */
59
60 /* data not covered by checksum */
61 uint32 magic;
62 pg_crc32c checksum;
63
64 /* data covered by checksum */
65 uint32 version;
66 uint32 length;
67
68 /*
69 * The actual data in the slot that follows can differ based on the above
70 * 'version'.
71 */
72
73 ReplicationSlotPersistentData slotdata;
74 } ReplicationSlotOnDisk;
75
76 /* size of version independent data */
77 #define ReplicationSlotOnDiskConstantSize \
78 offsetof(ReplicationSlotOnDisk, slotdata)
79 /* size of the part of the slot not covered by the checksum */
80 #define SnapBuildOnDiskNotChecksummedSize \
81 offsetof(ReplicationSlotOnDisk, version)
82 /* size of the part covered by the checksum */
83 #define SnapBuildOnDiskChecksummedSize \
84 sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
85 /* size of the slot data that is version dependent */
86 #define ReplicationSlotOnDiskV2Size \
87 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
88
89 #define SLOT_MAGIC 0x1051CA1 /* format identifier */
90 #define SLOT_VERSION 2 /* version for new files */
91
92 /* Control array for replication slot management */
93 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
94
95 /* My backend's replication slot in the shared memory array */
96 ReplicationSlot *MyReplicationSlot = NULL;
97
98 /* GUCs */
99 int max_replication_slots = 0; /* the maximum number of replication
100 * slots */
101
102 static void ReplicationSlotDropAcquired(void);
103 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
104
105 /* internal persistency functions */
106 static void RestoreSlotFromDisk(const char *name);
107 static void CreateSlotOnDisk(ReplicationSlot *slot);
108 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
109
110 /*
111 * Report shared-memory space needed by ReplicationSlotShmemInit.
112 */
113 Size
ReplicationSlotsShmemSize(void)114 ReplicationSlotsShmemSize(void)
115 {
116 Size size = 0;
117
118 if (max_replication_slots == 0)
119 return size;
120
121 size = offsetof(ReplicationSlotCtlData, replication_slots);
122 size = add_size(size,
123 mul_size(max_replication_slots, sizeof(ReplicationSlot)));
124
125 return size;
126 }
127
128 /*
129 * Allocate and initialize shared memory for replication slots.
130 */
131 void
ReplicationSlotsShmemInit(void)132 ReplicationSlotsShmemInit(void)
133 {
134 bool found;
135
136 if (max_replication_slots == 0)
137 return;
138
139 ReplicationSlotCtl = (ReplicationSlotCtlData *)
140 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
141 &found);
142
143 LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
144 "replication_slot_io");
145
146 if (!found)
147 {
148 int i;
149
150 /* First time through, so initialize */
151 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
152
153 for (i = 0; i < max_replication_slots; i++)
154 {
155 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
156
157 /* everything else is zeroed by the memset above */
158 SpinLockInit(&slot->mutex);
159 LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
160 ConditionVariableInit(&slot->active_cv);
161 }
162 }
163 }
164
165 /*
166 * Check whether the passed slot name is valid and report errors at elevel.
167 *
168 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
169 * the name to be used as a directory name on every supported OS.
170 *
171 * Returns whether the directory name is valid or not if elevel < ERROR.
172 */
173 bool
ReplicationSlotValidateName(const char * name,int elevel)174 ReplicationSlotValidateName(const char *name, int elevel)
175 {
176 const char *cp;
177
178 if (strlen(name) == 0)
179 {
180 ereport(elevel,
181 (errcode(ERRCODE_INVALID_NAME),
182 errmsg("replication slot name \"%s\" is too short",
183 name)));
184 return false;
185 }
186
187 if (strlen(name) >= NAMEDATALEN)
188 {
189 ereport(elevel,
190 (errcode(ERRCODE_NAME_TOO_LONG),
191 errmsg("replication slot name \"%s\" is too long",
192 name)));
193 return false;
194 }
195
196 for (cp = name; *cp; cp++)
197 {
198 if (!((*cp >= 'a' && *cp <= 'z')
199 || (*cp >= '0' && *cp <= '9')
200 || (*cp == '_')))
201 {
202 ereport(elevel,
203 (errcode(ERRCODE_INVALID_NAME),
204 errmsg("replication slot name \"%s\" contains invalid character",
205 name),
206 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
207 return false;
208 }
209 }
210 return true;
211 }
212
213 /*
214 * Create a new replication slot and mark it as used by this backend.
215 *
216 * name: Name of the slot
217 * db_specific: logical decoding is db specific; if the slot is going to
218 * be used for that pass true, otherwise false.
219 */
220 void
ReplicationSlotCreate(const char * name,bool db_specific,ReplicationSlotPersistency persistency)221 ReplicationSlotCreate(const char *name, bool db_specific,
222 ReplicationSlotPersistency persistency)
223 {
224 ReplicationSlot *slot = NULL;
225 int i;
226
227 Assert(MyReplicationSlot == NULL);
228
229 ReplicationSlotValidateName(name, ERROR);
230
231 /*
232 * If some other backend ran this code concurrently with us, we'd likely
233 * both allocate the same slot, and that would be bad. We'd also be at
234 * risk of missing a name collision. Also, we don't want to try to create
235 * a new slot while somebody's busy cleaning up an old one, because we
236 * might both be monkeying with the same directory.
237 */
238 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
239
240 /*
241 * Check for name collision, and identify an allocatable slot. We need to
242 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
243 * else can change the in_use flags while we're looking at them.
244 */
245 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
246 for (i = 0; i < max_replication_slots; i++)
247 {
248 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
249
250 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
251 ereport(ERROR,
252 (errcode(ERRCODE_DUPLICATE_OBJECT),
253 errmsg("replication slot \"%s\" already exists", name)));
254 if (!s->in_use && slot == NULL)
255 slot = s;
256 }
257 LWLockRelease(ReplicationSlotControlLock);
258
259 /* If all slots are in use, we're out of luck. */
260 if (slot == NULL)
261 ereport(ERROR,
262 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
263 errmsg("all replication slots are in use"),
264 errhint("Free one or increase max_replication_slots.")));
265
266 /*
267 * Since this slot is not in use, nobody should be looking at any part of
268 * it other than the in_use field unless they're trying to allocate it.
269 * And since we hold ReplicationSlotAllocationLock, nobody except us can
270 * be doing that. So it's safe to initialize the slot.
271 */
272 Assert(!slot->in_use);
273 Assert(slot->active_pid == 0);
274
275 /* first initialize persistent data */
276 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
277 StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
278 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
279 slot->data.persistency = persistency;
280
281 /* and then data only present in shared memory */
282 slot->just_dirtied = false;
283 slot->dirty = false;
284 slot->effective_xmin = InvalidTransactionId;
285 slot->effective_catalog_xmin = InvalidTransactionId;
286 slot->candidate_catalog_xmin = InvalidTransactionId;
287 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
288 slot->candidate_restart_valid = InvalidXLogRecPtr;
289 slot->candidate_restart_lsn = InvalidXLogRecPtr;
290
291 /*
292 * Create the slot on disk. We haven't actually marked the slot allocated
293 * yet, so no special cleanup is required if this errors out.
294 */
295 CreateSlotOnDisk(slot);
296
297 /*
298 * We need to briefly prevent any other backend from iterating over the
299 * slots while we flip the in_use flag. We also need to set the active
300 * flag while holding the ControlLock as otherwise a concurrent
301 * SlotAcquire() could acquire the slot as well.
302 */
303 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
304
305 slot->in_use = true;
306
307 /* We can now mark the slot active, and that makes it our slot. */
308 SpinLockAcquire(&slot->mutex);
309 Assert(slot->active_pid == 0);
310 slot->active_pid = MyProcPid;
311 SpinLockRelease(&slot->mutex);
312 MyReplicationSlot = slot;
313
314 LWLockRelease(ReplicationSlotControlLock);
315
316 /*
317 * Now that the slot has been marked as in_use and active, it's safe to
318 * let somebody else try to allocate a slot.
319 */
320 LWLockRelease(ReplicationSlotAllocationLock);
321
322 /* Let everybody know we've modified this slot */
323 ConditionVariableBroadcast(&slot->active_cv);
324 }
325
326 /*
327 * Find a previously created slot and mark it as used by this backend.
328 */
329 void
ReplicationSlotAcquire(const char * name,bool nowait)330 ReplicationSlotAcquire(const char *name, bool nowait)
331 {
332 ReplicationSlot *slot;
333 int active_pid;
334 int i;
335
336 retry:
337 Assert(MyReplicationSlot == NULL);
338
339 /*
340 * Search for the named slot and mark it active if we find it. If the
341 * slot is already active, we exit the loop with active_pid set to the PID
342 * of the backend that owns it.
343 */
344 active_pid = 0;
345 slot = NULL;
346 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
347 for (i = 0; i < max_replication_slots; i++)
348 {
349 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
350
351 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
352 {
353 /*
354 * This is the slot we want; check if it's active under some other
355 * process. In single user mode, we don't need this check.
356 */
357 if (IsUnderPostmaster)
358 {
359 /*
360 * Get ready to sleep on it in case it is active. (We may end
361 * up not sleeping, but we don't want to do this while holding
362 * the spinlock.)
363 */
364 ConditionVariablePrepareToSleep(&s->active_cv);
365
366 SpinLockAcquire(&s->mutex);
367
368 active_pid = s->active_pid;
369 if (active_pid == 0)
370 active_pid = s->active_pid = MyProcPid;
371
372 SpinLockRelease(&s->mutex);
373 }
374 else
375 active_pid = MyProcPid;
376 slot = s;
377
378 break;
379 }
380 }
381 LWLockRelease(ReplicationSlotControlLock);
382
383 /* If we did not find the slot, error out. */
384 if (slot == NULL)
385 ereport(ERROR,
386 (errcode(ERRCODE_UNDEFINED_OBJECT),
387 errmsg("replication slot \"%s\" does not exist", name)));
388
389 /*
390 * If we found the slot but it's already active in another backend, we
391 * either error out or retry after a short wait, as caller specified.
392 */
393 if (active_pid != MyProcPid)
394 {
395 if (nowait)
396 ereport(ERROR,
397 (errcode(ERRCODE_OBJECT_IN_USE),
398 errmsg("replication slot \"%s\" is active for PID %d",
399 name, active_pid)));
400
401 /* Wait here until we get signaled, and then restart */
402 ConditionVariableSleep(&slot->active_cv,
403 WAIT_EVENT_REPLICATION_SLOT_DROP);
404 ConditionVariableCancelSleep();
405 goto retry;
406 }
407 else
408 ConditionVariableCancelSleep(); /* no sleep needed after all */
409
410 /* Let everybody know we've modified this slot */
411 ConditionVariableBroadcast(&slot->active_cv);
412
413 /* We made this slot active, so it's ours now. */
414 MyReplicationSlot = slot;
415 }
416
417 /*
418 * Release the replication slot that this backend considers to own.
419 *
420 * This or another backend can re-acquire the slot later.
421 * Resources this slot requires will be preserved.
422 */
423 void
ReplicationSlotRelease(void)424 ReplicationSlotRelease(void)
425 {
426 ReplicationSlot *slot = MyReplicationSlot;
427
428 Assert(slot != NULL && slot->active_pid != 0);
429
430 if (slot->data.persistency == RS_EPHEMERAL)
431 {
432 /*
433 * Delete the slot. There is no !PANIC case where this is allowed to
434 * fail, all that may happen is an incomplete cleanup of the on-disk
435 * data.
436 */
437 ReplicationSlotDropAcquired();
438 }
439
440 /*
441 * If slot needed to temporarily restrain both data and catalog xmin to
442 * create the catalog snapshot, remove that temporary constraint.
443 * Snapshots can only be exported while the initial snapshot is still
444 * acquired.
445 */
446 if (!TransactionIdIsValid(slot->data.xmin) &&
447 TransactionIdIsValid(slot->effective_xmin))
448 {
449 SpinLockAcquire(&slot->mutex);
450 slot->effective_xmin = InvalidTransactionId;
451 SpinLockRelease(&slot->mutex);
452 ReplicationSlotsComputeRequiredXmin(false);
453 }
454
455 if (slot->data.persistency == RS_PERSISTENT)
456 {
457 /*
458 * Mark persistent slot inactive. We're not freeing it, just
459 * disconnecting, but wake up others that may be waiting for it.
460 */
461 SpinLockAcquire(&slot->mutex);
462 slot->active_pid = 0;
463 SpinLockRelease(&slot->mutex);
464 ConditionVariableBroadcast(&slot->active_cv);
465 }
466
467 MyReplicationSlot = NULL;
468
469 /* might not have been set when we've been a plain slot */
470 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
471 MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
472 LWLockRelease(ProcArrayLock);
473 }
474
475 /*
476 * Cleanup all temporary slots created in current session.
477 */
478 void
ReplicationSlotCleanup(void)479 ReplicationSlotCleanup(void)
480 {
481 int i;
482
483 Assert(MyReplicationSlot == NULL);
484
485 restart:
486 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
487 for (i = 0; i < max_replication_slots; i++)
488 {
489 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
490
491 if (!s->in_use)
492 continue;
493
494 SpinLockAcquire(&s->mutex);
495 if (s->active_pid == MyProcPid)
496 {
497 Assert(s->data.persistency == RS_TEMPORARY);
498 SpinLockRelease(&s->mutex);
499 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
500
501 ReplicationSlotDropPtr(s);
502
503 ConditionVariableBroadcast(&s->active_cv);
504 goto restart;
505 }
506 else
507 SpinLockRelease(&s->mutex);
508 }
509
510 LWLockRelease(ReplicationSlotControlLock);
511 }
512
513 /*
514 * Permanently drop replication slot identified by the passed in name.
515 */
516 void
ReplicationSlotDrop(const char * name,bool nowait)517 ReplicationSlotDrop(const char *name, bool nowait)
518 {
519 Assert(MyReplicationSlot == NULL);
520
521 ReplicationSlotAcquire(name, nowait);
522
523 ReplicationSlotDropAcquired();
524 }
525
526 /*
527 * Permanently drop the currently acquired replication slot.
528 */
529 static void
ReplicationSlotDropAcquired(void)530 ReplicationSlotDropAcquired(void)
531 {
532 ReplicationSlot *slot = MyReplicationSlot;
533
534 Assert(MyReplicationSlot != NULL);
535
536 /* slot isn't acquired anymore */
537 MyReplicationSlot = NULL;
538
539 ReplicationSlotDropPtr(slot);
540 }
541
542 /*
543 * Permanently drop the replication slot which will be released by the point
544 * this function returns.
545 */
546 static void
ReplicationSlotDropPtr(ReplicationSlot * slot)547 ReplicationSlotDropPtr(ReplicationSlot *slot)
548 {
549 char path[MAXPGPATH];
550 char tmppath[MAXPGPATH];
551
552 /*
553 * If some other backend ran this code concurrently with us, we might try
554 * to delete a slot with a certain name while someone else was trying to
555 * create a slot with the same name.
556 */
557 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
558
559 /* Generate pathnames. */
560 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
561 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
562
563 /*
564 * Rename the slot directory on disk, so that we'll no longer recognize
565 * this as a valid slot. Note that if this fails, we've got to mark the
566 * slot inactive before bailing out. If we're dropping an ephemeral or a
567 * temporary slot, we better never fail hard as the caller won't expect
568 * the slot to survive and this might get called during error handling.
569 */
570 if (rename(path, tmppath) == 0)
571 {
572 /*
573 * We need to fsync() the directory we just renamed and its parent to
574 * make sure that our changes are on disk in a crash-safe fashion. If
575 * fsync() fails, we can't be sure whether the changes are on disk or
576 * not. For now, we handle that by panicking;
577 * StartupReplicationSlots() will try to straighten it out after
578 * restart.
579 */
580 START_CRIT_SECTION();
581 fsync_fname(tmppath, true);
582 fsync_fname("pg_replslot", true);
583 END_CRIT_SECTION();
584 }
585 else
586 {
587 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
588
589 SpinLockAcquire(&slot->mutex);
590 slot->active_pid = 0;
591 SpinLockRelease(&slot->mutex);
592
593 /* wake up anyone waiting on this slot */
594 ConditionVariableBroadcast(&slot->active_cv);
595
596 ereport(fail_softly ? WARNING : ERROR,
597 (errcode_for_file_access(),
598 errmsg("could not rename file \"%s\" to \"%s\": %m",
599 path, tmppath)));
600 }
601
602 /*
603 * The slot is definitely gone. Lock out concurrent scans of the array
604 * long enough to kill it. It's OK to clear the active PID here without
605 * grabbing the mutex because nobody else can be scanning the array here,
606 * and nobody can be attached to this slot and thus access it without
607 * scanning the array.
608 *
609 * Also wake up processes waiting for it.
610 */
611 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
612 slot->active_pid = 0;
613 slot->in_use = false;
614 LWLockRelease(ReplicationSlotControlLock);
615 ConditionVariableBroadcast(&slot->active_cv);
616
617 /*
618 * Slot is dead and doesn't prevent resource removal anymore, recompute
619 * limits.
620 */
621 ReplicationSlotsComputeRequiredXmin(false);
622 ReplicationSlotsComputeRequiredLSN();
623
624 /*
625 * If removing the directory fails, the worst thing that will happen is
626 * that the user won't be able to create a new slot with the same name
627 * until the next server restart. We warn about it, but that's all.
628 */
629 if (!rmtree(tmppath, true))
630 ereport(WARNING,
631 (errcode_for_file_access(),
632 errmsg("could not remove directory \"%s\"", tmppath)));
633
634 /*
635 * We release this at the very end, so that nobody starts trying to create
636 * a slot while we're still cleaning up the detritus of the old one.
637 */
638 LWLockRelease(ReplicationSlotAllocationLock);
639 }
640
641 /*
642 * Serialize the currently acquired slot's state from memory to disk, thereby
643 * guaranteeing the current state will survive a crash.
644 */
645 void
ReplicationSlotSave(void)646 ReplicationSlotSave(void)
647 {
648 char path[MAXPGPATH];
649
650 Assert(MyReplicationSlot != NULL);
651
652 sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
653 SaveSlotToPath(MyReplicationSlot, path, ERROR);
654 }
655
656 /*
657 * Signal that it would be useful if the currently acquired slot would be
658 * flushed out to disk.
659 *
660 * Note that the actual flush to disk can be delayed for a long time, if
661 * required for correctness explicitly do a ReplicationSlotSave().
662 */
663 void
ReplicationSlotMarkDirty(void)664 ReplicationSlotMarkDirty(void)
665 {
666 ReplicationSlot *slot = MyReplicationSlot;
667
668 Assert(MyReplicationSlot != NULL);
669
670 SpinLockAcquire(&slot->mutex);
671 MyReplicationSlot->just_dirtied = true;
672 MyReplicationSlot->dirty = true;
673 SpinLockRelease(&slot->mutex);
674 }
675
676 /*
677 * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
678 * guaranteeing it will be there after an eventual crash.
679 */
680 void
ReplicationSlotPersist(void)681 ReplicationSlotPersist(void)
682 {
683 ReplicationSlot *slot = MyReplicationSlot;
684
685 Assert(slot != NULL);
686 Assert(slot->data.persistency != RS_PERSISTENT);
687
688 SpinLockAcquire(&slot->mutex);
689 slot->data.persistency = RS_PERSISTENT;
690 SpinLockRelease(&slot->mutex);
691
692 ReplicationSlotMarkDirty();
693 ReplicationSlotSave();
694 }
695
696 /*
697 * Compute the oldest xmin across all slots and store it in the ProcArray.
698 *
699 * If already_locked is true, ProcArrayLock has already been acquired
700 * exclusively.
701 */
702 void
ReplicationSlotsComputeRequiredXmin(bool already_locked)703 ReplicationSlotsComputeRequiredXmin(bool already_locked)
704 {
705 int i;
706 TransactionId agg_xmin = InvalidTransactionId;
707 TransactionId agg_catalog_xmin = InvalidTransactionId;
708
709 Assert(ReplicationSlotCtl != NULL);
710
711 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
712
713 for (i = 0; i < max_replication_slots; i++)
714 {
715 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
716 TransactionId effective_xmin;
717 TransactionId effective_catalog_xmin;
718
719 if (!s->in_use)
720 continue;
721
722 SpinLockAcquire(&s->mutex);
723 effective_xmin = s->effective_xmin;
724 effective_catalog_xmin = s->effective_catalog_xmin;
725 SpinLockRelease(&s->mutex);
726
727 /* check the data xmin */
728 if (TransactionIdIsValid(effective_xmin) &&
729 (!TransactionIdIsValid(agg_xmin) ||
730 TransactionIdPrecedes(effective_xmin, agg_xmin)))
731 agg_xmin = effective_xmin;
732
733 /* check the catalog xmin */
734 if (TransactionIdIsValid(effective_catalog_xmin) &&
735 (!TransactionIdIsValid(agg_catalog_xmin) ||
736 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
737 agg_catalog_xmin = effective_catalog_xmin;
738 }
739
740 LWLockRelease(ReplicationSlotControlLock);
741
742 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
743 }
744
745 /*
746 * Compute the oldest restart LSN across all slots and inform xlog module.
747 */
748 void
ReplicationSlotsComputeRequiredLSN(void)749 ReplicationSlotsComputeRequiredLSN(void)
750 {
751 int i;
752 XLogRecPtr min_required = InvalidXLogRecPtr;
753
754 Assert(ReplicationSlotCtl != NULL);
755
756 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
757 for (i = 0; i < max_replication_slots; i++)
758 {
759 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
760 XLogRecPtr restart_lsn;
761
762 if (!s->in_use)
763 continue;
764
765 SpinLockAcquire(&s->mutex);
766 restart_lsn = s->data.restart_lsn;
767 SpinLockRelease(&s->mutex);
768
769 if (restart_lsn != InvalidXLogRecPtr &&
770 (min_required == InvalidXLogRecPtr ||
771 restart_lsn < min_required))
772 min_required = restart_lsn;
773 }
774 LWLockRelease(ReplicationSlotControlLock);
775
776 XLogSetReplicationSlotMinimumLSN(min_required);
777 }
778
779 /*
780 * Compute the oldest WAL LSN required by *logical* decoding slots..
781 *
782 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
783 * slots exist.
784 *
785 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
786 * ignores physical replication slots.
787 *
788 * The results aren't required frequently, so we don't maintain a precomputed
789 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
790 */
791 XLogRecPtr
ReplicationSlotsComputeLogicalRestartLSN(void)792 ReplicationSlotsComputeLogicalRestartLSN(void)
793 {
794 XLogRecPtr result = InvalidXLogRecPtr;
795 int i;
796
797 if (max_replication_slots <= 0)
798 return InvalidXLogRecPtr;
799
800 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
801
802 for (i = 0; i < max_replication_slots; i++)
803 {
804 ReplicationSlot *s;
805 XLogRecPtr restart_lsn;
806
807 s = &ReplicationSlotCtl->replication_slots[i];
808
809 /* cannot change while ReplicationSlotCtlLock is held */
810 if (!s->in_use)
811 continue;
812
813 /* we're only interested in logical slots */
814 if (!SlotIsLogical(s))
815 continue;
816
817 /* read once, it's ok if it increases while we're checking */
818 SpinLockAcquire(&s->mutex);
819 restart_lsn = s->data.restart_lsn;
820 SpinLockRelease(&s->mutex);
821
822 if (result == InvalidXLogRecPtr ||
823 restart_lsn < result)
824 result = restart_lsn;
825 }
826
827 LWLockRelease(ReplicationSlotControlLock);
828
829 return result;
830 }
831
832 /*
833 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
834 * passed database oid.
835 *
836 * Returns true if there are any slots referencing the database. *nslots will
837 * be set to the absolute number of slots in the database, *nactive to ones
838 * currently active.
839 */
840 bool
ReplicationSlotsCountDBSlots(Oid dboid,int * nslots,int * nactive)841 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
842 {
843 int i;
844
845 *nslots = *nactive = 0;
846
847 if (max_replication_slots <= 0)
848 return false;
849
850 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
851 for (i = 0; i < max_replication_slots; i++)
852 {
853 ReplicationSlot *s;
854
855 s = &ReplicationSlotCtl->replication_slots[i];
856
857 /* cannot change while ReplicationSlotCtlLock is held */
858 if (!s->in_use)
859 continue;
860
861 /* only logical slots are database specific, skip */
862 if (!SlotIsLogical(s))
863 continue;
864
865 /* not our database, skip */
866 if (s->data.database != dboid)
867 continue;
868
869 /* count slots with spinlock held */
870 SpinLockAcquire(&s->mutex);
871 (*nslots)++;
872 if (s->active_pid != 0)
873 (*nactive)++;
874 SpinLockRelease(&s->mutex);
875 }
876 LWLockRelease(ReplicationSlotControlLock);
877
878 if (*nslots > 0)
879 return true;
880 return false;
881 }
882
883 /*
884 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
885 * passed database oid. The caller should hold an exclusive lock on the
886 * pg_database oid for the database to prevent creation of new slots on the db
887 * or replay from existing slots.
888 *
889 * Another session that concurrently acquires an existing slot on the target DB
890 * (most likely to drop it) may cause this function to ERROR. If that happens
891 * it may have dropped some but not all slots.
892 *
893 * This routine isn't as efficient as it could be - but we don't drop
894 * databases often, especially databases with lots of slots.
895 */
896 void
ReplicationSlotsDropDBSlots(Oid dboid)897 ReplicationSlotsDropDBSlots(Oid dboid)
898 {
899 int i;
900
901 if (max_replication_slots <= 0)
902 return;
903
904 restart:
905 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
906 for (i = 0; i < max_replication_slots; i++)
907 {
908 ReplicationSlot *s;
909 char *slotname;
910 int active_pid;
911
912 s = &ReplicationSlotCtl->replication_slots[i];
913
914 /* cannot change while ReplicationSlotCtlLock is held */
915 if (!s->in_use)
916 continue;
917
918 /* only logical slots are database specific, skip */
919 if (!SlotIsLogical(s))
920 continue;
921
922 /* not our database, skip */
923 if (s->data.database != dboid)
924 continue;
925
926 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
927 SpinLockAcquire(&s->mutex);
928 /* can't change while ReplicationSlotControlLock is held */
929 slotname = NameStr(s->data.name);
930 active_pid = s->active_pid;
931 if (active_pid == 0)
932 {
933 MyReplicationSlot = s;
934 s->active_pid = MyProcPid;
935 }
936 SpinLockRelease(&s->mutex);
937
938 /*
939 * Even though we hold an exclusive lock on the database object a
940 * logical slot for that DB can still be active, e.g. if it's
941 * concurrently being dropped by a backend connected to another DB.
942 *
943 * That's fairly unlikely in practice, so we'll just bail out.
944 */
945 if (active_pid)
946 ereport(ERROR,
947 (errcode(ERRCODE_OBJECT_IN_USE),
948 errmsg("replication slot \"%s\" is active for PID %d",
949 slotname, active_pid)));
950
951 /*
952 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
953 * holding ReplicationSlotControlLock over filesystem operations,
954 * release ReplicationSlotControlLock and use
955 * ReplicationSlotDropAcquired.
956 *
957 * As that means the set of slots could change, restart scan from the
958 * beginning each time we release the lock.
959 */
960 LWLockRelease(ReplicationSlotControlLock);
961 ReplicationSlotDropAcquired();
962 goto restart;
963 }
964 LWLockRelease(ReplicationSlotControlLock);
965 }
966
967
968 /*
969 * Check whether the server's configuration supports using replication
970 * slots.
971 */
972 void
CheckSlotRequirements(void)973 CheckSlotRequirements(void)
974 {
975 /*
976 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
977 * needs the same check.
978 */
979
980 if (max_replication_slots == 0)
981 ereport(ERROR,
982 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
983 (errmsg("replication slots can only be used if max_replication_slots > 0"))));
984
985 if (wal_level < WAL_LEVEL_REPLICA)
986 ereport(ERROR,
987 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
988 errmsg("replication slots can only be used if wal_level >= replica")));
989 }
990
991 /*
992 * Reserve WAL for the currently active slot.
993 *
994 * Compute and set restart_lsn in a manner that's appropriate for the type of
995 * the slot and concurrency safe.
996 */
997 void
ReplicationSlotReserveWal(void)998 ReplicationSlotReserveWal(void)
999 {
1000 ReplicationSlot *slot = MyReplicationSlot;
1001
1002 Assert(slot != NULL);
1003 Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1004
1005 /*
1006 * The replication slot mechanism is used to prevent removal of required
1007 * WAL. As there is no interlock between this routine and checkpoints, WAL
1008 * segments could concurrently be removed when a now stale return value of
1009 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1010 * this happens we'll just retry.
1011 */
1012 while (true)
1013 {
1014 XLogSegNo segno;
1015
1016 /*
1017 * For logical slots log a standby snapshot and start logical decoding
1018 * at exactly that position. That allows the slot to start up more
1019 * quickly.
1020 *
1021 * That's not needed (or indeed helpful) for physical slots as they'll
1022 * start replay at the last logged checkpoint anyway. Instead return
1023 * the location of the last redo LSN. While that slightly increases
1024 * the chance that we have to retry, it's where a base backup has to
1025 * start replay at.
1026 */
1027 if (!RecoveryInProgress() && SlotIsLogical(slot))
1028 {
1029 XLogRecPtr flushptr;
1030
1031 /* start at current insert position */
1032 slot->data.restart_lsn = GetXLogInsertRecPtr();
1033
1034 /* make sure we have enough information to start */
1035 flushptr = LogStandbySnapshot();
1036
1037 /* and make sure it's fsynced to disk */
1038 XLogFlush(flushptr);
1039 }
1040 else
1041 {
1042 slot->data.restart_lsn = GetRedoRecPtr();
1043 }
1044
1045 /* prevent WAL removal as fast as possible */
1046 ReplicationSlotsComputeRequiredLSN();
1047
1048 /*
1049 * If all required WAL is still there, great, otherwise retry. The
1050 * slot should prevent further removal of WAL, unless there's a
1051 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1052 * the new restart_lsn above, so normally we should never need to loop
1053 * more than twice.
1054 */
1055 XLByteToSeg(slot->data.restart_lsn, segno);
1056 if (XLogGetLastRemovedSegno() < segno)
1057 break;
1058 }
1059 }
1060
1061 /*
1062 * Flush all replication slots to disk.
1063 *
1064 * This needn't actually be part of a checkpoint, but it's a convenient
1065 * location.
1066 */
1067 void
CheckPointReplicationSlots(void)1068 CheckPointReplicationSlots(void)
1069 {
1070 int i;
1071
1072 elog(DEBUG1, "performing replication slot checkpoint");
1073
1074 /*
1075 * Prevent any slot from being created/dropped while we're active. As we
1076 * explicitly do *not* want to block iterating over replication_slots or
1077 * acquiring a slot we cannot take the control lock - but that's OK,
1078 * because holding ReplicationSlotAllocationLock is strictly stronger, and
1079 * enough to guarantee that nobody can change the in_use bits on us.
1080 */
1081 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1082
1083 for (i = 0; i < max_replication_slots; i++)
1084 {
1085 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1086 char path[MAXPGPATH];
1087
1088 if (!s->in_use)
1089 continue;
1090
1091 /* save the slot to disk, locking is handled in SaveSlotToPath() */
1092 sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1093 SaveSlotToPath(s, path, LOG);
1094 }
1095 LWLockRelease(ReplicationSlotAllocationLock);
1096 }
1097
1098 /*
1099 * Load all replication slots from disk into memory at server startup. This
1100 * needs to be run before we start crash recovery.
1101 */
1102 void
StartupReplicationSlots(void)1103 StartupReplicationSlots(void)
1104 {
1105 DIR *replication_dir;
1106 struct dirent *replication_de;
1107
1108 elog(DEBUG1, "starting up replication slots");
1109
1110 /* restore all slots by iterating over all on-disk entries */
1111 replication_dir = AllocateDir("pg_replslot");
1112 while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1113 {
1114 struct stat statbuf;
1115 char path[MAXPGPATH + 12];
1116
1117 if (strcmp(replication_de->d_name, ".") == 0 ||
1118 strcmp(replication_de->d_name, "..") == 0)
1119 continue;
1120
1121 snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1122
1123 /* we're only creating directories here, skip if it's not our's */
1124 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
1125 continue;
1126
1127 /* we crashed while a slot was being setup or deleted, clean up */
1128 if (pg_str_endswith(replication_de->d_name, ".tmp"))
1129 {
1130 if (!rmtree(path, true))
1131 {
1132 ereport(WARNING,
1133 (errcode_for_file_access(),
1134 errmsg("could not remove directory \"%s\"", path)));
1135 continue;
1136 }
1137 fsync_fname("pg_replslot", true);
1138 continue;
1139 }
1140
1141 /* looks like a slot in a normal state, restore */
1142 RestoreSlotFromDisk(replication_de->d_name);
1143 }
1144 FreeDir(replication_dir);
1145
1146 /* currently no slots exist, we're done. */
1147 if (max_replication_slots <= 0)
1148 return;
1149
1150 /* Now that we have recovered all the data, compute replication xmin */
1151 ReplicationSlotsComputeRequiredXmin(false);
1152 ReplicationSlotsComputeRequiredLSN();
1153 }
1154
1155 /* ----
1156 * Manipulation of on-disk state of replication slots
1157 *
1158 * NB: none of the routines below should take any notice whether a slot is the
1159 * current one or not, that's all handled a layer above.
1160 * ----
1161 */
1162 static void
CreateSlotOnDisk(ReplicationSlot * slot)1163 CreateSlotOnDisk(ReplicationSlot *slot)
1164 {
1165 char tmppath[MAXPGPATH];
1166 char path[MAXPGPATH];
1167 struct stat st;
1168
1169 /*
1170 * No need to take out the io_in_progress_lock, nobody else can see this
1171 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1172 * takes out the lock, if we'd take the lock here, we'd deadlock.
1173 */
1174
1175 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1176 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1177
1178 /*
1179 * It's just barely possible that some previous effort to create or drop a
1180 * slot with this name left a temp directory lying around. If that seems
1181 * to be the case, try to remove it. If the rmtree() fails, we'll error
1182 * out at the mkdir() below, so we don't bother checking success.
1183 */
1184 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1185 rmtree(tmppath, true);
1186
1187 /* Create and fsync the temporary slot directory. */
1188 if (mkdir(tmppath, S_IRWXU) < 0)
1189 ereport(ERROR,
1190 (errcode_for_file_access(),
1191 errmsg("could not create directory \"%s\": %m",
1192 tmppath)));
1193 fsync_fname(tmppath, true);
1194
1195 /* Write the actual state file. */
1196 slot->dirty = true; /* signal that we really need to write */
1197 SaveSlotToPath(slot, tmppath, ERROR);
1198
1199 /* Rename the directory into place. */
1200 if (rename(tmppath, path) != 0)
1201 ereport(ERROR,
1202 (errcode_for_file_access(),
1203 errmsg("could not rename file \"%s\" to \"%s\": %m",
1204 tmppath, path)));
1205
1206 /*
1207 * If we'd now fail - really unlikely - we wouldn't know whether this slot
1208 * would persist after an OS crash or not - so, force a restart. The
1209 * restart would try to fsync this again till it works.
1210 */
1211 START_CRIT_SECTION();
1212
1213 fsync_fname(path, true);
1214 fsync_fname("pg_replslot", true);
1215
1216 END_CRIT_SECTION();
1217 }
1218
1219 /*
1220 * Shared functionality between saving and creating a replication slot.
1221 */
1222 static void
SaveSlotToPath(ReplicationSlot * slot,const char * dir,int elevel)1223 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1224 {
1225 char tmppath[MAXPGPATH];
1226 char path[MAXPGPATH];
1227 int fd;
1228 ReplicationSlotOnDisk cp;
1229 bool was_dirty;
1230
1231 /* first check whether there's something to write out */
1232 SpinLockAcquire(&slot->mutex);
1233 was_dirty = slot->dirty;
1234 slot->just_dirtied = false;
1235 SpinLockRelease(&slot->mutex);
1236
1237 /* and don't do anything if there's nothing to write */
1238 if (!was_dirty)
1239 return;
1240
1241 LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1242
1243 /* silence valgrind :( */
1244 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1245
1246 sprintf(tmppath, "%s/state.tmp", dir);
1247 sprintf(path, "%s/state", dir);
1248
1249 fd = OpenTransientFile(tmppath,
1250 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1251 S_IRUSR | S_IWUSR);
1252 if (fd < 0)
1253 {
1254 /*
1255 * If not an ERROR, then release the lock before returning. In case
1256 * of an ERROR, the error recovery path automatically releases the
1257 * lock, but no harm in explicitly releasing even in that case. Note
1258 * that LWLockRelease() could affect errno.
1259 */
1260 int save_errno = errno;
1261
1262 LWLockRelease(&slot->io_in_progress_lock);
1263 errno = save_errno;
1264 ereport(elevel,
1265 (errcode_for_file_access(),
1266 errmsg("could not create file \"%s\": %m",
1267 tmppath)));
1268 return;
1269 }
1270
1271 cp.magic = SLOT_MAGIC;
1272 INIT_CRC32C(cp.checksum);
1273 cp.version = SLOT_VERSION;
1274 cp.length = ReplicationSlotOnDiskV2Size;
1275
1276 SpinLockAcquire(&slot->mutex);
1277
1278 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1279
1280 SpinLockRelease(&slot->mutex);
1281
1282 COMP_CRC32C(cp.checksum,
1283 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1284 SnapBuildOnDiskChecksummedSize);
1285 FIN_CRC32C(cp.checksum);
1286
1287 errno = 0;
1288 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
1289 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1290 {
1291 int save_errno = errno;
1292
1293 pgstat_report_wait_end();
1294 CloseTransientFile(fd);
1295 LWLockRelease(&slot->io_in_progress_lock);
1296
1297 /* if write didn't set errno, assume problem is no disk space */
1298 errno = save_errno ? save_errno : ENOSPC;
1299 ereport(elevel,
1300 (errcode_for_file_access(),
1301 errmsg("could not write to file \"%s\": %m",
1302 tmppath)));
1303 return;
1304 }
1305 pgstat_report_wait_end();
1306
1307 /* fsync the temporary file */
1308 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
1309 if (pg_fsync(fd) != 0)
1310 {
1311 int save_errno = errno;
1312
1313 pgstat_report_wait_end();
1314 CloseTransientFile(fd);
1315 LWLockRelease(&slot->io_in_progress_lock);
1316 errno = save_errno;
1317 ereport(elevel,
1318 (errcode_for_file_access(),
1319 errmsg("could not fsync file \"%s\": %m",
1320 tmppath)));
1321 return;
1322 }
1323 pgstat_report_wait_end();
1324
1325 CloseTransientFile(fd);
1326
1327 /* rename to permanent file, fsync file and directory */
1328 if (rename(tmppath, path) != 0)
1329 {
1330 int save_errno = errno;
1331
1332 LWLockRelease(&slot->io_in_progress_lock);
1333 errno = save_errno;
1334 ereport(elevel,
1335 (errcode_for_file_access(),
1336 errmsg("could not rename file \"%s\" to \"%s\": %m",
1337 tmppath, path)));
1338 return;
1339 }
1340
1341 /* Check CreateSlot() for the reasoning of using a crit. section. */
1342 START_CRIT_SECTION();
1343
1344 fsync_fname(path, false);
1345 fsync_fname(dir, true);
1346 fsync_fname("pg_replslot", true);
1347
1348 END_CRIT_SECTION();
1349
1350 /*
1351 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1352 * already.
1353 */
1354 SpinLockAcquire(&slot->mutex);
1355 if (!slot->just_dirtied)
1356 slot->dirty = false;
1357 SpinLockRelease(&slot->mutex);
1358
1359 LWLockRelease(&slot->io_in_progress_lock);
1360 }
1361
1362 /*
1363 * Load a single slot from disk into memory.
1364 */
1365 static void
RestoreSlotFromDisk(const char * name)1366 RestoreSlotFromDisk(const char *name)
1367 {
1368 ReplicationSlotOnDisk cp;
1369 int i;
1370 char slotdir[MAXPGPATH + 12];
1371 char path[MAXPGPATH + 22];
1372 int fd;
1373 bool restored = false;
1374 int readBytes;
1375 pg_crc32c checksum;
1376
1377 /* no need to lock here, no concurrent access allowed yet */
1378
1379 /* delete temp file if it exists */
1380 sprintf(slotdir, "pg_replslot/%s", name);
1381 sprintf(path, "%s/state.tmp", slotdir);
1382 if (unlink(path) < 0 && errno != ENOENT)
1383 ereport(PANIC,
1384 (errcode_for_file_access(),
1385 errmsg("could not remove file \"%s\": %m", path)));
1386
1387 sprintf(path, "%s/state", slotdir);
1388
1389 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1390
1391 fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1392
1393 /*
1394 * We do not need to handle this as we are rename()ing the directory into
1395 * place only after we fsync()ed the state file.
1396 */
1397 if (fd < 0)
1398 ereport(PANIC,
1399 (errcode_for_file_access(),
1400 errmsg("could not open file \"%s\": %m", path)));
1401
1402 /*
1403 * Sync state file before we're reading from it. We might have crashed
1404 * while it wasn't synced yet and we shouldn't continue on that basis.
1405 */
1406 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
1407 if (pg_fsync(fd) != 0)
1408 {
1409 int save_errno = errno;
1410
1411 CloseTransientFile(fd);
1412 errno = save_errno;
1413 ereport(PANIC,
1414 (errcode_for_file_access(),
1415 errmsg("could not fsync file \"%s\": %m",
1416 path)));
1417 }
1418 pgstat_report_wait_end();
1419
1420 /* Also sync the parent directory */
1421 START_CRIT_SECTION();
1422 fsync_fname(slotdir, true);
1423 END_CRIT_SECTION();
1424
1425 /* read part of statefile that's guaranteed to be version independent */
1426 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1427 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1428 pgstat_report_wait_end();
1429 if (readBytes != ReplicationSlotOnDiskConstantSize)
1430 {
1431 int saved_errno = errno;
1432
1433 CloseTransientFile(fd);
1434 errno = saved_errno;
1435 ereport(PANIC,
1436 (errcode_for_file_access(),
1437 errmsg("could not read file \"%s\", read %d of %u: %m",
1438 path, readBytes,
1439 (uint32) ReplicationSlotOnDiskConstantSize)));
1440 }
1441
1442 /* verify magic */
1443 if (cp.magic != SLOT_MAGIC)
1444 ereport(PANIC,
1445 (errcode_for_file_access(),
1446 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1447 path, cp.magic, SLOT_MAGIC)));
1448
1449 /* verify version */
1450 if (cp.version != SLOT_VERSION)
1451 ereport(PANIC,
1452 (errcode_for_file_access(),
1453 errmsg("replication slot file \"%s\" has unsupported version %u",
1454 path, cp.version)));
1455
1456 /* boundary check on length */
1457 if (cp.length != ReplicationSlotOnDiskV2Size)
1458 ereport(PANIC,
1459 (errcode_for_file_access(),
1460 errmsg("replication slot file \"%s\" has corrupted length %u",
1461 path, cp.length)));
1462
1463 /* Now that we know the size, read the entire file */
1464 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1465 readBytes = read(fd,
1466 (char *) &cp + ReplicationSlotOnDiskConstantSize,
1467 cp.length);
1468 pgstat_report_wait_end();
1469 if (readBytes != cp.length)
1470 {
1471 int saved_errno = errno;
1472
1473 CloseTransientFile(fd);
1474 errno = saved_errno;
1475 ereport(PANIC,
1476 (errcode_for_file_access(),
1477 errmsg("could not read file \"%s\", read %d of %u: %m",
1478 path, readBytes, cp.length)));
1479 }
1480
1481 CloseTransientFile(fd);
1482
1483 /* now verify the CRC */
1484 INIT_CRC32C(checksum);
1485 COMP_CRC32C(checksum,
1486 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1487 SnapBuildOnDiskChecksummedSize);
1488 FIN_CRC32C(checksum);
1489
1490 if (!EQ_CRC32C(checksum, cp.checksum))
1491 ereport(PANIC,
1492 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1493 path, checksum, cp.checksum)));
1494
1495 /*
1496 * If we crashed with an ephemeral slot active, don't restore but delete
1497 * it.
1498 */
1499 if (cp.slotdata.persistency != RS_PERSISTENT)
1500 {
1501 if (!rmtree(slotdir, true))
1502 {
1503 ereport(WARNING,
1504 (errcode_for_file_access(),
1505 errmsg("could not remove directory \"%s\"", slotdir)));
1506 }
1507 fsync_fname("pg_replslot", true);
1508 return;
1509 }
1510
1511 /*
1512 * Verify that requirements for the specific slot type are met. That's
1513 * important because if these aren't met we're not guaranteed to retain
1514 * all the necessary resources for the slot.
1515 *
1516 * NB: We have to do so *after* the above checks for ephemeral slots,
1517 * because otherwise a slot that shouldn't exist anymore could prevent
1518 * restarts.
1519 *
1520 * NB: Changing the requirements here also requires adapting
1521 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
1522 */
1523 if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
1524 ereport(FATAL,
1525 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1526 errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
1527 NameStr(cp.slotdata.name)),
1528 errhint("Change wal_level to be logical or higher.")));
1529 else if (wal_level < WAL_LEVEL_REPLICA)
1530 ereport(FATAL,
1531 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1532 errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
1533 NameStr(cp.slotdata.name)),
1534 errhint("Change wal_level to be replica or higher.")));
1535
1536 /* nothing can be active yet, don't lock anything */
1537 for (i = 0; i < max_replication_slots; i++)
1538 {
1539 ReplicationSlot *slot;
1540
1541 slot = &ReplicationSlotCtl->replication_slots[i];
1542
1543 if (slot->in_use)
1544 continue;
1545
1546 /* restore the entire set of persistent data */
1547 memcpy(&slot->data, &cp.slotdata,
1548 sizeof(ReplicationSlotPersistentData));
1549
1550 /* initialize in memory state */
1551 slot->effective_xmin = cp.slotdata.xmin;
1552 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1553
1554 slot->candidate_catalog_xmin = InvalidTransactionId;
1555 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1556 slot->candidate_restart_lsn = InvalidXLogRecPtr;
1557 slot->candidate_restart_valid = InvalidXLogRecPtr;
1558
1559 slot->in_use = true;
1560 slot->active_pid = 0;
1561
1562 restored = true;
1563 break;
1564 }
1565
1566 if (!restored)
1567 ereport(PANIC,
1568 (errmsg("too many replication slots active before shutdown"),
1569 errhint("Increase max_replication_slots and try again.")));
1570 }
1571