1 /*-------------------------------------------------------------------------
2  *
3  * slot.c
4  *	   Replication slot management.
5  *
6  *
7  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/replication/slot.c
12  *
13  * NOTES
14  *
15  * Replication slots are used to keep state about replication streams
16  * originating from this cluster.  Their primary purpose is to prevent the
17  * premature removal of WAL or of old tuple versions in a manner that would
18  * interfere with replication; they are also useful for monitoring purposes.
19  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20  * on standbys (to support cascading setups).  The requirement that slots be
21  * usable on standbys precludes storing them in the system catalogs.
22  *
23  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24  * directory. Inside that directory the state file will contain the slot's
25  * own data. Additional data can be stored alongside that file if required.
26  * While the server is running, the state data is also cached in memory for
27  * efficiency.
28  *
29  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31  * to iterate over the slots, and in exclusive mode to change the in_use flag
32  * of a slot.  The remaining data in each slot is protected by its mutex.
33  *
34  *-------------------------------------------------------------------------
35  */
36 
37 #include "postgres.h"
38 
39 #include <unistd.h>
40 #include <sys/stat.h>
41 
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "common/string.h"
45 #include "miscadmin.h"
46 #include "pgstat.h"
47 #include "replication/slot.h"
48 #include "storage/fd.h"
49 #include "storage/proc.h"
50 #include "storage/procarray.h"
51 #include "utils/builtins.h"
52 
53 /*
54  * Replication slot on-disk data structure.
55  */
56 typedef struct ReplicationSlotOnDisk
57 {
58 	/* first part of this struct needs to be version independent */
59 
60 	/* data not covered by checksum */
61 	uint32		magic;
62 	pg_crc32c	checksum;
63 
64 	/* data covered by checksum */
65 	uint32		version;
66 	uint32		length;
67 
68 	/*
69 	 * The actual data in the slot that follows can differ based on the above
70 	 * 'version'.
71 	 */
72 
73 	ReplicationSlotPersistentData slotdata;
74 } ReplicationSlotOnDisk;
75 
76 /* size of version independent data */
77 #define ReplicationSlotOnDiskConstantSize \
78 	offsetof(ReplicationSlotOnDisk, slotdata)
79 /* size of the part of the slot not covered by the checksum */
80 #define SnapBuildOnDiskNotChecksummedSize \
81 	offsetof(ReplicationSlotOnDisk, version)
82 /* size of the part covered by the checksum */
83 #define SnapBuildOnDiskChecksummedSize \
84 	sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
85 /* size of the slot data that is version dependent */
86 #define ReplicationSlotOnDiskV2Size \
87 	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
88 
89 #define SLOT_MAGIC		0x1051CA1	/* format identifier */
90 #define SLOT_VERSION	2		/* version for new files */
91 
92 /* Control array for replication slot management */
93 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
94 
95 /* My backend's replication slot in the shared memory array */
96 ReplicationSlot *MyReplicationSlot = NULL;
97 
98 /* GUCs */
99 int			max_replication_slots = 0;	/* the maximum number of replication
100 										 * slots */
101 
102 static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
103 static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
104 										  const char *name, SlotAcquireBehavior behavior);
105 static void ReplicationSlotDropAcquired(void);
106 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
107 
108 /* internal persistency functions */
109 static void RestoreSlotFromDisk(const char *name);
110 static void CreateSlotOnDisk(ReplicationSlot *slot);
111 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
112 
113 /*
114  * Report shared-memory space needed by ReplicationSlotsShmemInit.
115  */
116 Size
ReplicationSlotsShmemSize(void)117 ReplicationSlotsShmemSize(void)
118 {
119 	Size		size = 0;
120 
121 	if (max_replication_slots == 0)
122 		return size;
123 
124 	size = offsetof(ReplicationSlotCtlData, replication_slots);
125 	size = add_size(size,
126 					mul_size(max_replication_slots, sizeof(ReplicationSlot)));
127 
128 	return size;
129 }
130 
131 /*
132  * Allocate and initialize shared memory for replication slots.
133  */
134 void
ReplicationSlotsShmemInit(void)135 ReplicationSlotsShmemInit(void)
136 {
137 	bool		found;
138 
139 	if (max_replication_slots == 0)
140 		return;
141 
142 	ReplicationSlotCtl = (ReplicationSlotCtlData *)
143 		ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
144 						&found);
145 
146 	if (!found)
147 	{
148 		int			i;
149 
150 		/* First time through, so initialize */
151 		MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
152 
153 		for (i = 0; i < max_replication_slots; i++)
154 		{
155 			ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
156 
157 			/* everything else is zeroed by the memset above */
158 			SpinLockInit(&slot->mutex);
159 			LWLockInitialize(&slot->io_in_progress_lock,
160 							 LWTRANCHE_REPLICATION_SLOT_IO);
161 			ConditionVariableInit(&slot->active_cv);
162 		}
163 	}
164 }
165 
166 /*
167  * Check whether the passed slot name is valid and report errors at elevel.
168  *
169  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
170  * the name to be used as a directory name on every supported OS.
171  *
172  * Returns whether the directory name is valid or not if elevel < ERROR.
173  */
174 bool
ReplicationSlotValidateName(const char * name,int elevel)175 ReplicationSlotValidateName(const char *name, int elevel)
176 {
177 	const char *cp;
178 
179 	if (strlen(name) == 0)
180 	{
181 		ereport(elevel,
182 				(errcode(ERRCODE_INVALID_NAME),
183 				 errmsg("replication slot name \"%s\" is too short",
184 						name)));
185 		return false;
186 	}
187 
188 	if (strlen(name) >= NAMEDATALEN)
189 	{
190 		ereport(elevel,
191 				(errcode(ERRCODE_NAME_TOO_LONG),
192 				 errmsg("replication slot name \"%s\" is too long",
193 						name)));
194 		return false;
195 	}
196 
197 	for (cp = name; *cp; cp++)
198 	{
199 		if (!((*cp >= 'a' && *cp <= 'z')
200 			  || (*cp >= '0' && *cp <= '9')
201 			  || (*cp == '_')))
202 		{
203 			ereport(elevel,
204 					(errcode(ERRCODE_INVALID_NAME),
205 					 errmsg("replication slot name \"%s\" contains invalid character",
206 							name),
207 					 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
208 			return false;
209 		}
210 	}
211 	return true;
212 }
213 
214 /*
215  * Create a new replication slot and mark it as used by this backend.
216  *
217  * name: Name of the slot
218  * db_specific: logical decoding is db specific; if the slot is going to
219  *	   be used for that pass true, otherwise false.
220  */
221 void
ReplicationSlotCreate(const char * name,bool db_specific,ReplicationSlotPersistency persistency)222 ReplicationSlotCreate(const char *name, bool db_specific,
223 					  ReplicationSlotPersistency persistency)
224 {
225 	ReplicationSlot *slot = NULL;
226 	int			i;
227 
228 	Assert(MyReplicationSlot == NULL);
229 
230 	ReplicationSlotValidateName(name, ERROR);
231 
232 	/*
233 	 * If some other backend ran this code concurrently with us, we'd likely
234 	 * both allocate the same slot, and that would be bad.  We'd also be at
235 	 * risk of missing a name collision.  Also, we don't want to try to create
236 	 * a new slot while somebody's busy cleaning up an old one, because we
237 	 * might both be monkeying with the same directory.
238 	 */
239 	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
240 
241 	/*
242 	 * Check for name collision, and identify an allocatable slot.  We need to
243 	 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
244 	 * else can change the in_use flags while we're looking at them.
245 	 */
246 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
247 	for (i = 0; i < max_replication_slots; i++)
248 	{
249 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
250 
251 		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
252 			ereport(ERROR,
253 					(errcode(ERRCODE_DUPLICATE_OBJECT),
254 					 errmsg("replication slot \"%s\" already exists", name)));
255 		if (!s->in_use && slot == NULL)
256 			slot = s;
257 	}
258 	LWLockRelease(ReplicationSlotControlLock);
259 
260 	/* If all slots are in use, we're out of luck. */
261 	if (slot == NULL)
262 		ereport(ERROR,
263 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
264 				 errmsg("all replication slots are in use"),
265 				 errhint("Free one or increase max_replication_slots.")));
266 
267 	/*
268 	 * Since this slot is not in use, nobody should be looking at any part of
269 	 * it other than the in_use field unless they're trying to allocate it.
270 	 * And since we hold ReplicationSlotAllocationLock, nobody except us can
271 	 * be doing that.  So it's safe to initialize the slot.
272 	 */
273 	Assert(!slot->in_use);
274 	Assert(slot->active_pid == 0);
275 
276 	/* first initialize persistent data */
277 	memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
278 	StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
279 	slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
280 	slot->data.persistency = persistency;
281 
282 	/* and then data only present in shared memory */
283 	slot->just_dirtied = false;
284 	slot->dirty = false;
285 	slot->effective_xmin = InvalidTransactionId;
286 	slot->effective_catalog_xmin = InvalidTransactionId;
287 	slot->candidate_catalog_xmin = InvalidTransactionId;
288 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
289 	slot->candidate_restart_valid = InvalidXLogRecPtr;
290 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
291 
292 	/*
293 	 * Create the slot on disk.  We haven't actually marked the slot allocated
294 	 * yet, so no special cleanup is required if this errors out.
295 	 */
296 	CreateSlotOnDisk(slot);
297 
298 	/*
299 	 * We need to briefly prevent any other backend from iterating over the
300 	 * slots while we flip the in_use flag. We also need to set the active
301 	 * flag while holding the ControlLock as otherwise a concurrent
302 	 * ReplicationSlotAcquire() could acquire the slot as well.
303 	 */
304 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
305 
306 	slot->in_use = true;
307 
308 	/* We can now mark the slot active, and that makes it our slot. */
309 	SpinLockAcquire(&slot->mutex);
310 	Assert(slot->active_pid == 0);
311 	slot->active_pid = MyProcPid;
312 	SpinLockRelease(&slot->mutex);
313 	MyReplicationSlot = slot;
314 
315 	LWLockRelease(ReplicationSlotControlLock);
316 
317 	/*
318 	 * Now that the slot has been marked as in_use and active, it's safe to
319 	 * let somebody else try to allocate a slot.
320 	 */
321 	LWLockRelease(ReplicationSlotAllocationLock);
322 
323 	/* Let everybody know we've modified this slot */
324 	ConditionVariableBroadcast(&slot->active_cv);
325 }
326 
327 /*
328  * Search for the named replication slot.
329  *
330  * Return the replication slot if found, otherwise NULL.
331  *
332  * The caller must hold ReplicationSlotControlLock in shared mode.
333  */
334 static ReplicationSlot *
SearchNamedReplicationSlot(const char * name)335 SearchNamedReplicationSlot(const char *name)
336 {
337 	int			i;
338 	ReplicationSlot	*slot = NULL;
339 
340 	Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
341 								LW_SHARED));
342 
343 	for (i = 0; i < max_replication_slots; i++)
344 	{
345 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
346 
347 		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
348 		{
349 			slot = s;
350 			break;
351 		}
352 	}
353 
354 	return slot;
355 }
356 
357 /*
358  * Find a previously created slot and mark it as used by this process.
359  *
360  * The return value is only useful if behavior is SAB_Inquire, in which
361  * it's zero if we successfully acquired the slot, -1 if the slot no longer
362  * exists, or the PID of the owning process otherwise.  If behavior is
363  * SAB_Error, then trying to acquire an owned slot is an error.
364  * If SAB_Block, we sleep until the slot is released by the owning process.
365  */
366 int
ReplicationSlotAcquire(const char * name,SlotAcquireBehavior behavior)367 ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
368 {
369 	return ReplicationSlotAcquireInternal(NULL, name, behavior);
370 }
371 
372 /*
373  * Mark the specified slot as used by this process.
374  *
375  * Only one of slot and name can be specified.
376  * If slot == NULL, search for the slot with the given name.
377  *
378  * See comments about the return value in ReplicationSlotAcquire().
379  */
380 static int
ReplicationSlotAcquireInternal(ReplicationSlot * slot,const char * name,SlotAcquireBehavior behavior)381 ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
382 							   SlotAcquireBehavior behavior)
383 {
384 	ReplicationSlot *s;
385 	int			active_pid;
386 
387 	AssertArg((slot == NULL) ^ (name == NULL));
388 
389 retry:
390 	Assert(MyReplicationSlot == NULL);
391 
392 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
393 
394 	/*
395 	 * Search for the slot with the specified name if the slot to acquire is
396 	 * not given. If the slot is not found, we either return -1 or error out.
397 	 */
398 	s = slot ? slot : SearchNamedReplicationSlot(name);
399 	if (s == NULL || !s->in_use)
400 	{
401 		LWLockRelease(ReplicationSlotControlLock);
402 
403 		if (behavior == SAB_Inquire)
404 			return -1;
405 		ereport(ERROR,
406 				(errcode(ERRCODE_UNDEFINED_OBJECT),
407 				 errmsg("replication slot \"%s\" does not exist",
408 						name ? name : NameStr(slot->data.name))));
409 	}
410 
411 	/*
412 	 * This is the slot we want; check if it's active under some other
413 	 * process.  In single user mode, we don't need this check.
414 	 */
415 	if (IsUnderPostmaster)
416 	{
417 		/*
418 		 * Get ready to sleep on the slot in case it is active if SAB_Block.
419 		 * (We may end up not sleeping, but we don't want to do this while
420 		 * holding the spinlock.)
421 		 */
422 		if (behavior == SAB_Block)
423 			ConditionVariablePrepareToSleep(&s->active_cv);
424 
425 		SpinLockAcquire(&s->mutex);
426 		if (s->active_pid == 0)
427 			s->active_pid = MyProcPid;
428 		active_pid = s->active_pid;
429 		SpinLockRelease(&s->mutex);
430 	}
431 	else
432 		active_pid = MyProcPid;
433 	LWLockRelease(ReplicationSlotControlLock);
434 
435 	/*
436 	 * If we found the slot but it's already active in another process, we
437 	 * either error out, return the PID of the owning process, or retry
438 	 * after a short wait, as caller specified.
439 	 */
440 	if (active_pid != MyProcPid)
441 	{
442 		if (behavior == SAB_Error)
443 			ereport(ERROR,
444 					(errcode(ERRCODE_OBJECT_IN_USE),
445 					 errmsg("replication slot \"%s\" is active for PID %d",
446 							NameStr(s->data.name), active_pid)));
447 		else if (behavior == SAB_Inquire)
448 			return active_pid;
449 
450 		/* Wait here until we get signaled, and then restart */
451 		ConditionVariableSleep(&s->active_cv,
452 							   WAIT_EVENT_REPLICATION_SLOT_DROP);
453 		ConditionVariableCancelSleep();
454 		goto retry;
455 	}
456 	else if (behavior == SAB_Block)
457 		ConditionVariableCancelSleep();	/* no sleep needed after all */
458 
459 	/* Let everybody know we've modified this slot */
460 	ConditionVariableBroadcast(&s->active_cv);
461 
462 	/* We made this slot active, so it's ours now. */
463 	MyReplicationSlot = s;
464 
465 	/* success */
466 	return 0;
467 }
468 
469 /*
470  * Release the replication slot that this backend considers to own.
471  *
472  * This or another backend can re-acquire the slot later.
473  * Resources this slot requires will be preserved.
474  */
475 void
ReplicationSlotRelease(void)476 ReplicationSlotRelease(void)
477 {
478 	ReplicationSlot *slot = MyReplicationSlot;
479 
480 	Assert(slot != NULL && slot->active_pid != 0);
481 
482 	if (slot->data.persistency == RS_EPHEMERAL)
483 	{
484 		/*
485 		 * Delete the slot. There is no !PANIC case where this is allowed to
486 		 * fail, all that may happen is an incomplete cleanup of the on-disk
487 		 * data.
488 		 */
489 		ReplicationSlotDropAcquired();
490 	}
491 
492 	/*
493 	 * If slot needed to temporarily restrain both data and catalog xmin to
494 	 * create the catalog snapshot, remove that temporary constraint.
495 	 * Snapshots can only be exported while the initial snapshot is still
496 	 * acquired.
497 	 */
498 	if (!TransactionIdIsValid(slot->data.xmin) &&
499 		TransactionIdIsValid(slot->effective_xmin))
500 	{
501 		SpinLockAcquire(&slot->mutex);
502 		slot->effective_xmin = InvalidTransactionId;
503 		SpinLockRelease(&slot->mutex);
504 		ReplicationSlotsComputeRequiredXmin(false);
505 	}
506 
507 	if (slot->data.persistency == RS_PERSISTENT)
508 	{
509 		/*
510 		 * Mark persistent slot inactive.  We're not freeing it, just
511 		 * disconnecting, but wake up others that may be waiting for it.
512 		 */
513 		SpinLockAcquire(&slot->mutex);
514 		slot->active_pid = 0;
515 		SpinLockRelease(&slot->mutex);
516 		ConditionVariableBroadcast(&slot->active_cv);
517 	}
518 
519 	MyReplicationSlot = NULL;
520 
521 	/* might not have been set when we've been a plain slot */
522 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
523 	MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
524 	LWLockRelease(ProcArrayLock);
525 }
526 
527 /*
528  * Cleanup all temporary slots created in current session.
529  */
530 void
ReplicationSlotCleanup(void)531 ReplicationSlotCleanup(void)
532 {
533 	int			i;
534 
535 	Assert(MyReplicationSlot == NULL);
536 
537 restart:
538 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
539 	for (i = 0; i < max_replication_slots; i++)
540 	{
541 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
542 
543 		if (!s->in_use)
544 			continue;
545 
546 		SpinLockAcquire(&s->mutex);
547 		if (s->active_pid == MyProcPid)
548 		{
549 			Assert(s->data.persistency == RS_TEMPORARY);
550 			SpinLockRelease(&s->mutex);
551 			LWLockRelease(ReplicationSlotControlLock);	/* avoid deadlock */
552 
553 			ReplicationSlotDropPtr(s);
554 
555 			ConditionVariableBroadcast(&s->active_cv);
556 			goto restart;
557 		}
558 		else
559 			SpinLockRelease(&s->mutex);
560 	}
561 
562 	LWLockRelease(ReplicationSlotControlLock);
563 }
564 
565 /*
566  * Permanently drop replication slot identified by the passed in name.
567  */
568 void
ReplicationSlotDrop(const char * name,bool nowait)569 ReplicationSlotDrop(const char *name, bool nowait)
570 {
571 	Assert(MyReplicationSlot == NULL);
572 
573 	(void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
574 
575 	ReplicationSlotDropAcquired();
576 }
577 
578 /*
579  * Permanently drop the currently acquired replication slot.
580  */
581 static void
ReplicationSlotDropAcquired(void)582 ReplicationSlotDropAcquired(void)
583 {
584 	ReplicationSlot *slot = MyReplicationSlot;
585 
586 	Assert(MyReplicationSlot != NULL);
587 
588 	/* slot isn't acquired anymore */
589 	MyReplicationSlot = NULL;
590 
591 	ReplicationSlotDropPtr(slot);
592 }
593 
594 /*
595  * Permanently drop the replication slot which will be released by the point
596  * this function returns.
597  */
598 static void
ReplicationSlotDropPtr(ReplicationSlot * slot)599 ReplicationSlotDropPtr(ReplicationSlot *slot)
600 {
601 	char		path[MAXPGPATH];
602 	char		tmppath[MAXPGPATH];
603 
604 	/*
605 	 * If some other backend ran this code concurrently with us, we might try
606 	 * to delete a slot with a certain name while someone else was trying to
607 	 * create a slot with the same name.
608 	 */
609 	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
610 
611 	/* Generate pathnames. */
612 	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
613 	sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
614 
615 	/*
616 	 * Rename the slot directory on disk, so that we'll no longer recognize
617 	 * this as a valid slot.  Note that if this fails, we've got to mark the
618 	 * slot inactive before bailing out.  If we're dropping an ephemeral or a
619 	 * temporary slot, we better never fail hard as the caller won't expect
620 	 * the slot to survive and this might get called during error handling.
621 	 */
622 	if (rename(path, tmppath) == 0)
623 	{
624 		/*
625 		 * We need to fsync() the directory we just renamed and its parent to
626 		 * make sure that our changes are on disk in a crash-safe fashion.  If
627 		 * fsync() fails, we can't be sure whether the changes are on disk or
628 		 * not.  For now, we handle that by panicking;
629 		 * StartupReplicationSlots() will try to straighten it out after
630 		 * restart.
631 		 */
632 		START_CRIT_SECTION();
633 		fsync_fname(tmppath, true);
634 		fsync_fname("pg_replslot", true);
635 		END_CRIT_SECTION();
636 	}
637 	else
638 	{
639 		bool		fail_softly = slot->data.persistency != RS_PERSISTENT;
640 
641 		SpinLockAcquire(&slot->mutex);
642 		slot->active_pid = 0;
643 		SpinLockRelease(&slot->mutex);
644 
645 		/* wake up anyone waiting on this slot */
646 		ConditionVariableBroadcast(&slot->active_cv);
647 
648 		ereport(fail_softly ? WARNING : ERROR,
649 				(errcode_for_file_access(),
650 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
651 						path, tmppath)));
652 	}
653 
654 	/*
655 	 * The slot is definitely gone.  Lock out concurrent scans of the array
656 	 * long enough to kill it.  It's OK to clear the active PID here without
657 	 * grabbing the mutex because nobody else can be scanning the array here,
658 	 * and nobody can be attached to this slot and thus access it without
659 	 * scanning the array.
660 	 *
661 	 * Also wake up processes waiting for it.
662 	 */
663 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
664 	slot->active_pid = 0;
665 	slot->in_use = false;
666 	LWLockRelease(ReplicationSlotControlLock);
667 	ConditionVariableBroadcast(&slot->active_cv);
668 
669 	/*
670 	 * Slot is dead and doesn't prevent resource removal anymore, recompute
671 	 * limits.
672 	 */
673 	ReplicationSlotsComputeRequiredXmin(false);
674 	ReplicationSlotsComputeRequiredLSN();
675 
676 	/*
677 	 * If removing the directory fails, the worst thing that will happen is
678 	 * that the user won't be able to create a new slot with the same name
679 	 * until the next server restart.  We warn about it, but that's all.
680 	 */
681 	if (!rmtree(tmppath, true))
682 		ereport(WARNING,
683 				(errmsg("could not remove directory \"%s\"", tmppath)));
684 
685 	/*
686 	 * We release this at the very end, so that nobody starts trying to create
687 	 * a slot while we're still cleaning up the detritus of the old one.
688 	 */
689 	LWLockRelease(ReplicationSlotAllocationLock);
690 }
691 
692 /*
693  * Serialize the currently acquired slot's state from memory to disk, thereby
694  * guaranteeing the current state will survive a crash.
695  */
696 void
ReplicationSlotSave(void)697 ReplicationSlotSave(void)
698 {
699 	char		path[MAXPGPATH];
700 
701 	Assert(MyReplicationSlot != NULL);
702 
703 	sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
704 	SaveSlotToPath(MyReplicationSlot, path, ERROR);
705 }
706 
707 /*
708  * Signal that it would be useful if the currently acquired slot would be
709  * flushed out to disk.
710  *
711  * Note that the actual flush to disk can be delayed for a long time, if
712  * required for correctness explicitly do a ReplicationSlotSave().
713  */
714 void
ReplicationSlotMarkDirty(void)715 ReplicationSlotMarkDirty(void)
716 {
717 	ReplicationSlot *slot = MyReplicationSlot;
718 
719 	Assert(MyReplicationSlot != NULL);
720 
721 	SpinLockAcquire(&slot->mutex);
722 	MyReplicationSlot->just_dirtied = true;
723 	MyReplicationSlot->dirty = true;
724 	SpinLockRelease(&slot->mutex);
725 }
726 
727 /*
728  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
729  * guaranteeing it will be there after an eventual crash.
730  */
731 void
ReplicationSlotPersist(void)732 ReplicationSlotPersist(void)
733 {
734 	ReplicationSlot *slot = MyReplicationSlot;
735 
736 	Assert(slot != NULL);
737 	Assert(slot->data.persistency != RS_PERSISTENT);
738 
739 	SpinLockAcquire(&slot->mutex);
740 	slot->data.persistency = RS_PERSISTENT;
741 	SpinLockRelease(&slot->mutex);
742 
743 	ReplicationSlotMarkDirty();
744 	ReplicationSlotSave();
745 }
746 
747 /*
748  * Compute the oldest xmin across all slots and store it in the ProcArray.
749  *
750  * If already_locked is true, ProcArrayLock has already been acquired
751  * exclusively.
752  */
753 void
ReplicationSlotsComputeRequiredXmin(bool already_locked)754 ReplicationSlotsComputeRequiredXmin(bool already_locked)
755 {
756 	int			i;
757 	TransactionId agg_xmin = InvalidTransactionId;
758 	TransactionId agg_catalog_xmin = InvalidTransactionId;
759 
760 	Assert(ReplicationSlotCtl != NULL);
761 
762 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
763 
764 	for (i = 0; i < max_replication_slots; i++)
765 	{
766 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
767 		TransactionId effective_xmin;
768 		TransactionId effective_catalog_xmin;
769 
770 		if (!s->in_use)
771 			continue;
772 
773 		SpinLockAcquire(&s->mutex);
774 		effective_xmin = s->effective_xmin;
775 		effective_catalog_xmin = s->effective_catalog_xmin;
776 		SpinLockRelease(&s->mutex);
777 
778 		/* check the data xmin */
779 		if (TransactionIdIsValid(effective_xmin) &&
780 			(!TransactionIdIsValid(agg_xmin) ||
781 			 TransactionIdPrecedes(effective_xmin, agg_xmin)))
782 			agg_xmin = effective_xmin;
783 
784 		/* check the catalog xmin */
785 		if (TransactionIdIsValid(effective_catalog_xmin) &&
786 			(!TransactionIdIsValid(agg_catalog_xmin) ||
787 			 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
788 			agg_catalog_xmin = effective_catalog_xmin;
789 	}
790 
791 	LWLockRelease(ReplicationSlotControlLock);
792 
793 	ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
794 }
795 
796 /*
797  * Compute the oldest restart LSN across all slots and inform xlog module.
798  *
799  * Note: while max_slot_wal_keep_size is theoretically relevant for this
800  * purpose, we don't try to account for that, because this module doesn't
801  * know what to compare against.
802  */
803 void
ReplicationSlotsComputeRequiredLSN(void)804 ReplicationSlotsComputeRequiredLSN(void)
805 {
806 	int			i;
807 	XLogRecPtr	min_required = InvalidXLogRecPtr;
808 
809 	Assert(ReplicationSlotCtl != NULL);
810 
811 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
812 	for (i = 0; i < max_replication_slots; i++)
813 	{
814 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
815 		XLogRecPtr	restart_lsn;
816 
817 		if (!s->in_use)
818 			continue;
819 
820 		SpinLockAcquire(&s->mutex);
821 		restart_lsn = s->data.restart_lsn;
822 		SpinLockRelease(&s->mutex);
823 
824 		if (restart_lsn != InvalidXLogRecPtr &&
825 			(min_required == InvalidXLogRecPtr ||
826 			 restart_lsn < min_required))
827 			min_required = restart_lsn;
828 	}
829 	LWLockRelease(ReplicationSlotControlLock);
830 
831 	XLogSetReplicationSlotMinimumLSN(min_required);
832 }
833 
834 /*
835  * Compute the oldest WAL LSN required by *logical* decoding slots..
836  *
837  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
838  * slots exist.
839  *
840  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
841  * ignores physical replication slots.
842  *
843  * The results aren't required frequently, so we don't maintain a precomputed
844  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
845  */
846 XLogRecPtr
ReplicationSlotsComputeLogicalRestartLSN(void)847 ReplicationSlotsComputeLogicalRestartLSN(void)
848 {
849 	XLogRecPtr	result = InvalidXLogRecPtr;
850 	int			i;
851 
852 	if (max_replication_slots <= 0)
853 		return InvalidXLogRecPtr;
854 
855 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
856 
857 	for (i = 0; i < max_replication_slots; i++)
858 	{
859 		ReplicationSlot *s;
860 		XLogRecPtr	restart_lsn;
861 
862 		s = &ReplicationSlotCtl->replication_slots[i];
863 
864 		/* cannot change while ReplicationSlotCtlLock is held */
865 		if (!s->in_use)
866 			continue;
867 
868 		/* we're only interested in logical slots */
869 		if (!SlotIsLogical(s))
870 			continue;
871 
872 		/* read once, it's ok if it increases while we're checking */
873 		SpinLockAcquire(&s->mutex);
874 		restart_lsn = s->data.restart_lsn;
875 		SpinLockRelease(&s->mutex);
876 
877 		if (restart_lsn == InvalidXLogRecPtr)
878 			continue;
879 
880 		if (result == InvalidXLogRecPtr ||
881 			restart_lsn < result)
882 			result = restart_lsn;
883 	}
884 
885 	LWLockRelease(ReplicationSlotControlLock);
886 
887 	return result;
888 }
889 
890 /*
891  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
892  * passed database oid.
893  *
894  * Returns true if there are any slots referencing the database. *nslots will
895  * be set to the absolute number of slots in the database, *nactive to ones
896  * currently active.
897  */
898 bool
ReplicationSlotsCountDBSlots(Oid dboid,int * nslots,int * nactive)899 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
900 {
901 	int			i;
902 
903 	*nslots = *nactive = 0;
904 
905 	if (max_replication_slots <= 0)
906 		return false;
907 
908 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
909 	for (i = 0; i < max_replication_slots; i++)
910 	{
911 		ReplicationSlot *s;
912 
913 		s = &ReplicationSlotCtl->replication_slots[i];
914 
915 		/* cannot change while ReplicationSlotCtlLock is held */
916 		if (!s->in_use)
917 			continue;
918 
919 		/* only logical slots are database specific, skip */
920 		if (!SlotIsLogical(s))
921 			continue;
922 
923 		/* not our database, skip */
924 		if (s->data.database != dboid)
925 			continue;
926 
927 		/* count slots with spinlock held */
928 		SpinLockAcquire(&s->mutex);
929 		(*nslots)++;
930 		if (s->active_pid != 0)
931 			(*nactive)++;
932 		SpinLockRelease(&s->mutex);
933 	}
934 	LWLockRelease(ReplicationSlotControlLock);
935 
936 	if (*nslots > 0)
937 		return true;
938 	return false;
939 }
940 
941 /*
942  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
943  * passed database oid. The caller should hold an exclusive lock on the
944  * pg_database oid for the database to prevent creation of new slots on the db
945  * or replay from existing slots.
946  *
947  * Another session that concurrently acquires an existing slot on the target DB
948  * (most likely to drop it) may cause this function to ERROR. If that happens
949  * it may have dropped some but not all slots.
950  *
951  * This routine isn't as efficient as it could be - but we don't drop
952  * databases often, especially databases with lots of slots.
953  */
954 void
ReplicationSlotsDropDBSlots(Oid dboid)955 ReplicationSlotsDropDBSlots(Oid dboid)
956 {
957 	int			i;
958 
959 	if (max_replication_slots <= 0)
960 		return;
961 
962 restart:
963 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
964 	for (i = 0; i < max_replication_slots; i++)
965 	{
966 		ReplicationSlot *s;
967 		char	   *slotname;
968 		int			active_pid;
969 
970 		s = &ReplicationSlotCtl->replication_slots[i];
971 
972 		/* cannot change while ReplicationSlotCtlLock is held */
973 		if (!s->in_use)
974 			continue;
975 
976 		/* only logical slots are database specific, skip */
977 		if (!SlotIsLogical(s))
978 			continue;
979 
980 		/* not our database, skip */
981 		if (s->data.database != dboid)
982 			continue;
983 
984 		/* acquire slot, so ReplicationSlotDropAcquired can be reused  */
985 		SpinLockAcquire(&s->mutex);
986 		/* can't change while ReplicationSlotControlLock is held */
987 		slotname = NameStr(s->data.name);
988 		active_pid = s->active_pid;
989 		if (active_pid == 0)
990 		{
991 			MyReplicationSlot = s;
992 			s->active_pid = MyProcPid;
993 		}
994 		SpinLockRelease(&s->mutex);
995 
996 		/*
997 		 * Even though we hold an exclusive lock on the database object a
998 		 * logical slot for that DB can still be active, e.g. if it's
999 		 * concurrently being dropped by a backend connected to another DB.
1000 		 *
1001 		 * That's fairly unlikely in practice, so we'll just bail out.
1002 		 */
1003 		if (active_pid)
1004 			ereport(ERROR,
1005 					(errcode(ERRCODE_OBJECT_IN_USE),
1006 					 errmsg("replication slot \"%s\" is active for PID %d",
1007 							slotname, active_pid)));
1008 
1009 		/*
1010 		 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1011 		 * holding ReplicationSlotControlLock over filesystem operations,
1012 		 * release ReplicationSlotControlLock and use
1013 		 * ReplicationSlotDropAcquired.
1014 		 *
1015 		 * As that means the set of slots could change, restart scan from the
1016 		 * beginning each time we release the lock.
1017 		 */
1018 		LWLockRelease(ReplicationSlotControlLock);
1019 		ReplicationSlotDropAcquired();
1020 		goto restart;
1021 	}
1022 	LWLockRelease(ReplicationSlotControlLock);
1023 }
1024 
1025 
1026 /*
1027  * Check whether the server's configuration supports using replication
1028  * slots.
1029  */
1030 void
CheckSlotRequirements(void)1031 CheckSlotRequirements(void)
1032 {
1033 	/*
1034 	 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1035 	 * needs the same check.
1036 	 */
1037 
1038 	if (max_replication_slots == 0)
1039 		ereport(ERROR,
1040 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1041 				 errmsg("replication slots can only be used if max_replication_slots > 0")));
1042 
1043 	if (wal_level < WAL_LEVEL_REPLICA)
1044 		ereport(ERROR,
1045 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1046 				 errmsg("replication slots can only be used if wal_level >= replica")));
1047 }
1048 
1049 /*
1050  * Reserve WAL for the currently active slot.
1051  *
1052  * Compute and set restart_lsn in a manner that's appropriate for the type of
1053  * the slot and concurrency safe.
1054  */
1055 void
ReplicationSlotReserveWal(void)1056 ReplicationSlotReserveWal(void)
1057 {
1058 	ReplicationSlot *slot = MyReplicationSlot;
1059 
1060 	Assert(slot != NULL);
1061 	Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1062 
1063 	/*
1064 	 * The replication slot mechanism is used to prevent removal of required
1065 	 * WAL. As there is no interlock between this routine and checkpoints, WAL
1066 	 * segments could concurrently be removed when a now stale return value of
1067 	 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1068 	 * this happens we'll just retry.
1069 	 */
1070 	while (true)
1071 	{
1072 		XLogSegNo	segno;
1073 		XLogRecPtr	restart_lsn;
1074 
1075 		/*
1076 		 * For logical slots log a standby snapshot and start logical decoding
1077 		 * at exactly that position. That allows the slot to start up more
1078 		 * quickly.
1079 		 *
1080 		 * That's not needed (or indeed helpful) for physical slots as they'll
1081 		 * start replay at the last logged checkpoint anyway. Instead return
1082 		 * the location of the last redo LSN. While that slightly increases
1083 		 * the chance that we have to retry, it's where a base backup has to
1084 		 * start replay at.
1085 		 */
1086 		if (!RecoveryInProgress() && SlotIsLogical(slot))
1087 		{
1088 			XLogRecPtr	flushptr;
1089 
1090 			/* start at current insert position */
1091 			restart_lsn = GetXLogInsertRecPtr();
1092 			SpinLockAcquire(&slot->mutex);
1093 			slot->data.restart_lsn = restart_lsn;
1094 			SpinLockRelease(&slot->mutex);
1095 
1096 			/* make sure we have enough information to start */
1097 			flushptr = LogStandbySnapshot();
1098 
1099 			/* and make sure it's fsynced to disk */
1100 			XLogFlush(flushptr);
1101 		}
1102 		else
1103 		{
1104 			restart_lsn = GetRedoRecPtr();
1105 			SpinLockAcquire(&slot->mutex);
1106 			slot->data.restart_lsn = restart_lsn;
1107 			SpinLockRelease(&slot->mutex);
1108 		}
1109 
1110 		/* prevent WAL removal as fast as possible */
1111 		ReplicationSlotsComputeRequiredLSN();
1112 
1113 		/*
1114 		 * If all required WAL is still there, great, otherwise retry. The
1115 		 * slot should prevent further removal of WAL, unless there's a
1116 		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1117 		 * the new restart_lsn above, so normally we should never need to loop
1118 		 * more than twice.
1119 		 */
1120 		XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1121 		if (XLogGetLastRemovedSegno() < segno)
1122 			break;
1123 	}
1124 }
1125 
1126 /*
1127  * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
1128  * and mark it invalid, if necessary and possible.
1129  *
1130  * Returns whether ReplicationSlotControlLock was released in the interim (and
1131  * in that case we're not holding the lock at return, otherwise we are).
1132  *
1133  * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1134  *
1135  * This is inherently racy, because we release the LWLock
1136  * for syscalls, so caller must restart if we return true.
1137  */
1138 static bool
InvalidatePossiblyObsoleteSlot(ReplicationSlot * s,XLogRecPtr oldestLSN,bool * invalidated)1139 InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
1140 							   bool *invalidated)
1141 {
1142 	int			last_signaled_pid = 0;
1143 	bool		released_lock = false;
1144 
1145 	for (;;)
1146 	{
1147 		XLogRecPtr	restart_lsn;
1148 		NameData	slotname;
1149 		int			active_pid = 0;
1150 
1151 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1152 
1153 		if (!s->in_use)
1154 		{
1155 			if (released_lock)
1156 				LWLockRelease(ReplicationSlotControlLock);
1157 			break;
1158 		}
1159 
1160 		/*
1161 		 * Check if the slot needs to be invalidated. If it needs to be
1162 		 * invalidated, and is not currently acquired, acquire it and mark it
1163 		 * as having been invalidated.  We do this with the spinlock held to
1164 		 * avoid race conditions -- for example the restart_lsn could move
1165 		 * forward, or the slot could be dropped.
1166 		 */
1167 		SpinLockAcquire(&s->mutex);
1168 
1169 		restart_lsn = s->data.restart_lsn;
1170 
1171 		/*
1172 		 * If the slot is already invalid or is fresh enough, we don't need to
1173 		 * do anything.
1174 		 */
1175 		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
1176 		{
1177 			SpinLockRelease(&s->mutex);
1178 			if (released_lock)
1179 				LWLockRelease(ReplicationSlotControlLock);
1180 			break;
1181 		}
1182 
1183 		slotname = s->data.name;
1184 		active_pid = s->active_pid;
1185 
1186 		/*
1187 		 * If the slot can be acquired, do so and mark it invalidated
1188 		 * immediately.  Otherwise we'll signal the owning process, below, and
1189 		 * retry.
1190 		 */
1191 		if (active_pid == 0)
1192 		{
1193 			MyReplicationSlot = s;
1194 			s->active_pid = MyProcPid;
1195 			s->data.invalidated_at = restart_lsn;
1196 			s->data.restart_lsn = InvalidXLogRecPtr;
1197 
1198 			/* Let caller know */
1199 			*invalidated = true;
1200 		}
1201 
1202 		SpinLockRelease(&s->mutex);
1203 
1204 		if (active_pid != 0)
1205 		{
1206 			/*
1207 			 * Prepare the sleep on the slot's condition variable before
1208 			 * releasing the lock, to close a possible race condition if the
1209 			 * slot is released before the sleep below.
1210 			 */
1211 			ConditionVariablePrepareToSleep(&s->active_cv);
1212 
1213 			LWLockRelease(ReplicationSlotControlLock);
1214 			released_lock = true;
1215 
1216 			/*
1217 			 * Signal to terminate the process that owns the slot, if we
1218 			 * haven't already signalled it.  (Avoidance of repeated
1219 			 * signalling is the only reason for there to be a loop in this
1220 			 * routine; otherwise we could rely on caller's restart loop.)
1221 			 *
1222 			 * There is the race condition that other process may own the slot
1223 			 * after its current owner process is terminated and before this
1224 			 * process owns it. To handle that, we signal only if the PID of
1225 			 * the owning process has changed from the previous time. (This
1226 			 * logic assumes that the same PID is not reused very quickly.)
1227 			 */
1228 			if (last_signaled_pid != active_pid)
1229 			{
1230 				ereport(LOG,
1231 						(errmsg("terminating process %d to release replication slot \"%s\"",
1232 								active_pid, NameStr(slotname))));
1233 
1234 				(void) kill(active_pid, SIGTERM);
1235 				last_signaled_pid = active_pid;
1236 			}
1237 
1238 			/* Wait until the slot is released. */
1239 			ConditionVariableSleep(&s->active_cv,
1240 								   WAIT_EVENT_REPLICATION_SLOT_DROP);
1241 
1242 			/*
1243 			 * Re-acquire lock and start over; we expect to invalidate the slot
1244 			 * next time (unless another process acquires the slot in the
1245 			 * meantime).
1246 			 */
1247 			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1248 			continue;
1249 		}
1250 		else
1251 		{
1252 			/*
1253 			 * We hold the slot now and have already invalidated it; flush it
1254 			 * to ensure that state persists.
1255 			 *
1256 			 * Don't want to hold ReplicationSlotControlLock across file
1257 			 * system operations, so release it now but be sure to tell caller
1258 			 * to restart from scratch.
1259 			 */
1260 			LWLockRelease(ReplicationSlotControlLock);
1261 			released_lock = true;
1262 
1263 			/* Make sure the invalidated state persists across server restart */
1264 			ReplicationSlotMarkDirty();
1265 			ReplicationSlotSave();
1266 			ReplicationSlotRelease();
1267 
1268 			ereport(LOG,
1269 					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
1270 							NameStr(slotname),
1271 							(uint32) (restart_lsn >> 32),
1272 							(uint32) restart_lsn)));
1273 
1274 			/* done with this slot for now */
1275 			break;
1276 		}
1277 	}
1278 
1279 	Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1280 
1281 	return released_lock;
1282 }
1283 
1284 /*
1285  * Mark any slot that points to an LSN older than the given segment
1286  * as invalid; it requires WAL that's about to be removed.
1287  *
1288  * Returns true when any slot have got invalidated.
1289  *
1290  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1291  */
1292 bool
InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)1293 InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
1294 {
1295 	XLogRecPtr	oldestLSN;
1296 	bool		invalidated = false;
1297 
1298 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1299 
1300 restart:
1301 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1302 	for (int i = 0; i < max_replication_slots; i++)
1303 	{
1304 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1305 
1306 		if (!s->in_use)
1307 			continue;
1308 
1309 		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
1310 		{
1311 			/* if the lock was released, start from scratch */
1312 			goto restart;
1313 		}
1314 	}
1315 	LWLockRelease(ReplicationSlotControlLock);
1316 
1317 	/*
1318 	 * If any slots have been invalidated, recalculate the resource limits.
1319 	 */
1320 	if (invalidated)
1321 	{
1322 		ReplicationSlotsComputeRequiredXmin(false);
1323 		ReplicationSlotsComputeRequiredLSN();
1324 	}
1325 
1326 	return invalidated;
1327 }
1328 
1329 /*
1330  * Flush all replication slots to disk.
1331  *
1332  * This needn't actually be part of a checkpoint, but it's a convenient
1333  * location.
1334  */
1335 void
CheckPointReplicationSlots(void)1336 CheckPointReplicationSlots(void)
1337 {
1338 	int			i;
1339 
1340 	elog(DEBUG1, "performing replication slot checkpoint");
1341 
1342 	/*
1343 	 * Prevent any slot from being created/dropped while we're active. As we
1344 	 * explicitly do *not* want to block iterating over replication_slots or
1345 	 * acquiring a slot we cannot take the control lock - but that's OK,
1346 	 * because holding ReplicationSlotAllocationLock is strictly stronger, and
1347 	 * enough to guarantee that nobody can change the in_use bits on us.
1348 	 */
1349 	LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1350 
1351 	for (i = 0; i < max_replication_slots; i++)
1352 	{
1353 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1354 		char		path[MAXPGPATH];
1355 
1356 		if (!s->in_use)
1357 			continue;
1358 
1359 		/* save the slot to disk, locking is handled in SaveSlotToPath() */
1360 		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1361 		SaveSlotToPath(s, path, LOG);
1362 	}
1363 	LWLockRelease(ReplicationSlotAllocationLock);
1364 }
1365 
1366 /*
1367  * Load all replication slots from disk into memory at server startup. This
1368  * needs to be run before we start crash recovery.
1369  */
1370 void
StartupReplicationSlots(void)1371 StartupReplicationSlots(void)
1372 {
1373 	DIR		   *replication_dir;
1374 	struct dirent *replication_de;
1375 
1376 	elog(DEBUG1, "starting up replication slots");
1377 
1378 	/* restore all slots by iterating over all on-disk entries */
1379 	replication_dir = AllocateDir("pg_replslot");
1380 	while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1381 	{
1382 		struct stat statbuf;
1383 		char		path[MAXPGPATH + 12];
1384 
1385 		if (strcmp(replication_de->d_name, ".") == 0 ||
1386 			strcmp(replication_de->d_name, "..") == 0)
1387 			continue;
1388 
1389 		snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1390 
1391 		/* we're only creating directories here, skip if it's not our's */
1392 		if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
1393 			continue;
1394 
1395 		/* we crashed while a slot was being setup or deleted, clean up */
1396 		if (pg_str_endswith(replication_de->d_name, ".tmp"))
1397 		{
1398 			if (!rmtree(path, true))
1399 			{
1400 				ereport(WARNING,
1401 						(errmsg("could not remove directory \"%s\"",
1402 								path)));
1403 				continue;
1404 			}
1405 			fsync_fname("pg_replslot", true);
1406 			continue;
1407 		}
1408 
1409 		/* looks like a slot in a normal state, restore */
1410 		RestoreSlotFromDisk(replication_de->d_name);
1411 	}
1412 	FreeDir(replication_dir);
1413 
1414 	/* currently no slots exist, we're done. */
1415 	if (max_replication_slots <= 0)
1416 		return;
1417 
1418 	/* Now that we have recovered all the data, compute replication xmin */
1419 	ReplicationSlotsComputeRequiredXmin(false);
1420 	ReplicationSlotsComputeRequiredLSN();
1421 }
1422 
1423 /* ----
1424  * Manipulation of on-disk state of replication slots
1425  *
1426  * NB: none of the routines below should take any notice whether a slot is the
1427  * current one or not, that's all handled a layer above.
1428  * ----
1429  */
1430 static void
CreateSlotOnDisk(ReplicationSlot * slot)1431 CreateSlotOnDisk(ReplicationSlot *slot)
1432 {
1433 	char		tmppath[MAXPGPATH];
1434 	char		path[MAXPGPATH];
1435 	struct stat st;
1436 
1437 	/*
1438 	 * No need to take out the io_in_progress_lock, nobody else can see this
1439 	 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1440 	 * takes out the lock, if we'd take the lock here, we'd deadlock.
1441 	 */
1442 
1443 	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1444 	sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1445 
1446 	/*
1447 	 * It's just barely possible that some previous effort to create or drop a
1448 	 * slot with this name left a temp directory lying around. If that seems
1449 	 * to be the case, try to remove it.  If the rmtree() fails, we'll error
1450 	 * out at the MakePGDirectory() below, so we don't bother checking
1451 	 * success.
1452 	 */
1453 	if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1454 		rmtree(tmppath, true);
1455 
1456 	/* Create and fsync the temporary slot directory. */
1457 	if (MakePGDirectory(tmppath) < 0)
1458 		ereport(ERROR,
1459 				(errcode_for_file_access(),
1460 				 errmsg("could not create directory \"%s\": %m",
1461 						tmppath)));
1462 	fsync_fname(tmppath, true);
1463 
1464 	/* Write the actual state file. */
1465 	slot->dirty = true;			/* signal that we really need to write */
1466 	SaveSlotToPath(slot, tmppath, ERROR);
1467 
1468 	/* Rename the directory into place. */
1469 	if (rename(tmppath, path) != 0)
1470 		ereport(ERROR,
1471 				(errcode_for_file_access(),
1472 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
1473 						tmppath, path)));
1474 
1475 	/*
1476 	 * If we'd now fail - really unlikely - we wouldn't know whether this slot
1477 	 * would persist after an OS crash or not - so, force a restart. The
1478 	 * restart would try to fsync this again till it works.
1479 	 */
1480 	START_CRIT_SECTION();
1481 
1482 	fsync_fname(path, true);
1483 	fsync_fname("pg_replslot", true);
1484 
1485 	END_CRIT_SECTION();
1486 }
1487 
1488 /*
1489  * Shared functionality between saving and creating a replication slot.
1490  */
1491 static void
SaveSlotToPath(ReplicationSlot * slot,const char * dir,int elevel)1492 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1493 {
1494 	char		tmppath[MAXPGPATH];
1495 	char		path[MAXPGPATH];
1496 	int			fd;
1497 	ReplicationSlotOnDisk cp;
1498 	bool		was_dirty;
1499 
1500 	/* first check whether there's something to write out */
1501 	SpinLockAcquire(&slot->mutex);
1502 	was_dirty = slot->dirty;
1503 	slot->just_dirtied = false;
1504 	SpinLockRelease(&slot->mutex);
1505 
1506 	/* and don't do anything if there's nothing to write */
1507 	if (!was_dirty)
1508 		return;
1509 
1510 	LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1511 
1512 	/* silence valgrind :( */
1513 	memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1514 
1515 	sprintf(tmppath, "%s/state.tmp", dir);
1516 	sprintf(path, "%s/state", dir);
1517 
1518 	fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1519 	if (fd < 0)
1520 	{
1521 		/*
1522 		 * If not an ERROR, then release the lock before returning.  In case
1523 		 * of an ERROR, the error recovery path automatically releases the
1524 		 * lock, but no harm in explicitly releasing even in that case.  Note
1525 		 * that LWLockRelease() could affect errno.
1526 		 */
1527 		int			save_errno = errno;
1528 
1529 		LWLockRelease(&slot->io_in_progress_lock);
1530 		errno = save_errno;
1531 		ereport(elevel,
1532 				(errcode_for_file_access(),
1533 				 errmsg("could not create file \"%s\": %m",
1534 						tmppath)));
1535 		return;
1536 	}
1537 
1538 	cp.magic = SLOT_MAGIC;
1539 	INIT_CRC32C(cp.checksum);
1540 	cp.version = SLOT_VERSION;
1541 	cp.length = ReplicationSlotOnDiskV2Size;
1542 
1543 	SpinLockAcquire(&slot->mutex);
1544 
1545 	memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1546 
1547 	SpinLockRelease(&slot->mutex);
1548 
1549 	COMP_CRC32C(cp.checksum,
1550 				(char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1551 				SnapBuildOnDiskChecksummedSize);
1552 	FIN_CRC32C(cp.checksum);
1553 
1554 	errno = 0;
1555 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
1556 	if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1557 	{
1558 		int			save_errno = errno;
1559 
1560 		pgstat_report_wait_end();
1561 		CloseTransientFile(fd);
1562 		LWLockRelease(&slot->io_in_progress_lock);
1563 
1564 		/* if write didn't set errno, assume problem is no disk space */
1565 		errno = save_errno ? save_errno : ENOSPC;
1566 		ereport(elevel,
1567 				(errcode_for_file_access(),
1568 				 errmsg("could not write to file \"%s\": %m",
1569 						tmppath)));
1570 		return;
1571 	}
1572 	pgstat_report_wait_end();
1573 
1574 	/* fsync the temporary file */
1575 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
1576 	if (pg_fsync(fd) != 0)
1577 	{
1578 		int			save_errno = errno;
1579 
1580 		pgstat_report_wait_end();
1581 		CloseTransientFile(fd);
1582 		LWLockRelease(&slot->io_in_progress_lock);
1583 		errno = save_errno;
1584 		ereport(elevel,
1585 				(errcode_for_file_access(),
1586 				 errmsg("could not fsync file \"%s\": %m",
1587 						tmppath)));
1588 		return;
1589 	}
1590 	pgstat_report_wait_end();
1591 
1592 	if (CloseTransientFile(fd) != 0)
1593 	{
1594 		int			save_errno = errno;
1595 
1596 		LWLockRelease(&slot->io_in_progress_lock);
1597 		errno = save_errno;
1598 		ereport(elevel,
1599 				(errcode_for_file_access(),
1600 				 errmsg("could not close file \"%s\": %m",
1601 						tmppath)));
1602 		return;
1603 	}
1604 
1605 	/* rename to permanent file, fsync file and directory */
1606 	if (rename(tmppath, path) != 0)
1607 	{
1608 		int			save_errno = errno;
1609 
1610 		LWLockRelease(&slot->io_in_progress_lock);
1611 		errno = save_errno;
1612 		ereport(elevel,
1613 				(errcode_for_file_access(),
1614 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
1615 						tmppath, path)));
1616 		return;
1617 	}
1618 
1619 	/*
1620 	 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
1621 	 */
1622 	START_CRIT_SECTION();
1623 
1624 	fsync_fname(path, false);
1625 	fsync_fname(dir, true);
1626 	fsync_fname("pg_replslot", true);
1627 
1628 	END_CRIT_SECTION();
1629 
1630 	/*
1631 	 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1632 	 * already.
1633 	 */
1634 	SpinLockAcquire(&slot->mutex);
1635 	if (!slot->just_dirtied)
1636 		slot->dirty = false;
1637 	SpinLockRelease(&slot->mutex);
1638 
1639 	LWLockRelease(&slot->io_in_progress_lock);
1640 }
1641 
1642 /*
1643  * Load a single slot from disk into memory.
1644  */
1645 static void
RestoreSlotFromDisk(const char * name)1646 RestoreSlotFromDisk(const char *name)
1647 {
1648 	ReplicationSlotOnDisk cp;
1649 	int			i;
1650 	char		slotdir[MAXPGPATH + 12];
1651 	char		path[MAXPGPATH + 22];
1652 	int			fd;
1653 	bool		restored = false;
1654 	int			readBytes;
1655 	pg_crc32c	checksum;
1656 
1657 	/* no need to lock here, no concurrent access allowed yet */
1658 
1659 	/* delete temp file if it exists */
1660 	sprintf(slotdir, "pg_replslot/%s", name);
1661 	sprintf(path, "%s/state.tmp", slotdir);
1662 	if (unlink(path) < 0 && errno != ENOENT)
1663 		ereport(PANIC,
1664 				(errcode_for_file_access(),
1665 				 errmsg("could not remove file \"%s\": %m", path)));
1666 
1667 	sprintf(path, "%s/state", slotdir);
1668 
1669 	elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1670 
1671 	/* on some operating systems fsyncing a file requires O_RDWR */
1672 	fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1673 
1674 	/*
1675 	 * We do not need to handle this as we are rename()ing the directory into
1676 	 * place only after we fsync()ed the state file.
1677 	 */
1678 	if (fd < 0)
1679 		ereport(PANIC,
1680 				(errcode_for_file_access(),
1681 				 errmsg("could not open file \"%s\": %m", path)));
1682 
1683 	/*
1684 	 * Sync state file before we're reading from it. We might have crashed
1685 	 * while it wasn't synced yet and we shouldn't continue on that basis.
1686 	 */
1687 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
1688 	if (pg_fsync(fd) != 0)
1689 		ereport(PANIC,
1690 				(errcode_for_file_access(),
1691 				 errmsg("could not fsync file \"%s\": %m",
1692 						path)));
1693 	pgstat_report_wait_end();
1694 
1695 	/* Also sync the parent directory */
1696 	START_CRIT_SECTION();
1697 	fsync_fname(slotdir, true);
1698 	END_CRIT_SECTION();
1699 
1700 	/* read part of statefile that's guaranteed to be version independent */
1701 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1702 	readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1703 	pgstat_report_wait_end();
1704 	if (readBytes != ReplicationSlotOnDiskConstantSize)
1705 	{
1706 		if (readBytes < 0)
1707 			ereport(PANIC,
1708 					(errcode_for_file_access(),
1709 					 errmsg("could not read file \"%s\": %m", path)));
1710 		else
1711 			ereport(PANIC,
1712 					(errcode(ERRCODE_DATA_CORRUPTED),
1713 					 errmsg("could not read file \"%s\": read %d of %zu",
1714 							path, readBytes,
1715 							(Size) ReplicationSlotOnDiskConstantSize)));
1716 	}
1717 
1718 	/* verify magic */
1719 	if (cp.magic != SLOT_MAGIC)
1720 		ereport(PANIC,
1721 				(errcode(ERRCODE_DATA_CORRUPTED),
1722 				 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1723 						path, cp.magic, SLOT_MAGIC)));
1724 
1725 	/* verify version */
1726 	if (cp.version != SLOT_VERSION)
1727 		ereport(PANIC,
1728 				(errcode(ERRCODE_DATA_CORRUPTED),
1729 				 errmsg("replication slot file \"%s\" has unsupported version %u",
1730 						path, cp.version)));
1731 
1732 	/* boundary check on length */
1733 	if (cp.length != ReplicationSlotOnDiskV2Size)
1734 		ereport(PANIC,
1735 				(errcode(ERRCODE_DATA_CORRUPTED),
1736 				 errmsg("replication slot file \"%s\" has corrupted length %u",
1737 						path, cp.length)));
1738 
1739 	/* Now that we know the size, read the entire file */
1740 	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1741 	readBytes = read(fd,
1742 					 (char *) &cp + ReplicationSlotOnDiskConstantSize,
1743 					 cp.length);
1744 	pgstat_report_wait_end();
1745 	if (readBytes != cp.length)
1746 	{
1747 		if (readBytes < 0)
1748 			ereport(PANIC,
1749 					(errcode_for_file_access(),
1750 					 errmsg("could not read file \"%s\": %m", path)));
1751 		else
1752 			ereport(PANIC,
1753 					(errcode(ERRCODE_DATA_CORRUPTED),
1754 					 errmsg("could not read file \"%s\": read %d of %zu",
1755 							path, readBytes, (Size) cp.length)));
1756 	}
1757 
1758 	if (CloseTransientFile(fd) != 0)
1759 		ereport(PANIC,
1760 				(errcode_for_file_access(),
1761 				 errmsg("could not close file \"%s\": %m", path)));
1762 
1763 	/* now verify the CRC */
1764 	INIT_CRC32C(checksum);
1765 	COMP_CRC32C(checksum,
1766 				(char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1767 				SnapBuildOnDiskChecksummedSize);
1768 	FIN_CRC32C(checksum);
1769 
1770 	if (!EQ_CRC32C(checksum, cp.checksum))
1771 		ereport(PANIC,
1772 				(errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1773 						path, checksum, cp.checksum)));
1774 
1775 	/*
1776 	 * If we crashed with an ephemeral slot active, don't restore but delete
1777 	 * it.
1778 	 */
1779 	if (cp.slotdata.persistency != RS_PERSISTENT)
1780 	{
1781 		if (!rmtree(slotdir, true))
1782 		{
1783 			ereport(WARNING,
1784 					(errmsg("could not remove directory \"%s\"",
1785 							slotdir)));
1786 		}
1787 		fsync_fname("pg_replslot", true);
1788 		return;
1789 	}
1790 
1791 	/*
1792 	 * Verify that requirements for the specific slot type are met. That's
1793 	 * important because if these aren't met we're not guaranteed to retain
1794 	 * all the necessary resources for the slot.
1795 	 *
1796 	 * NB: We have to do so *after* the above checks for ephemeral slots,
1797 	 * because otherwise a slot that shouldn't exist anymore could prevent
1798 	 * restarts.
1799 	 *
1800 	 * NB: Changing the requirements here also requires adapting
1801 	 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
1802 	 */
1803 	if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
1804 		ereport(FATAL,
1805 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1806 				 errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
1807 						NameStr(cp.slotdata.name)),
1808 				 errhint("Change wal_level to be logical or higher.")));
1809 	else if (wal_level < WAL_LEVEL_REPLICA)
1810 		ereport(FATAL,
1811 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1812 				 errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
1813 						NameStr(cp.slotdata.name)),
1814 				 errhint("Change wal_level to be replica or higher.")));
1815 
1816 	/* nothing can be active yet, don't lock anything */
1817 	for (i = 0; i < max_replication_slots; i++)
1818 	{
1819 		ReplicationSlot *slot;
1820 
1821 		slot = &ReplicationSlotCtl->replication_slots[i];
1822 
1823 		if (slot->in_use)
1824 			continue;
1825 
1826 		/* restore the entire set of persistent data */
1827 		memcpy(&slot->data, &cp.slotdata,
1828 			   sizeof(ReplicationSlotPersistentData));
1829 
1830 		/* initialize in memory state */
1831 		slot->effective_xmin = cp.slotdata.xmin;
1832 		slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1833 
1834 		slot->candidate_catalog_xmin = InvalidTransactionId;
1835 		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1836 		slot->candidate_restart_lsn = InvalidXLogRecPtr;
1837 		slot->candidate_restart_valid = InvalidXLogRecPtr;
1838 
1839 		slot->in_use = true;
1840 		slot->active_pid = 0;
1841 
1842 		restored = true;
1843 		break;
1844 	}
1845 
1846 	if (!restored)
1847 		ereport(FATAL,
1848 				(errmsg("too many replication slots active before shutdown"),
1849 				 errhint("Increase max_replication_slots and try again.")));
1850 }
1851