1 /*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2020, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 * directory. Inside that directory the state file will contain the slot's
25 * own data. Additional data can be stored alongside that file if required.
26 * While the server is running, the state data is also cached in memory for
27 * efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37 #include "postgres.h"
38
39 #include <unistd.h>
40 #include <sys/stat.h>
41
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "common/string.h"
45 #include "miscadmin.h"
46 #include "pgstat.h"
47 #include "replication/slot.h"
48 #include "storage/fd.h"
49 #include "storage/proc.h"
50 #include "storage/procarray.h"
51 #include "utils/builtins.h"
52
53 /*
54 * Replication slot on-disk data structure.
55 */
56 typedef struct ReplicationSlotOnDisk
57 {
58 /* first part of this struct needs to be version independent */
59
60 /* data not covered by checksum */
61 uint32 magic;
62 pg_crc32c checksum;
63
64 /* data covered by checksum */
65 uint32 version;
66 uint32 length;
67
68 /*
69 * The actual data in the slot that follows can differ based on the above
70 * 'version'.
71 */
72
73 ReplicationSlotPersistentData slotdata;
74 } ReplicationSlotOnDisk;
75
76 /* size of version independent data */
77 #define ReplicationSlotOnDiskConstantSize \
78 offsetof(ReplicationSlotOnDisk, slotdata)
79 /* size of the part of the slot not covered by the checksum */
80 #define SnapBuildOnDiskNotChecksummedSize \
81 offsetof(ReplicationSlotOnDisk, version)
82 /* size of the part covered by the checksum */
83 #define SnapBuildOnDiskChecksummedSize \
84 sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
85 /* size of the slot data that is version dependent */
86 #define ReplicationSlotOnDiskV2Size \
87 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
88
89 #define SLOT_MAGIC 0x1051CA1 /* format identifier */
90 #define SLOT_VERSION 2 /* version for new files */
91
92 /* Control array for replication slot management */
93 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
94
95 /* My backend's replication slot in the shared memory array */
96 ReplicationSlot *MyReplicationSlot = NULL;
97
98 /* GUCs */
99 int max_replication_slots = 0; /* the maximum number of replication
100 * slots */
101
102 static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
103 static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
104 const char *name, SlotAcquireBehavior behavior);
105 static void ReplicationSlotDropAcquired(void);
106 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
107
108 /* internal persistency functions */
109 static void RestoreSlotFromDisk(const char *name);
110 static void CreateSlotOnDisk(ReplicationSlot *slot);
111 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
112
113 /*
114 * Report shared-memory space needed by ReplicationSlotsShmemInit.
115 */
116 Size
ReplicationSlotsShmemSize(void)117 ReplicationSlotsShmemSize(void)
118 {
119 Size size = 0;
120
121 if (max_replication_slots == 0)
122 return size;
123
124 size = offsetof(ReplicationSlotCtlData, replication_slots);
125 size = add_size(size,
126 mul_size(max_replication_slots, sizeof(ReplicationSlot)));
127
128 return size;
129 }
130
131 /*
132 * Allocate and initialize shared memory for replication slots.
133 */
134 void
ReplicationSlotsShmemInit(void)135 ReplicationSlotsShmemInit(void)
136 {
137 bool found;
138
139 if (max_replication_slots == 0)
140 return;
141
142 ReplicationSlotCtl = (ReplicationSlotCtlData *)
143 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
144 &found);
145
146 if (!found)
147 {
148 int i;
149
150 /* First time through, so initialize */
151 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
152
153 for (i = 0; i < max_replication_slots; i++)
154 {
155 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
156
157 /* everything else is zeroed by the memset above */
158 SpinLockInit(&slot->mutex);
159 LWLockInitialize(&slot->io_in_progress_lock,
160 LWTRANCHE_REPLICATION_SLOT_IO);
161 ConditionVariableInit(&slot->active_cv);
162 }
163 }
164 }
165
166 /*
167 * Check whether the passed slot name is valid and report errors at elevel.
168 *
169 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
170 * the name to be used as a directory name on every supported OS.
171 *
172 * Returns whether the directory name is valid or not if elevel < ERROR.
173 */
174 bool
ReplicationSlotValidateName(const char * name,int elevel)175 ReplicationSlotValidateName(const char *name, int elevel)
176 {
177 const char *cp;
178
179 if (strlen(name) == 0)
180 {
181 ereport(elevel,
182 (errcode(ERRCODE_INVALID_NAME),
183 errmsg("replication slot name \"%s\" is too short",
184 name)));
185 return false;
186 }
187
188 if (strlen(name) >= NAMEDATALEN)
189 {
190 ereport(elevel,
191 (errcode(ERRCODE_NAME_TOO_LONG),
192 errmsg("replication slot name \"%s\" is too long",
193 name)));
194 return false;
195 }
196
197 for (cp = name; *cp; cp++)
198 {
199 if (!((*cp >= 'a' && *cp <= 'z')
200 || (*cp >= '0' && *cp <= '9')
201 || (*cp == '_')))
202 {
203 ereport(elevel,
204 (errcode(ERRCODE_INVALID_NAME),
205 errmsg("replication slot name \"%s\" contains invalid character",
206 name),
207 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
208 return false;
209 }
210 }
211 return true;
212 }
213
214 /*
215 * Create a new replication slot and mark it as used by this backend.
216 *
217 * name: Name of the slot
218 * db_specific: logical decoding is db specific; if the slot is going to
219 * be used for that pass true, otherwise false.
220 */
221 void
ReplicationSlotCreate(const char * name,bool db_specific,ReplicationSlotPersistency persistency)222 ReplicationSlotCreate(const char *name, bool db_specific,
223 ReplicationSlotPersistency persistency)
224 {
225 ReplicationSlot *slot = NULL;
226 int i;
227
228 Assert(MyReplicationSlot == NULL);
229
230 ReplicationSlotValidateName(name, ERROR);
231
232 /*
233 * If some other backend ran this code concurrently with us, we'd likely
234 * both allocate the same slot, and that would be bad. We'd also be at
235 * risk of missing a name collision. Also, we don't want to try to create
236 * a new slot while somebody's busy cleaning up an old one, because we
237 * might both be monkeying with the same directory.
238 */
239 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
240
241 /*
242 * Check for name collision, and identify an allocatable slot. We need to
243 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
244 * else can change the in_use flags while we're looking at them.
245 */
246 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
247 for (i = 0; i < max_replication_slots; i++)
248 {
249 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
250
251 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
252 ereport(ERROR,
253 (errcode(ERRCODE_DUPLICATE_OBJECT),
254 errmsg("replication slot \"%s\" already exists", name)));
255 if (!s->in_use && slot == NULL)
256 slot = s;
257 }
258 LWLockRelease(ReplicationSlotControlLock);
259
260 /* If all slots are in use, we're out of luck. */
261 if (slot == NULL)
262 ereport(ERROR,
263 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
264 errmsg("all replication slots are in use"),
265 errhint("Free one or increase max_replication_slots.")));
266
267 /*
268 * Since this slot is not in use, nobody should be looking at any part of
269 * it other than the in_use field unless they're trying to allocate it.
270 * And since we hold ReplicationSlotAllocationLock, nobody except us can
271 * be doing that. So it's safe to initialize the slot.
272 */
273 Assert(!slot->in_use);
274 Assert(slot->active_pid == 0);
275
276 /* first initialize persistent data */
277 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
278 StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
279 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
280 slot->data.persistency = persistency;
281
282 /* and then data only present in shared memory */
283 slot->just_dirtied = false;
284 slot->dirty = false;
285 slot->effective_xmin = InvalidTransactionId;
286 slot->effective_catalog_xmin = InvalidTransactionId;
287 slot->candidate_catalog_xmin = InvalidTransactionId;
288 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
289 slot->candidate_restart_valid = InvalidXLogRecPtr;
290 slot->candidate_restart_lsn = InvalidXLogRecPtr;
291
292 /*
293 * Create the slot on disk. We haven't actually marked the slot allocated
294 * yet, so no special cleanup is required if this errors out.
295 */
296 CreateSlotOnDisk(slot);
297
298 /*
299 * We need to briefly prevent any other backend from iterating over the
300 * slots while we flip the in_use flag. We also need to set the active
301 * flag while holding the ControlLock as otherwise a concurrent
302 * ReplicationSlotAcquire() could acquire the slot as well.
303 */
304 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
305
306 slot->in_use = true;
307
308 /* We can now mark the slot active, and that makes it our slot. */
309 SpinLockAcquire(&slot->mutex);
310 Assert(slot->active_pid == 0);
311 slot->active_pid = MyProcPid;
312 SpinLockRelease(&slot->mutex);
313 MyReplicationSlot = slot;
314
315 LWLockRelease(ReplicationSlotControlLock);
316
317 /*
318 * Now that the slot has been marked as in_use and active, it's safe to
319 * let somebody else try to allocate a slot.
320 */
321 LWLockRelease(ReplicationSlotAllocationLock);
322
323 /* Let everybody know we've modified this slot */
324 ConditionVariableBroadcast(&slot->active_cv);
325 }
326
327 /*
328 * Search for the named replication slot.
329 *
330 * Return the replication slot if found, otherwise NULL.
331 *
332 * The caller must hold ReplicationSlotControlLock in shared mode.
333 */
334 static ReplicationSlot *
SearchNamedReplicationSlot(const char * name)335 SearchNamedReplicationSlot(const char *name)
336 {
337 int i;
338 ReplicationSlot *slot = NULL;
339
340 Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
341 LW_SHARED));
342
343 for (i = 0; i < max_replication_slots; i++)
344 {
345 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
346
347 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
348 {
349 slot = s;
350 break;
351 }
352 }
353
354 return slot;
355 }
356
357 /*
358 * Find a previously created slot and mark it as used by this process.
359 *
360 * The return value is only useful if behavior is SAB_Inquire, in which
361 * it's zero if we successfully acquired the slot, -1 if the slot no longer
362 * exists, or the PID of the owning process otherwise. If behavior is
363 * SAB_Error, then trying to acquire an owned slot is an error.
364 * If SAB_Block, we sleep until the slot is released by the owning process.
365 */
366 int
ReplicationSlotAcquire(const char * name,SlotAcquireBehavior behavior)367 ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
368 {
369 return ReplicationSlotAcquireInternal(NULL, name, behavior);
370 }
371
372 /*
373 * Mark the specified slot as used by this process.
374 *
375 * Only one of slot and name can be specified.
376 * If slot == NULL, search for the slot with the given name.
377 *
378 * See comments about the return value in ReplicationSlotAcquire().
379 */
380 static int
ReplicationSlotAcquireInternal(ReplicationSlot * slot,const char * name,SlotAcquireBehavior behavior)381 ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
382 SlotAcquireBehavior behavior)
383 {
384 ReplicationSlot *s;
385 int active_pid;
386
387 AssertArg((slot == NULL) ^ (name == NULL));
388
389 retry:
390 Assert(MyReplicationSlot == NULL);
391
392 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
393
394 /*
395 * Search for the slot with the specified name if the slot to acquire is
396 * not given. If the slot is not found, we either return -1 or error out.
397 */
398 s = slot ? slot : SearchNamedReplicationSlot(name);
399 if (s == NULL || !s->in_use)
400 {
401 LWLockRelease(ReplicationSlotControlLock);
402
403 if (behavior == SAB_Inquire)
404 return -1;
405 ereport(ERROR,
406 (errcode(ERRCODE_UNDEFINED_OBJECT),
407 errmsg("replication slot \"%s\" does not exist",
408 name ? name : NameStr(slot->data.name))));
409 }
410
411 /*
412 * This is the slot we want; check if it's active under some other
413 * process. In single user mode, we don't need this check.
414 */
415 if (IsUnderPostmaster)
416 {
417 /*
418 * Get ready to sleep on the slot in case it is active if SAB_Block.
419 * (We may end up not sleeping, but we don't want to do this while
420 * holding the spinlock.)
421 */
422 if (behavior == SAB_Block)
423 ConditionVariablePrepareToSleep(&s->active_cv);
424
425 SpinLockAcquire(&s->mutex);
426 if (s->active_pid == 0)
427 s->active_pid = MyProcPid;
428 active_pid = s->active_pid;
429 SpinLockRelease(&s->mutex);
430 }
431 else
432 active_pid = MyProcPid;
433 LWLockRelease(ReplicationSlotControlLock);
434
435 /*
436 * If we found the slot but it's already active in another process, we
437 * either error out, return the PID of the owning process, or retry
438 * after a short wait, as caller specified.
439 */
440 if (active_pid != MyProcPid)
441 {
442 if (behavior == SAB_Error)
443 ereport(ERROR,
444 (errcode(ERRCODE_OBJECT_IN_USE),
445 errmsg("replication slot \"%s\" is active for PID %d",
446 NameStr(s->data.name), active_pid)));
447 else if (behavior == SAB_Inquire)
448 return active_pid;
449
450 /* Wait here until we get signaled, and then restart */
451 ConditionVariableSleep(&s->active_cv,
452 WAIT_EVENT_REPLICATION_SLOT_DROP);
453 ConditionVariableCancelSleep();
454 goto retry;
455 }
456 else if (behavior == SAB_Block)
457 ConditionVariableCancelSleep(); /* no sleep needed after all */
458
459 /* Let everybody know we've modified this slot */
460 ConditionVariableBroadcast(&s->active_cv);
461
462 /* We made this slot active, so it's ours now. */
463 MyReplicationSlot = s;
464
465 /* success */
466 return 0;
467 }
468
469 /*
470 * Release the replication slot that this backend considers to own.
471 *
472 * This or another backend can re-acquire the slot later.
473 * Resources this slot requires will be preserved.
474 */
475 void
ReplicationSlotRelease(void)476 ReplicationSlotRelease(void)
477 {
478 ReplicationSlot *slot = MyReplicationSlot;
479
480 Assert(slot != NULL && slot->active_pid != 0);
481
482 if (slot->data.persistency == RS_EPHEMERAL)
483 {
484 /*
485 * Delete the slot. There is no !PANIC case where this is allowed to
486 * fail, all that may happen is an incomplete cleanup of the on-disk
487 * data.
488 */
489 ReplicationSlotDropAcquired();
490 }
491
492 /*
493 * If slot needed to temporarily restrain both data and catalog xmin to
494 * create the catalog snapshot, remove that temporary constraint.
495 * Snapshots can only be exported while the initial snapshot is still
496 * acquired.
497 */
498 if (!TransactionIdIsValid(slot->data.xmin) &&
499 TransactionIdIsValid(slot->effective_xmin))
500 {
501 SpinLockAcquire(&slot->mutex);
502 slot->effective_xmin = InvalidTransactionId;
503 SpinLockRelease(&slot->mutex);
504 ReplicationSlotsComputeRequiredXmin(false);
505 }
506
507 if (slot->data.persistency == RS_PERSISTENT)
508 {
509 /*
510 * Mark persistent slot inactive. We're not freeing it, just
511 * disconnecting, but wake up others that may be waiting for it.
512 */
513 SpinLockAcquire(&slot->mutex);
514 slot->active_pid = 0;
515 SpinLockRelease(&slot->mutex);
516 ConditionVariableBroadcast(&slot->active_cv);
517 }
518
519 MyReplicationSlot = NULL;
520
521 /* might not have been set when we've been a plain slot */
522 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
523 MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
524 LWLockRelease(ProcArrayLock);
525 }
526
527 /*
528 * Cleanup all temporary slots created in current session.
529 */
530 void
ReplicationSlotCleanup(void)531 ReplicationSlotCleanup(void)
532 {
533 int i;
534
535 Assert(MyReplicationSlot == NULL);
536
537 restart:
538 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
539 for (i = 0; i < max_replication_slots; i++)
540 {
541 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
542
543 if (!s->in_use)
544 continue;
545
546 SpinLockAcquire(&s->mutex);
547 if (s->active_pid == MyProcPid)
548 {
549 Assert(s->data.persistency == RS_TEMPORARY);
550 SpinLockRelease(&s->mutex);
551 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
552
553 ReplicationSlotDropPtr(s);
554
555 ConditionVariableBroadcast(&s->active_cv);
556 goto restart;
557 }
558 else
559 SpinLockRelease(&s->mutex);
560 }
561
562 LWLockRelease(ReplicationSlotControlLock);
563 }
564
565 /*
566 * Permanently drop replication slot identified by the passed in name.
567 */
568 void
ReplicationSlotDrop(const char * name,bool nowait)569 ReplicationSlotDrop(const char *name, bool nowait)
570 {
571 Assert(MyReplicationSlot == NULL);
572
573 (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
574
575 ReplicationSlotDropAcquired();
576 }
577
578 /*
579 * Permanently drop the currently acquired replication slot.
580 */
581 static void
ReplicationSlotDropAcquired(void)582 ReplicationSlotDropAcquired(void)
583 {
584 ReplicationSlot *slot = MyReplicationSlot;
585
586 Assert(MyReplicationSlot != NULL);
587
588 /* slot isn't acquired anymore */
589 MyReplicationSlot = NULL;
590
591 ReplicationSlotDropPtr(slot);
592 }
593
594 /*
595 * Permanently drop the replication slot which will be released by the point
596 * this function returns.
597 */
598 static void
ReplicationSlotDropPtr(ReplicationSlot * slot)599 ReplicationSlotDropPtr(ReplicationSlot *slot)
600 {
601 char path[MAXPGPATH];
602 char tmppath[MAXPGPATH];
603
604 /*
605 * If some other backend ran this code concurrently with us, we might try
606 * to delete a slot with a certain name while someone else was trying to
607 * create a slot with the same name.
608 */
609 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
610
611 /* Generate pathnames. */
612 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
613 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
614
615 /*
616 * Rename the slot directory on disk, so that we'll no longer recognize
617 * this as a valid slot. Note that if this fails, we've got to mark the
618 * slot inactive before bailing out. If we're dropping an ephemeral or a
619 * temporary slot, we better never fail hard as the caller won't expect
620 * the slot to survive and this might get called during error handling.
621 */
622 if (rename(path, tmppath) == 0)
623 {
624 /*
625 * We need to fsync() the directory we just renamed and its parent to
626 * make sure that our changes are on disk in a crash-safe fashion. If
627 * fsync() fails, we can't be sure whether the changes are on disk or
628 * not. For now, we handle that by panicking;
629 * StartupReplicationSlots() will try to straighten it out after
630 * restart.
631 */
632 START_CRIT_SECTION();
633 fsync_fname(tmppath, true);
634 fsync_fname("pg_replslot", true);
635 END_CRIT_SECTION();
636 }
637 else
638 {
639 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
640
641 SpinLockAcquire(&slot->mutex);
642 slot->active_pid = 0;
643 SpinLockRelease(&slot->mutex);
644
645 /* wake up anyone waiting on this slot */
646 ConditionVariableBroadcast(&slot->active_cv);
647
648 ereport(fail_softly ? WARNING : ERROR,
649 (errcode_for_file_access(),
650 errmsg("could not rename file \"%s\" to \"%s\": %m",
651 path, tmppath)));
652 }
653
654 /*
655 * The slot is definitely gone. Lock out concurrent scans of the array
656 * long enough to kill it. It's OK to clear the active PID here without
657 * grabbing the mutex because nobody else can be scanning the array here,
658 * and nobody can be attached to this slot and thus access it without
659 * scanning the array.
660 *
661 * Also wake up processes waiting for it.
662 */
663 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
664 slot->active_pid = 0;
665 slot->in_use = false;
666 LWLockRelease(ReplicationSlotControlLock);
667 ConditionVariableBroadcast(&slot->active_cv);
668
669 /*
670 * Slot is dead and doesn't prevent resource removal anymore, recompute
671 * limits.
672 */
673 ReplicationSlotsComputeRequiredXmin(false);
674 ReplicationSlotsComputeRequiredLSN();
675
676 /*
677 * If removing the directory fails, the worst thing that will happen is
678 * that the user won't be able to create a new slot with the same name
679 * until the next server restart. We warn about it, but that's all.
680 */
681 if (!rmtree(tmppath, true))
682 ereport(WARNING,
683 (errmsg("could not remove directory \"%s\"", tmppath)));
684
685 /*
686 * We release this at the very end, so that nobody starts trying to create
687 * a slot while we're still cleaning up the detritus of the old one.
688 */
689 LWLockRelease(ReplicationSlotAllocationLock);
690 }
691
692 /*
693 * Serialize the currently acquired slot's state from memory to disk, thereby
694 * guaranteeing the current state will survive a crash.
695 */
696 void
ReplicationSlotSave(void)697 ReplicationSlotSave(void)
698 {
699 char path[MAXPGPATH];
700
701 Assert(MyReplicationSlot != NULL);
702
703 sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
704 SaveSlotToPath(MyReplicationSlot, path, ERROR);
705 }
706
707 /*
708 * Signal that it would be useful if the currently acquired slot would be
709 * flushed out to disk.
710 *
711 * Note that the actual flush to disk can be delayed for a long time, if
712 * required for correctness explicitly do a ReplicationSlotSave().
713 */
714 void
ReplicationSlotMarkDirty(void)715 ReplicationSlotMarkDirty(void)
716 {
717 ReplicationSlot *slot = MyReplicationSlot;
718
719 Assert(MyReplicationSlot != NULL);
720
721 SpinLockAcquire(&slot->mutex);
722 MyReplicationSlot->just_dirtied = true;
723 MyReplicationSlot->dirty = true;
724 SpinLockRelease(&slot->mutex);
725 }
726
727 /*
728 * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
729 * guaranteeing it will be there after an eventual crash.
730 */
731 void
ReplicationSlotPersist(void)732 ReplicationSlotPersist(void)
733 {
734 ReplicationSlot *slot = MyReplicationSlot;
735
736 Assert(slot != NULL);
737 Assert(slot->data.persistency != RS_PERSISTENT);
738
739 SpinLockAcquire(&slot->mutex);
740 slot->data.persistency = RS_PERSISTENT;
741 SpinLockRelease(&slot->mutex);
742
743 ReplicationSlotMarkDirty();
744 ReplicationSlotSave();
745 }
746
747 /*
748 * Compute the oldest xmin across all slots and store it in the ProcArray.
749 *
750 * If already_locked is true, ProcArrayLock has already been acquired
751 * exclusively.
752 */
753 void
ReplicationSlotsComputeRequiredXmin(bool already_locked)754 ReplicationSlotsComputeRequiredXmin(bool already_locked)
755 {
756 int i;
757 TransactionId agg_xmin = InvalidTransactionId;
758 TransactionId agg_catalog_xmin = InvalidTransactionId;
759
760 Assert(ReplicationSlotCtl != NULL);
761
762 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
763
764 for (i = 0; i < max_replication_slots; i++)
765 {
766 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
767 TransactionId effective_xmin;
768 TransactionId effective_catalog_xmin;
769
770 if (!s->in_use)
771 continue;
772
773 SpinLockAcquire(&s->mutex);
774 effective_xmin = s->effective_xmin;
775 effective_catalog_xmin = s->effective_catalog_xmin;
776 SpinLockRelease(&s->mutex);
777
778 /* check the data xmin */
779 if (TransactionIdIsValid(effective_xmin) &&
780 (!TransactionIdIsValid(agg_xmin) ||
781 TransactionIdPrecedes(effective_xmin, agg_xmin)))
782 agg_xmin = effective_xmin;
783
784 /* check the catalog xmin */
785 if (TransactionIdIsValid(effective_catalog_xmin) &&
786 (!TransactionIdIsValid(agg_catalog_xmin) ||
787 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
788 agg_catalog_xmin = effective_catalog_xmin;
789 }
790
791 LWLockRelease(ReplicationSlotControlLock);
792
793 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
794 }
795
796 /*
797 * Compute the oldest restart LSN across all slots and inform xlog module.
798 *
799 * Note: while max_slot_wal_keep_size is theoretically relevant for this
800 * purpose, we don't try to account for that, because this module doesn't
801 * know what to compare against.
802 */
803 void
ReplicationSlotsComputeRequiredLSN(void)804 ReplicationSlotsComputeRequiredLSN(void)
805 {
806 int i;
807 XLogRecPtr min_required = InvalidXLogRecPtr;
808
809 Assert(ReplicationSlotCtl != NULL);
810
811 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
812 for (i = 0; i < max_replication_slots; i++)
813 {
814 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
815 XLogRecPtr restart_lsn;
816
817 if (!s->in_use)
818 continue;
819
820 SpinLockAcquire(&s->mutex);
821 restart_lsn = s->data.restart_lsn;
822 SpinLockRelease(&s->mutex);
823
824 if (restart_lsn != InvalidXLogRecPtr &&
825 (min_required == InvalidXLogRecPtr ||
826 restart_lsn < min_required))
827 min_required = restart_lsn;
828 }
829 LWLockRelease(ReplicationSlotControlLock);
830
831 XLogSetReplicationSlotMinimumLSN(min_required);
832 }
833
834 /*
835 * Compute the oldest WAL LSN required by *logical* decoding slots..
836 *
837 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
838 * slots exist.
839 *
840 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
841 * ignores physical replication slots.
842 *
843 * The results aren't required frequently, so we don't maintain a precomputed
844 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
845 */
846 XLogRecPtr
ReplicationSlotsComputeLogicalRestartLSN(void)847 ReplicationSlotsComputeLogicalRestartLSN(void)
848 {
849 XLogRecPtr result = InvalidXLogRecPtr;
850 int i;
851
852 if (max_replication_slots <= 0)
853 return InvalidXLogRecPtr;
854
855 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
856
857 for (i = 0; i < max_replication_slots; i++)
858 {
859 ReplicationSlot *s;
860 XLogRecPtr restart_lsn;
861
862 s = &ReplicationSlotCtl->replication_slots[i];
863
864 /* cannot change while ReplicationSlotCtlLock is held */
865 if (!s->in_use)
866 continue;
867
868 /* we're only interested in logical slots */
869 if (!SlotIsLogical(s))
870 continue;
871
872 /* read once, it's ok if it increases while we're checking */
873 SpinLockAcquire(&s->mutex);
874 restart_lsn = s->data.restart_lsn;
875 SpinLockRelease(&s->mutex);
876
877 if (restart_lsn == InvalidXLogRecPtr)
878 continue;
879
880 if (result == InvalidXLogRecPtr ||
881 restart_lsn < result)
882 result = restart_lsn;
883 }
884
885 LWLockRelease(ReplicationSlotControlLock);
886
887 return result;
888 }
889
890 /*
891 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
892 * passed database oid.
893 *
894 * Returns true if there are any slots referencing the database. *nslots will
895 * be set to the absolute number of slots in the database, *nactive to ones
896 * currently active.
897 */
898 bool
ReplicationSlotsCountDBSlots(Oid dboid,int * nslots,int * nactive)899 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
900 {
901 int i;
902
903 *nslots = *nactive = 0;
904
905 if (max_replication_slots <= 0)
906 return false;
907
908 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
909 for (i = 0; i < max_replication_slots; i++)
910 {
911 ReplicationSlot *s;
912
913 s = &ReplicationSlotCtl->replication_slots[i];
914
915 /* cannot change while ReplicationSlotCtlLock is held */
916 if (!s->in_use)
917 continue;
918
919 /* only logical slots are database specific, skip */
920 if (!SlotIsLogical(s))
921 continue;
922
923 /* not our database, skip */
924 if (s->data.database != dboid)
925 continue;
926
927 /* count slots with spinlock held */
928 SpinLockAcquire(&s->mutex);
929 (*nslots)++;
930 if (s->active_pid != 0)
931 (*nactive)++;
932 SpinLockRelease(&s->mutex);
933 }
934 LWLockRelease(ReplicationSlotControlLock);
935
936 if (*nslots > 0)
937 return true;
938 return false;
939 }
940
941 /*
942 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
943 * passed database oid. The caller should hold an exclusive lock on the
944 * pg_database oid for the database to prevent creation of new slots on the db
945 * or replay from existing slots.
946 *
947 * Another session that concurrently acquires an existing slot on the target DB
948 * (most likely to drop it) may cause this function to ERROR. If that happens
949 * it may have dropped some but not all slots.
950 *
951 * This routine isn't as efficient as it could be - but we don't drop
952 * databases often, especially databases with lots of slots.
953 */
954 void
ReplicationSlotsDropDBSlots(Oid dboid)955 ReplicationSlotsDropDBSlots(Oid dboid)
956 {
957 int i;
958
959 if (max_replication_slots <= 0)
960 return;
961
962 restart:
963 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
964 for (i = 0; i < max_replication_slots; i++)
965 {
966 ReplicationSlot *s;
967 char *slotname;
968 int active_pid;
969
970 s = &ReplicationSlotCtl->replication_slots[i];
971
972 /* cannot change while ReplicationSlotCtlLock is held */
973 if (!s->in_use)
974 continue;
975
976 /* only logical slots are database specific, skip */
977 if (!SlotIsLogical(s))
978 continue;
979
980 /* not our database, skip */
981 if (s->data.database != dboid)
982 continue;
983
984 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
985 SpinLockAcquire(&s->mutex);
986 /* can't change while ReplicationSlotControlLock is held */
987 slotname = NameStr(s->data.name);
988 active_pid = s->active_pid;
989 if (active_pid == 0)
990 {
991 MyReplicationSlot = s;
992 s->active_pid = MyProcPid;
993 }
994 SpinLockRelease(&s->mutex);
995
996 /*
997 * Even though we hold an exclusive lock on the database object a
998 * logical slot for that DB can still be active, e.g. if it's
999 * concurrently being dropped by a backend connected to another DB.
1000 *
1001 * That's fairly unlikely in practice, so we'll just bail out.
1002 */
1003 if (active_pid)
1004 ereport(ERROR,
1005 (errcode(ERRCODE_OBJECT_IN_USE),
1006 errmsg("replication slot \"%s\" is active for PID %d",
1007 slotname, active_pid)));
1008
1009 /*
1010 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1011 * holding ReplicationSlotControlLock over filesystem operations,
1012 * release ReplicationSlotControlLock and use
1013 * ReplicationSlotDropAcquired.
1014 *
1015 * As that means the set of slots could change, restart scan from the
1016 * beginning each time we release the lock.
1017 */
1018 LWLockRelease(ReplicationSlotControlLock);
1019 ReplicationSlotDropAcquired();
1020 goto restart;
1021 }
1022 LWLockRelease(ReplicationSlotControlLock);
1023 }
1024
1025
1026 /*
1027 * Check whether the server's configuration supports using replication
1028 * slots.
1029 */
1030 void
CheckSlotRequirements(void)1031 CheckSlotRequirements(void)
1032 {
1033 /*
1034 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1035 * needs the same check.
1036 */
1037
1038 if (max_replication_slots == 0)
1039 ereport(ERROR,
1040 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1041 errmsg("replication slots can only be used if max_replication_slots > 0")));
1042
1043 if (wal_level < WAL_LEVEL_REPLICA)
1044 ereport(ERROR,
1045 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1046 errmsg("replication slots can only be used if wal_level >= replica")));
1047 }
1048
1049 /*
1050 * Reserve WAL for the currently active slot.
1051 *
1052 * Compute and set restart_lsn in a manner that's appropriate for the type of
1053 * the slot and concurrency safe.
1054 */
1055 void
ReplicationSlotReserveWal(void)1056 ReplicationSlotReserveWal(void)
1057 {
1058 ReplicationSlot *slot = MyReplicationSlot;
1059
1060 Assert(slot != NULL);
1061 Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1062
1063 /*
1064 * The replication slot mechanism is used to prevent removal of required
1065 * WAL. As there is no interlock between this routine and checkpoints, WAL
1066 * segments could concurrently be removed when a now stale return value of
1067 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1068 * this happens we'll just retry.
1069 */
1070 while (true)
1071 {
1072 XLogSegNo segno;
1073 XLogRecPtr restart_lsn;
1074
1075 /*
1076 * For logical slots log a standby snapshot and start logical decoding
1077 * at exactly that position. That allows the slot to start up more
1078 * quickly.
1079 *
1080 * That's not needed (or indeed helpful) for physical slots as they'll
1081 * start replay at the last logged checkpoint anyway. Instead return
1082 * the location of the last redo LSN. While that slightly increases
1083 * the chance that we have to retry, it's where a base backup has to
1084 * start replay at.
1085 */
1086 if (!RecoveryInProgress() && SlotIsLogical(slot))
1087 {
1088 XLogRecPtr flushptr;
1089
1090 /* start at current insert position */
1091 restart_lsn = GetXLogInsertRecPtr();
1092 SpinLockAcquire(&slot->mutex);
1093 slot->data.restart_lsn = restart_lsn;
1094 SpinLockRelease(&slot->mutex);
1095
1096 /* make sure we have enough information to start */
1097 flushptr = LogStandbySnapshot();
1098
1099 /* and make sure it's fsynced to disk */
1100 XLogFlush(flushptr);
1101 }
1102 else
1103 {
1104 restart_lsn = GetRedoRecPtr();
1105 SpinLockAcquire(&slot->mutex);
1106 slot->data.restart_lsn = restart_lsn;
1107 SpinLockRelease(&slot->mutex);
1108 }
1109
1110 /* prevent WAL removal as fast as possible */
1111 ReplicationSlotsComputeRequiredLSN();
1112
1113 /*
1114 * If all required WAL is still there, great, otherwise retry. The
1115 * slot should prevent further removal of WAL, unless there's a
1116 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1117 * the new restart_lsn above, so normally we should never need to loop
1118 * more than twice.
1119 */
1120 XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1121 if (XLogGetLastRemovedSegno() < segno)
1122 break;
1123 }
1124 }
1125
1126 /*
1127 * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
1128 * and mark it invalid, if necessary and possible.
1129 *
1130 * Returns whether ReplicationSlotControlLock was released in the interim (and
1131 * in that case we're not holding the lock at return, otherwise we are).
1132 *
1133 * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1134 *
1135 * This is inherently racy, because we release the LWLock
1136 * for syscalls, so caller must restart if we return true.
1137 */
1138 static bool
InvalidatePossiblyObsoleteSlot(ReplicationSlot * s,XLogRecPtr oldestLSN,bool * invalidated)1139 InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
1140 bool *invalidated)
1141 {
1142 int last_signaled_pid = 0;
1143 bool released_lock = false;
1144
1145 for (;;)
1146 {
1147 XLogRecPtr restart_lsn;
1148 NameData slotname;
1149 int active_pid = 0;
1150
1151 Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1152
1153 if (!s->in_use)
1154 {
1155 if (released_lock)
1156 LWLockRelease(ReplicationSlotControlLock);
1157 break;
1158 }
1159
1160 /*
1161 * Check if the slot needs to be invalidated. If it needs to be
1162 * invalidated, and is not currently acquired, acquire it and mark it
1163 * as having been invalidated. We do this with the spinlock held to
1164 * avoid race conditions -- for example the restart_lsn could move
1165 * forward, or the slot could be dropped.
1166 */
1167 SpinLockAcquire(&s->mutex);
1168
1169 restart_lsn = s->data.restart_lsn;
1170
1171 /*
1172 * If the slot is already invalid or is fresh enough, we don't need to
1173 * do anything.
1174 */
1175 if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
1176 {
1177 SpinLockRelease(&s->mutex);
1178 if (released_lock)
1179 LWLockRelease(ReplicationSlotControlLock);
1180 break;
1181 }
1182
1183 slotname = s->data.name;
1184 active_pid = s->active_pid;
1185
1186 /*
1187 * If the slot can be acquired, do so and mark it invalidated
1188 * immediately. Otherwise we'll signal the owning process, below, and
1189 * retry.
1190 */
1191 if (active_pid == 0)
1192 {
1193 MyReplicationSlot = s;
1194 s->active_pid = MyProcPid;
1195 s->data.invalidated_at = restart_lsn;
1196 s->data.restart_lsn = InvalidXLogRecPtr;
1197
1198 /* Let caller know */
1199 *invalidated = true;
1200 }
1201
1202 SpinLockRelease(&s->mutex);
1203
1204 if (active_pid != 0)
1205 {
1206 /*
1207 * Prepare the sleep on the slot's condition variable before
1208 * releasing the lock, to close a possible race condition if the
1209 * slot is released before the sleep below.
1210 */
1211 ConditionVariablePrepareToSleep(&s->active_cv);
1212
1213 LWLockRelease(ReplicationSlotControlLock);
1214 released_lock = true;
1215
1216 /*
1217 * Signal to terminate the process that owns the slot, if we
1218 * haven't already signalled it. (Avoidance of repeated
1219 * signalling is the only reason for there to be a loop in this
1220 * routine; otherwise we could rely on caller's restart loop.)
1221 *
1222 * There is the race condition that other process may own the slot
1223 * after its current owner process is terminated and before this
1224 * process owns it. To handle that, we signal only if the PID of
1225 * the owning process has changed from the previous time. (This
1226 * logic assumes that the same PID is not reused very quickly.)
1227 */
1228 if (last_signaled_pid != active_pid)
1229 {
1230 ereport(LOG,
1231 (errmsg("terminating process %d to release replication slot \"%s\"",
1232 active_pid, NameStr(slotname))));
1233
1234 (void) kill(active_pid, SIGTERM);
1235 last_signaled_pid = active_pid;
1236 }
1237
1238 /* Wait until the slot is released. */
1239 ConditionVariableSleep(&s->active_cv,
1240 WAIT_EVENT_REPLICATION_SLOT_DROP);
1241
1242 /*
1243 * Re-acquire lock and start over; we expect to invalidate the slot
1244 * next time (unless another process acquires the slot in the
1245 * meantime).
1246 */
1247 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1248 continue;
1249 }
1250 else
1251 {
1252 /*
1253 * We hold the slot now and have already invalidated it; flush it
1254 * to ensure that state persists.
1255 *
1256 * Don't want to hold ReplicationSlotControlLock across file
1257 * system operations, so release it now but be sure to tell caller
1258 * to restart from scratch.
1259 */
1260 LWLockRelease(ReplicationSlotControlLock);
1261 released_lock = true;
1262
1263 /* Make sure the invalidated state persists across server restart */
1264 ReplicationSlotMarkDirty();
1265 ReplicationSlotSave();
1266 ReplicationSlotRelease();
1267
1268 ereport(LOG,
1269 (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
1270 NameStr(slotname),
1271 (uint32) (restart_lsn >> 32),
1272 (uint32) restart_lsn)));
1273
1274 /* done with this slot for now */
1275 break;
1276 }
1277 }
1278
1279 Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1280
1281 return released_lock;
1282 }
1283
1284 /*
1285 * Mark any slot that points to an LSN older than the given segment
1286 * as invalid; it requires WAL that's about to be removed.
1287 *
1288 * Returns true when any slot have got invalidated.
1289 *
1290 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1291 */
1292 bool
InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)1293 InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
1294 {
1295 XLogRecPtr oldestLSN;
1296 bool invalidated = false;
1297
1298 XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1299
1300 restart:
1301 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1302 for (int i = 0; i < max_replication_slots; i++)
1303 {
1304 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1305
1306 if (!s->in_use)
1307 continue;
1308
1309 if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
1310 {
1311 /* if the lock was released, start from scratch */
1312 goto restart;
1313 }
1314 }
1315 LWLockRelease(ReplicationSlotControlLock);
1316
1317 /*
1318 * If any slots have been invalidated, recalculate the resource limits.
1319 */
1320 if (invalidated)
1321 {
1322 ReplicationSlotsComputeRequiredXmin(false);
1323 ReplicationSlotsComputeRequiredLSN();
1324 }
1325
1326 return invalidated;
1327 }
1328
1329 /*
1330 * Flush all replication slots to disk.
1331 *
1332 * This needn't actually be part of a checkpoint, but it's a convenient
1333 * location.
1334 */
1335 void
CheckPointReplicationSlots(void)1336 CheckPointReplicationSlots(void)
1337 {
1338 int i;
1339
1340 elog(DEBUG1, "performing replication slot checkpoint");
1341
1342 /*
1343 * Prevent any slot from being created/dropped while we're active. As we
1344 * explicitly do *not* want to block iterating over replication_slots or
1345 * acquiring a slot we cannot take the control lock - but that's OK,
1346 * because holding ReplicationSlotAllocationLock is strictly stronger, and
1347 * enough to guarantee that nobody can change the in_use bits on us.
1348 */
1349 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1350
1351 for (i = 0; i < max_replication_slots; i++)
1352 {
1353 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1354 char path[MAXPGPATH];
1355
1356 if (!s->in_use)
1357 continue;
1358
1359 /* save the slot to disk, locking is handled in SaveSlotToPath() */
1360 sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1361 SaveSlotToPath(s, path, LOG);
1362 }
1363 LWLockRelease(ReplicationSlotAllocationLock);
1364 }
1365
1366 /*
1367 * Load all replication slots from disk into memory at server startup. This
1368 * needs to be run before we start crash recovery.
1369 */
1370 void
StartupReplicationSlots(void)1371 StartupReplicationSlots(void)
1372 {
1373 DIR *replication_dir;
1374 struct dirent *replication_de;
1375
1376 elog(DEBUG1, "starting up replication slots");
1377
1378 /* restore all slots by iterating over all on-disk entries */
1379 replication_dir = AllocateDir("pg_replslot");
1380 while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1381 {
1382 struct stat statbuf;
1383 char path[MAXPGPATH + 12];
1384
1385 if (strcmp(replication_de->d_name, ".") == 0 ||
1386 strcmp(replication_de->d_name, "..") == 0)
1387 continue;
1388
1389 snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1390
1391 /* we're only creating directories here, skip if it's not our's */
1392 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
1393 continue;
1394
1395 /* we crashed while a slot was being setup or deleted, clean up */
1396 if (pg_str_endswith(replication_de->d_name, ".tmp"))
1397 {
1398 if (!rmtree(path, true))
1399 {
1400 ereport(WARNING,
1401 (errmsg("could not remove directory \"%s\"",
1402 path)));
1403 continue;
1404 }
1405 fsync_fname("pg_replslot", true);
1406 continue;
1407 }
1408
1409 /* looks like a slot in a normal state, restore */
1410 RestoreSlotFromDisk(replication_de->d_name);
1411 }
1412 FreeDir(replication_dir);
1413
1414 /* currently no slots exist, we're done. */
1415 if (max_replication_slots <= 0)
1416 return;
1417
1418 /* Now that we have recovered all the data, compute replication xmin */
1419 ReplicationSlotsComputeRequiredXmin(false);
1420 ReplicationSlotsComputeRequiredLSN();
1421 }
1422
1423 /* ----
1424 * Manipulation of on-disk state of replication slots
1425 *
1426 * NB: none of the routines below should take any notice whether a slot is the
1427 * current one or not, that's all handled a layer above.
1428 * ----
1429 */
1430 static void
CreateSlotOnDisk(ReplicationSlot * slot)1431 CreateSlotOnDisk(ReplicationSlot *slot)
1432 {
1433 char tmppath[MAXPGPATH];
1434 char path[MAXPGPATH];
1435 struct stat st;
1436
1437 /*
1438 * No need to take out the io_in_progress_lock, nobody else can see this
1439 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1440 * takes out the lock, if we'd take the lock here, we'd deadlock.
1441 */
1442
1443 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1444 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1445
1446 /*
1447 * It's just barely possible that some previous effort to create or drop a
1448 * slot with this name left a temp directory lying around. If that seems
1449 * to be the case, try to remove it. If the rmtree() fails, we'll error
1450 * out at the MakePGDirectory() below, so we don't bother checking
1451 * success.
1452 */
1453 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1454 rmtree(tmppath, true);
1455
1456 /* Create and fsync the temporary slot directory. */
1457 if (MakePGDirectory(tmppath) < 0)
1458 ereport(ERROR,
1459 (errcode_for_file_access(),
1460 errmsg("could not create directory \"%s\": %m",
1461 tmppath)));
1462 fsync_fname(tmppath, true);
1463
1464 /* Write the actual state file. */
1465 slot->dirty = true; /* signal that we really need to write */
1466 SaveSlotToPath(slot, tmppath, ERROR);
1467
1468 /* Rename the directory into place. */
1469 if (rename(tmppath, path) != 0)
1470 ereport(ERROR,
1471 (errcode_for_file_access(),
1472 errmsg("could not rename file \"%s\" to \"%s\": %m",
1473 tmppath, path)));
1474
1475 /*
1476 * If we'd now fail - really unlikely - we wouldn't know whether this slot
1477 * would persist after an OS crash or not - so, force a restart. The
1478 * restart would try to fsync this again till it works.
1479 */
1480 START_CRIT_SECTION();
1481
1482 fsync_fname(path, true);
1483 fsync_fname("pg_replslot", true);
1484
1485 END_CRIT_SECTION();
1486 }
1487
1488 /*
1489 * Shared functionality between saving and creating a replication slot.
1490 */
1491 static void
SaveSlotToPath(ReplicationSlot * slot,const char * dir,int elevel)1492 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1493 {
1494 char tmppath[MAXPGPATH];
1495 char path[MAXPGPATH];
1496 int fd;
1497 ReplicationSlotOnDisk cp;
1498 bool was_dirty;
1499
1500 /* first check whether there's something to write out */
1501 SpinLockAcquire(&slot->mutex);
1502 was_dirty = slot->dirty;
1503 slot->just_dirtied = false;
1504 SpinLockRelease(&slot->mutex);
1505
1506 /* and don't do anything if there's nothing to write */
1507 if (!was_dirty)
1508 return;
1509
1510 LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1511
1512 /* silence valgrind :( */
1513 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1514
1515 sprintf(tmppath, "%s/state.tmp", dir);
1516 sprintf(path, "%s/state", dir);
1517
1518 fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1519 if (fd < 0)
1520 {
1521 /*
1522 * If not an ERROR, then release the lock before returning. In case
1523 * of an ERROR, the error recovery path automatically releases the
1524 * lock, but no harm in explicitly releasing even in that case. Note
1525 * that LWLockRelease() could affect errno.
1526 */
1527 int save_errno = errno;
1528
1529 LWLockRelease(&slot->io_in_progress_lock);
1530 errno = save_errno;
1531 ereport(elevel,
1532 (errcode_for_file_access(),
1533 errmsg("could not create file \"%s\": %m",
1534 tmppath)));
1535 return;
1536 }
1537
1538 cp.magic = SLOT_MAGIC;
1539 INIT_CRC32C(cp.checksum);
1540 cp.version = SLOT_VERSION;
1541 cp.length = ReplicationSlotOnDiskV2Size;
1542
1543 SpinLockAcquire(&slot->mutex);
1544
1545 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1546
1547 SpinLockRelease(&slot->mutex);
1548
1549 COMP_CRC32C(cp.checksum,
1550 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1551 SnapBuildOnDiskChecksummedSize);
1552 FIN_CRC32C(cp.checksum);
1553
1554 errno = 0;
1555 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
1556 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1557 {
1558 int save_errno = errno;
1559
1560 pgstat_report_wait_end();
1561 CloseTransientFile(fd);
1562 LWLockRelease(&slot->io_in_progress_lock);
1563
1564 /* if write didn't set errno, assume problem is no disk space */
1565 errno = save_errno ? save_errno : ENOSPC;
1566 ereport(elevel,
1567 (errcode_for_file_access(),
1568 errmsg("could not write to file \"%s\": %m",
1569 tmppath)));
1570 return;
1571 }
1572 pgstat_report_wait_end();
1573
1574 /* fsync the temporary file */
1575 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
1576 if (pg_fsync(fd) != 0)
1577 {
1578 int save_errno = errno;
1579
1580 pgstat_report_wait_end();
1581 CloseTransientFile(fd);
1582 LWLockRelease(&slot->io_in_progress_lock);
1583 errno = save_errno;
1584 ereport(elevel,
1585 (errcode_for_file_access(),
1586 errmsg("could not fsync file \"%s\": %m",
1587 tmppath)));
1588 return;
1589 }
1590 pgstat_report_wait_end();
1591
1592 if (CloseTransientFile(fd) != 0)
1593 {
1594 int save_errno = errno;
1595
1596 LWLockRelease(&slot->io_in_progress_lock);
1597 errno = save_errno;
1598 ereport(elevel,
1599 (errcode_for_file_access(),
1600 errmsg("could not close file \"%s\": %m",
1601 tmppath)));
1602 return;
1603 }
1604
1605 /* rename to permanent file, fsync file and directory */
1606 if (rename(tmppath, path) != 0)
1607 {
1608 int save_errno = errno;
1609
1610 LWLockRelease(&slot->io_in_progress_lock);
1611 errno = save_errno;
1612 ereport(elevel,
1613 (errcode_for_file_access(),
1614 errmsg("could not rename file \"%s\" to \"%s\": %m",
1615 tmppath, path)));
1616 return;
1617 }
1618
1619 /*
1620 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
1621 */
1622 START_CRIT_SECTION();
1623
1624 fsync_fname(path, false);
1625 fsync_fname(dir, true);
1626 fsync_fname("pg_replslot", true);
1627
1628 END_CRIT_SECTION();
1629
1630 /*
1631 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1632 * already.
1633 */
1634 SpinLockAcquire(&slot->mutex);
1635 if (!slot->just_dirtied)
1636 slot->dirty = false;
1637 SpinLockRelease(&slot->mutex);
1638
1639 LWLockRelease(&slot->io_in_progress_lock);
1640 }
1641
1642 /*
1643 * Load a single slot from disk into memory.
1644 */
1645 static void
RestoreSlotFromDisk(const char * name)1646 RestoreSlotFromDisk(const char *name)
1647 {
1648 ReplicationSlotOnDisk cp;
1649 int i;
1650 char slotdir[MAXPGPATH + 12];
1651 char path[MAXPGPATH + 22];
1652 int fd;
1653 bool restored = false;
1654 int readBytes;
1655 pg_crc32c checksum;
1656
1657 /* no need to lock here, no concurrent access allowed yet */
1658
1659 /* delete temp file if it exists */
1660 sprintf(slotdir, "pg_replslot/%s", name);
1661 sprintf(path, "%s/state.tmp", slotdir);
1662 if (unlink(path) < 0 && errno != ENOENT)
1663 ereport(PANIC,
1664 (errcode_for_file_access(),
1665 errmsg("could not remove file \"%s\": %m", path)));
1666
1667 sprintf(path, "%s/state", slotdir);
1668
1669 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1670
1671 /* on some operating systems fsyncing a file requires O_RDWR */
1672 fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1673
1674 /*
1675 * We do not need to handle this as we are rename()ing the directory into
1676 * place only after we fsync()ed the state file.
1677 */
1678 if (fd < 0)
1679 ereport(PANIC,
1680 (errcode_for_file_access(),
1681 errmsg("could not open file \"%s\": %m", path)));
1682
1683 /*
1684 * Sync state file before we're reading from it. We might have crashed
1685 * while it wasn't synced yet and we shouldn't continue on that basis.
1686 */
1687 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
1688 if (pg_fsync(fd) != 0)
1689 ereport(PANIC,
1690 (errcode_for_file_access(),
1691 errmsg("could not fsync file \"%s\": %m",
1692 path)));
1693 pgstat_report_wait_end();
1694
1695 /* Also sync the parent directory */
1696 START_CRIT_SECTION();
1697 fsync_fname(slotdir, true);
1698 END_CRIT_SECTION();
1699
1700 /* read part of statefile that's guaranteed to be version independent */
1701 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1702 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1703 pgstat_report_wait_end();
1704 if (readBytes != ReplicationSlotOnDiskConstantSize)
1705 {
1706 if (readBytes < 0)
1707 ereport(PANIC,
1708 (errcode_for_file_access(),
1709 errmsg("could not read file \"%s\": %m", path)));
1710 else
1711 ereport(PANIC,
1712 (errcode(ERRCODE_DATA_CORRUPTED),
1713 errmsg("could not read file \"%s\": read %d of %zu",
1714 path, readBytes,
1715 (Size) ReplicationSlotOnDiskConstantSize)));
1716 }
1717
1718 /* verify magic */
1719 if (cp.magic != SLOT_MAGIC)
1720 ereport(PANIC,
1721 (errcode(ERRCODE_DATA_CORRUPTED),
1722 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1723 path, cp.magic, SLOT_MAGIC)));
1724
1725 /* verify version */
1726 if (cp.version != SLOT_VERSION)
1727 ereport(PANIC,
1728 (errcode(ERRCODE_DATA_CORRUPTED),
1729 errmsg("replication slot file \"%s\" has unsupported version %u",
1730 path, cp.version)));
1731
1732 /* boundary check on length */
1733 if (cp.length != ReplicationSlotOnDiskV2Size)
1734 ereport(PANIC,
1735 (errcode(ERRCODE_DATA_CORRUPTED),
1736 errmsg("replication slot file \"%s\" has corrupted length %u",
1737 path, cp.length)));
1738
1739 /* Now that we know the size, read the entire file */
1740 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1741 readBytes = read(fd,
1742 (char *) &cp + ReplicationSlotOnDiskConstantSize,
1743 cp.length);
1744 pgstat_report_wait_end();
1745 if (readBytes != cp.length)
1746 {
1747 if (readBytes < 0)
1748 ereport(PANIC,
1749 (errcode_for_file_access(),
1750 errmsg("could not read file \"%s\": %m", path)));
1751 else
1752 ereport(PANIC,
1753 (errcode(ERRCODE_DATA_CORRUPTED),
1754 errmsg("could not read file \"%s\": read %d of %zu",
1755 path, readBytes, (Size) cp.length)));
1756 }
1757
1758 if (CloseTransientFile(fd) != 0)
1759 ereport(PANIC,
1760 (errcode_for_file_access(),
1761 errmsg("could not close file \"%s\": %m", path)));
1762
1763 /* now verify the CRC */
1764 INIT_CRC32C(checksum);
1765 COMP_CRC32C(checksum,
1766 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1767 SnapBuildOnDiskChecksummedSize);
1768 FIN_CRC32C(checksum);
1769
1770 if (!EQ_CRC32C(checksum, cp.checksum))
1771 ereport(PANIC,
1772 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1773 path, checksum, cp.checksum)));
1774
1775 /*
1776 * If we crashed with an ephemeral slot active, don't restore but delete
1777 * it.
1778 */
1779 if (cp.slotdata.persistency != RS_PERSISTENT)
1780 {
1781 if (!rmtree(slotdir, true))
1782 {
1783 ereport(WARNING,
1784 (errmsg("could not remove directory \"%s\"",
1785 slotdir)));
1786 }
1787 fsync_fname("pg_replslot", true);
1788 return;
1789 }
1790
1791 /*
1792 * Verify that requirements for the specific slot type are met. That's
1793 * important because if these aren't met we're not guaranteed to retain
1794 * all the necessary resources for the slot.
1795 *
1796 * NB: We have to do so *after* the above checks for ephemeral slots,
1797 * because otherwise a slot that shouldn't exist anymore could prevent
1798 * restarts.
1799 *
1800 * NB: Changing the requirements here also requires adapting
1801 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
1802 */
1803 if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
1804 ereport(FATAL,
1805 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1806 errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
1807 NameStr(cp.slotdata.name)),
1808 errhint("Change wal_level to be logical or higher.")));
1809 else if (wal_level < WAL_LEVEL_REPLICA)
1810 ereport(FATAL,
1811 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1812 errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
1813 NameStr(cp.slotdata.name)),
1814 errhint("Change wal_level to be replica or higher.")));
1815
1816 /* nothing can be active yet, don't lock anything */
1817 for (i = 0; i < max_replication_slots; i++)
1818 {
1819 ReplicationSlot *slot;
1820
1821 slot = &ReplicationSlotCtl->replication_slots[i];
1822
1823 if (slot->in_use)
1824 continue;
1825
1826 /* restore the entire set of persistent data */
1827 memcpy(&slot->data, &cp.slotdata,
1828 sizeof(ReplicationSlotPersistentData));
1829
1830 /* initialize in memory state */
1831 slot->effective_xmin = cp.slotdata.xmin;
1832 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1833
1834 slot->candidate_catalog_xmin = InvalidTransactionId;
1835 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1836 slot->candidate_restart_lsn = InvalidXLogRecPtr;
1837 slot->candidate_restart_valid = InvalidXLogRecPtr;
1838
1839 slot->in_use = true;
1840 slot->active_pid = 0;
1841
1842 restored = true;
1843 break;
1844 }
1845
1846 if (!restored)
1847 ereport(FATAL,
1848 (errmsg("too many replication slots active before shutdown"),
1849 errhint("Increase max_replication_slots and try again.")));
1850 }
1851