1 /*-------------------------------------------------------------------------
2  *
3  * slot.c
4  *	   Replication slot management.
5  *
6  *
7  * Copyright (c) 2012-2016, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/replication/slot.c
12  *
13  * NOTES
14  *
15  * Replication slots are used to keep state about replication streams
16  * originating from this cluster.  Their primary purpose is to prevent the
17  * premature removal of WAL or of old tuple versions in a manner that would
18  * interfere with replication; they are also useful for monitoring purposes.
19  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20  * on standbys (to support cascading setups).  The requirement that slots be
21  * usable on standbys precludes storing them in the system catalogs.
22  *
23  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24  * directory. Inside that directory the state file will contain the slot's
25  * own data. Additional data can be stored alongside that file if required.
26  * While the server is running, the state data is also cached in memory for
27  * efficiency.
28  *
29  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31  * to iterate over the slots, and in exclusive mode to change the in_use flag
32  * of a slot.  The remaining data in each slot is protected by its mutex.
33  *
34  *-------------------------------------------------------------------------
35  */
36 
37 #include "postgres.h"
38 
39 #include <unistd.h>
40 #include <sys/stat.h>
41 
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "common/string.h"
45 #include "miscadmin.h"
46 #include "replication/slot.h"
47 #include "storage/fd.h"
48 #include "storage/proc.h"
49 #include "storage/procarray.h"
50 
51 /*
52  * Replication slot on-disk data structure.
53  */
54 typedef struct ReplicationSlotOnDisk
55 {
56 	/* first part of this struct needs to be version independent */
57 
58 	/* data not covered by checksum */
59 	uint32		magic;
60 	pg_crc32c	checksum;
61 
62 	/* data covered by checksum */
63 	uint32		version;
64 	uint32		length;
65 
66 	/*
67 	 * The actual data in the slot that follows can differ based on the above
68 	 * 'version'.
69 	 */
70 
71 	ReplicationSlotPersistentData slotdata;
72 } ReplicationSlotOnDisk;
73 
74 /* size of version independent data */
75 #define ReplicationSlotOnDiskConstantSize \
76 	offsetof(ReplicationSlotOnDisk, slotdata)
77 /* size of the part of the slot not covered by the checksum */
78 #define SnapBuildOnDiskNotChecksummedSize \
79 	offsetof(ReplicationSlotOnDisk, version)
80 /* size of the part covered by the checksum */
81 #define SnapBuildOnDiskChecksummedSize \
82 	sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
83 /* size of the slot data that is version dependent */
84 #define ReplicationSlotOnDiskV2Size \
85 	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
86 
87 #define SLOT_MAGIC		0x1051CA1		/* format identifier */
88 #define SLOT_VERSION	2		/* version for new files */
89 
90 /* Control array for replication slot management */
91 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
92 
93 /* My backend's replication slot in the shared memory array */
94 ReplicationSlot *MyReplicationSlot = NULL;
95 
96 /* GUCs */
97 int			max_replication_slots = 0;	/* the maximum number of replication
98 										 * slots */
99 
100 static LWLockTranche ReplSlotIOLWLockTranche;
101 static void ReplicationSlotDropAcquired(void);
102 
103 /* internal persistency functions */
104 static void RestoreSlotFromDisk(const char *name);
105 static void CreateSlotOnDisk(ReplicationSlot *slot);
106 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
107 
108 /*
109  * Report shared-memory space needed by ReplicationSlotShmemInit.
110  */
111 Size
ReplicationSlotsShmemSize(void)112 ReplicationSlotsShmemSize(void)
113 {
114 	Size		size = 0;
115 
116 	if (max_replication_slots == 0)
117 		return size;
118 
119 	size = offsetof(ReplicationSlotCtlData, replication_slots);
120 	size = add_size(size,
121 					mul_size(max_replication_slots, sizeof(ReplicationSlot)));
122 
123 	return size;
124 }
125 
126 /*
127  * Allocate and initialize shared memory for replication slots.
128  */
129 void
ReplicationSlotsShmemInit(void)130 ReplicationSlotsShmemInit(void)
131 {
132 	bool		found;
133 
134 	if (max_replication_slots == 0)
135 		return;
136 
137 	ReplicationSlotCtl = (ReplicationSlotCtlData *)
138 		ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
139 						&found);
140 
141 	ReplSlotIOLWLockTranche.name = "replication_slot_io";
142 	ReplSlotIOLWLockTranche.array_base =
143 		((char *) ReplicationSlotCtl) + offsetof(ReplicationSlotCtlData, replication_slots) +offsetof(ReplicationSlot, io_in_progress_lock);
144 	ReplSlotIOLWLockTranche.array_stride = sizeof(ReplicationSlot);
145 	LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
146 						  &ReplSlotIOLWLockTranche);
147 
148 	if (!found)
149 	{
150 		int			i;
151 
152 		/* First time through, so initialize */
153 		MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
154 
155 		for (i = 0; i < max_replication_slots; i++)
156 		{
157 			ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
158 
159 			/* everything else is zeroed by the memset above */
160 			SpinLockInit(&slot->mutex);
161 			LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
162 		}
163 	}
164 }
165 
166 /*
167  * Check whether the passed slot name is valid and report errors at elevel.
168  *
169  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
170  * the name to be used as a directory name on every supported OS.
171  *
172  * Returns whether the directory name is valid or not if elevel < ERROR.
173  */
174 bool
ReplicationSlotValidateName(const char * name,int elevel)175 ReplicationSlotValidateName(const char *name, int elevel)
176 {
177 	const char *cp;
178 
179 	if (strlen(name) == 0)
180 	{
181 		ereport(elevel,
182 				(errcode(ERRCODE_INVALID_NAME),
183 				 errmsg("replication slot name \"%s\" is too short",
184 						name)));
185 		return false;
186 	}
187 
188 	if (strlen(name) >= NAMEDATALEN)
189 	{
190 		ereport(elevel,
191 				(errcode(ERRCODE_NAME_TOO_LONG),
192 				 errmsg("replication slot name \"%s\" is too long",
193 						name)));
194 		return false;
195 	}
196 
197 	for (cp = name; *cp; cp++)
198 	{
199 		if (!((*cp >= 'a' && *cp <= 'z')
200 			  || (*cp >= '0' && *cp <= '9')
201 			  || (*cp == '_')))
202 		{
203 			ereport(elevel,
204 					(errcode(ERRCODE_INVALID_NAME),
205 			errmsg("replication slot name \"%s\" contains invalid character",
206 				   name),
207 					 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
208 			return false;
209 		}
210 	}
211 	return true;
212 }
213 
214 /*
215  * Create a new replication slot and mark it as used by this backend.
216  *
217  * name: Name of the slot
218  * db_specific: logical decoding is db specific; if the slot is going to
219  *	   be used for that pass true, otherwise false.
220  */
221 void
ReplicationSlotCreate(const char * name,bool db_specific,ReplicationSlotPersistency persistency)222 ReplicationSlotCreate(const char *name, bool db_specific,
223 					  ReplicationSlotPersistency persistency)
224 {
225 	ReplicationSlot *slot = NULL;
226 	int			i;
227 
228 	Assert(MyReplicationSlot == NULL);
229 
230 	ReplicationSlotValidateName(name, ERROR);
231 
232 	/*
233 	 * If some other backend ran this code concurrently with us, we'd likely
234 	 * both allocate the same slot, and that would be bad.  We'd also be at
235 	 * risk of missing a name collision.  Also, we don't want to try to create
236 	 * a new slot while somebody's busy cleaning up an old one, because we
237 	 * might both be monkeying with the same directory.
238 	 */
239 	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
240 
241 	/*
242 	 * Check for name collision, and identify an allocatable slot.  We need to
243 	 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
244 	 * else can change the in_use flags while we're looking at them.
245 	 */
246 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
247 	for (i = 0; i < max_replication_slots; i++)
248 	{
249 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
250 
251 		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
252 			ereport(ERROR,
253 					(errcode(ERRCODE_DUPLICATE_OBJECT),
254 					 errmsg("replication slot \"%s\" already exists", name)));
255 		if (!s->in_use && slot == NULL)
256 			slot = s;
257 	}
258 	LWLockRelease(ReplicationSlotControlLock);
259 
260 	/* If all slots are in use, we're out of luck. */
261 	if (slot == NULL)
262 		ereport(ERROR,
263 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
264 				 errmsg("all replication slots are in use"),
265 				 errhint("Free one or increase max_replication_slots.")));
266 
267 	/*
268 	 * Since this slot is not in use, nobody should be looking at any part of
269 	 * it other than the in_use field unless they're trying to allocate it.
270 	 * And since we hold ReplicationSlotAllocationLock, nobody except us can
271 	 * be doing that.  So it's safe to initialize the slot.
272 	 */
273 	Assert(!slot->in_use);
274 	Assert(slot->active_pid == 0);
275 
276 	/* first initialize persistent data */
277 	memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
278 	StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
279 	slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
280 	slot->data.persistency = persistency;
281 
282 	/* and then data only present in shared memory */
283 	slot->just_dirtied = false;
284 	slot->dirty = false;
285 	slot->effective_xmin = InvalidTransactionId;
286 	slot->effective_catalog_xmin = InvalidTransactionId;
287 	slot->candidate_catalog_xmin = InvalidTransactionId;
288 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
289 	slot->candidate_restart_valid = InvalidXLogRecPtr;
290 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
291 
292 	/*
293 	 * Create the slot on disk.  We haven't actually marked the slot allocated
294 	 * yet, so no special cleanup is required if this errors out.
295 	 */
296 	CreateSlotOnDisk(slot);
297 
298 	/*
299 	 * We need to briefly prevent any other backend from iterating over the
300 	 * slots while we flip the in_use flag. We also need to set the active
301 	 * flag while holding the ControlLock as otherwise a concurrent
302 	 * SlotAcquire() could acquire the slot as well.
303 	 */
304 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
305 
306 	slot->in_use = true;
307 
308 	/* We can now mark the slot active, and that makes it our slot. */
309 	SpinLockAcquire(&slot->mutex);
310 	Assert(slot->active_pid == 0);
311 	slot->active_pid = MyProcPid;
312 	SpinLockRelease(&slot->mutex);
313 	MyReplicationSlot = slot;
314 
315 	LWLockRelease(ReplicationSlotControlLock);
316 
317 	/*
318 	 * Now that the slot has been marked as in_use and in_active, it's safe to
319 	 * let somebody else try to allocate a slot.
320 	 */
321 	LWLockRelease(ReplicationSlotAllocationLock);
322 }
323 
324 /*
325  * Find a previously created slot and mark it as used by this backend.
326  */
327 void
ReplicationSlotAcquire(const char * name)328 ReplicationSlotAcquire(const char *name)
329 {
330 	ReplicationSlot *slot = NULL;
331 	int			i;
332 	int			active_pid = 0;
333 
334 	Assert(MyReplicationSlot == NULL);
335 
336 	ReplicationSlotValidateName(name, ERROR);
337 
338 	/* Search for the named slot and mark it active if we find it. */
339 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
340 	for (i = 0; i < max_replication_slots; i++)
341 	{
342 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
343 
344 		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
345 		{
346 			SpinLockAcquire(&s->mutex);
347 			active_pid = s->active_pid;
348 			if (active_pid == 0)
349 				s->active_pid = MyProcPid;
350 			SpinLockRelease(&s->mutex);
351 			slot = s;
352 			break;
353 		}
354 	}
355 	LWLockRelease(ReplicationSlotControlLock);
356 
357 	/* If we did not find the slot or it was already active, error out. */
358 	if (slot == NULL)
359 		ereport(ERROR,
360 				(errcode(ERRCODE_UNDEFINED_OBJECT),
361 				 errmsg("replication slot \"%s\" does not exist", name)));
362 	if (active_pid != 0)
363 		ereport(ERROR,
364 				(errcode(ERRCODE_OBJECT_IN_USE),
365 				 errmsg("replication slot \"%s\" is active for PID %d",
366 						name, active_pid)));
367 
368 	/* We made this slot active, so it's ours now. */
369 	MyReplicationSlot = slot;
370 }
371 
372 /*
373  * Release a replication slot, this or another backend can ReAcquire it
374  * later. Resources this slot requires will be preserved.
375  */
376 void
ReplicationSlotRelease(void)377 ReplicationSlotRelease(void)
378 {
379 	ReplicationSlot *slot = MyReplicationSlot;
380 
381 	Assert(slot != NULL && slot->active_pid != 0);
382 
383 	if (slot->data.persistency == RS_EPHEMERAL)
384 	{
385 		/*
386 		 * Delete the slot. There is no !PANIC case where this is allowed to
387 		 * fail, all that may happen is an incomplete cleanup of the on-disk
388 		 * data.
389 		 */
390 		ReplicationSlotDropAcquired();
391 	}
392 	else
393 	{
394 		/* Mark slot inactive.  We're not freeing it, just disconnecting. */
395 		SpinLockAcquire(&slot->mutex);
396 		slot->active_pid = 0;
397 		SpinLockRelease(&slot->mutex);
398 	}
399 
400 
401 	/*
402 	 * If slot needed to temporarily restrain both data and catalog xmin to
403 	 * create the catalog snapshot, remove that temporary constraint.
404 	 * Snapshots can only be exported while the initial snapshot is still
405 	 * acquired.
406 	 */
407 	if (!TransactionIdIsValid(slot->data.xmin) &&
408 		TransactionIdIsValid(slot->effective_xmin))
409 	{
410 		SpinLockAcquire(&slot->mutex);
411 		slot->effective_xmin = InvalidTransactionId;
412 		SpinLockRelease(&slot->mutex);
413 		ReplicationSlotsComputeRequiredXmin(false);
414 	}
415 
416 	MyReplicationSlot = NULL;
417 
418 	/* might not have been set when we've been a plain slot */
419 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
420 	MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
421 	LWLockRelease(ProcArrayLock);
422 }
423 
424 /*
425  * Permanently drop replication slot identified by the passed in name.
426  */
427 void
ReplicationSlotDrop(const char * name)428 ReplicationSlotDrop(const char *name)
429 {
430 	Assert(MyReplicationSlot == NULL);
431 
432 	ReplicationSlotAcquire(name);
433 
434 	ReplicationSlotDropAcquired();
435 }
436 
437 /*
438  * Permanently drop the currently acquired replication slot which will be
439  * released by the point this function returns.
440  */
441 static void
ReplicationSlotDropAcquired(void)442 ReplicationSlotDropAcquired(void)
443 {
444 	char		path[MAXPGPATH];
445 	char		tmppath[MAXPGPATH];
446 	ReplicationSlot *slot = MyReplicationSlot;
447 
448 	Assert(MyReplicationSlot != NULL);
449 
450 	/* slot isn't acquired anymore */
451 	MyReplicationSlot = NULL;
452 
453 	/*
454 	 * If some other backend ran this code concurrently with us, we might try
455 	 * to delete a slot with a certain name while someone else was trying to
456 	 * create a slot with the same name.
457 	 */
458 	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
459 
460 	/* Generate pathnames. */
461 	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
462 	sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
463 
464 	/*
465 	 * Rename the slot directory on disk, so that we'll no longer recognize
466 	 * this as a valid slot.  Note that if this fails, we've got to mark the
467 	 * slot inactive before bailing out.  If we're dropping an ephemeral slot,
468 	 * we better never fail hard as the caller won't expect the slot to
469 	 * survive and this might get called during error handling.
470 	 */
471 	if (rename(path, tmppath) == 0)
472 	{
473 		/*
474 		 * We need to fsync() the directory we just renamed and its parent to
475 		 * make sure that our changes are on disk in a crash-safe fashion.  If
476 		 * fsync() fails, we can't be sure whether the changes are on disk or
477 		 * not.  For now, we handle that by panicking;
478 		 * StartupReplicationSlots() will try to straighten it out after
479 		 * restart.
480 		 */
481 		START_CRIT_SECTION();
482 		fsync_fname(tmppath, true);
483 		fsync_fname("pg_replslot", true);
484 		END_CRIT_SECTION();
485 	}
486 	else
487 	{
488 		bool		fail_softly = slot->data.persistency == RS_EPHEMERAL;
489 
490 		SpinLockAcquire(&slot->mutex);
491 		slot->active_pid = 0;
492 		SpinLockRelease(&slot->mutex);
493 
494 		ereport(fail_softly ? WARNING : ERROR,
495 				(errcode_for_file_access(),
496 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
497 						path, tmppath)));
498 	}
499 
500 	/*
501 	 * The slot is definitely gone.  Lock out concurrent scans of the array
502 	 * long enough to kill it.  It's OK to clear the active flag here without
503 	 * grabbing the mutex because nobody else can be scanning the array here,
504 	 * and nobody can be attached to this slot and thus access it without
505 	 * scanning the array.
506 	 */
507 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
508 	slot->active_pid = 0;
509 	slot->in_use = false;
510 	LWLockRelease(ReplicationSlotControlLock);
511 
512 	/*
513 	 * Slot is dead and doesn't prevent resource removal anymore, recompute
514 	 * limits.
515 	 */
516 	ReplicationSlotsComputeRequiredXmin(false);
517 	ReplicationSlotsComputeRequiredLSN();
518 
519 	/*
520 	 * If removing the directory fails, the worst thing that will happen is
521 	 * that the user won't be able to create a new slot with the same name
522 	 * until the next server restart.  We warn about it, but that's all.
523 	 */
524 	if (!rmtree(tmppath, true))
525 		ereport(WARNING,
526 				(errcode_for_file_access(),
527 				 errmsg("could not remove directory \"%s\"", tmppath)));
528 
529 	/*
530 	 * We release this at the very end, so that nobody starts trying to create
531 	 * a slot while we're still cleaning up the detritus of the old one.
532 	 */
533 	LWLockRelease(ReplicationSlotAllocationLock);
534 }
535 
536 /*
537  * Serialize the currently acquired slot's state from memory to disk, thereby
538  * guaranteeing the current state will survive a crash.
539  */
540 void
ReplicationSlotSave(void)541 ReplicationSlotSave(void)
542 {
543 	char		path[MAXPGPATH];
544 
545 	Assert(MyReplicationSlot != NULL);
546 
547 	sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
548 	SaveSlotToPath(MyReplicationSlot, path, ERROR);
549 }
550 
551 /*
552  * Signal that it would be useful if the currently acquired slot would be
553  * flushed out to disk.
554  *
555  * Note that the actual flush to disk can be delayed for a long time, if
556  * required for correctness explicitly do a ReplicationSlotSave().
557  */
558 void
ReplicationSlotMarkDirty(void)559 ReplicationSlotMarkDirty(void)
560 {
561 	ReplicationSlot *slot = MyReplicationSlot;
562 
563 	Assert(MyReplicationSlot != NULL);
564 
565 	SpinLockAcquire(&slot->mutex);
566 	MyReplicationSlot->just_dirtied = true;
567 	MyReplicationSlot->dirty = true;
568 	SpinLockRelease(&slot->mutex);
569 }
570 
571 /*
572  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
573  * guaranteeing it will be there after an eventual crash.
574  */
575 void
ReplicationSlotPersist(void)576 ReplicationSlotPersist(void)
577 {
578 	ReplicationSlot *slot = MyReplicationSlot;
579 
580 	Assert(slot != NULL);
581 	Assert(slot->data.persistency != RS_PERSISTENT);
582 
583 	SpinLockAcquire(&slot->mutex);
584 	slot->data.persistency = RS_PERSISTENT;
585 	SpinLockRelease(&slot->mutex);
586 
587 	ReplicationSlotMarkDirty();
588 	ReplicationSlotSave();
589 }
590 
591 /*
592  * Compute the oldest xmin across all slots and store it in the ProcArray.
593  *
594  * If already_locked is true, ProcArrayLock has already been acquired
595  * exclusively.
596  */
597 void
ReplicationSlotsComputeRequiredXmin(bool already_locked)598 ReplicationSlotsComputeRequiredXmin(bool already_locked)
599 {
600 	int			i;
601 	TransactionId agg_xmin = InvalidTransactionId;
602 	TransactionId agg_catalog_xmin = InvalidTransactionId;
603 
604 	Assert(ReplicationSlotCtl != NULL);
605 
606 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
607 
608 	for (i = 0; i < max_replication_slots; i++)
609 	{
610 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
611 		TransactionId effective_xmin;
612 		TransactionId effective_catalog_xmin;
613 
614 		if (!s->in_use)
615 			continue;
616 
617 		SpinLockAcquire(&s->mutex);
618 		effective_xmin = s->effective_xmin;
619 		effective_catalog_xmin = s->effective_catalog_xmin;
620 		SpinLockRelease(&s->mutex);
621 
622 		/* check the data xmin */
623 		if (TransactionIdIsValid(effective_xmin) &&
624 			(!TransactionIdIsValid(agg_xmin) ||
625 			 TransactionIdPrecedes(effective_xmin, agg_xmin)))
626 			agg_xmin = effective_xmin;
627 
628 		/* check the catalog xmin */
629 		if (TransactionIdIsValid(effective_catalog_xmin) &&
630 			(!TransactionIdIsValid(agg_catalog_xmin) ||
631 			 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
632 			agg_catalog_xmin = effective_catalog_xmin;
633 	}
634 
635 	LWLockRelease(ReplicationSlotControlLock);
636 
637 	ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
638 }
639 
640 /*
641  * Compute the oldest restart LSN across all slots and inform xlog module.
642  */
643 void
ReplicationSlotsComputeRequiredLSN(void)644 ReplicationSlotsComputeRequiredLSN(void)
645 {
646 	int			i;
647 	XLogRecPtr	min_required = InvalidXLogRecPtr;
648 
649 	Assert(ReplicationSlotCtl != NULL);
650 
651 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
652 	for (i = 0; i < max_replication_slots; i++)
653 	{
654 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
655 		XLogRecPtr	restart_lsn;
656 
657 		if (!s->in_use)
658 			continue;
659 
660 		SpinLockAcquire(&s->mutex);
661 		restart_lsn = s->data.restart_lsn;
662 		SpinLockRelease(&s->mutex);
663 
664 		if (restart_lsn != InvalidXLogRecPtr &&
665 			(min_required == InvalidXLogRecPtr ||
666 			 restart_lsn < min_required))
667 			min_required = restart_lsn;
668 	}
669 	LWLockRelease(ReplicationSlotControlLock);
670 
671 	XLogSetReplicationSlotMinimumLSN(min_required);
672 }
673 
674 /*
675  * Compute the oldest WAL LSN required by *logical* decoding slots..
676  *
677  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
678  * slots exist.
679  *
680  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
681  * ignores physical replication slots.
682  *
683  * The results aren't required frequently, so we don't maintain a precomputed
684  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
685  */
686 XLogRecPtr
ReplicationSlotsComputeLogicalRestartLSN(void)687 ReplicationSlotsComputeLogicalRestartLSN(void)
688 {
689 	XLogRecPtr	result = InvalidXLogRecPtr;
690 	int			i;
691 
692 	if (max_replication_slots <= 0)
693 		return InvalidXLogRecPtr;
694 
695 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
696 
697 	for (i = 0; i < max_replication_slots; i++)
698 	{
699 		ReplicationSlot *s;
700 		XLogRecPtr	restart_lsn;
701 
702 		s = &ReplicationSlotCtl->replication_slots[i];
703 
704 		/* cannot change while ReplicationSlotCtlLock is held */
705 		if (!s->in_use)
706 			continue;
707 
708 		/* we're only interested in logical slots */
709 		if (!SlotIsLogical(s))
710 			continue;
711 
712 		/* read once, it's ok if it increases while we're checking */
713 		SpinLockAcquire(&s->mutex);
714 		restart_lsn = s->data.restart_lsn;
715 		SpinLockRelease(&s->mutex);
716 
717 		if (result == InvalidXLogRecPtr ||
718 			restart_lsn < result)
719 			result = restart_lsn;
720 	}
721 
722 	LWLockRelease(ReplicationSlotControlLock);
723 
724 	return result;
725 }
726 
727 /*
728  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
729  * passed database oid.
730  *
731  * Returns true if there are any slots referencing the database. *nslots will
732  * be set to the absolute number of slots in the database, *nactive to ones
733  * currently active.
734  */
735 bool
ReplicationSlotsCountDBSlots(Oid dboid,int * nslots,int * nactive)736 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
737 {
738 	int			i;
739 
740 	*nslots = *nactive = 0;
741 
742 	if (max_replication_slots <= 0)
743 		return false;
744 
745 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
746 	for (i = 0; i < max_replication_slots; i++)
747 	{
748 		ReplicationSlot *s;
749 
750 		s = &ReplicationSlotCtl->replication_slots[i];
751 
752 		/* cannot change while ReplicationSlotCtlLock is held */
753 		if (!s->in_use)
754 			continue;
755 
756 		/* only logical slots are database specific, skip */
757 		if (!SlotIsLogical(s))
758 			continue;
759 
760 		/* not our database, skip */
761 		if (s->data.database != dboid)
762 			continue;
763 
764 		/* count slots with spinlock held */
765 		SpinLockAcquire(&s->mutex);
766 		(*nslots)++;
767 		if (s->active_pid != 0)
768 			(*nactive)++;
769 		SpinLockRelease(&s->mutex);
770 	}
771 	LWLockRelease(ReplicationSlotControlLock);
772 
773 	if (*nslots > 0)
774 		return true;
775 	return false;
776 }
777 
778 
779 /*
780  * Check whether the server's configuration supports using replication
781  * slots.
782  */
783 void
CheckSlotRequirements(void)784 CheckSlotRequirements(void)
785 {
786 	/*
787 	 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
788 	 * needs the same check.
789 	 */
790 
791 	if (max_replication_slots == 0)
792 		ereport(ERROR,
793 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
794 				 (errmsg("replication slots can only be used if max_replication_slots > 0"))));
795 
796 	if (wal_level < WAL_LEVEL_REPLICA)
797 		ereport(ERROR,
798 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
799 				 errmsg("replication slots can only be used if wal_level >= replica")));
800 }
801 
802 /*
803  * Reserve WAL for the currently active slot.
804  *
805  * Compute and set restart_lsn in a manner that's appropriate for the type of
806  * the slot and concurrency safe.
807  */
808 void
ReplicationSlotReserveWal(void)809 ReplicationSlotReserveWal(void)
810 {
811 	ReplicationSlot *slot = MyReplicationSlot;
812 
813 	Assert(slot != NULL);
814 	Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
815 
816 	/*
817 	 * The replication slot mechanism is used to prevent removal of required
818 	 * WAL. As there is no interlock between this routine and checkpoints, WAL
819 	 * segments could concurrently be removed when a now stale return value of
820 	 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
821 	 * this happens we'll just retry.
822 	 */
823 	while (true)
824 	{
825 		XLogSegNo	segno;
826 
827 		/*
828 		 * For logical slots log a standby snapshot and start logical decoding
829 		 * at exactly that position. That allows the slot to start up more
830 		 * quickly.
831 		 *
832 		 * That's not needed (or indeed helpful) for physical slots as they'll
833 		 * start replay at the last logged checkpoint anyway. Instead return
834 		 * the location of the last redo LSN. While that slightly increases
835 		 * the chance that we have to retry, it's where a base backup has to
836 		 * start replay at.
837 		 */
838 		if (!RecoveryInProgress() && SlotIsLogical(slot))
839 		{
840 			XLogRecPtr	flushptr;
841 
842 			/* start at current insert position */
843 			slot->data.restart_lsn = GetXLogInsertRecPtr();
844 
845 			/* make sure we have enough information to start */
846 			flushptr = LogStandbySnapshot();
847 
848 			/* and make sure it's fsynced to disk */
849 			XLogFlush(flushptr);
850 		}
851 		else
852 		{
853 			slot->data.restart_lsn = GetRedoRecPtr();
854 		}
855 
856 		/* prevent WAL removal as fast as possible */
857 		ReplicationSlotsComputeRequiredLSN();
858 
859 		/*
860 		 * If all required WAL is still there, great, otherwise retry. The
861 		 * slot should prevent further removal of WAL, unless there's a
862 		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
863 		 * the new restart_lsn above, so normally we should never need to loop
864 		 * more than twice.
865 		 */
866 		XLByteToSeg(slot->data.restart_lsn, segno);
867 		if (XLogGetLastRemovedSegno() < segno)
868 			break;
869 	}
870 }
871 
872 /*
873  * Flush all replication slots to disk.
874  *
875  * This needn't actually be part of a checkpoint, but it's a convenient
876  * location.
877  */
878 void
CheckPointReplicationSlots(void)879 CheckPointReplicationSlots(void)
880 {
881 	int			i;
882 
883 	elog(DEBUG1, "performing replication slot checkpoint");
884 
885 	/*
886 	 * Prevent any slot from being created/dropped while we're active. As we
887 	 * explicitly do *not* want to block iterating over replication_slots or
888 	 * acquiring a slot we cannot take the control lock - but that's OK,
889 	 * because holding ReplicationSlotAllocationLock is strictly stronger, and
890 	 * enough to guarantee that nobody can change the in_use bits on us.
891 	 */
892 	LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
893 
894 	for (i = 0; i < max_replication_slots; i++)
895 	{
896 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
897 		char		path[MAXPGPATH];
898 
899 		if (!s->in_use)
900 			continue;
901 
902 		/* save the slot to disk, locking is handled in SaveSlotToPath() */
903 		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
904 		SaveSlotToPath(s, path, LOG);
905 	}
906 	LWLockRelease(ReplicationSlotAllocationLock);
907 }
908 
909 /*
910  * Load all replication slots from disk into memory at server startup. This
911  * needs to be run before we start crash recovery.
912  */
913 void
StartupReplicationSlots(void)914 StartupReplicationSlots(void)
915 {
916 	DIR		   *replication_dir;
917 	struct dirent *replication_de;
918 
919 	elog(DEBUG1, "starting up replication slots");
920 
921 	/* restore all slots by iterating over all on-disk entries */
922 	replication_dir = AllocateDir("pg_replslot");
923 	while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
924 	{
925 		struct stat statbuf;
926 		char		path[MAXPGPATH + 12];
927 
928 		if (strcmp(replication_de->d_name, ".") == 0 ||
929 			strcmp(replication_de->d_name, "..") == 0)
930 			continue;
931 
932 		snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
933 
934 		/* we're only creating directories here, skip if it's not our's */
935 		if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
936 			continue;
937 
938 		/* we crashed while a slot was being setup or deleted, clean up */
939 		if (pg_str_endswith(replication_de->d_name, ".tmp"))
940 		{
941 			if (!rmtree(path, true))
942 			{
943 				ereport(WARNING,
944 						(errcode_for_file_access(),
945 						 errmsg("could not remove directory \"%s\"", path)));
946 				continue;
947 			}
948 			fsync_fname("pg_replslot", true);
949 			continue;
950 		}
951 
952 		/* looks like a slot in a normal state, restore */
953 		RestoreSlotFromDisk(replication_de->d_name);
954 	}
955 	FreeDir(replication_dir);
956 
957 	/* currently no slots exist, we're done. */
958 	if (max_replication_slots <= 0)
959 		return;
960 
961 	/* Now that we have recovered all the data, compute replication xmin */
962 	ReplicationSlotsComputeRequiredXmin(false);
963 	ReplicationSlotsComputeRequiredLSN();
964 }
965 
966 /* ----
967  * Manipulation of on-disk state of replication slots
968  *
969  * NB: none of the routines below should take any notice whether a slot is the
970  * current one or not, that's all handled a layer above.
971  * ----
972  */
973 static void
CreateSlotOnDisk(ReplicationSlot * slot)974 CreateSlotOnDisk(ReplicationSlot *slot)
975 {
976 	char		tmppath[MAXPGPATH];
977 	char		path[MAXPGPATH];
978 	struct stat st;
979 
980 	/*
981 	 * No need to take out the io_in_progress_lock, nobody else can see this
982 	 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
983 	 * takes out the lock, if we'd take the lock here, we'd deadlock.
984 	 */
985 
986 	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
987 	sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
988 
989 	/*
990 	 * It's just barely possible that some previous effort to create or drop a
991 	 * slot with this name left a temp directory lying around. If that seems
992 	 * to be the case, try to remove it.  If the rmtree() fails, we'll error
993 	 * out at the mkdir() below, so we don't bother checking success.
994 	 */
995 	if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
996 		rmtree(tmppath, true);
997 
998 	/* Create and fsync the temporary slot directory. */
999 	if (mkdir(tmppath, S_IRWXU) < 0)
1000 		ereport(ERROR,
1001 				(errcode_for_file_access(),
1002 				 errmsg("could not create directory \"%s\": %m",
1003 						tmppath)));
1004 	fsync_fname(tmppath, true);
1005 
1006 	/* Write the actual state file. */
1007 	slot->dirty = true;			/* signal that we really need to write */
1008 	SaveSlotToPath(slot, tmppath, ERROR);
1009 
1010 	/* Rename the directory into place. */
1011 	if (rename(tmppath, path) != 0)
1012 		ereport(ERROR,
1013 				(errcode_for_file_access(),
1014 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
1015 						tmppath, path)));
1016 
1017 	/*
1018 	 * If we'd now fail - really unlikely - we wouldn't know whether this slot
1019 	 * would persist after an OS crash or not - so, force a restart. The
1020 	 * restart would try to fsync this again till it works.
1021 	 */
1022 	START_CRIT_SECTION();
1023 
1024 	fsync_fname(path, true);
1025 	fsync_fname("pg_replslot", true);
1026 
1027 	END_CRIT_SECTION();
1028 }
1029 
1030 /*
1031  * Shared functionality between saving and creating a replication slot.
1032  */
1033 static void
SaveSlotToPath(ReplicationSlot * slot,const char * dir,int elevel)1034 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1035 {
1036 	char		tmppath[MAXPGPATH];
1037 	char		path[MAXPGPATH];
1038 	int			fd;
1039 	ReplicationSlotOnDisk cp;
1040 	bool		was_dirty;
1041 
1042 	/* first check whether there's something to write out */
1043 	SpinLockAcquire(&slot->mutex);
1044 	was_dirty = slot->dirty;
1045 	slot->just_dirtied = false;
1046 	SpinLockRelease(&slot->mutex);
1047 
1048 	/* and don't do anything if there's nothing to write */
1049 	if (!was_dirty)
1050 		return;
1051 
1052 	LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1053 
1054 	/* silence valgrind :( */
1055 	memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1056 
1057 	sprintf(tmppath, "%s/state.tmp", dir);
1058 	sprintf(path, "%s/state", dir);
1059 
1060 	fd = OpenTransientFile(tmppath,
1061 						   O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1062 						   S_IRUSR | S_IWUSR);
1063 	if (fd < 0)
1064 	{
1065 		/*
1066 		 * If not an ERROR, then release the lock before returning.  In case
1067 		 * of an ERROR, the error recovery path automatically releases the
1068 		 * lock, but no harm in explicitly releasing even in that case.  Note
1069 		 * that LWLockRelease() could affect errno.
1070 		 */
1071 		int			save_errno = errno;
1072 
1073 		LWLockRelease(&slot->io_in_progress_lock);
1074 		errno = save_errno;
1075 		ereport(elevel,
1076 				(errcode_for_file_access(),
1077 				 errmsg("could not create file \"%s\": %m",
1078 						tmppath)));
1079 		return;
1080 	}
1081 
1082 	cp.magic = SLOT_MAGIC;
1083 	INIT_CRC32C(cp.checksum);
1084 	cp.version = SLOT_VERSION;
1085 	cp.length = ReplicationSlotOnDiskV2Size;
1086 
1087 	SpinLockAcquire(&slot->mutex);
1088 
1089 	memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1090 
1091 	SpinLockRelease(&slot->mutex);
1092 
1093 	COMP_CRC32C(cp.checksum,
1094 				(char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1095 				SnapBuildOnDiskChecksummedSize);
1096 	FIN_CRC32C(cp.checksum);
1097 
1098 	errno = 0;
1099 	if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1100 	{
1101 		int			save_errno = errno;
1102 
1103 		CloseTransientFile(fd);
1104 		LWLockRelease(&slot->io_in_progress_lock);
1105 
1106 		/* if write didn't set errno, assume problem is no disk space */
1107 		errno = save_errno ? save_errno : ENOSPC;
1108 		ereport(elevel,
1109 				(errcode_for_file_access(),
1110 				 errmsg("could not write to file \"%s\": %m",
1111 						tmppath)));
1112 		return;
1113 	}
1114 
1115 	/* fsync the temporary file */
1116 	if (pg_fsync(fd) != 0)
1117 	{
1118 		int			save_errno = errno;
1119 
1120 		CloseTransientFile(fd);
1121 		LWLockRelease(&slot->io_in_progress_lock);
1122 		errno = save_errno;
1123 		ereport(elevel,
1124 				(errcode_for_file_access(),
1125 				 errmsg("could not fsync file \"%s\": %m",
1126 						tmppath)));
1127 		return;
1128 	}
1129 
1130 	CloseTransientFile(fd);
1131 
1132 	/* rename to permanent file, fsync file and directory */
1133 	if (rename(tmppath, path) != 0)
1134 	{
1135 		int			save_errno = errno;
1136 
1137 		LWLockRelease(&slot->io_in_progress_lock);
1138 		errno = save_errno;
1139 		ereport(elevel,
1140 				(errcode_for_file_access(),
1141 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
1142 						tmppath, path)));
1143 		return;
1144 	}
1145 
1146 	/* Check CreateSlot() for the reasoning of using a crit. section. */
1147 	START_CRIT_SECTION();
1148 
1149 	fsync_fname(path, false);
1150 	fsync_fname(dir, true);
1151 	fsync_fname("pg_replslot", true);
1152 
1153 	END_CRIT_SECTION();
1154 
1155 	/*
1156 	 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1157 	 * already.
1158 	 */
1159 	SpinLockAcquire(&slot->mutex);
1160 	if (!slot->just_dirtied)
1161 		slot->dirty = false;
1162 	SpinLockRelease(&slot->mutex);
1163 
1164 	LWLockRelease(&slot->io_in_progress_lock);
1165 }
1166 
1167 /*
1168  * Load a single slot from disk into memory.
1169  */
1170 static void
RestoreSlotFromDisk(const char * name)1171 RestoreSlotFromDisk(const char *name)
1172 {
1173 	ReplicationSlotOnDisk cp;
1174 	int			i;
1175 	char		slotdir[MAXPGPATH + 12];
1176 	char		path[MAXPGPATH + 22];
1177 	int			fd;
1178 	bool		restored = false;
1179 	int			readBytes;
1180 	pg_crc32c	checksum;
1181 
1182 	/* no need to lock here, no concurrent access allowed yet */
1183 
1184 	/* delete temp file if it exists */
1185 	sprintf(slotdir, "pg_replslot/%s", name);
1186 	sprintf(path, "%s/state.tmp", slotdir);
1187 	if (unlink(path) < 0 && errno != ENOENT)
1188 		ereport(PANIC,
1189 				(errcode_for_file_access(),
1190 				 errmsg("could not remove file \"%s\": %m", path)));
1191 
1192 	sprintf(path, "%s/state", slotdir);
1193 
1194 	elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1195 
1196 	fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1197 
1198 	/*
1199 	 * We do not need to handle this as we are rename()ing the directory into
1200 	 * place only after we fsync()ed the state file.
1201 	 */
1202 	if (fd < 0)
1203 		ereport(PANIC,
1204 				(errcode_for_file_access(),
1205 				 errmsg("could not open file \"%s\": %m", path)));
1206 
1207 	/*
1208 	 * Sync state file before we're reading from it. We might have crashed
1209 	 * while it wasn't synced yet and we shouldn't continue on that basis.
1210 	 */
1211 	if (pg_fsync(fd) != 0)
1212 	{
1213 		int			save_errno = errno;
1214 
1215 		CloseTransientFile(fd);
1216 		errno = save_errno;
1217 		ereport(PANIC,
1218 				(errcode_for_file_access(),
1219 				 errmsg("could not fsync file \"%s\": %m",
1220 						path)));
1221 	}
1222 
1223 	/* Also sync the parent directory */
1224 	START_CRIT_SECTION();
1225 	fsync_fname(slotdir, true);
1226 	END_CRIT_SECTION();
1227 
1228 	/* read part of statefile that's guaranteed to be version independent */
1229 	readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1230 	if (readBytes != ReplicationSlotOnDiskConstantSize)
1231 	{
1232 		int			saved_errno = errno;
1233 
1234 		CloseTransientFile(fd);
1235 		errno = saved_errno;
1236 		ereport(PANIC,
1237 				(errcode_for_file_access(),
1238 				 errmsg("could not read file \"%s\", read %d of %u: %m",
1239 						path, readBytes,
1240 						(uint32) ReplicationSlotOnDiskConstantSize)));
1241 	}
1242 
1243 	/* verify magic */
1244 	if (cp.magic != SLOT_MAGIC)
1245 		ereport(PANIC,
1246 				(errcode_for_file_access(),
1247 				 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1248 						path, cp.magic, SLOT_MAGIC)));
1249 
1250 	/* verify version */
1251 	if (cp.version != SLOT_VERSION)
1252 		ereport(PANIC,
1253 				(errcode_for_file_access(),
1254 			errmsg("replication slot file \"%s\" has unsupported version %u",
1255 				   path, cp.version)));
1256 
1257 	/* boundary check on length */
1258 	if (cp.length != ReplicationSlotOnDiskV2Size)
1259 		ereport(PANIC,
1260 				(errcode_for_file_access(),
1261 			   errmsg("replication slot file \"%s\" has corrupted length %u",
1262 					  path, cp.length)));
1263 
1264 	/* Now that we know the size, read the entire file */
1265 	readBytes = read(fd,
1266 					 (char *) &cp + ReplicationSlotOnDiskConstantSize,
1267 					 cp.length);
1268 	if (readBytes != cp.length)
1269 	{
1270 		int			saved_errno = errno;
1271 
1272 		CloseTransientFile(fd);
1273 		errno = saved_errno;
1274 		ereport(PANIC,
1275 				(errcode_for_file_access(),
1276 				 errmsg("could not read file \"%s\", read %d of %u: %m",
1277 						path, readBytes, cp.length)));
1278 	}
1279 
1280 	CloseTransientFile(fd);
1281 
1282 	/* now verify the CRC */
1283 	INIT_CRC32C(checksum);
1284 	COMP_CRC32C(checksum,
1285 				(char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1286 				SnapBuildOnDiskChecksummedSize);
1287 	FIN_CRC32C(checksum);
1288 
1289 	if (!EQ_CRC32C(checksum, cp.checksum))
1290 		ereport(PANIC,
1291 				(errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1292 						path, checksum, cp.checksum)));
1293 
1294 	/*
1295 	 * If we crashed with an ephemeral slot active, don't restore but delete
1296 	 * it.
1297 	 */
1298 	if (cp.slotdata.persistency != RS_PERSISTENT)
1299 	{
1300 		if (!rmtree(slotdir, true))
1301 		{
1302 			ereport(WARNING,
1303 					(errcode_for_file_access(),
1304 					 errmsg("could not remove directory \"%s\"", slotdir)));
1305 		}
1306 		fsync_fname("pg_replslot", true);
1307 		return;
1308 	}
1309 
1310 	/*
1311 	 * Verify that requirements for the specific slot type are met. That's
1312 	 * important because if these aren't met we're not guaranteed to retain
1313 	 * all the necessary resources for the slot.
1314 	 *
1315 	 * NB: We have to do so *after* the above checks for ephemeral slots,
1316 	 * because otherwise a slot that shouldn't exist anymore could prevent
1317 	 * restarts.
1318 	 *
1319 	 * NB: Changing the requirements here also requires adapting
1320 	 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
1321 	 */
1322 	if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
1323 		ereport(FATAL,
1324 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1325 				 errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
1326 						NameStr(cp.slotdata.name)),
1327 				 errhint("Change wal_level to be logical or higher.")));
1328 	else if (wal_level < WAL_LEVEL_REPLICA)
1329 		ereport(FATAL,
1330 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1331 				 errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
1332 						NameStr(cp.slotdata.name)),
1333 				 errhint("Change wal_level to be replica or higher.")));
1334 
1335 	/* nothing can be active yet, don't lock anything */
1336 	for (i = 0; i < max_replication_slots; i++)
1337 	{
1338 		ReplicationSlot *slot;
1339 
1340 		slot = &ReplicationSlotCtl->replication_slots[i];
1341 
1342 		if (slot->in_use)
1343 			continue;
1344 
1345 		/* restore the entire set of persistent data */
1346 		memcpy(&slot->data, &cp.slotdata,
1347 			   sizeof(ReplicationSlotPersistentData));
1348 
1349 		/* initialize in memory state */
1350 		slot->effective_xmin = cp.slotdata.xmin;
1351 		slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1352 
1353 		slot->candidate_catalog_xmin = InvalidTransactionId;
1354 		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1355 		slot->candidate_restart_lsn = InvalidXLogRecPtr;
1356 		slot->candidate_restart_valid = InvalidXLogRecPtr;
1357 
1358 		slot->in_use = true;
1359 		slot->active_pid = 0;
1360 
1361 		restored = true;
1362 		break;
1363 	}
1364 
1365 	if (!restored)
1366 		ereport(PANIC,
1367 				(errmsg("too many replication slots active before shutdown"),
1368 				 errhint("Increase max_replication_slots and try again.")));
1369 }
1370