1 /*-------------------------------------------------------------------------
2  *
3  * slot.c
4  *	   Replication slot management.
5  *
6  *
7  * Copyright (c) 2012-2018, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/replication/slot.c
12  *
13  * NOTES
14  *
15  * Replication slots are used to keep state about replication streams
16  * originating from this cluster.  Their primary purpose is to prevent the
17  * premature removal of WAL or of old tuple versions in a manner that would
18  * interfere with replication; they are also useful for monitoring purposes.
19  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20  * on standbys (to support cascading setups).  The requirement that slots be
21  * usable on standbys precludes storing them in the system catalogs.
22  *
23  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24  * directory. Inside that directory the state file will contain the slot's
25  * own data. Additional data can be stored alongside that file if required.
26  * While the server is running, the state data is also cached in memory for
27  * efficiency.
28  *
29  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31  * to iterate over the slots, and in exclusive mode to change the in_use flag
32  * of a slot.  The remaining data in each slot is protected by its mutex.
33  *
34  *-------------------------------------------------------------------------
35  */
36 
37 #include "postgres.h"
38 
39 #include <unistd.h>
40 #include <sys/stat.h>
41 
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "common/string.h"
45 #include "miscadmin.h"
46 #include "pgstat.h"
47 #include "replication/slot.h"
48 #include "storage/fd.h"
49 #include "storage/proc.h"
50 #include "storage/procarray.h"
51 #include "utils/builtins.h"
52 
53 /*
54  * Replication slot on-disk data structure.
55  */
56 typedef struct ReplicationSlotOnDisk
57 {
58 	/* first part of this struct needs to be version independent */
59 
60 	/* data not covered by checksum */
61 	uint32		magic;
62 	pg_crc32c	checksum;
63 
64 	/* data covered by checksum */
65 	uint32		version;
66 	uint32		length;
67 
68 	/*
69 	 * The actual data in the slot that follows can differ based on the above
70 	 * 'version'.
71 	 */
72 
73 	ReplicationSlotPersistentData slotdata;
74 } ReplicationSlotOnDisk;
75 
76 /* size of version independent data */
77 #define ReplicationSlotOnDiskConstantSize \
78 	offsetof(ReplicationSlotOnDisk, slotdata)
79 /* size of the part of the slot not covered by the checksum */
80 #define SnapBuildOnDiskNotChecksummedSize \
81 	offsetof(ReplicationSlotOnDisk, version)
82 /* size of the part covered by the checksum */
83 #define SnapBuildOnDiskChecksummedSize \
84 	sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
85 /* size of the slot data that is version dependent */
86 #define ReplicationSlotOnDiskV2Size \
87 	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
88 
89 #define SLOT_MAGIC		0x1051CA1	/* format identifier */
90 #define SLOT_VERSION	2		/* version for new files */
91 
92 /* Control array for replication slot management */
93 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
94 
95 /* My backend's replication slot in the shared memory array */
96 ReplicationSlot *MyReplicationSlot = NULL;
97 
98 /* GUCs */
99 int			max_replication_slots = 0;	/* the maximum number of replication
100 										 * slots */
101 
102 static void ReplicationSlotDropAcquired(void);
103 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
104 
105 /* internal persistency functions */
106 static void RestoreSlotFromDisk(const char *name);
107 static void CreateSlotOnDisk(ReplicationSlot *slot);
108 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
109 
110 /*
111  * Report shared-memory space needed by ReplicationSlotShmemInit.
112  */
113 Size
ReplicationSlotsShmemSize(void)114 ReplicationSlotsShmemSize(void)
115 {
116 	Size		size = 0;
117 
118 	if (max_replication_slots == 0)
119 		return size;
120 
121 	size = offsetof(ReplicationSlotCtlData, replication_slots);
122 	size = add_size(size,
123 					mul_size(max_replication_slots, sizeof(ReplicationSlot)));
124 
125 	return size;
126 }
127 
128 /*
129  * Allocate and initialize shared memory for replication slots.
130  */
131 void
ReplicationSlotsShmemInit(void)132 ReplicationSlotsShmemInit(void)
133 {
134 	bool		found;
135 
136 	if (max_replication_slots == 0)
137 		return;
138 
139 	ReplicationSlotCtl = (ReplicationSlotCtlData *)
140 		ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
141 						&found);
142 
143 	LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
144 						  "replication_slot_io");
145 
146 	if (!found)
147 	{
148 		int			i;
149 
150 		/* First time through, so initialize */
151 		MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
152 
153 		for (i = 0; i < max_replication_slots; i++)
154 		{
155 			ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
156 
157 			/* everything else is zeroed by the memset above */
158 			SpinLockInit(&slot->mutex);
159 			LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
160 			ConditionVariableInit(&slot->active_cv);
161 		}
162 	}
163 }
164 
165 /*
166  * Check whether the passed slot name is valid and report errors at elevel.
167  *
168  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
169  * the name to be used as a directory name on every supported OS.
170  *
171  * Returns whether the directory name is valid or not if elevel < ERROR.
172  */
173 bool
ReplicationSlotValidateName(const char * name,int elevel)174 ReplicationSlotValidateName(const char *name, int elevel)
175 {
176 	const char *cp;
177 
178 	if (strlen(name) == 0)
179 	{
180 		ereport(elevel,
181 				(errcode(ERRCODE_INVALID_NAME),
182 				 errmsg("replication slot name \"%s\" is too short",
183 						name)));
184 		return false;
185 	}
186 
187 	if (strlen(name) >= NAMEDATALEN)
188 	{
189 		ereport(elevel,
190 				(errcode(ERRCODE_NAME_TOO_LONG),
191 				 errmsg("replication slot name \"%s\" is too long",
192 						name)));
193 		return false;
194 	}
195 
196 	for (cp = name; *cp; cp++)
197 	{
198 		if (!((*cp >= 'a' && *cp <= 'z')
199 			  || (*cp >= '0' && *cp <= '9')
200 			  || (*cp == '_')))
201 		{
202 			ereport(elevel,
203 					(errcode(ERRCODE_INVALID_NAME),
204 					 errmsg("replication slot name \"%s\" contains invalid character",
205 							name),
206 					 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
207 			return false;
208 		}
209 	}
210 	return true;
211 }
212 
213 /*
214  * Create a new replication slot and mark it as used by this backend.
215  *
216  * name: Name of the slot
217  * db_specific: logical decoding is db specific; if the slot is going to
218  *	   be used for that pass true, otherwise false.
219  */
220 void
ReplicationSlotCreate(const char * name,bool db_specific,ReplicationSlotPersistency persistency)221 ReplicationSlotCreate(const char *name, bool db_specific,
222 					  ReplicationSlotPersistency persistency)
223 {
224 	ReplicationSlot *slot = NULL;
225 	int			i;
226 
227 	Assert(MyReplicationSlot == NULL);
228 
229 	ReplicationSlotValidateName(name, ERROR);
230 
231 	/*
232 	 * If some other backend ran this code concurrently with us, we'd likely
233 	 * both allocate the same slot, and that would be bad.  We'd also be at
234 	 * risk of missing a name collision.  Also, we don't want to try to create
235 	 * a new slot while somebody's busy cleaning up an old one, because we
236 	 * might both be monkeying with the same directory.
237 	 */
238 	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
239 
240 	/*
241 	 * Check for name collision, and identify an allocatable slot.  We need to
242 	 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
243 	 * else can change the in_use flags while we're looking at them.
244 	 */
245 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
246 	for (i = 0; i < max_replication_slots; i++)
247 	{
248 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
249 
250 		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
251 			ereport(ERROR,
252 					(errcode(ERRCODE_DUPLICATE_OBJECT),
253 					 errmsg("replication slot \"%s\" already exists", name)));
254 		if (!s->in_use && slot == NULL)
255 			slot = s;
256 	}
257 	LWLockRelease(ReplicationSlotControlLock);
258 
259 	/* If all slots are in use, we're out of luck. */
260 	if (slot == NULL)
261 		ereport(ERROR,
262 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
263 				 errmsg("all replication slots are in use"),
264 				 errhint("Free one or increase max_replication_slots.")));
265 
266 	/*
267 	 * Since this slot is not in use, nobody should be looking at any part of
268 	 * it other than the in_use field unless they're trying to allocate it.
269 	 * And since we hold ReplicationSlotAllocationLock, nobody except us can
270 	 * be doing that.  So it's safe to initialize the slot.
271 	 */
272 	Assert(!slot->in_use);
273 	Assert(slot->active_pid == 0);
274 
275 	/* first initialize persistent data */
276 	memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
277 	StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
278 	slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
279 	slot->data.persistency = persistency;
280 
281 	/* and then data only present in shared memory */
282 	slot->just_dirtied = false;
283 	slot->dirty = false;
284 	slot->effective_xmin = InvalidTransactionId;
285 	slot->effective_catalog_xmin = InvalidTransactionId;
286 	slot->candidate_catalog_xmin = InvalidTransactionId;
287 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
288 	slot->candidate_restart_valid = InvalidXLogRecPtr;
289 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
290 
291 	/*
292 	 * Create the slot on disk.  We haven't actually marked the slot allocated
293 	 * yet, so no special cleanup is required if this errors out.
294 	 */
295 	CreateSlotOnDisk(slot);
296 
297 	/*
298 	 * We need to briefly prevent any other backend from iterating over the
299 	 * slots while we flip the in_use flag. We also need to set the active
300 	 * flag while holding the ControlLock as otherwise a concurrent
301 	 * SlotAcquire() could acquire the slot as well.
302 	 */
303 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
304 
305 	slot->in_use = true;
306 
307 	/* We can now mark the slot active, and that makes it our slot. */
308 	SpinLockAcquire(&slot->mutex);
309 	Assert(slot->active_pid == 0);
310 	slot->active_pid = MyProcPid;
311 	SpinLockRelease(&slot->mutex);
312 	MyReplicationSlot = slot;
313 
314 	LWLockRelease(ReplicationSlotControlLock);
315 
316 	/*
317 	 * Now that the slot has been marked as in_use and active, it's safe to
318 	 * let somebody else try to allocate a slot.
319 	 */
320 	LWLockRelease(ReplicationSlotAllocationLock);
321 
322 	/* Let everybody know we've modified this slot */
323 	ConditionVariableBroadcast(&slot->active_cv);
324 }
325 
326 /*
327  * Find a previously created slot and mark it as used by this backend.
328  */
329 void
ReplicationSlotAcquire(const char * name,bool nowait)330 ReplicationSlotAcquire(const char *name, bool nowait)
331 {
332 	ReplicationSlot *slot;
333 	int			active_pid;
334 	int			i;
335 
336 retry:
337 	Assert(MyReplicationSlot == NULL);
338 
339 	/*
340 	 * Search for the named slot and mark it active if we find it.  If the
341 	 * slot is already active, we exit the loop with active_pid set to the PID
342 	 * of the backend that owns it.
343 	 */
344 	active_pid = 0;
345 	slot = NULL;
346 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
347 	for (i = 0; i < max_replication_slots; i++)
348 	{
349 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
350 
351 		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
352 		{
353 			/*
354 			 * This is the slot we want; check if it's active under some other
355 			 * process.  In single user mode, we don't need this check.
356 			 */
357 			if (IsUnderPostmaster)
358 			{
359 				/*
360 				 * Get ready to sleep on it in case it is active.  (We may end
361 				 * up not sleeping, but we don't want to do this while holding
362 				 * the spinlock.)
363 				 */
364 				ConditionVariablePrepareToSleep(&s->active_cv);
365 
366 				SpinLockAcquire(&s->mutex);
367 
368 				active_pid = s->active_pid;
369 				if (active_pid == 0)
370 					active_pid = s->active_pid = MyProcPid;
371 
372 				SpinLockRelease(&s->mutex);
373 			}
374 			else
375 				active_pid = MyProcPid;
376 			slot = s;
377 
378 			break;
379 		}
380 	}
381 	LWLockRelease(ReplicationSlotControlLock);
382 
383 	/* If we did not find the slot, error out. */
384 	if (slot == NULL)
385 		ereport(ERROR,
386 				(errcode(ERRCODE_UNDEFINED_OBJECT),
387 				 errmsg("replication slot \"%s\" does not exist", name)));
388 
389 	/*
390 	 * If we found the slot but it's already active in another backend, we
391 	 * either error out or retry after a short wait, as caller specified.
392 	 */
393 	if (active_pid != MyProcPid)
394 	{
395 		if (nowait)
396 			ereport(ERROR,
397 					(errcode(ERRCODE_OBJECT_IN_USE),
398 					 errmsg("replication slot \"%s\" is active for PID %d",
399 							name, active_pid)));
400 
401 		/* Wait here until we get signaled, and then restart */
402 		ConditionVariableSleep(&slot->active_cv,
403 							   WAIT_EVENT_REPLICATION_SLOT_DROP);
404 		ConditionVariableCancelSleep();
405 		goto retry;
406 	}
407 	else
408 		ConditionVariableCancelSleep(); /* no sleep needed after all */
409 
410 	/* Let everybody know we've modified this slot */
411 	ConditionVariableBroadcast(&slot->active_cv);
412 
413 	/* We made this slot active, so it's ours now. */
414 	MyReplicationSlot = slot;
415 }
416 
417 /*
418  * Release the replication slot that this backend considers to own.
419  *
420  * This or another backend can re-acquire the slot later.
421  * Resources this slot requires will be preserved.
422  */
423 void
ReplicationSlotRelease(void)424 ReplicationSlotRelease(void)
425 {
426 	ReplicationSlot *slot = MyReplicationSlot;
427 
428 	Assert(slot != NULL && slot->active_pid != 0);
429 
430 	if (slot->data.persistency == RS_EPHEMERAL)
431 	{
432 		/*
433 		 * Delete the slot. There is no !PANIC case where this is allowed to
434 		 * fail, all that may happen is an incomplete cleanup of the on-disk
435 		 * data.
436 		 */
437 		ReplicationSlotDropAcquired();
438 	}
439 
440 	/*
441 	 * If slot needed to temporarily restrain both data and catalog xmin to
442 	 * create the catalog snapshot, remove that temporary constraint.
443 	 * Snapshots can only be exported while the initial snapshot is still
444 	 * acquired.
445 	 */
446 	if (!TransactionIdIsValid(slot->data.xmin) &&
447 		TransactionIdIsValid(slot->effective_xmin))
448 	{
449 		SpinLockAcquire(&slot->mutex);
450 		slot->effective_xmin = InvalidTransactionId;
451 		SpinLockRelease(&slot->mutex);
452 		ReplicationSlotsComputeRequiredXmin(false);
453 	}
454 
455 	if (slot->data.persistency == RS_PERSISTENT)
456 	{
457 		/*
458 		 * Mark persistent slot inactive.  We're not freeing it, just
459 		 * disconnecting, but wake up others that may be waiting for it.
460 		 */
461 		SpinLockAcquire(&slot->mutex);
462 		slot->active_pid = 0;
463 		SpinLockRelease(&slot->mutex);
464 		ConditionVariableBroadcast(&slot->active_cv);
465 	}
466 
467 	MyReplicationSlot = NULL;
468 
469 	/* might not have been set when we've been a plain slot */
470 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
471 	MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
472 	LWLockRelease(ProcArrayLock);
473 }
474 
475 /*
476  * Cleanup all temporary slots created in current session.
477  */
478 void
ReplicationSlotCleanup(void)479 ReplicationSlotCleanup(void)
480 {
481 	int			i;
482 
483 	Assert(MyReplicationSlot == NULL);
484 
485 restart:
486 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
487 	for (i = 0; i < max_replication_slots; i++)
488 	{
489 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
490 
491 		if (!s->in_use)
492 			continue;
493 
494 		SpinLockAcquire(&s->mutex);
495 		if (s->active_pid == MyProcPid)
496 		{
497 			Assert(s->data.persistency == RS_TEMPORARY);
498 			SpinLockRelease(&s->mutex);
499 			LWLockRelease(ReplicationSlotControlLock);	/* avoid deadlock */
500 
501 			ReplicationSlotDropPtr(s);
502 
503 			ConditionVariableBroadcast(&s->active_cv);
504 			goto restart;
505 		}
506 		else
507 			SpinLockRelease(&s->mutex);
508 	}
509 
510 	LWLockRelease(ReplicationSlotControlLock);
511 }
512 
513 /*
514  * Permanently drop replication slot identified by the passed in name.
515  */
516 void
ReplicationSlotDrop(const char * name,bool nowait)517 ReplicationSlotDrop(const char *name, bool nowait)
518 {
519 	Assert(MyReplicationSlot == NULL);
520 
521 	ReplicationSlotAcquire(name, nowait);
522 
523 	ReplicationSlotDropAcquired();
524 }
525 
526 /*
527  * Permanently drop the currently acquired replication slot.
528  */
529 static void
ReplicationSlotDropAcquired(void)530 ReplicationSlotDropAcquired(void)
531 {
532 	ReplicationSlot *slot = MyReplicationSlot;
533 
534 	Assert(MyReplicationSlot != NULL);
535 
536 	/* slot isn't acquired anymore */
537 	MyReplicationSlot = NULL;
538 
539 	ReplicationSlotDropPtr(slot);
540 }
541 
542 /*
543  * Permanently drop the replication slot which will be released by the point
544  * this function returns.
545  */
546 static void
ReplicationSlotDropPtr(ReplicationSlot * slot)547 ReplicationSlotDropPtr(ReplicationSlot *slot)
548 {
549 	char		path[MAXPGPATH];
550 	char		tmppath[MAXPGPATH];
551 
552 	/*
553 	 * If some other backend ran this code concurrently with us, we might try
554 	 * to delete a slot with a certain name while someone else was trying to
555 	 * create a slot with the same name.
556 	 */
557 	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
558 
559 	/* Generate pathnames. */
560 	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
561 	sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
562 
563 	/*
564 	 * Rename the slot directory on disk, so that we'll no longer recognize
565 	 * this as a valid slot.  Note that if this fails, we've got to mark the
566 	 * slot inactive before bailing out.  If we're dropping an ephemeral or a
567 	 * temporary slot, we better never fail hard as the caller won't expect
568 	 * the slot to survive and this might get called during error handling.
569 	 */
570 	if (rename(path, tmppath) == 0)
571 	{
572 		/*
573 		 * We need to fsync() the directory we just renamed and its parent to
574 		 * make sure that our changes are on disk in a crash-safe fashion.  If
575 		 * fsync() fails, we can't be sure whether the changes are on disk or
576 		 * not.  For now, we handle that by panicking;
577 		 * StartupReplicationSlots() will try to straighten it out after
578 		 * restart.
579 		 */
580 		START_CRIT_SECTION();
581 		fsync_fname(tmppath, true);
582 		fsync_fname("pg_replslot", true);
583 		END_CRIT_SECTION();
584 	}
585 	else
586 	{
587 		bool		fail_softly = slot->data.persistency != RS_PERSISTENT;
588 
589 		SpinLockAcquire(&slot->mutex);
590 		slot->active_pid = 0;
591 		SpinLockRelease(&slot->mutex);
592 
593 		/* wake up anyone waiting on this slot */
594 		ConditionVariableBroadcast(&slot->active_cv);
595 
596 		ereport(fail_softly ? WARNING : ERROR,
597 				(errcode_for_file_access(),
598 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
599 						path, tmppath)));
600 	}
601 
602 	/*
603 	 * The slot is definitely gone.  Lock out concurrent scans of the array
604 	 * long enough to kill it.  It's OK to clear the active PID here without
605 	 * grabbing the mutex because nobody else can be scanning the array here,
606 	 * and nobody can be attached to this slot and thus access it without
607 	 * scanning the array.
608 	 *
609 	 * Also wake up processes waiting for it.
610 	 */
611 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
612 	slot->active_pid = 0;
613 	slot->in_use = false;
614 	LWLockRelease(ReplicationSlotControlLock);
615 	ConditionVariableBroadcast(&slot->active_cv);
616 
617 	/*
618 	 * Slot is dead and doesn't prevent resource removal anymore, recompute
619 	 * limits.
620 	 */
621 	ReplicationSlotsComputeRequiredXmin(false);
622 	ReplicationSlotsComputeRequiredLSN();
623 
624 	/*
625 	 * If removing the directory fails, the worst thing that will happen is
626 	 * that the user won't be able to create a new slot with the same name
627 	 * until the next server restart.  We warn about it, but that's all.
628 	 */
629 	if (!rmtree(tmppath, true))
630 		ereport(WARNING,
631 				(errcode_for_file_access(),
632 				 errmsg("could not remove directory \"%s\"", tmppath)));
633 
634 	/*
635 	 * We release this at the very end, so that nobody starts trying to create
636 	 * a slot while we're still cleaning up the detritus of the old one.
637 	 */
638 	LWLockRelease(ReplicationSlotAllocationLock);
639 }
640 
641 /*
642  * Serialize the currently acquired slot's state from memory to disk, thereby
643  * guaranteeing the current state will survive a crash.
644  */
645 void
ReplicationSlotSave(void)646 ReplicationSlotSave(void)
647 {
648 	char		path[MAXPGPATH];
649 
650 	Assert(MyReplicationSlot != NULL);
651 
652 	sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
653 	SaveSlotToPath(MyReplicationSlot, path, ERROR);
654 }
655 
656 /*
657  * Signal that it would be useful if the currently acquired slot would be
658  * flushed out to disk.
659  *
660  * Note that the actual flush to disk can be delayed for a long time, if
661  * required for correctness explicitly do a ReplicationSlotSave().
662  */
663 void
ReplicationSlotMarkDirty(void)664 ReplicationSlotMarkDirty(void)
665 {
666 	ReplicationSlot *slot = MyReplicationSlot;
667 
668 	Assert(MyReplicationSlot != NULL);
669 
670 	SpinLockAcquire(&slot->mutex);
671 	MyReplicationSlot->just_dirtied = true;
672 	MyReplicationSlot->dirty = true;
673 	SpinLockRelease(&slot->mutex);
674 }
675 
676 /*
677  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
678  * guaranteeing it will be there after an eventual crash.
679  */
680 void
ReplicationSlotPersist(void)681 ReplicationSlotPersist(void)
682 {
683 	ReplicationSlot *slot = MyReplicationSlot;
684 
685 	Assert(slot != NULL);
686 	Assert(slot->data.persistency != RS_PERSISTENT);
687 
688 	SpinLockAcquire(&slot->mutex);
689 	slot->data.persistency = RS_PERSISTENT;
690 	SpinLockRelease(&slot->mutex);
691 
692 	ReplicationSlotMarkDirty();
693 	ReplicationSlotSave();
694 }
695 
696 /*
697  * Compute the oldest xmin across all slots and store it in the ProcArray.
698  *
699  * If already_locked is true, ProcArrayLock has already been acquired
700  * exclusively.
701  */
702 void
ReplicationSlotsComputeRequiredXmin(bool already_locked)703 ReplicationSlotsComputeRequiredXmin(bool already_locked)
704 {
705 	int			i;
706 	TransactionId agg_xmin = InvalidTransactionId;
707 	TransactionId agg_catalog_xmin = InvalidTransactionId;
708 
709 	Assert(ReplicationSlotCtl != NULL);
710 
711 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
712 
713 	for (i = 0; i < max_replication_slots; i++)
714 	{
715 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
716 		TransactionId effective_xmin;
717 		TransactionId effective_catalog_xmin;
718 
719 		if (!s->in_use)
720 			continue;
721 
722 		SpinLockAcquire(&s->mutex);
723 		effective_xmin = s->effective_xmin;
724 		effective_catalog_xmin = s->effective_catalog_xmin;
725 		SpinLockRelease(&s->mutex);
726 
727 		/* check the data xmin */
728 		if (TransactionIdIsValid(effective_xmin) &&
729 			(!TransactionIdIsValid(agg_xmin) ||
730 			 TransactionIdPrecedes(effective_xmin, agg_xmin)))
731 			agg_xmin = effective_xmin;
732 
733 		/* check the catalog xmin */
734 		if (TransactionIdIsValid(effective_catalog_xmin) &&
735 			(!TransactionIdIsValid(agg_catalog_xmin) ||
736 			 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
737 			agg_catalog_xmin = effective_catalog_xmin;
738 	}
739 
740 	LWLockRelease(ReplicationSlotControlLock);
741 
742 	ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
743 }
744 
745 /*
746  * Compute the oldest restart LSN across all slots and inform xlog module.
747  */
748 void
ReplicationSlotsComputeRequiredLSN(void)749 ReplicationSlotsComputeRequiredLSN(void)
750 {
751 	int			i;
752 	XLogRecPtr	min_required = InvalidXLogRecPtr;
753 
754 	Assert(ReplicationSlotCtl != NULL);
755 
756 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
757 	for (i = 0; i < max_replication_slots; i++)
758 	{
759 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
760 		XLogRecPtr	restart_lsn;
761 
762 		if (!s->in_use)
763 			continue;
764 
765 		SpinLockAcquire(&s->mutex);
766 		restart_lsn = s->data.restart_lsn;
767 		SpinLockRelease(&s->mutex);
768 
769 		if (restart_lsn != InvalidXLogRecPtr &&
770 			(min_required == InvalidXLogRecPtr ||
771 			 restart_lsn < min_required))
772 			min_required = restart_lsn;
773 	}
774 	LWLockRelease(ReplicationSlotControlLock);
775 
776 	XLogSetReplicationSlotMinimumLSN(min_required);
777 }
778 
779 /*
780  * Compute the oldest WAL LSN required by *logical* decoding slots..
781  *
782  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
783  * slots exist.
784  *
785  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
786  * ignores physical replication slots.
787  *
788  * The results aren't required frequently, so we don't maintain a precomputed
789  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
790  */
791 XLogRecPtr
ReplicationSlotsComputeLogicalRestartLSN(void)792 ReplicationSlotsComputeLogicalRestartLSN(void)
793 {
794 	XLogRecPtr	result = InvalidXLogRecPtr;
795 	int			i;
796 
797 	if (max_replication_slots <= 0)
798 		return InvalidXLogRecPtr;
799 
800 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
801 
802 	for (i = 0; i < max_replication_slots; i++)
803 	{
804 		ReplicationSlot *s;
805 		XLogRecPtr	restart_lsn;
806 
807 		s = &ReplicationSlotCtl->replication_slots[i];
808 
809 		/* cannot change while ReplicationSlotCtlLock is held */
810 		if (!s->in_use)
811 			continue;
812 
813 		/* we're only interested in logical slots */
814 		if (!SlotIsLogical(s))
815 			continue;
816 
817 		/* read once, it's ok if it increases while we're checking */
818 		SpinLockAcquire(&s->mutex);
819 		restart_lsn = s->data.restart_lsn;
820 		SpinLockRelease(&s->mutex);
821 
822 		if (result == InvalidXLogRecPtr ||
823 			restart_lsn < result)
824 			result = restart_lsn;
825 	}
826 
827 	LWLockRelease(ReplicationSlotControlLock);
828 
829 	return result;
830 }
831 
832 /*
833  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
834  * passed database oid.
835  *
836  * Returns true if there are any slots referencing the database. *nslots will
837  * be set to the absolute number of slots in the database, *nactive to ones
838  * currently active.
839  */
840 bool
ReplicationSlotsCountDBSlots(Oid dboid,int * nslots,int * nactive)841 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
842 {
843 	int			i;
844 
845 	*nslots = *nactive = 0;
846 
847 	if (max_replication_slots <= 0)
848 		return false;
849 
850 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
851 	for (i = 0; i < max_replication_slots; i++)
852 	{
853 		ReplicationSlot *s;
854 
855 		s = &ReplicationSlotCtl->replication_slots[i];
856 
857 		/* cannot change while ReplicationSlotCtlLock is held */
858 		if (!s->in_use)
859 			continue;
860 
861 		/* only logical slots are database specific, skip */
862 		if (!SlotIsLogical(s))
863 			continue;
864 
865 		/* not our database, skip */
866 		if (s->data.database != dboid)
867 			continue;
868 
869 		/* count slots with spinlock held */
870 		SpinLockAcquire(&s->mutex);
871 		(*nslots)++;
872 		if (s->active_pid != 0)
873 			(*nactive)++;
874 		SpinLockRelease(&s->mutex);
875 	}
876 	LWLockRelease(ReplicationSlotControlLock);
877 
878 	if (*nslots > 0)
879 		return true;
880 	return false;
881 }
882 
883 /*
884  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
885  * passed database oid. The caller should hold an exclusive lock on the
886  * pg_database oid for the database to prevent creation of new slots on the db
887  * or replay from existing slots.
888  *
889  * Another session that concurrently acquires an existing slot on the target DB
890  * (most likely to drop it) may cause this function to ERROR. If that happens
891  * it may have dropped some but not all slots.
892  *
893  * This routine isn't as efficient as it could be - but we don't drop
894  * databases often, especially databases with lots of slots.
895  */
896 void
ReplicationSlotsDropDBSlots(Oid dboid)897 ReplicationSlotsDropDBSlots(Oid dboid)
898 {
899 	int			i;
900 
901 	if (max_replication_slots <= 0)
902 		return;
903 
904 restart:
905 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
906 	for (i = 0; i < max_replication_slots; i++)
907 	{
908 		ReplicationSlot *s;
909 		char	   *slotname;
910 		int			active_pid;
911 
912 		s = &ReplicationSlotCtl->replication_slots[i];
913 
914 		/* cannot change while ReplicationSlotCtlLock is held */
915 		if (!s->in_use)
916 			continue;
917 
918 		/* only logical slots are database specific, skip */
919 		if (!SlotIsLogical(s))
920 			continue;
921 
922 		/* not our database, skip */
923 		if (s->data.database != dboid)
924 			continue;
925 
926 		/* acquire slot, so ReplicationSlotDropAcquired can be reused  */
927 		SpinLockAcquire(&s->mutex);
928 		/* can't change while ReplicationSlotControlLock is held */
929 		slotname = NameStr(s->data.name);
930 		active_pid = s->active_pid;
931 		if (active_pid == 0)
932 		{
933 			MyReplicationSlot = s;
934 			s->active_pid = MyProcPid;
935 		}
936 		SpinLockRelease(&s->mutex);
937 
938 		/*
939 		 * Even though we hold an exclusive lock on the database object a
940 		 * logical slot for that DB can still be active, e.g. if it's
941 		 * concurrently being dropped by a backend connected to another DB.
942 		 *
943 		 * That's fairly unlikely in practice, so we'll just bail out.
944 		 */
945 		if (active_pid)
946 			ereport(ERROR,
947 					(errcode(ERRCODE_OBJECT_IN_USE),
948 					 errmsg("replication slot \"%s\" is active for PID %d",
949 							slotname, active_pid)));
950 
951 		/*
952 		 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
953 		 * holding ReplicationSlotControlLock over filesystem operations,
954 		 * release ReplicationSlotControlLock and use
955 		 * ReplicationSlotDropAcquired.
956 		 *
957 		 * As that means the set of slots could change, restart scan from the
958 		 * beginning each time we release the lock.
959 		 */
960 		LWLockRelease(ReplicationSlotControlLock);
961 		ReplicationSlotDropAcquired();
962 		goto restart;
963 	}
964 	LWLockRelease(ReplicationSlotControlLock);
965 }
966 
967 
968 /*
969  * Check whether the server's configuration supports using replication
970  * slots.
971  */
972 void
CheckSlotRequirements(void)973 CheckSlotRequirements(void)
974 {
975 	/*
976 	 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
977 	 * needs the same check.
978 	 */
979 
980 	if (max_replication_slots == 0)
981 		ereport(ERROR,
982 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
983 				 (errmsg("replication slots can only be used if max_replication_slots > 0"))));
984 
985 	if (wal_level < WAL_LEVEL_REPLICA)
986 		ereport(ERROR,
987 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
988 				 errmsg("replication slots can only be used if wal_level >= replica")));
989 }
990 
991 /*
992  * Reserve WAL for the currently active slot.
993  *
994  * Compute and set restart_lsn in a manner that's appropriate for the type of
995  * the slot and concurrency safe.
996  */
997 void
ReplicationSlotReserveWal(void)998 ReplicationSlotReserveWal(void)
999 {
1000 	ReplicationSlot *slot = MyReplicationSlot;
1001 
1002 	Assert(slot != NULL);
1003 	Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1004 
1005 	/*
1006 	 * The replication slot mechanism is used to prevent removal of required
1007 	 * WAL. As there is no interlock between this routine and checkpoints, WAL
1008 	 * segments could concurrently be removed when a now stale return value of
1009 	 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1010 	 * this happens we'll just retry.
1011 	 */
1012 	while (true)
1013 	{
1014 		XLogSegNo	segno;
1015 		XLogRecPtr	restart_lsn;
1016 
1017 		/*
1018 		 * For logical slots log a standby snapshot and start logical decoding
1019 		 * at exactly that position. That allows the slot to start up more
1020 		 * quickly.
1021 		 *
1022 		 * That's not needed (or indeed helpful) for physical slots as they'll
1023 		 * start replay at the last logged checkpoint anyway. Instead return
1024 		 * the location of the last redo LSN. While that slightly increases
1025 		 * the chance that we have to retry, it's where a base backup has to
1026 		 * start replay at.
1027 		 */
1028 		if (!RecoveryInProgress() && SlotIsLogical(slot))
1029 		{
1030 			XLogRecPtr	flushptr;
1031 
1032 			/* start at current insert position */
1033 			restart_lsn = GetXLogInsertRecPtr();
1034 			SpinLockAcquire(&slot->mutex);
1035 			slot->data.restart_lsn = restart_lsn;
1036 			SpinLockRelease(&slot->mutex);
1037 
1038 			/* make sure we have enough information to start */
1039 			flushptr = LogStandbySnapshot();
1040 
1041 			/* and make sure it's fsynced to disk */
1042 			XLogFlush(flushptr);
1043 		}
1044 		else
1045 		{
1046 			restart_lsn = GetRedoRecPtr();
1047 			SpinLockAcquire(&slot->mutex);
1048 			slot->data.restart_lsn = restart_lsn;
1049 			SpinLockRelease(&slot->mutex);
1050 		}
1051 
1052 		/* prevent WAL removal as fast as possible */
1053 		ReplicationSlotsComputeRequiredLSN();
1054 
1055 		/*
1056 		 * If all required WAL is still there, great, otherwise retry. The
1057 		 * slot should prevent further removal of WAL, unless there's a
1058 		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1059 		 * the new restart_lsn above, so normally we should never need to loop
1060 		 * more than twice.
1061 		 */
1062 		XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1063 		if (XLogGetLastRemovedSegno() < segno)
1064 			break;
1065 	}
1066 }
1067 
1068 /*
1069  * Flush all replication slots to disk.
1070  *
1071  * This needn't actually be part of a checkpoint, but it's a convenient
1072  * location.
1073  */
1074 void
CheckPointReplicationSlots(void)1075 CheckPointReplicationSlots(void)
1076 {
1077 	int			i;
1078 
1079 	elog(DEBUG1, "performing replication slot checkpoint");
1080 
1081 	/*
1082 	 * Prevent any slot from being created/dropped while we're active. As we
1083 	 * explicitly do *not* want to block iterating over replication_slots or
1084 	 * acquiring a slot we cannot take the control lock - but that's OK,
1085 	 * because holding ReplicationSlotAllocationLock is strictly stronger, and
1086 	 * enough to guarantee that nobody can change the in_use bits on us.
1087 	 */
1088 	LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1089 
1090 	for (i = 0; i < max_replication_slots; i++)
1091 	{
1092 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1093 		char		path[MAXPGPATH];
1094 
1095 		if (!s->in_use)
1096 			continue;
1097 
1098 		/* save the slot to disk, locking is handled in SaveSlotToPath() */
1099 		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1100 		SaveSlotToPath(s, path, LOG);
1101 	}
1102 	LWLockRelease(ReplicationSlotAllocationLock);
1103 }
1104 
1105 /*
1106  * Load all replication slots from disk into memory at server startup. This
1107  * needs to be run before we start crash recovery.
1108  */
1109 void
StartupReplicationSlots(void)1110 StartupReplicationSlots(void)
1111 {
1112 	DIR		   *replication_dir;
1113 	struct dirent *replication_de;
1114 
1115 	elog(DEBUG1, "starting up replication slots");
1116 
1117 	/* restore all slots by iterating over all on-disk entries */
1118 	replication_dir = AllocateDir("pg_replslot");
1119 	while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1120 	{
1121 		struct stat statbuf;
1122 		char		path[MAXPGPATH + 12];
1123 
1124 		if (strcmp(replication_de->d_name, ".") == 0 ||
1125 			strcmp(replication_de->d_name, "..") == 0)
1126 			continue;
1127 
1128 		snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1129 
1130 		/* we're only creating directories here, skip if it's not our's */
1131 		if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
1132 			continue;
1133 
1134 		/* we crashed while a slot was being setup or deleted, clean up */
1135 		if (pg_str_endswith(replication_de->d_name, ".tmp"))
1136 		{
1137 			if (!rmtree(path, true))
1138 			{
1139 				ereport(WARNING,
1140 						(errcode_for_file_access(),
1141 						 errmsg("could not remove directory \"%s\"", path)));
1142 				continue;
1143 			}
1144 			fsync_fname("pg_replslot", true);
1145 			continue;
1146 		}
1147 
1148 		/* looks like a slot in a normal state, restore */
1149 		RestoreSlotFromDisk(replication_de->d_name);
1150 	}
1151 	FreeDir(replication_dir);
1152 
1153 	/* currently no slots exist, we're done. */
1154 	if (max_replication_slots <= 0)
1155 		return;
1156 
1157 	/* Now that we have recovered all the data, compute replication xmin */
1158 	ReplicationSlotsComputeRequiredXmin(false);
1159 	ReplicationSlotsComputeRequiredLSN();
1160 }
1161 
1162 /* ----
1163  * Manipulation of on-disk state of replication slots
1164  *
1165  * NB: none of the routines below should take any notice whether a slot is the
1166  * current one or not, that's all handled a layer above.
1167  * ----
1168  */
1169 static void
CreateSlotOnDisk(ReplicationSlot * slot)1170 CreateSlotOnDisk(ReplicationSlot *slot)
1171 {
1172 	char		tmppath[MAXPGPATH];
1173 	char		path[MAXPGPATH];
1174 	struct stat st;
1175 
1176 	/*
1177 	 * No need to take out the io_in_progress_lock, nobody else can see this
1178 	 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1179 	 * takes out the lock, if we'd take the lock here, we'd deadlock.
1180 	 */
1181 
1182 	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1183 	sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1184 
1185 	/*
1186 	 * It's just barely possible that some previous effort to create or drop a
1187 	 * slot with this name left a temp directory lying around. If that seems
1188 	 * to be the case, try to remove it.  If the rmtree() fails, we'll error
1189 	 * out at the MakePGDirectory() below, so we don't bother checking
1190 	 * success.
1191 	 */
1192 	if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1193 		rmtree(tmppath, true);
1194 
1195 	/* Create and fsync the temporary slot directory. */
1196 	if (MakePGDirectory(tmppath) < 0)
1197 		ereport(ERROR,
1198 				(errcode_for_file_access(),
1199 				 errmsg("could not create directory \"%s\": %m",
1200 						tmppath)));
1201 	fsync_fname(tmppath, true);
1202 
1203 	/* Write the actual state file. */
1204 	slot->dirty = true;			/* signal that we really need to write */
1205 	SaveSlotToPath(slot, tmppath, ERROR);
1206 
1207 	/* Rename the directory into place. */
1208 	if (rename(tmppath, path) != 0)
1209 		ereport(ERROR,
1210 				(errcode_for_file_access(),
1211 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
1212 						tmppath, path)));
1213 
1214 	/*
1215 	 * If we'd now fail - really unlikely - we wouldn't know whether this slot
1216 	 * would persist after an OS crash or not - so, force a restart. The
1217 	 * restart would try to fsync this again till it works.
1218 	 */
1219 	START_CRIT_SECTION();
1220 
1221 	fsync_fname(path, true);
1222 	fsync_fname("pg_replslot", true);
1223 
1224 	END_CRIT_SECTION();
1225 }
1226 
1227 /*
1228  * Shared functionality between saving and creating a replication slot.
1229  */
1230 static void
SaveSlotToPath(ReplicationSlot * slot,const char * dir,int elevel)1231 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1232 {
1233 	char		tmppath[MAXPGPATH];
1234 	char		path[MAXPGPATH];
1235 	int			fd;
1236 	ReplicationSlotOnDisk cp;
1237 	bool		was_dirty;
1238 
1239 	/* first check whether there's something to write out */
1240 	SpinLockAcquire(&slot->mutex);
1241 	was_dirty = slot->dirty;
1242 	slot->just_dirtied = false;
1243 	SpinLockRelease(&slot->mutex);
1244 
1245 	/* and don't do anything if there's nothing to write */
1246 	if (!was_dirty)
1247 		return;
1248 
1249 	LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1250 
1251 	/* silence valgrind :( */
1252 	memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1253 
1254 	sprintf(tmppath, "%s/state.tmp", dir);
1255 	sprintf(path, "%s/state", dir);
1256 
1257 	fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1258 	if (fd < 0)
1259 	{
1260 		/*
1261 		 * If not an ERROR, then release the lock before returning.  In case
1262 		 * of an ERROR, the error recovery path automatically releases the
1263 		 * lock, but no harm in explicitly releasing even in that case.  Note
1264 		 * that LWLockRelease() could affect errno.
1265 		 */
1266 		int			save_errno = errno;
1267 
1268 		LWLockRelease(&slot->io_in_progress_lock);
1269 		errno = save_errno;
1270 		ereport(elevel,
1271 				(errcode_for_file_access(),
1272 				 errmsg("could not create file \"%s\": %m",
1273 						tmppath)));
1274 		return;
1275 	}
1276 
1277 	cp.magic = SLOT_MAGIC;
1278 	INIT_CRC32C(cp.checksum);
1279 	cp.version = SLOT_VERSION;
1280 	cp.length = ReplicationSlotOnDiskV2Size;
1281 
1282 	SpinLockAcquire(&slot->mutex);
1283 
1284 	memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1285 
1286 	SpinLockRelease(&slot->mutex);
1287 
1288 	COMP_CRC32C(cp.checksum,
1289 				(char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1290 				SnapBuildOnDiskChecksummedSize);
1291 	FIN_CRC32C(cp.checksum);
1292 
1293 	errno = 0;
1294 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
1295 	if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1296 	{
1297 		int			save_errno = errno;
1298 
1299 		pgstat_report_wait_end();
1300 		CloseTransientFile(fd);
1301 		LWLockRelease(&slot->io_in_progress_lock);
1302 
1303 		/* if write didn't set errno, assume problem is no disk space */
1304 		errno = save_errno ? save_errno : ENOSPC;
1305 		ereport(elevel,
1306 				(errcode_for_file_access(),
1307 				 errmsg("could not write to file \"%s\": %m",
1308 						tmppath)));
1309 		return;
1310 	}
1311 	pgstat_report_wait_end();
1312 
1313 	/* fsync the temporary file */
1314 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
1315 	if (pg_fsync(fd) != 0)
1316 	{
1317 		int			save_errno = errno;
1318 
1319 		pgstat_report_wait_end();
1320 		CloseTransientFile(fd);
1321 		LWLockRelease(&slot->io_in_progress_lock);
1322 		errno = save_errno;
1323 		ereport(elevel,
1324 				(errcode_for_file_access(),
1325 				 errmsg("could not fsync file \"%s\": %m",
1326 						tmppath)));
1327 		return;
1328 	}
1329 	pgstat_report_wait_end();
1330 
1331 	CloseTransientFile(fd);
1332 
1333 	/* rename to permanent file, fsync file and directory */
1334 	if (rename(tmppath, path) != 0)
1335 	{
1336 		int			save_errno = errno;
1337 
1338 		LWLockRelease(&slot->io_in_progress_lock);
1339 		errno = save_errno;
1340 		ereport(elevel,
1341 				(errcode_for_file_access(),
1342 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
1343 						tmppath, path)));
1344 		return;
1345 	}
1346 
1347 	/* Check CreateSlot() for the reasoning of using a crit. section. */
1348 	START_CRIT_SECTION();
1349 
1350 	fsync_fname(path, false);
1351 	fsync_fname(dir, true);
1352 	fsync_fname("pg_replslot", true);
1353 
1354 	END_CRIT_SECTION();
1355 
1356 	/*
1357 	 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1358 	 * already.
1359 	 */
1360 	SpinLockAcquire(&slot->mutex);
1361 	if (!slot->just_dirtied)
1362 		slot->dirty = false;
1363 	SpinLockRelease(&slot->mutex);
1364 
1365 	LWLockRelease(&slot->io_in_progress_lock);
1366 }
1367 
1368 /*
1369  * Load a single slot from disk into memory.
1370  */
1371 static void
RestoreSlotFromDisk(const char * name)1372 RestoreSlotFromDisk(const char *name)
1373 {
1374 	ReplicationSlotOnDisk cp;
1375 	int			i;
1376 	char		slotdir[MAXPGPATH + 12];
1377 	char		path[MAXPGPATH + 22];
1378 	int			fd;
1379 	bool		restored = false;
1380 	int			readBytes;
1381 	pg_crc32c	checksum;
1382 
1383 	/* no need to lock here, no concurrent access allowed yet */
1384 
1385 	/* delete temp file if it exists */
1386 	sprintf(slotdir, "pg_replslot/%s", name);
1387 	sprintf(path, "%s/state.tmp", slotdir);
1388 	if (unlink(path) < 0 && errno != ENOENT)
1389 		ereport(PANIC,
1390 				(errcode_for_file_access(),
1391 				 errmsg("could not remove file \"%s\": %m", path)));
1392 
1393 	sprintf(path, "%s/state", slotdir);
1394 
1395 	elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1396 
1397 	fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1398 
1399 	/*
1400 	 * We do not need to handle this as we are rename()ing the directory into
1401 	 * place only after we fsync()ed the state file.
1402 	 */
1403 	if (fd < 0)
1404 		ereport(PANIC,
1405 				(errcode_for_file_access(),
1406 				 errmsg("could not open file \"%s\": %m", path)));
1407 
1408 	/*
1409 	 * Sync state file before we're reading from it. We might have crashed
1410 	 * while it wasn't synced yet and we shouldn't continue on that basis.
1411 	 */
1412 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
1413 	if (pg_fsync(fd) != 0)
1414 	{
1415 		int			save_errno = errno;
1416 
1417 		CloseTransientFile(fd);
1418 		errno = save_errno;
1419 		ereport(PANIC,
1420 				(errcode_for_file_access(),
1421 				 errmsg("could not fsync file \"%s\": %m",
1422 						path)));
1423 	}
1424 	pgstat_report_wait_end();
1425 
1426 	/* Also sync the parent directory */
1427 	START_CRIT_SECTION();
1428 	fsync_fname(slotdir, true);
1429 	END_CRIT_SECTION();
1430 
1431 	/* read part of statefile that's guaranteed to be version independent */
1432 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1433 	readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1434 	pgstat_report_wait_end();
1435 	if (readBytes != ReplicationSlotOnDiskConstantSize)
1436 	{
1437 		int			saved_errno = errno;
1438 
1439 		CloseTransientFile(fd);
1440 		errno = saved_errno;
1441 		ereport(PANIC,
1442 				(errcode_for_file_access(),
1443 				 errmsg("could not read file \"%s\", read %d of %u: %m",
1444 						path, readBytes,
1445 						(uint32) ReplicationSlotOnDiskConstantSize)));
1446 	}
1447 
1448 	/* verify magic */
1449 	if (cp.magic != SLOT_MAGIC)
1450 		ereport(PANIC,
1451 				(errcode_for_file_access(),
1452 				 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1453 						path, cp.magic, SLOT_MAGIC)));
1454 
1455 	/* verify version */
1456 	if (cp.version != SLOT_VERSION)
1457 		ereport(PANIC,
1458 				(errcode_for_file_access(),
1459 				 errmsg("replication slot file \"%s\" has unsupported version %u",
1460 						path, cp.version)));
1461 
1462 	/* boundary check on length */
1463 	if (cp.length != ReplicationSlotOnDiskV2Size)
1464 		ereport(PANIC,
1465 				(errcode_for_file_access(),
1466 				 errmsg("replication slot file \"%s\" has corrupted length %u",
1467 						path, cp.length)));
1468 
1469 	/* Now that we know the size, read the entire file */
1470 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1471 	readBytes = read(fd,
1472 					 (char *) &cp + ReplicationSlotOnDiskConstantSize,
1473 					 cp.length);
1474 	pgstat_report_wait_end();
1475 	if (readBytes != cp.length)
1476 	{
1477 		int			saved_errno = errno;
1478 
1479 		CloseTransientFile(fd);
1480 		errno = saved_errno;
1481 		ereport(PANIC,
1482 				(errcode_for_file_access(),
1483 				 errmsg("could not read file \"%s\", read %d of %u: %m",
1484 						path, readBytes, cp.length)));
1485 	}
1486 
1487 	CloseTransientFile(fd);
1488 
1489 	/* now verify the CRC */
1490 	INIT_CRC32C(checksum);
1491 	COMP_CRC32C(checksum,
1492 				(char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1493 				SnapBuildOnDiskChecksummedSize);
1494 	FIN_CRC32C(checksum);
1495 
1496 	if (!EQ_CRC32C(checksum, cp.checksum))
1497 		ereport(PANIC,
1498 				(errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1499 						path, checksum, cp.checksum)));
1500 
1501 	/*
1502 	 * If we crashed with an ephemeral slot active, don't restore but delete
1503 	 * it.
1504 	 */
1505 	if (cp.slotdata.persistency != RS_PERSISTENT)
1506 	{
1507 		if (!rmtree(slotdir, true))
1508 		{
1509 			ereport(WARNING,
1510 					(errcode_for_file_access(),
1511 					 errmsg("could not remove directory \"%s\"", slotdir)));
1512 		}
1513 		fsync_fname("pg_replslot", true);
1514 		return;
1515 	}
1516 
1517 	/*
1518 	 * Verify that requirements for the specific slot type are met. That's
1519 	 * important because if these aren't met we're not guaranteed to retain
1520 	 * all the necessary resources for the slot.
1521 	 *
1522 	 * NB: We have to do so *after* the above checks for ephemeral slots,
1523 	 * because otherwise a slot that shouldn't exist anymore could prevent
1524 	 * restarts.
1525 	 *
1526 	 * NB: Changing the requirements here also requires adapting
1527 	 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
1528 	 */
1529 	if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
1530 		ereport(FATAL,
1531 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1532 				 errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
1533 						NameStr(cp.slotdata.name)),
1534 				 errhint("Change wal_level to be logical or higher.")));
1535 	else if (wal_level < WAL_LEVEL_REPLICA)
1536 		ereport(FATAL,
1537 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1538 				 errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
1539 						NameStr(cp.slotdata.name)),
1540 				 errhint("Change wal_level to be replica or higher.")));
1541 
1542 	/* nothing can be active yet, don't lock anything */
1543 	for (i = 0; i < max_replication_slots; i++)
1544 	{
1545 		ReplicationSlot *slot;
1546 
1547 		slot = &ReplicationSlotCtl->replication_slots[i];
1548 
1549 		if (slot->in_use)
1550 			continue;
1551 
1552 		/* restore the entire set of persistent data */
1553 		memcpy(&slot->data, &cp.slotdata,
1554 			   sizeof(ReplicationSlotPersistentData));
1555 
1556 		/* initialize in memory state */
1557 		slot->effective_xmin = cp.slotdata.xmin;
1558 		slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1559 
1560 		slot->candidate_catalog_xmin = InvalidTransactionId;
1561 		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1562 		slot->candidate_restart_lsn = InvalidXLogRecPtr;
1563 		slot->candidate_restart_valid = InvalidXLogRecPtr;
1564 
1565 		slot->in_use = true;
1566 		slot->active_pid = 0;
1567 
1568 		restored = true;
1569 		break;
1570 	}
1571 
1572 	if (!restored)
1573 		ereport(PANIC,
1574 				(errmsg("too many replication slots active before shutdown"),
1575 				 errhint("Increase max_replication_slots and try again.")));
1576 }
1577