1 /*-------------------------------------------------------------------------
2  *
3  * slot.c
4  *	   Replication slot management.
5  *
6  *
7  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/replication/slot.c
12  *
13  * NOTES
14  *
15  * Replication slots are used to keep state about replication streams
16  * originating from this cluster.  Their primary purpose is to prevent the
17  * premature removal of WAL or of old tuple versions in a manner that would
18  * interfere with replication; they are also useful for monitoring purposes.
19  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20  * on standbys (to support cascading setups).  The requirement that slots be
21  * usable on standbys precludes storing them in the system catalogs.
22  *
23  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24  * directory. Inside that directory the state file will contain the slot's
25  * own data. Additional data can be stored alongside that file if required.
26  * While the server is running, the state data is also cached in memory for
27  * efficiency.
28  *
29  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31  * to iterate over the slots, and in exclusive mode to change the in_use flag
32  * of a slot.  The remaining data in each slot is protected by its mutex.
33  *
34  *-------------------------------------------------------------------------
35  */
36 
37 #include "postgres.h"
38 
39 #include <unistd.h>
40 #include <sys/stat.h>
41 
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "common/string.h"
45 #include "miscadmin.h"
46 #include "pgstat.h"
47 #include "replication/slot.h"
48 #include "storage/fd.h"
49 #include "storage/proc.h"
50 #include "storage/procarray.h"
51 #include "utils/builtins.h"
52 
53 /*
54  * Replication slot on-disk data structure.
55  */
56 typedef struct ReplicationSlotOnDisk
57 {
58 	/* first part of this struct needs to be version independent */
59 
60 	/* data not covered by checksum */
61 	uint32		magic;
62 	pg_crc32c	checksum;
63 
64 	/* data covered by checksum */
65 	uint32		version;
66 	uint32		length;
67 
68 	/*
69 	 * The actual data in the slot that follows can differ based on the above
70 	 * 'version'.
71 	 */
72 
73 	ReplicationSlotPersistentData slotdata;
74 } ReplicationSlotOnDisk;
75 
76 /* size of version independent data */
77 #define ReplicationSlotOnDiskConstantSize \
78 	offsetof(ReplicationSlotOnDisk, slotdata)
79 /* size of the part of the slot not covered by the checksum */
80 #define SnapBuildOnDiskNotChecksummedSize \
81 	offsetof(ReplicationSlotOnDisk, version)
82 /* size of the part covered by the checksum */
83 #define SnapBuildOnDiskChecksummedSize \
84 	sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
85 /* size of the slot data that is version dependent */
86 #define ReplicationSlotOnDiskV2Size \
87 	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
88 
89 #define SLOT_MAGIC		0x1051CA1	/* format identifier */
90 #define SLOT_VERSION	2		/* version for new files */
91 
92 /* Control array for replication slot management */
93 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
94 
95 /* My backend's replication slot in the shared memory array */
96 ReplicationSlot *MyReplicationSlot = NULL;
97 
98 /* GUCs */
99 int			max_replication_slots = 0;	/* the maximum number of replication
100 										 * slots */
101 
102 static void ReplicationSlotDropAcquired(void);
103 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
104 
105 /* internal persistency functions */
106 static void RestoreSlotFromDisk(const char *name);
107 static void CreateSlotOnDisk(ReplicationSlot *slot);
108 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
109 
110 /*
111  * Report shared-memory space needed by ReplicationSlotShmemInit.
112  */
113 Size
ReplicationSlotsShmemSize(void)114 ReplicationSlotsShmemSize(void)
115 {
116 	Size		size = 0;
117 
118 	if (max_replication_slots == 0)
119 		return size;
120 
121 	size = offsetof(ReplicationSlotCtlData, replication_slots);
122 	size = add_size(size,
123 					mul_size(max_replication_slots, sizeof(ReplicationSlot)));
124 
125 	return size;
126 }
127 
128 /*
129  * Allocate and initialize shared memory for replication slots.
130  */
131 void
ReplicationSlotsShmemInit(void)132 ReplicationSlotsShmemInit(void)
133 {
134 	bool		found;
135 
136 	if (max_replication_slots == 0)
137 		return;
138 
139 	ReplicationSlotCtl = (ReplicationSlotCtlData *)
140 		ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
141 						&found);
142 
143 	LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
144 						  "replication_slot_io");
145 
146 	if (!found)
147 	{
148 		int			i;
149 
150 		/* First time through, so initialize */
151 		MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
152 
153 		for (i = 0; i < max_replication_slots; i++)
154 		{
155 			ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
156 
157 			/* everything else is zeroed by the memset above */
158 			SpinLockInit(&slot->mutex);
159 			LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
160 			ConditionVariableInit(&slot->active_cv);
161 		}
162 	}
163 }
164 
165 /*
166  * Check whether the passed slot name is valid and report errors at elevel.
167  *
168  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
169  * the name to be used as a directory name on every supported OS.
170  *
171  * Returns whether the directory name is valid or not if elevel < ERROR.
172  */
173 bool
ReplicationSlotValidateName(const char * name,int elevel)174 ReplicationSlotValidateName(const char *name, int elevel)
175 {
176 	const char *cp;
177 
178 	if (strlen(name) == 0)
179 	{
180 		ereport(elevel,
181 				(errcode(ERRCODE_INVALID_NAME),
182 				 errmsg("replication slot name \"%s\" is too short",
183 						name)));
184 		return false;
185 	}
186 
187 	if (strlen(name) >= NAMEDATALEN)
188 	{
189 		ereport(elevel,
190 				(errcode(ERRCODE_NAME_TOO_LONG),
191 				 errmsg("replication slot name \"%s\" is too long",
192 						name)));
193 		return false;
194 	}
195 
196 	for (cp = name; *cp; cp++)
197 	{
198 		if (!((*cp >= 'a' && *cp <= 'z')
199 			  || (*cp >= '0' && *cp <= '9')
200 			  || (*cp == '_')))
201 		{
202 			ereport(elevel,
203 					(errcode(ERRCODE_INVALID_NAME),
204 					 errmsg("replication slot name \"%s\" contains invalid character",
205 							name),
206 					 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
207 			return false;
208 		}
209 	}
210 	return true;
211 }
212 
213 /*
214  * Create a new replication slot and mark it as used by this backend.
215  *
216  * name: Name of the slot
217  * db_specific: logical decoding is db specific; if the slot is going to
218  *	   be used for that pass true, otherwise false.
219  */
220 void
ReplicationSlotCreate(const char * name,bool db_specific,ReplicationSlotPersistency persistency)221 ReplicationSlotCreate(const char *name, bool db_specific,
222 					  ReplicationSlotPersistency persistency)
223 {
224 	ReplicationSlot *slot = NULL;
225 	int			i;
226 
227 	Assert(MyReplicationSlot == NULL);
228 
229 	ReplicationSlotValidateName(name, ERROR);
230 
231 	/*
232 	 * If some other backend ran this code concurrently with us, we'd likely
233 	 * both allocate the same slot, and that would be bad.  We'd also be at
234 	 * risk of missing a name collision.  Also, we don't want to try to create
235 	 * a new slot while somebody's busy cleaning up an old one, because we
236 	 * might both be monkeying with the same directory.
237 	 */
238 	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
239 
240 	/*
241 	 * Check for name collision, and identify an allocatable slot.  We need to
242 	 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
243 	 * else can change the in_use flags while we're looking at them.
244 	 */
245 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
246 	for (i = 0; i < max_replication_slots; i++)
247 	{
248 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
249 
250 		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
251 			ereport(ERROR,
252 					(errcode(ERRCODE_DUPLICATE_OBJECT),
253 					 errmsg("replication slot \"%s\" already exists", name)));
254 		if (!s->in_use && slot == NULL)
255 			slot = s;
256 	}
257 	LWLockRelease(ReplicationSlotControlLock);
258 
259 	/* If all slots are in use, we're out of luck. */
260 	if (slot == NULL)
261 		ereport(ERROR,
262 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
263 				 errmsg("all replication slots are in use"),
264 				 errhint("Free one or increase max_replication_slots.")));
265 
266 	/*
267 	 * Since this slot is not in use, nobody should be looking at any part of
268 	 * it other than the in_use field unless they're trying to allocate it.
269 	 * And since we hold ReplicationSlotAllocationLock, nobody except us can
270 	 * be doing that.  So it's safe to initialize the slot.
271 	 */
272 	Assert(!slot->in_use);
273 	Assert(slot->active_pid == 0);
274 
275 	/* first initialize persistent data */
276 	memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
277 	StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
278 	slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
279 	slot->data.persistency = persistency;
280 
281 	/* and then data only present in shared memory */
282 	slot->just_dirtied = false;
283 	slot->dirty = false;
284 	slot->effective_xmin = InvalidTransactionId;
285 	slot->effective_catalog_xmin = InvalidTransactionId;
286 	slot->candidate_catalog_xmin = InvalidTransactionId;
287 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
288 	slot->candidate_restart_valid = InvalidXLogRecPtr;
289 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
290 
291 	/*
292 	 * Create the slot on disk.  We haven't actually marked the slot allocated
293 	 * yet, so no special cleanup is required if this errors out.
294 	 */
295 	CreateSlotOnDisk(slot);
296 
297 	/*
298 	 * We need to briefly prevent any other backend from iterating over the
299 	 * slots while we flip the in_use flag. We also need to set the active
300 	 * flag while holding the ControlLock as otherwise a concurrent
301 	 * SlotAcquire() could acquire the slot as well.
302 	 */
303 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
304 
305 	slot->in_use = true;
306 
307 	/* We can now mark the slot active, and that makes it our slot. */
308 	SpinLockAcquire(&slot->mutex);
309 	Assert(slot->active_pid == 0);
310 	slot->active_pid = MyProcPid;
311 	SpinLockRelease(&slot->mutex);
312 	MyReplicationSlot = slot;
313 
314 	LWLockRelease(ReplicationSlotControlLock);
315 
316 	/*
317 	 * Now that the slot has been marked as in_use and active, it's safe to
318 	 * let somebody else try to allocate a slot.
319 	 */
320 	LWLockRelease(ReplicationSlotAllocationLock);
321 
322 	/* Let everybody know we've modified this slot */
323 	ConditionVariableBroadcast(&slot->active_cv);
324 }
325 
326 /*
327  * Find a previously created slot and mark it as used by this backend.
328  */
329 void
ReplicationSlotAcquire(const char * name,bool nowait)330 ReplicationSlotAcquire(const char *name, bool nowait)
331 {
332 	ReplicationSlot *slot;
333 	int			active_pid;
334 	int			i;
335 
336 retry:
337 	Assert(MyReplicationSlot == NULL);
338 
339 	/*
340 	 * Search for the named slot and mark it active if we find it.  If the
341 	 * slot is already active, we exit the loop with active_pid set to the PID
342 	 * of the backend that owns it.
343 	 */
344 	active_pid = 0;
345 	slot = NULL;
346 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
347 	for (i = 0; i < max_replication_slots; i++)
348 	{
349 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
350 
351 		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
352 		{
353 			/*
354 			 * This is the slot we want; check if it's active under some other
355 			 * process.  In single user mode, we don't need this check.
356 			 */
357 			if (IsUnderPostmaster)
358 			{
359 				/*
360 				 * Get ready to sleep on it in case it is active.  (We may end
361 				 * up not sleeping, but we don't want to do this while holding
362 				 * the spinlock.)
363 				 */
364 				ConditionVariablePrepareToSleep(&s->active_cv);
365 
366 				SpinLockAcquire(&s->mutex);
367 
368 				active_pid = s->active_pid;
369 				if (active_pid == 0)
370 					active_pid = s->active_pid = MyProcPid;
371 
372 				SpinLockRelease(&s->mutex);
373 			}
374 			else
375 				active_pid = MyProcPid;
376 			slot = s;
377 
378 			break;
379 		}
380 	}
381 	LWLockRelease(ReplicationSlotControlLock);
382 
383 	/* If we did not find the slot, error out. */
384 	if (slot == NULL)
385 		ereport(ERROR,
386 				(errcode(ERRCODE_UNDEFINED_OBJECT),
387 				 errmsg("replication slot \"%s\" does not exist", name)));
388 
389 	/*
390 	 * If we found the slot but it's already active in another backend, we
391 	 * either error out or retry after a short wait, as caller specified.
392 	 */
393 	if (active_pid != MyProcPid)
394 	{
395 		if (nowait)
396 			ereport(ERROR,
397 					(errcode(ERRCODE_OBJECT_IN_USE),
398 					 errmsg("replication slot \"%s\" is active for PID %d",
399 							name, active_pid)));
400 
401 		/* Wait here until we get signaled, and then restart */
402 		ConditionVariableSleep(&slot->active_cv,
403 							   WAIT_EVENT_REPLICATION_SLOT_DROP);
404 		ConditionVariableCancelSleep();
405 		goto retry;
406 	}
407 	else
408 		ConditionVariableCancelSleep(); /* no sleep needed after all */
409 
410 	/* Let everybody know we've modified this slot */
411 	ConditionVariableBroadcast(&slot->active_cv);
412 
413 	/* We made this slot active, so it's ours now. */
414 	MyReplicationSlot = slot;
415 }
416 
417 /*
418  * Release the replication slot that this backend considers to own.
419  *
420  * This or another backend can re-acquire the slot later.
421  * Resources this slot requires will be preserved.
422  */
423 void
ReplicationSlotRelease(void)424 ReplicationSlotRelease(void)
425 {
426 	ReplicationSlot *slot = MyReplicationSlot;
427 
428 	Assert(slot != NULL && slot->active_pid != 0);
429 
430 	if (slot->data.persistency == RS_EPHEMERAL)
431 	{
432 		/*
433 		 * Delete the slot. There is no !PANIC case where this is allowed to
434 		 * fail, all that may happen is an incomplete cleanup of the on-disk
435 		 * data.
436 		 */
437 		ReplicationSlotDropAcquired();
438 	}
439 
440 	/*
441 	 * If slot needed to temporarily restrain both data and catalog xmin to
442 	 * create the catalog snapshot, remove that temporary constraint.
443 	 * Snapshots can only be exported while the initial snapshot is still
444 	 * acquired.
445 	 */
446 	if (!TransactionIdIsValid(slot->data.xmin) &&
447 		TransactionIdIsValid(slot->effective_xmin))
448 	{
449 		SpinLockAcquire(&slot->mutex);
450 		slot->effective_xmin = InvalidTransactionId;
451 		SpinLockRelease(&slot->mutex);
452 		ReplicationSlotsComputeRequiredXmin(false);
453 	}
454 
455 	if (slot->data.persistency == RS_PERSISTENT)
456 	{
457 		/*
458 		 * Mark persistent slot inactive.  We're not freeing it, just
459 		 * disconnecting, but wake up others that may be waiting for it.
460 		 */
461 		SpinLockAcquire(&slot->mutex);
462 		slot->active_pid = 0;
463 		SpinLockRelease(&slot->mutex);
464 		ConditionVariableBroadcast(&slot->active_cv);
465 	}
466 
467 	MyReplicationSlot = NULL;
468 
469 	/* might not have been set when we've been a plain slot */
470 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
471 	MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
472 	LWLockRelease(ProcArrayLock);
473 }
474 
475 /*
476  * Cleanup all temporary slots created in current session.
477  */
478 void
ReplicationSlotCleanup(void)479 ReplicationSlotCleanup(void)
480 {
481 	int			i;
482 
483 	Assert(MyReplicationSlot == NULL);
484 
485 restart:
486 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
487 	for (i = 0; i < max_replication_slots; i++)
488 	{
489 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
490 
491 		if (!s->in_use)
492 			continue;
493 
494 		SpinLockAcquire(&s->mutex);
495 		if (s->active_pid == MyProcPid)
496 		{
497 			Assert(s->data.persistency == RS_TEMPORARY);
498 			SpinLockRelease(&s->mutex);
499 			LWLockRelease(ReplicationSlotControlLock);	/* avoid deadlock */
500 
501 			ReplicationSlotDropPtr(s);
502 
503 			ConditionVariableBroadcast(&s->active_cv);
504 			goto restart;
505 		}
506 		else
507 			SpinLockRelease(&s->mutex);
508 	}
509 
510 	LWLockRelease(ReplicationSlotControlLock);
511 }
512 
513 /*
514  * Permanently drop replication slot identified by the passed in name.
515  */
516 void
ReplicationSlotDrop(const char * name,bool nowait)517 ReplicationSlotDrop(const char *name, bool nowait)
518 {
519 	Assert(MyReplicationSlot == NULL);
520 
521 	ReplicationSlotAcquire(name, nowait);
522 
523 	ReplicationSlotDropAcquired();
524 }
525 
526 /*
527  * Permanently drop the currently acquired replication slot.
528  */
529 static void
ReplicationSlotDropAcquired(void)530 ReplicationSlotDropAcquired(void)
531 {
532 	ReplicationSlot *slot = MyReplicationSlot;
533 
534 	Assert(MyReplicationSlot != NULL);
535 
536 	/* slot isn't acquired anymore */
537 	MyReplicationSlot = NULL;
538 
539 	ReplicationSlotDropPtr(slot);
540 }
541 
542 /*
543  * Permanently drop the replication slot which will be released by the point
544  * this function returns.
545  */
546 static void
ReplicationSlotDropPtr(ReplicationSlot * slot)547 ReplicationSlotDropPtr(ReplicationSlot *slot)
548 {
549 	char		path[MAXPGPATH];
550 	char		tmppath[MAXPGPATH];
551 
552 	/*
553 	 * If some other backend ran this code concurrently with us, we might try
554 	 * to delete a slot with a certain name while someone else was trying to
555 	 * create a slot with the same name.
556 	 */
557 	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
558 
559 	/* Generate pathnames. */
560 	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
561 	sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
562 
563 	/*
564 	 * Rename the slot directory on disk, so that we'll no longer recognize
565 	 * this as a valid slot.  Note that if this fails, we've got to mark the
566 	 * slot inactive before bailing out.  If we're dropping an ephemeral or a
567 	 * temporary slot, we better never fail hard as the caller won't expect
568 	 * the slot to survive and this might get called during error handling.
569 	 */
570 	if (rename(path, tmppath) == 0)
571 	{
572 		/*
573 		 * We need to fsync() the directory we just renamed and its parent to
574 		 * make sure that our changes are on disk in a crash-safe fashion.  If
575 		 * fsync() fails, we can't be sure whether the changes are on disk or
576 		 * not.  For now, we handle that by panicking;
577 		 * StartupReplicationSlots() will try to straighten it out after
578 		 * restart.
579 		 */
580 		START_CRIT_SECTION();
581 		fsync_fname(tmppath, true);
582 		fsync_fname("pg_replslot", true);
583 		END_CRIT_SECTION();
584 	}
585 	else
586 	{
587 		bool		fail_softly = slot->data.persistency != RS_PERSISTENT;
588 
589 		SpinLockAcquire(&slot->mutex);
590 		slot->active_pid = 0;
591 		SpinLockRelease(&slot->mutex);
592 
593 		/* wake up anyone waiting on this slot */
594 		ConditionVariableBroadcast(&slot->active_cv);
595 
596 		ereport(fail_softly ? WARNING : ERROR,
597 				(errcode_for_file_access(),
598 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
599 						path, tmppath)));
600 	}
601 
602 	/*
603 	 * The slot is definitely gone.  Lock out concurrent scans of the array
604 	 * long enough to kill it.  It's OK to clear the active PID here without
605 	 * grabbing the mutex because nobody else can be scanning the array here,
606 	 * and nobody can be attached to this slot and thus access it without
607 	 * scanning the array.
608 	 *
609 	 * Also wake up processes waiting for it.
610 	 */
611 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
612 	slot->active_pid = 0;
613 	slot->in_use = false;
614 	LWLockRelease(ReplicationSlotControlLock);
615 	ConditionVariableBroadcast(&slot->active_cv);
616 
617 	/*
618 	 * Slot is dead and doesn't prevent resource removal anymore, recompute
619 	 * limits.
620 	 */
621 	ReplicationSlotsComputeRequiredXmin(false);
622 	ReplicationSlotsComputeRequiredLSN();
623 
624 	/*
625 	 * If removing the directory fails, the worst thing that will happen is
626 	 * that the user won't be able to create a new slot with the same name
627 	 * until the next server restart.  We warn about it, but that's all.
628 	 */
629 	if (!rmtree(tmppath, true))
630 		ereport(WARNING,
631 				(errmsg("could not remove directory \"%s\"", tmppath)));
632 
633 	/*
634 	 * We release this at the very end, so that nobody starts trying to create
635 	 * a slot while we're still cleaning up the detritus of the old one.
636 	 */
637 	LWLockRelease(ReplicationSlotAllocationLock);
638 }
639 
640 /*
641  * Serialize the currently acquired slot's state from memory to disk, thereby
642  * guaranteeing the current state will survive a crash.
643  */
644 void
ReplicationSlotSave(void)645 ReplicationSlotSave(void)
646 {
647 	char		path[MAXPGPATH];
648 
649 	Assert(MyReplicationSlot != NULL);
650 
651 	sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
652 	SaveSlotToPath(MyReplicationSlot, path, ERROR);
653 }
654 
655 /*
656  * Signal that it would be useful if the currently acquired slot would be
657  * flushed out to disk.
658  *
659  * Note that the actual flush to disk can be delayed for a long time, if
660  * required for correctness explicitly do a ReplicationSlotSave().
661  */
662 void
ReplicationSlotMarkDirty(void)663 ReplicationSlotMarkDirty(void)
664 {
665 	ReplicationSlot *slot = MyReplicationSlot;
666 
667 	Assert(MyReplicationSlot != NULL);
668 
669 	SpinLockAcquire(&slot->mutex);
670 	MyReplicationSlot->just_dirtied = true;
671 	MyReplicationSlot->dirty = true;
672 	SpinLockRelease(&slot->mutex);
673 }
674 
675 /*
676  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
677  * guaranteeing it will be there after an eventual crash.
678  */
679 void
ReplicationSlotPersist(void)680 ReplicationSlotPersist(void)
681 {
682 	ReplicationSlot *slot = MyReplicationSlot;
683 
684 	Assert(slot != NULL);
685 	Assert(slot->data.persistency != RS_PERSISTENT);
686 
687 	SpinLockAcquire(&slot->mutex);
688 	slot->data.persistency = RS_PERSISTENT;
689 	SpinLockRelease(&slot->mutex);
690 
691 	ReplicationSlotMarkDirty();
692 	ReplicationSlotSave();
693 }
694 
695 /*
696  * Compute the oldest xmin across all slots and store it in the ProcArray.
697  *
698  * If already_locked is true, ProcArrayLock has already been acquired
699  * exclusively.
700  */
701 void
ReplicationSlotsComputeRequiredXmin(bool already_locked)702 ReplicationSlotsComputeRequiredXmin(bool already_locked)
703 {
704 	int			i;
705 	TransactionId agg_xmin = InvalidTransactionId;
706 	TransactionId agg_catalog_xmin = InvalidTransactionId;
707 
708 	Assert(ReplicationSlotCtl != NULL);
709 
710 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
711 
712 	for (i = 0; i < max_replication_slots; i++)
713 	{
714 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
715 		TransactionId effective_xmin;
716 		TransactionId effective_catalog_xmin;
717 
718 		if (!s->in_use)
719 			continue;
720 
721 		SpinLockAcquire(&s->mutex);
722 		effective_xmin = s->effective_xmin;
723 		effective_catalog_xmin = s->effective_catalog_xmin;
724 		SpinLockRelease(&s->mutex);
725 
726 		/* check the data xmin */
727 		if (TransactionIdIsValid(effective_xmin) &&
728 			(!TransactionIdIsValid(agg_xmin) ||
729 			 TransactionIdPrecedes(effective_xmin, agg_xmin)))
730 			agg_xmin = effective_xmin;
731 
732 		/* check the catalog xmin */
733 		if (TransactionIdIsValid(effective_catalog_xmin) &&
734 			(!TransactionIdIsValid(agg_catalog_xmin) ||
735 			 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
736 			agg_catalog_xmin = effective_catalog_xmin;
737 	}
738 
739 	LWLockRelease(ReplicationSlotControlLock);
740 
741 	ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
742 }
743 
744 /*
745  * Compute the oldest restart LSN across all slots and inform xlog module.
746  */
747 void
ReplicationSlotsComputeRequiredLSN(void)748 ReplicationSlotsComputeRequiredLSN(void)
749 {
750 	int			i;
751 	XLogRecPtr	min_required = InvalidXLogRecPtr;
752 
753 	Assert(ReplicationSlotCtl != NULL);
754 
755 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
756 	for (i = 0; i < max_replication_slots; i++)
757 	{
758 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
759 		XLogRecPtr	restart_lsn;
760 
761 		if (!s->in_use)
762 			continue;
763 
764 		SpinLockAcquire(&s->mutex);
765 		restart_lsn = s->data.restart_lsn;
766 		SpinLockRelease(&s->mutex);
767 
768 		if (restart_lsn != InvalidXLogRecPtr &&
769 			(min_required == InvalidXLogRecPtr ||
770 			 restart_lsn < min_required))
771 			min_required = restart_lsn;
772 	}
773 	LWLockRelease(ReplicationSlotControlLock);
774 
775 	XLogSetReplicationSlotMinimumLSN(min_required);
776 }
777 
778 /*
779  * Compute the oldest WAL LSN required by *logical* decoding slots..
780  *
781  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
782  * slots exist.
783  *
784  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
785  * ignores physical replication slots.
786  *
787  * The results aren't required frequently, so we don't maintain a precomputed
788  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
789  */
790 XLogRecPtr
ReplicationSlotsComputeLogicalRestartLSN(void)791 ReplicationSlotsComputeLogicalRestartLSN(void)
792 {
793 	XLogRecPtr	result = InvalidXLogRecPtr;
794 	int			i;
795 
796 	if (max_replication_slots <= 0)
797 		return InvalidXLogRecPtr;
798 
799 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
800 
801 	for (i = 0; i < max_replication_slots; i++)
802 	{
803 		ReplicationSlot *s;
804 		XLogRecPtr	restart_lsn;
805 
806 		s = &ReplicationSlotCtl->replication_slots[i];
807 
808 		/* cannot change while ReplicationSlotCtlLock is held */
809 		if (!s->in_use)
810 			continue;
811 
812 		/* we're only interested in logical slots */
813 		if (!SlotIsLogical(s))
814 			continue;
815 
816 		/* read once, it's ok if it increases while we're checking */
817 		SpinLockAcquire(&s->mutex);
818 		restart_lsn = s->data.restart_lsn;
819 		SpinLockRelease(&s->mutex);
820 
821 		if (result == InvalidXLogRecPtr ||
822 			restart_lsn < result)
823 			result = restart_lsn;
824 	}
825 
826 	LWLockRelease(ReplicationSlotControlLock);
827 
828 	return result;
829 }
830 
831 /*
832  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
833  * passed database oid.
834  *
835  * Returns true if there are any slots referencing the database. *nslots will
836  * be set to the absolute number of slots in the database, *nactive to ones
837  * currently active.
838  */
839 bool
ReplicationSlotsCountDBSlots(Oid dboid,int * nslots,int * nactive)840 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
841 {
842 	int			i;
843 
844 	*nslots = *nactive = 0;
845 
846 	if (max_replication_slots <= 0)
847 		return false;
848 
849 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
850 	for (i = 0; i < max_replication_slots; i++)
851 	{
852 		ReplicationSlot *s;
853 
854 		s = &ReplicationSlotCtl->replication_slots[i];
855 
856 		/* cannot change while ReplicationSlotCtlLock is held */
857 		if (!s->in_use)
858 			continue;
859 
860 		/* only logical slots are database specific, skip */
861 		if (!SlotIsLogical(s))
862 			continue;
863 
864 		/* not our database, skip */
865 		if (s->data.database != dboid)
866 			continue;
867 
868 		/* count slots with spinlock held */
869 		SpinLockAcquire(&s->mutex);
870 		(*nslots)++;
871 		if (s->active_pid != 0)
872 			(*nactive)++;
873 		SpinLockRelease(&s->mutex);
874 	}
875 	LWLockRelease(ReplicationSlotControlLock);
876 
877 	if (*nslots > 0)
878 		return true;
879 	return false;
880 }
881 
882 /*
883  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
884  * passed database oid. The caller should hold an exclusive lock on the
885  * pg_database oid for the database to prevent creation of new slots on the db
886  * or replay from existing slots.
887  *
888  * Another session that concurrently acquires an existing slot on the target DB
889  * (most likely to drop it) may cause this function to ERROR. If that happens
890  * it may have dropped some but not all slots.
891  *
892  * This routine isn't as efficient as it could be - but we don't drop
893  * databases often, especially databases with lots of slots.
894  */
895 void
ReplicationSlotsDropDBSlots(Oid dboid)896 ReplicationSlotsDropDBSlots(Oid dboid)
897 {
898 	int			i;
899 
900 	if (max_replication_slots <= 0)
901 		return;
902 
903 restart:
904 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
905 	for (i = 0; i < max_replication_slots; i++)
906 	{
907 		ReplicationSlot *s;
908 		char	   *slotname;
909 		int			active_pid;
910 
911 		s = &ReplicationSlotCtl->replication_slots[i];
912 
913 		/* cannot change while ReplicationSlotCtlLock is held */
914 		if (!s->in_use)
915 			continue;
916 
917 		/* only logical slots are database specific, skip */
918 		if (!SlotIsLogical(s))
919 			continue;
920 
921 		/* not our database, skip */
922 		if (s->data.database != dboid)
923 			continue;
924 
925 		/* acquire slot, so ReplicationSlotDropAcquired can be reused  */
926 		SpinLockAcquire(&s->mutex);
927 		/* can't change while ReplicationSlotControlLock is held */
928 		slotname = NameStr(s->data.name);
929 		active_pid = s->active_pid;
930 		if (active_pid == 0)
931 		{
932 			MyReplicationSlot = s;
933 			s->active_pid = MyProcPid;
934 		}
935 		SpinLockRelease(&s->mutex);
936 
937 		/*
938 		 * Even though we hold an exclusive lock on the database object a
939 		 * logical slot for that DB can still be active, e.g. if it's
940 		 * concurrently being dropped by a backend connected to another DB.
941 		 *
942 		 * That's fairly unlikely in practice, so we'll just bail out.
943 		 */
944 		if (active_pid)
945 			ereport(ERROR,
946 					(errcode(ERRCODE_OBJECT_IN_USE),
947 					 errmsg("replication slot \"%s\" is active for PID %d",
948 							slotname, active_pid)));
949 
950 		/*
951 		 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
952 		 * holding ReplicationSlotControlLock over filesystem operations,
953 		 * release ReplicationSlotControlLock and use
954 		 * ReplicationSlotDropAcquired.
955 		 *
956 		 * As that means the set of slots could change, restart scan from the
957 		 * beginning each time we release the lock.
958 		 */
959 		LWLockRelease(ReplicationSlotControlLock);
960 		ReplicationSlotDropAcquired();
961 		goto restart;
962 	}
963 	LWLockRelease(ReplicationSlotControlLock);
964 }
965 
966 
967 /*
968  * Check whether the server's configuration supports using replication
969  * slots.
970  */
971 void
CheckSlotRequirements(void)972 CheckSlotRequirements(void)
973 {
974 	/*
975 	 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
976 	 * needs the same check.
977 	 */
978 
979 	if (max_replication_slots == 0)
980 		ereport(ERROR,
981 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
982 				 (errmsg("replication slots can only be used if max_replication_slots > 0"))));
983 
984 	if (wal_level < WAL_LEVEL_REPLICA)
985 		ereport(ERROR,
986 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
987 				 errmsg("replication slots can only be used if wal_level >= replica")));
988 }
989 
990 /*
991  * Reserve WAL for the currently active slot.
992  *
993  * Compute and set restart_lsn in a manner that's appropriate for the type of
994  * the slot and concurrency safe.
995  */
996 void
ReplicationSlotReserveWal(void)997 ReplicationSlotReserveWal(void)
998 {
999 	ReplicationSlot *slot = MyReplicationSlot;
1000 
1001 	Assert(slot != NULL);
1002 	Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1003 
1004 	/*
1005 	 * The replication slot mechanism is used to prevent removal of required
1006 	 * WAL. As there is no interlock between this routine and checkpoints, WAL
1007 	 * segments could concurrently be removed when a now stale return value of
1008 	 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1009 	 * this happens we'll just retry.
1010 	 */
1011 	while (true)
1012 	{
1013 		XLogSegNo	segno;
1014 		XLogRecPtr	restart_lsn;
1015 
1016 		/*
1017 		 * For logical slots log a standby snapshot and start logical decoding
1018 		 * at exactly that position. That allows the slot to start up more
1019 		 * quickly.
1020 		 *
1021 		 * That's not needed (or indeed helpful) for physical slots as they'll
1022 		 * start replay at the last logged checkpoint anyway. Instead return
1023 		 * the location of the last redo LSN. While that slightly increases
1024 		 * the chance that we have to retry, it's where a base backup has to
1025 		 * start replay at.
1026 		 */
1027 		if (!RecoveryInProgress() && SlotIsLogical(slot))
1028 		{
1029 			XLogRecPtr	flushptr;
1030 
1031 			/* start at current insert position */
1032 			restart_lsn = GetXLogInsertRecPtr();
1033 			SpinLockAcquire(&slot->mutex);
1034 			slot->data.restart_lsn = restart_lsn;
1035 			SpinLockRelease(&slot->mutex);
1036 
1037 			/* make sure we have enough information to start */
1038 			flushptr = LogStandbySnapshot();
1039 
1040 			/* and make sure it's fsynced to disk */
1041 			XLogFlush(flushptr);
1042 		}
1043 		else
1044 		{
1045 			restart_lsn = GetRedoRecPtr();
1046 			SpinLockAcquire(&slot->mutex);
1047 			slot->data.restart_lsn = restart_lsn;
1048 			SpinLockRelease(&slot->mutex);
1049 		}
1050 
1051 		/* prevent WAL removal as fast as possible */
1052 		ReplicationSlotsComputeRequiredLSN();
1053 
1054 		/*
1055 		 * If all required WAL is still there, great, otherwise retry. The
1056 		 * slot should prevent further removal of WAL, unless there's a
1057 		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1058 		 * the new restart_lsn above, so normally we should never need to loop
1059 		 * more than twice.
1060 		 */
1061 		XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1062 		if (XLogGetLastRemovedSegno() < segno)
1063 			break;
1064 	}
1065 }
1066 
1067 /*
1068  * Flush all replication slots to disk.
1069  *
1070  * This needn't actually be part of a checkpoint, but it's a convenient
1071  * location.
1072  */
1073 void
CheckPointReplicationSlots(void)1074 CheckPointReplicationSlots(void)
1075 {
1076 	int			i;
1077 
1078 	elog(DEBUG1, "performing replication slot checkpoint");
1079 
1080 	/*
1081 	 * Prevent any slot from being created/dropped while we're active. As we
1082 	 * explicitly do *not* want to block iterating over replication_slots or
1083 	 * acquiring a slot we cannot take the control lock - but that's OK,
1084 	 * because holding ReplicationSlotAllocationLock is strictly stronger, and
1085 	 * enough to guarantee that nobody can change the in_use bits on us.
1086 	 */
1087 	LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1088 
1089 	for (i = 0; i < max_replication_slots; i++)
1090 	{
1091 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1092 		char		path[MAXPGPATH];
1093 
1094 		if (!s->in_use)
1095 			continue;
1096 
1097 		/* save the slot to disk, locking is handled in SaveSlotToPath() */
1098 		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1099 		SaveSlotToPath(s, path, LOG);
1100 	}
1101 	LWLockRelease(ReplicationSlotAllocationLock);
1102 }
1103 
1104 /*
1105  * Load all replication slots from disk into memory at server startup. This
1106  * needs to be run before we start crash recovery.
1107  */
1108 void
StartupReplicationSlots(void)1109 StartupReplicationSlots(void)
1110 {
1111 	DIR		   *replication_dir;
1112 	struct dirent *replication_de;
1113 
1114 	elog(DEBUG1, "starting up replication slots");
1115 
1116 	/* restore all slots by iterating over all on-disk entries */
1117 	replication_dir = AllocateDir("pg_replslot");
1118 	while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1119 	{
1120 		struct stat statbuf;
1121 		char		path[MAXPGPATH + 12];
1122 
1123 		if (strcmp(replication_de->d_name, ".") == 0 ||
1124 			strcmp(replication_de->d_name, "..") == 0)
1125 			continue;
1126 
1127 		snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1128 
1129 		/* we're only creating directories here, skip if it's not our's */
1130 		if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
1131 			continue;
1132 
1133 		/* we crashed while a slot was being setup or deleted, clean up */
1134 		if (pg_str_endswith(replication_de->d_name, ".tmp"))
1135 		{
1136 			if (!rmtree(path, true))
1137 			{
1138 				ereport(WARNING,
1139 						(errmsg("could not remove directory \"%s\"",
1140 								path)));
1141 				continue;
1142 			}
1143 			fsync_fname("pg_replslot", true);
1144 			continue;
1145 		}
1146 
1147 		/* looks like a slot in a normal state, restore */
1148 		RestoreSlotFromDisk(replication_de->d_name);
1149 	}
1150 	FreeDir(replication_dir);
1151 
1152 	/* currently no slots exist, we're done. */
1153 	if (max_replication_slots <= 0)
1154 		return;
1155 
1156 	/* Now that we have recovered all the data, compute replication xmin */
1157 	ReplicationSlotsComputeRequiredXmin(false);
1158 	ReplicationSlotsComputeRequiredLSN();
1159 }
1160 
1161 /* ----
1162  * Manipulation of on-disk state of replication slots
1163  *
1164  * NB: none of the routines below should take any notice whether a slot is the
1165  * current one or not, that's all handled a layer above.
1166  * ----
1167  */
1168 static void
CreateSlotOnDisk(ReplicationSlot * slot)1169 CreateSlotOnDisk(ReplicationSlot *slot)
1170 {
1171 	char		tmppath[MAXPGPATH];
1172 	char		path[MAXPGPATH];
1173 	struct stat st;
1174 
1175 	/*
1176 	 * No need to take out the io_in_progress_lock, nobody else can see this
1177 	 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1178 	 * takes out the lock, if we'd take the lock here, we'd deadlock.
1179 	 */
1180 
1181 	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1182 	sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1183 
1184 	/*
1185 	 * It's just barely possible that some previous effort to create or drop a
1186 	 * slot with this name left a temp directory lying around. If that seems
1187 	 * to be the case, try to remove it.  If the rmtree() fails, we'll error
1188 	 * out at the MakePGDirectory() below, so we don't bother checking
1189 	 * success.
1190 	 */
1191 	if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1192 		rmtree(tmppath, true);
1193 
1194 	/* Create and fsync the temporary slot directory. */
1195 	if (MakePGDirectory(tmppath) < 0)
1196 		ereport(ERROR,
1197 				(errcode_for_file_access(),
1198 				 errmsg("could not create directory \"%s\": %m",
1199 						tmppath)));
1200 	fsync_fname(tmppath, true);
1201 
1202 	/* Write the actual state file. */
1203 	slot->dirty = true;			/* signal that we really need to write */
1204 	SaveSlotToPath(slot, tmppath, ERROR);
1205 
1206 	/* Rename the directory into place. */
1207 	if (rename(tmppath, path) != 0)
1208 		ereport(ERROR,
1209 				(errcode_for_file_access(),
1210 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
1211 						tmppath, path)));
1212 
1213 	/*
1214 	 * If we'd now fail - really unlikely - we wouldn't know whether this slot
1215 	 * would persist after an OS crash or not - so, force a restart. The
1216 	 * restart would try to fsync this again till it works.
1217 	 */
1218 	START_CRIT_SECTION();
1219 
1220 	fsync_fname(path, true);
1221 	fsync_fname("pg_replslot", true);
1222 
1223 	END_CRIT_SECTION();
1224 }
1225 
1226 /*
1227  * Shared functionality between saving and creating a replication slot.
1228  */
1229 static void
SaveSlotToPath(ReplicationSlot * slot,const char * dir,int elevel)1230 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1231 {
1232 	char		tmppath[MAXPGPATH];
1233 	char		path[MAXPGPATH];
1234 	int			fd;
1235 	ReplicationSlotOnDisk cp;
1236 	bool		was_dirty;
1237 
1238 	/* first check whether there's something to write out */
1239 	SpinLockAcquire(&slot->mutex);
1240 	was_dirty = slot->dirty;
1241 	slot->just_dirtied = false;
1242 	SpinLockRelease(&slot->mutex);
1243 
1244 	/* and don't do anything if there's nothing to write */
1245 	if (!was_dirty)
1246 		return;
1247 
1248 	LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1249 
1250 	/* silence valgrind :( */
1251 	memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1252 
1253 	sprintf(tmppath, "%s/state.tmp", dir);
1254 	sprintf(path, "%s/state", dir);
1255 
1256 	fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1257 	if (fd < 0)
1258 	{
1259 		/*
1260 		 * If not an ERROR, then release the lock before returning.  In case
1261 		 * of an ERROR, the error recovery path automatically releases the
1262 		 * lock, but no harm in explicitly releasing even in that case.  Note
1263 		 * that LWLockRelease() could affect errno.
1264 		 */
1265 		int			save_errno = errno;
1266 
1267 		LWLockRelease(&slot->io_in_progress_lock);
1268 		errno = save_errno;
1269 		ereport(elevel,
1270 				(errcode_for_file_access(),
1271 				 errmsg("could not create file \"%s\": %m",
1272 						tmppath)));
1273 		return;
1274 	}
1275 
1276 	cp.magic = SLOT_MAGIC;
1277 	INIT_CRC32C(cp.checksum);
1278 	cp.version = SLOT_VERSION;
1279 	cp.length = ReplicationSlotOnDiskV2Size;
1280 
1281 	SpinLockAcquire(&slot->mutex);
1282 
1283 	memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1284 
1285 	SpinLockRelease(&slot->mutex);
1286 
1287 	COMP_CRC32C(cp.checksum,
1288 				(char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1289 				SnapBuildOnDiskChecksummedSize);
1290 	FIN_CRC32C(cp.checksum);
1291 
1292 	errno = 0;
1293 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
1294 	if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1295 	{
1296 		int			save_errno = errno;
1297 
1298 		pgstat_report_wait_end();
1299 		CloseTransientFile(fd);
1300 		LWLockRelease(&slot->io_in_progress_lock);
1301 
1302 		/* if write didn't set errno, assume problem is no disk space */
1303 		errno = save_errno ? save_errno : ENOSPC;
1304 		ereport(elevel,
1305 				(errcode_for_file_access(),
1306 				 errmsg("could not write to file \"%s\": %m",
1307 						tmppath)));
1308 		return;
1309 	}
1310 	pgstat_report_wait_end();
1311 
1312 	/* fsync the temporary file */
1313 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
1314 	if (pg_fsync(fd) != 0)
1315 	{
1316 		int			save_errno = errno;
1317 
1318 		pgstat_report_wait_end();
1319 		CloseTransientFile(fd);
1320 		LWLockRelease(&slot->io_in_progress_lock);
1321 		errno = save_errno;
1322 		ereport(elevel,
1323 				(errcode_for_file_access(),
1324 				 errmsg("could not fsync file \"%s\": %m",
1325 						tmppath)));
1326 		return;
1327 	}
1328 	pgstat_report_wait_end();
1329 
1330 	if (CloseTransientFile(fd))
1331 	{
1332 		int			save_errno = errno;
1333 
1334 		LWLockRelease(&slot->io_in_progress_lock);
1335 		errno = save_errno;
1336 		ereport(elevel,
1337 				(errcode_for_file_access(),
1338 				 errmsg("could not close file \"%s\": %m",
1339 						tmppath)));
1340 		return;
1341 	}
1342 
1343 	/* rename to permanent file, fsync file and directory */
1344 	if (rename(tmppath, path) != 0)
1345 	{
1346 		int			save_errno = errno;
1347 
1348 		LWLockRelease(&slot->io_in_progress_lock);
1349 		errno = save_errno;
1350 		ereport(elevel,
1351 				(errcode_for_file_access(),
1352 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
1353 						tmppath, path)));
1354 		return;
1355 	}
1356 
1357 	/*
1358 	 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
1359 	 */
1360 	START_CRIT_SECTION();
1361 
1362 	fsync_fname(path, false);
1363 	fsync_fname(dir, true);
1364 	fsync_fname("pg_replslot", true);
1365 
1366 	END_CRIT_SECTION();
1367 
1368 	/*
1369 	 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1370 	 * already.
1371 	 */
1372 	SpinLockAcquire(&slot->mutex);
1373 	if (!slot->just_dirtied)
1374 		slot->dirty = false;
1375 	SpinLockRelease(&slot->mutex);
1376 
1377 	LWLockRelease(&slot->io_in_progress_lock);
1378 }
1379 
1380 /*
1381  * Load a single slot from disk into memory.
1382  */
1383 static void
RestoreSlotFromDisk(const char * name)1384 RestoreSlotFromDisk(const char *name)
1385 {
1386 	ReplicationSlotOnDisk cp;
1387 	int			i;
1388 	char		slotdir[MAXPGPATH + 12];
1389 	char		path[MAXPGPATH + 22];
1390 	int			fd;
1391 	bool		restored = false;
1392 	int			readBytes;
1393 	pg_crc32c	checksum;
1394 
1395 	/* no need to lock here, no concurrent access allowed yet */
1396 
1397 	/* delete temp file if it exists */
1398 	sprintf(slotdir, "pg_replslot/%s", name);
1399 	sprintf(path, "%s/state.tmp", slotdir);
1400 	if (unlink(path) < 0 && errno != ENOENT)
1401 		ereport(PANIC,
1402 				(errcode_for_file_access(),
1403 				 errmsg("could not remove file \"%s\": %m", path)));
1404 
1405 	sprintf(path, "%s/state", slotdir);
1406 
1407 	elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1408 
1409 	/* on some operating systems fsyncing a file requires O_RDWR */
1410 	fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1411 
1412 	/*
1413 	 * We do not need to handle this as we are rename()ing the directory into
1414 	 * place only after we fsync()ed the state file.
1415 	 */
1416 	if (fd < 0)
1417 		ereport(PANIC,
1418 				(errcode_for_file_access(),
1419 				 errmsg("could not open file \"%s\": %m", path)));
1420 
1421 	/*
1422 	 * Sync state file before we're reading from it. We might have crashed
1423 	 * while it wasn't synced yet and we shouldn't continue on that basis.
1424 	 */
1425 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
1426 	if (pg_fsync(fd) != 0)
1427 		ereport(PANIC,
1428 				(errcode_for_file_access(),
1429 				 errmsg("could not fsync file \"%s\": %m",
1430 						path)));
1431 	pgstat_report_wait_end();
1432 
1433 	/* Also sync the parent directory */
1434 	START_CRIT_SECTION();
1435 	fsync_fname(slotdir, true);
1436 	END_CRIT_SECTION();
1437 
1438 	/* read part of statefile that's guaranteed to be version independent */
1439 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1440 	readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1441 	pgstat_report_wait_end();
1442 	if (readBytes != ReplicationSlotOnDiskConstantSize)
1443 	{
1444 		if (readBytes < 0)
1445 			ereport(PANIC,
1446 					(errcode_for_file_access(),
1447 					 errmsg("could not read file \"%s\": %m", path)));
1448 		else
1449 			ereport(PANIC,
1450 					(errcode(ERRCODE_DATA_CORRUPTED),
1451 					 errmsg("could not read file \"%s\": read %d of %zu",
1452 							path, readBytes,
1453 							(Size) ReplicationSlotOnDiskConstantSize)));
1454 	}
1455 
1456 	/* verify magic */
1457 	if (cp.magic != SLOT_MAGIC)
1458 		ereport(PANIC,
1459 				(errcode(ERRCODE_DATA_CORRUPTED),
1460 				 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1461 						path, cp.magic, SLOT_MAGIC)));
1462 
1463 	/* verify version */
1464 	if (cp.version != SLOT_VERSION)
1465 		ereport(PANIC,
1466 				(errcode(ERRCODE_DATA_CORRUPTED),
1467 				 errmsg("replication slot file \"%s\" has unsupported version %u",
1468 						path, cp.version)));
1469 
1470 	/* boundary check on length */
1471 	if (cp.length != ReplicationSlotOnDiskV2Size)
1472 		ereport(PANIC,
1473 				(errcode(ERRCODE_DATA_CORRUPTED),
1474 				 errmsg("replication slot file \"%s\" has corrupted length %u",
1475 						path, cp.length)));
1476 
1477 	/* Now that we know the size, read the entire file */
1478 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1479 	readBytes = read(fd,
1480 					 (char *) &cp + ReplicationSlotOnDiskConstantSize,
1481 					 cp.length);
1482 	pgstat_report_wait_end();
1483 	if (readBytes != cp.length)
1484 	{
1485 		if (readBytes < 0)
1486 			ereport(PANIC,
1487 					(errcode_for_file_access(),
1488 					 errmsg("could not read file \"%s\": %m", path)));
1489 		else
1490 			ereport(PANIC,
1491 					(errcode(ERRCODE_DATA_CORRUPTED),
1492 					 errmsg("could not read file \"%s\": read %d of %zu",
1493 							path, readBytes, (Size) cp.length)));
1494 	}
1495 
1496 	if (CloseTransientFile(fd))
1497 		ereport(PANIC,
1498 				(errcode_for_file_access(),
1499 				 errmsg("could not close file \"%s\": %m", path)));
1500 
1501 	/* now verify the CRC */
1502 	INIT_CRC32C(checksum);
1503 	COMP_CRC32C(checksum,
1504 				(char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1505 				SnapBuildOnDiskChecksummedSize);
1506 	FIN_CRC32C(checksum);
1507 
1508 	if (!EQ_CRC32C(checksum, cp.checksum))
1509 		ereport(PANIC,
1510 				(errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1511 						path, checksum, cp.checksum)));
1512 
1513 	/*
1514 	 * If we crashed with an ephemeral slot active, don't restore but delete
1515 	 * it.
1516 	 */
1517 	if (cp.slotdata.persistency != RS_PERSISTENT)
1518 	{
1519 		if (!rmtree(slotdir, true))
1520 		{
1521 			ereport(WARNING,
1522 					(errmsg("could not remove directory \"%s\"",
1523 							slotdir)));
1524 		}
1525 		fsync_fname("pg_replslot", true);
1526 		return;
1527 	}
1528 
1529 	/*
1530 	 * Verify that requirements for the specific slot type are met. That's
1531 	 * important because if these aren't met we're not guaranteed to retain
1532 	 * all the necessary resources for the slot.
1533 	 *
1534 	 * NB: We have to do so *after* the above checks for ephemeral slots,
1535 	 * because otherwise a slot that shouldn't exist anymore could prevent
1536 	 * restarts.
1537 	 *
1538 	 * NB: Changing the requirements here also requires adapting
1539 	 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
1540 	 */
1541 	if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
1542 		ereport(FATAL,
1543 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1544 				 errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
1545 						NameStr(cp.slotdata.name)),
1546 				 errhint("Change wal_level to be logical or higher.")));
1547 	else if (wal_level < WAL_LEVEL_REPLICA)
1548 		ereport(FATAL,
1549 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1550 				 errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
1551 						NameStr(cp.slotdata.name)),
1552 				 errhint("Change wal_level to be replica or higher.")));
1553 
1554 	/* nothing can be active yet, don't lock anything */
1555 	for (i = 0; i < max_replication_slots; i++)
1556 	{
1557 		ReplicationSlot *slot;
1558 
1559 		slot = &ReplicationSlotCtl->replication_slots[i];
1560 
1561 		if (slot->in_use)
1562 			continue;
1563 
1564 		/* restore the entire set of persistent data */
1565 		memcpy(&slot->data, &cp.slotdata,
1566 			   sizeof(ReplicationSlotPersistentData));
1567 
1568 		/* initialize in memory state */
1569 		slot->effective_xmin = cp.slotdata.xmin;
1570 		slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1571 
1572 		slot->candidate_catalog_xmin = InvalidTransactionId;
1573 		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1574 		slot->candidate_restart_lsn = InvalidXLogRecPtr;
1575 		slot->candidate_restart_valid = InvalidXLogRecPtr;
1576 
1577 		slot->in_use = true;
1578 		slot->active_pid = 0;
1579 
1580 		restored = true;
1581 		break;
1582 	}
1583 
1584 	if (!restored)
1585 		ereport(FATAL,
1586 				(errmsg("too many replication slots active before shutdown"),
1587 				 errhint("Increase max_replication_slots and try again.")));
1588 }
1589