1 /*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2016, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 * directory. Inside that directory the state file will contain the slot's
25 * own data. Additional data can be stored alongside that file if required.
26 * While the server is running, the state data is also cached in memory for
27 * efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37 #include "postgres.h"
38
39 #include <unistd.h>
40 #include <sys/stat.h>
41
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "common/string.h"
45 #include "miscadmin.h"
46 #include "replication/slot.h"
47 #include "storage/fd.h"
48 #include "storage/proc.h"
49 #include "storage/procarray.h"
50
51 /*
52 * Replication slot on-disk data structure.
53 */
54 typedef struct ReplicationSlotOnDisk
55 {
56 /* first part of this struct needs to be version independent */
57
58 /* data not covered by checksum */
59 uint32 magic;
60 pg_crc32c checksum;
61
62 /* data covered by checksum */
63 uint32 version;
64 uint32 length;
65
66 /*
67 * The actual data in the slot that follows can differ based on the above
68 * 'version'.
69 */
70
71 ReplicationSlotPersistentData slotdata;
72 } ReplicationSlotOnDisk;
73
74 /* size of version independent data */
75 #define ReplicationSlotOnDiskConstantSize \
76 offsetof(ReplicationSlotOnDisk, slotdata)
77 /* size of the part of the slot not covered by the checksum */
78 #define SnapBuildOnDiskNotChecksummedSize \
79 offsetof(ReplicationSlotOnDisk, version)
80 /* size of the part covered by the checksum */
81 #define SnapBuildOnDiskChecksummedSize \
82 sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
83 /* size of the slot data that is version dependent */
84 #define ReplicationSlotOnDiskV2Size \
85 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
86
87 #define SLOT_MAGIC 0x1051CA1 /* format identifier */
88 #define SLOT_VERSION 2 /* version for new files */
89
90 /* Control array for replication slot management */
91 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
92
93 /* My backend's replication slot in the shared memory array */
94 ReplicationSlot *MyReplicationSlot = NULL;
95
96 /* GUCs */
97 int max_replication_slots = 0; /* the maximum number of replication
98 * slots */
99
100 static LWLockTranche ReplSlotIOLWLockTranche;
101 static void ReplicationSlotDropAcquired(void);
102
103 /* internal persistency functions */
104 static void RestoreSlotFromDisk(const char *name);
105 static void CreateSlotOnDisk(ReplicationSlot *slot);
106 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
107
108 /*
109 * Report shared-memory space needed by ReplicationSlotShmemInit.
110 */
111 Size
ReplicationSlotsShmemSize(void)112 ReplicationSlotsShmemSize(void)
113 {
114 Size size = 0;
115
116 if (max_replication_slots == 0)
117 return size;
118
119 size = offsetof(ReplicationSlotCtlData, replication_slots);
120 size = add_size(size,
121 mul_size(max_replication_slots, sizeof(ReplicationSlot)));
122
123 return size;
124 }
125
126 /*
127 * Allocate and initialize shared memory for replication slots.
128 */
129 void
ReplicationSlotsShmemInit(void)130 ReplicationSlotsShmemInit(void)
131 {
132 bool found;
133
134 if (max_replication_slots == 0)
135 return;
136
137 ReplicationSlotCtl = (ReplicationSlotCtlData *)
138 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
139 &found);
140
141 ReplSlotIOLWLockTranche.name = "replication_slot_io";
142 ReplSlotIOLWLockTranche.array_base =
143 ((char *) ReplicationSlotCtl) + offsetof(ReplicationSlotCtlData, replication_slots) +offsetof(ReplicationSlot, io_in_progress_lock);
144 ReplSlotIOLWLockTranche.array_stride = sizeof(ReplicationSlot);
145 LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
146 &ReplSlotIOLWLockTranche);
147
148 if (!found)
149 {
150 int i;
151
152 /* First time through, so initialize */
153 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
154
155 for (i = 0; i < max_replication_slots; i++)
156 {
157 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
158
159 /* everything else is zeroed by the memset above */
160 SpinLockInit(&slot->mutex);
161 LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
162 }
163 }
164 }
165
166 /*
167 * Check whether the passed slot name is valid and report errors at elevel.
168 *
169 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
170 * the name to be used as a directory name on every supported OS.
171 *
172 * Returns whether the directory name is valid or not if elevel < ERROR.
173 */
174 bool
ReplicationSlotValidateName(const char * name,int elevel)175 ReplicationSlotValidateName(const char *name, int elevel)
176 {
177 const char *cp;
178
179 if (strlen(name) == 0)
180 {
181 ereport(elevel,
182 (errcode(ERRCODE_INVALID_NAME),
183 errmsg("replication slot name \"%s\" is too short",
184 name)));
185 return false;
186 }
187
188 if (strlen(name) >= NAMEDATALEN)
189 {
190 ereport(elevel,
191 (errcode(ERRCODE_NAME_TOO_LONG),
192 errmsg("replication slot name \"%s\" is too long",
193 name)));
194 return false;
195 }
196
197 for (cp = name; *cp; cp++)
198 {
199 if (!((*cp >= 'a' && *cp <= 'z')
200 || (*cp >= '0' && *cp <= '9')
201 || (*cp == '_')))
202 {
203 ereport(elevel,
204 (errcode(ERRCODE_INVALID_NAME),
205 errmsg("replication slot name \"%s\" contains invalid character",
206 name),
207 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
208 return false;
209 }
210 }
211 return true;
212 }
213
214 /*
215 * Create a new replication slot and mark it as used by this backend.
216 *
217 * name: Name of the slot
218 * db_specific: logical decoding is db specific; if the slot is going to
219 * be used for that pass true, otherwise false.
220 */
221 void
ReplicationSlotCreate(const char * name,bool db_specific,ReplicationSlotPersistency persistency)222 ReplicationSlotCreate(const char *name, bool db_specific,
223 ReplicationSlotPersistency persistency)
224 {
225 ReplicationSlot *slot = NULL;
226 int i;
227
228 Assert(MyReplicationSlot == NULL);
229
230 ReplicationSlotValidateName(name, ERROR);
231
232 /*
233 * If some other backend ran this code concurrently with us, we'd likely
234 * both allocate the same slot, and that would be bad. We'd also be at
235 * risk of missing a name collision. Also, we don't want to try to create
236 * a new slot while somebody's busy cleaning up an old one, because we
237 * might both be monkeying with the same directory.
238 */
239 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
240
241 /*
242 * Check for name collision, and identify an allocatable slot. We need to
243 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
244 * else can change the in_use flags while we're looking at them.
245 */
246 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
247 for (i = 0; i < max_replication_slots; i++)
248 {
249 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
250
251 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
252 ereport(ERROR,
253 (errcode(ERRCODE_DUPLICATE_OBJECT),
254 errmsg("replication slot \"%s\" already exists", name)));
255 if (!s->in_use && slot == NULL)
256 slot = s;
257 }
258 LWLockRelease(ReplicationSlotControlLock);
259
260 /* If all slots are in use, we're out of luck. */
261 if (slot == NULL)
262 ereport(ERROR,
263 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
264 errmsg("all replication slots are in use"),
265 errhint("Free one or increase max_replication_slots.")));
266
267 /*
268 * Since this slot is not in use, nobody should be looking at any part of
269 * it other than the in_use field unless they're trying to allocate it.
270 * And since we hold ReplicationSlotAllocationLock, nobody except us can
271 * be doing that. So it's safe to initialize the slot.
272 */
273 Assert(!slot->in_use);
274 Assert(slot->active_pid == 0);
275
276 /* first initialize persistent data */
277 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
278 StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
279 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
280 slot->data.persistency = persistency;
281
282 /* and then data only present in shared memory */
283 slot->just_dirtied = false;
284 slot->dirty = false;
285 slot->effective_xmin = InvalidTransactionId;
286 slot->effective_catalog_xmin = InvalidTransactionId;
287 slot->candidate_catalog_xmin = InvalidTransactionId;
288 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
289 slot->candidate_restart_valid = InvalidXLogRecPtr;
290 slot->candidate_restart_lsn = InvalidXLogRecPtr;
291
292 /*
293 * Create the slot on disk. We haven't actually marked the slot allocated
294 * yet, so no special cleanup is required if this errors out.
295 */
296 CreateSlotOnDisk(slot);
297
298 /*
299 * We need to briefly prevent any other backend from iterating over the
300 * slots while we flip the in_use flag. We also need to set the active
301 * flag while holding the ControlLock as otherwise a concurrent
302 * SlotAcquire() could acquire the slot as well.
303 */
304 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
305
306 slot->in_use = true;
307
308 /* We can now mark the slot active, and that makes it our slot. */
309 SpinLockAcquire(&slot->mutex);
310 Assert(slot->active_pid == 0);
311 slot->active_pid = MyProcPid;
312 SpinLockRelease(&slot->mutex);
313 MyReplicationSlot = slot;
314
315 LWLockRelease(ReplicationSlotControlLock);
316
317 /*
318 * Now that the slot has been marked as in_use and in_active, it's safe to
319 * let somebody else try to allocate a slot.
320 */
321 LWLockRelease(ReplicationSlotAllocationLock);
322 }
323
324 /*
325 * Find a previously created slot and mark it as used by this backend.
326 */
327 void
ReplicationSlotAcquire(const char * name)328 ReplicationSlotAcquire(const char *name)
329 {
330 ReplicationSlot *slot = NULL;
331 int i;
332 int active_pid = 0;
333
334 Assert(MyReplicationSlot == NULL);
335
336 ReplicationSlotValidateName(name, ERROR);
337
338 /* Search for the named slot and mark it active if we find it. */
339 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
340 for (i = 0; i < max_replication_slots; i++)
341 {
342 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
343
344 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
345 {
346 SpinLockAcquire(&s->mutex);
347 active_pid = s->active_pid;
348 if (active_pid == 0)
349 s->active_pid = MyProcPid;
350 SpinLockRelease(&s->mutex);
351 slot = s;
352 break;
353 }
354 }
355 LWLockRelease(ReplicationSlotControlLock);
356
357 /* If we did not find the slot or it was already active, error out. */
358 if (slot == NULL)
359 ereport(ERROR,
360 (errcode(ERRCODE_UNDEFINED_OBJECT),
361 errmsg("replication slot \"%s\" does not exist", name)));
362 if (active_pid != 0)
363 ereport(ERROR,
364 (errcode(ERRCODE_OBJECT_IN_USE),
365 errmsg("replication slot \"%s\" is active for PID %d",
366 name, active_pid)));
367
368 /* We made this slot active, so it's ours now. */
369 MyReplicationSlot = slot;
370 }
371
372 /*
373 * Release a replication slot, this or another backend can ReAcquire it
374 * later. Resources this slot requires will be preserved.
375 */
376 void
ReplicationSlotRelease(void)377 ReplicationSlotRelease(void)
378 {
379 ReplicationSlot *slot = MyReplicationSlot;
380
381 Assert(slot != NULL && slot->active_pid != 0);
382
383 if (slot->data.persistency == RS_EPHEMERAL)
384 {
385 /*
386 * Delete the slot. There is no !PANIC case where this is allowed to
387 * fail, all that may happen is an incomplete cleanup of the on-disk
388 * data.
389 */
390 ReplicationSlotDropAcquired();
391 }
392 else
393 {
394 /* Mark slot inactive. We're not freeing it, just disconnecting. */
395 SpinLockAcquire(&slot->mutex);
396 slot->active_pid = 0;
397 SpinLockRelease(&slot->mutex);
398 }
399
400
401 /*
402 * If slot needed to temporarily restrain both data and catalog xmin to
403 * create the catalog snapshot, remove that temporary constraint.
404 * Snapshots can only be exported while the initial snapshot is still
405 * acquired.
406 */
407 if (!TransactionIdIsValid(slot->data.xmin) &&
408 TransactionIdIsValid(slot->effective_xmin))
409 {
410 SpinLockAcquire(&slot->mutex);
411 slot->effective_xmin = InvalidTransactionId;
412 SpinLockRelease(&slot->mutex);
413 ReplicationSlotsComputeRequiredXmin(false);
414 }
415
416 MyReplicationSlot = NULL;
417
418 /* might not have been set when we've been a plain slot */
419 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
420 MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
421 LWLockRelease(ProcArrayLock);
422 }
423
424 /*
425 * Permanently drop replication slot identified by the passed in name.
426 */
427 void
ReplicationSlotDrop(const char * name)428 ReplicationSlotDrop(const char *name)
429 {
430 Assert(MyReplicationSlot == NULL);
431
432 ReplicationSlotAcquire(name);
433
434 ReplicationSlotDropAcquired();
435 }
436
437 /*
438 * Permanently drop the currently acquired replication slot which will be
439 * released by the point this function returns.
440 */
441 static void
ReplicationSlotDropAcquired(void)442 ReplicationSlotDropAcquired(void)
443 {
444 char path[MAXPGPATH];
445 char tmppath[MAXPGPATH];
446 ReplicationSlot *slot = MyReplicationSlot;
447
448 Assert(MyReplicationSlot != NULL);
449
450 /* slot isn't acquired anymore */
451 MyReplicationSlot = NULL;
452
453 /*
454 * If some other backend ran this code concurrently with us, we might try
455 * to delete a slot with a certain name while someone else was trying to
456 * create a slot with the same name.
457 */
458 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
459
460 /* Generate pathnames. */
461 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
462 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
463
464 /*
465 * Rename the slot directory on disk, so that we'll no longer recognize
466 * this as a valid slot. Note that if this fails, we've got to mark the
467 * slot inactive before bailing out. If we're dropping an ephemeral slot,
468 * we better never fail hard as the caller won't expect the slot to
469 * survive and this might get called during error handling.
470 */
471 if (rename(path, tmppath) == 0)
472 {
473 /*
474 * We need to fsync() the directory we just renamed and its parent to
475 * make sure that our changes are on disk in a crash-safe fashion. If
476 * fsync() fails, we can't be sure whether the changes are on disk or
477 * not. For now, we handle that by panicking;
478 * StartupReplicationSlots() will try to straighten it out after
479 * restart.
480 */
481 START_CRIT_SECTION();
482 fsync_fname(tmppath, true);
483 fsync_fname("pg_replslot", true);
484 END_CRIT_SECTION();
485 }
486 else
487 {
488 bool fail_softly = slot->data.persistency == RS_EPHEMERAL;
489
490 SpinLockAcquire(&slot->mutex);
491 slot->active_pid = 0;
492 SpinLockRelease(&slot->mutex);
493
494 ereport(fail_softly ? WARNING : ERROR,
495 (errcode_for_file_access(),
496 errmsg("could not rename file \"%s\" to \"%s\": %m",
497 path, tmppath)));
498 }
499
500 /*
501 * The slot is definitely gone. Lock out concurrent scans of the array
502 * long enough to kill it. It's OK to clear the active flag here without
503 * grabbing the mutex because nobody else can be scanning the array here,
504 * and nobody can be attached to this slot and thus access it without
505 * scanning the array.
506 */
507 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
508 slot->active_pid = 0;
509 slot->in_use = false;
510 LWLockRelease(ReplicationSlotControlLock);
511
512 /*
513 * Slot is dead and doesn't prevent resource removal anymore, recompute
514 * limits.
515 */
516 ReplicationSlotsComputeRequiredXmin(false);
517 ReplicationSlotsComputeRequiredLSN();
518
519 /*
520 * If removing the directory fails, the worst thing that will happen is
521 * that the user won't be able to create a new slot with the same name
522 * until the next server restart. We warn about it, but that's all.
523 */
524 if (!rmtree(tmppath, true))
525 ereport(WARNING,
526 (errcode_for_file_access(),
527 errmsg("could not remove directory \"%s\"", tmppath)));
528
529 /*
530 * We release this at the very end, so that nobody starts trying to create
531 * a slot while we're still cleaning up the detritus of the old one.
532 */
533 LWLockRelease(ReplicationSlotAllocationLock);
534 }
535
536 /*
537 * Serialize the currently acquired slot's state from memory to disk, thereby
538 * guaranteeing the current state will survive a crash.
539 */
540 void
ReplicationSlotSave(void)541 ReplicationSlotSave(void)
542 {
543 char path[MAXPGPATH];
544
545 Assert(MyReplicationSlot != NULL);
546
547 sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
548 SaveSlotToPath(MyReplicationSlot, path, ERROR);
549 }
550
551 /*
552 * Signal that it would be useful if the currently acquired slot would be
553 * flushed out to disk.
554 *
555 * Note that the actual flush to disk can be delayed for a long time, if
556 * required for correctness explicitly do a ReplicationSlotSave().
557 */
558 void
ReplicationSlotMarkDirty(void)559 ReplicationSlotMarkDirty(void)
560 {
561 ReplicationSlot *slot = MyReplicationSlot;
562
563 Assert(MyReplicationSlot != NULL);
564
565 SpinLockAcquire(&slot->mutex);
566 MyReplicationSlot->just_dirtied = true;
567 MyReplicationSlot->dirty = true;
568 SpinLockRelease(&slot->mutex);
569 }
570
571 /*
572 * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
573 * guaranteeing it will be there after an eventual crash.
574 */
575 void
ReplicationSlotPersist(void)576 ReplicationSlotPersist(void)
577 {
578 ReplicationSlot *slot = MyReplicationSlot;
579
580 Assert(slot != NULL);
581 Assert(slot->data.persistency != RS_PERSISTENT);
582
583 SpinLockAcquire(&slot->mutex);
584 slot->data.persistency = RS_PERSISTENT;
585 SpinLockRelease(&slot->mutex);
586
587 ReplicationSlotMarkDirty();
588 ReplicationSlotSave();
589 }
590
591 /*
592 * Compute the oldest xmin across all slots and store it in the ProcArray.
593 *
594 * If already_locked is true, ProcArrayLock has already been acquired
595 * exclusively.
596 */
597 void
ReplicationSlotsComputeRequiredXmin(bool already_locked)598 ReplicationSlotsComputeRequiredXmin(bool already_locked)
599 {
600 int i;
601 TransactionId agg_xmin = InvalidTransactionId;
602 TransactionId agg_catalog_xmin = InvalidTransactionId;
603
604 Assert(ReplicationSlotCtl != NULL);
605
606 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
607
608 for (i = 0; i < max_replication_slots; i++)
609 {
610 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
611 TransactionId effective_xmin;
612 TransactionId effective_catalog_xmin;
613
614 if (!s->in_use)
615 continue;
616
617 SpinLockAcquire(&s->mutex);
618 effective_xmin = s->effective_xmin;
619 effective_catalog_xmin = s->effective_catalog_xmin;
620 SpinLockRelease(&s->mutex);
621
622 /* check the data xmin */
623 if (TransactionIdIsValid(effective_xmin) &&
624 (!TransactionIdIsValid(agg_xmin) ||
625 TransactionIdPrecedes(effective_xmin, agg_xmin)))
626 agg_xmin = effective_xmin;
627
628 /* check the catalog xmin */
629 if (TransactionIdIsValid(effective_catalog_xmin) &&
630 (!TransactionIdIsValid(agg_catalog_xmin) ||
631 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
632 agg_catalog_xmin = effective_catalog_xmin;
633 }
634
635 LWLockRelease(ReplicationSlotControlLock);
636
637 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
638 }
639
640 /*
641 * Compute the oldest restart LSN across all slots and inform xlog module.
642 */
643 void
ReplicationSlotsComputeRequiredLSN(void)644 ReplicationSlotsComputeRequiredLSN(void)
645 {
646 int i;
647 XLogRecPtr min_required = InvalidXLogRecPtr;
648
649 Assert(ReplicationSlotCtl != NULL);
650
651 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
652 for (i = 0; i < max_replication_slots; i++)
653 {
654 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
655 XLogRecPtr restart_lsn;
656
657 if (!s->in_use)
658 continue;
659
660 SpinLockAcquire(&s->mutex);
661 restart_lsn = s->data.restart_lsn;
662 SpinLockRelease(&s->mutex);
663
664 if (restart_lsn != InvalidXLogRecPtr &&
665 (min_required == InvalidXLogRecPtr ||
666 restart_lsn < min_required))
667 min_required = restart_lsn;
668 }
669 LWLockRelease(ReplicationSlotControlLock);
670
671 XLogSetReplicationSlotMinimumLSN(min_required);
672 }
673
674 /*
675 * Compute the oldest WAL LSN required by *logical* decoding slots..
676 *
677 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
678 * slots exist.
679 *
680 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
681 * ignores physical replication slots.
682 *
683 * The results aren't required frequently, so we don't maintain a precomputed
684 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
685 */
686 XLogRecPtr
ReplicationSlotsComputeLogicalRestartLSN(void)687 ReplicationSlotsComputeLogicalRestartLSN(void)
688 {
689 XLogRecPtr result = InvalidXLogRecPtr;
690 int i;
691
692 if (max_replication_slots <= 0)
693 return InvalidXLogRecPtr;
694
695 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
696
697 for (i = 0; i < max_replication_slots; i++)
698 {
699 ReplicationSlot *s;
700 XLogRecPtr restart_lsn;
701
702 s = &ReplicationSlotCtl->replication_slots[i];
703
704 /* cannot change while ReplicationSlotCtlLock is held */
705 if (!s->in_use)
706 continue;
707
708 /* we're only interested in logical slots */
709 if (!SlotIsLogical(s))
710 continue;
711
712 /* read once, it's ok if it increases while we're checking */
713 SpinLockAcquire(&s->mutex);
714 restart_lsn = s->data.restart_lsn;
715 SpinLockRelease(&s->mutex);
716
717 if (result == InvalidXLogRecPtr ||
718 restart_lsn < result)
719 result = restart_lsn;
720 }
721
722 LWLockRelease(ReplicationSlotControlLock);
723
724 return result;
725 }
726
727 /*
728 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
729 * passed database oid.
730 *
731 * Returns true if there are any slots referencing the database. *nslots will
732 * be set to the absolute number of slots in the database, *nactive to ones
733 * currently active.
734 */
735 bool
ReplicationSlotsCountDBSlots(Oid dboid,int * nslots,int * nactive)736 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
737 {
738 int i;
739
740 *nslots = *nactive = 0;
741
742 if (max_replication_slots <= 0)
743 return false;
744
745 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
746 for (i = 0; i < max_replication_slots; i++)
747 {
748 ReplicationSlot *s;
749
750 s = &ReplicationSlotCtl->replication_slots[i];
751
752 /* cannot change while ReplicationSlotCtlLock is held */
753 if (!s->in_use)
754 continue;
755
756 /* only logical slots are database specific, skip */
757 if (!SlotIsLogical(s))
758 continue;
759
760 /* not our database, skip */
761 if (s->data.database != dboid)
762 continue;
763
764 /* count slots with spinlock held */
765 SpinLockAcquire(&s->mutex);
766 (*nslots)++;
767 if (s->active_pid != 0)
768 (*nactive)++;
769 SpinLockRelease(&s->mutex);
770 }
771 LWLockRelease(ReplicationSlotControlLock);
772
773 if (*nslots > 0)
774 return true;
775 return false;
776 }
777
778
779 /*
780 * Check whether the server's configuration supports using replication
781 * slots.
782 */
783 void
CheckSlotRequirements(void)784 CheckSlotRequirements(void)
785 {
786 /*
787 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
788 * needs the same check.
789 */
790
791 if (max_replication_slots == 0)
792 ereport(ERROR,
793 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
794 (errmsg("replication slots can only be used if max_replication_slots > 0"))));
795
796 if (wal_level < WAL_LEVEL_REPLICA)
797 ereport(ERROR,
798 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
799 errmsg("replication slots can only be used if wal_level >= replica")));
800 }
801
802 /*
803 * Reserve WAL for the currently active slot.
804 *
805 * Compute and set restart_lsn in a manner that's appropriate for the type of
806 * the slot and concurrency safe.
807 */
808 void
ReplicationSlotReserveWal(void)809 ReplicationSlotReserveWal(void)
810 {
811 ReplicationSlot *slot = MyReplicationSlot;
812
813 Assert(slot != NULL);
814 Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
815
816 /*
817 * The replication slot mechanism is used to prevent removal of required
818 * WAL. As there is no interlock between this routine and checkpoints, WAL
819 * segments could concurrently be removed when a now stale return value of
820 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
821 * this happens we'll just retry.
822 */
823 while (true)
824 {
825 XLogSegNo segno;
826
827 /*
828 * For logical slots log a standby snapshot and start logical decoding
829 * at exactly that position. That allows the slot to start up more
830 * quickly.
831 *
832 * That's not needed (or indeed helpful) for physical slots as they'll
833 * start replay at the last logged checkpoint anyway. Instead return
834 * the location of the last redo LSN. While that slightly increases
835 * the chance that we have to retry, it's where a base backup has to
836 * start replay at.
837 */
838 if (!RecoveryInProgress() && SlotIsLogical(slot))
839 {
840 XLogRecPtr flushptr;
841
842 /* start at current insert position */
843 slot->data.restart_lsn = GetXLogInsertRecPtr();
844
845 /* make sure we have enough information to start */
846 flushptr = LogStandbySnapshot();
847
848 /* and make sure it's fsynced to disk */
849 XLogFlush(flushptr);
850 }
851 else
852 {
853 slot->data.restart_lsn = GetRedoRecPtr();
854 }
855
856 /* prevent WAL removal as fast as possible */
857 ReplicationSlotsComputeRequiredLSN();
858
859 /*
860 * If all required WAL is still there, great, otherwise retry. The
861 * slot should prevent further removal of WAL, unless there's a
862 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
863 * the new restart_lsn above, so normally we should never need to loop
864 * more than twice.
865 */
866 XLByteToSeg(slot->data.restart_lsn, segno);
867 if (XLogGetLastRemovedSegno() < segno)
868 break;
869 }
870 }
871
872 /*
873 * Flush all replication slots to disk.
874 *
875 * This needn't actually be part of a checkpoint, but it's a convenient
876 * location.
877 */
878 void
CheckPointReplicationSlots(void)879 CheckPointReplicationSlots(void)
880 {
881 int i;
882
883 elog(DEBUG1, "performing replication slot checkpoint");
884
885 /*
886 * Prevent any slot from being created/dropped while we're active. As we
887 * explicitly do *not* want to block iterating over replication_slots or
888 * acquiring a slot we cannot take the control lock - but that's OK,
889 * because holding ReplicationSlotAllocationLock is strictly stronger, and
890 * enough to guarantee that nobody can change the in_use bits on us.
891 */
892 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
893
894 for (i = 0; i < max_replication_slots; i++)
895 {
896 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
897 char path[MAXPGPATH];
898
899 if (!s->in_use)
900 continue;
901
902 /* save the slot to disk, locking is handled in SaveSlotToPath() */
903 sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
904 SaveSlotToPath(s, path, LOG);
905 }
906 LWLockRelease(ReplicationSlotAllocationLock);
907 }
908
909 /*
910 * Load all replication slots from disk into memory at server startup. This
911 * needs to be run before we start crash recovery.
912 */
913 void
StartupReplicationSlots(void)914 StartupReplicationSlots(void)
915 {
916 DIR *replication_dir;
917 struct dirent *replication_de;
918
919 elog(DEBUG1, "starting up replication slots");
920
921 /* restore all slots by iterating over all on-disk entries */
922 replication_dir = AllocateDir("pg_replslot");
923 while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
924 {
925 struct stat statbuf;
926 char path[MAXPGPATH + 12];
927
928 if (strcmp(replication_de->d_name, ".") == 0 ||
929 strcmp(replication_de->d_name, "..") == 0)
930 continue;
931
932 snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
933
934 /* we're only creating directories here, skip if it's not our's */
935 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
936 continue;
937
938 /* we crashed while a slot was being setup or deleted, clean up */
939 if (pg_str_endswith(replication_de->d_name, ".tmp"))
940 {
941 if (!rmtree(path, true))
942 {
943 ereport(WARNING,
944 (errcode_for_file_access(),
945 errmsg("could not remove directory \"%s\"", path)));
946 continue;
947 }
948 fsync_fname("pg_replslot", true);
949 continue;
950 }
951
952 /* looks like a slot in a normal state, restore */
953 RestoreSlotFromDisk(replication_de->d_name);
954 }
955 FreeDir(replication_dir);
956
957 /* currently no slots exist, we're done. */
958 if (max_replication_slots <= 0)
959 return;
960
961 /* Now that we have recovered all the data, compute replication xmin */
962 ReplicationSlotsComputeRequiredXmin(false);
963 ReplicationSlotsComputeRequiredLSN();
964 }
965
966 /* ----
967 * Manipulation of on-disk state of replication slots
968 *
969 * NB: none of the routines below should take any notice whether a slot is the
970 * current one or not, that's all handled a layer above.
971 * ----
972 */
973 static void
CreateSlotOnDisk(ReplicationSlot * slot)974 CreateSlotOnDisk(ReplicationSlot *slot)
975 {
976 char tmppath[MAXPGPATH];
977 char path[MAXPGPATH];
978 struct stat st;
979
980 /*
981 * No need to take out the io_in_progress_lock, nobody else can see this
982 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
983 * takes out the lock, if we'd take the lock here, we'd deadlock.
984 */
985
986 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
987 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
988
989 /*
990 * It's just barely possible that some previous effort to create or drop a
991 * slot with this name left a temp directory lying around. If that seems
992 * to be the case, try to remove it. If the rmtree() fails, we'll error
993 * out at the mkdir() below, so we don't bother checking success.
994 */
995 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
996 rmtree(tmppath, true);
997
998 /* Create and fsync the temporary slot directory. */
999 if (mkdir(tmppath, S_IRWXU) < 0)
1000 ereport(ERROR,
1001 (errcode_for_file_access(),
1002 errmsg("could not create directory \"%s\": %m",
1003 tmppath)));
1004 fsync_fname(tmppath, true);
1005
1006 /* Write the actual state file. */
1007 slot->dirty = true; /* signal that we really need to write */
1008 SaveSlotToPath(slot, tmppath, ERROR);
1009
1010 /* Rename the directory into place. */
1011 if (rename(tmppath, path) != 0)
1012 ereport(ERROR,
1013 (errcode_for_file_access(),
1014 errmsg("could not rename file \"%s\" to \"%s\": %m",
1015 tmppath, path)));
1016
1017 /*
1018 * If we'd now fail - really unlikely - we wouldn't know whether this slot
1019 * would persist after an OS crash or not - so, force a restart. The
1020 * restart would try to fsync this again till it works.
1021 */
1022 START_CRIT_SECTION();
1023
1024 fsync_fname(path, true);
1025 fsync_fname("pg_replslot", true);
1026
1027 END_CRIT_SECTION();
1028 }
1029
1030 /*
1031 * Shared functionality between saving and creating a replication slot.
1032 */
1033 static void
SaveSlotToPath(ReplicationSlot * slot,const char * dir,int elevel)1034 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1035 {
1036 char tmppath[MAXPGPATH];
1037 char path[MAXPGPATH];
1038 int fd;
1039 ReplicationSlotOnDisk cp;
1040 bool was_dirty;
1041
1042 /* first check whether there's something to write out */
1043 SpinLockAcquire(&slot->mutex);
1044 was_dirty = slot->dirty;
1045 slot->just_dirtied = false;
1046 SpinLockRelease(&slot->mutex);
1047
1048 /* and don't do anything if there's nothing to write */
1049 if (!was_dirty)
1050 return;
1051
1052 LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1053
1054 /* silence valgrind :( */
1055 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1056
1057 sprintf(tmppath, "%s/state.tmp", dir);
1058 sprintf(path, "%s/state", dir);
1059
1060 fd = OpenTransientFile(tmppath,
1061 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1062 S_IRUSR | S_IWUSR);
1063 if (fd < 0)
1064 {
1065 /*
1066 * If not an ERROR, then release the lock before returning. In case
1067 * of an ERROR, the error recovery path automatically releases the
1068 * lock, but no harm in explicitly releasing even in that case. Note
1069 * that LWLockRelease() could affect errno.
1070 */
1071 int save_errno = errno;
1072
1073 LWLockRelease(&slot->io_in_progress_lock);
1074 errno = save_errno;
1075 ereport(elevel,
1076 (errcode_for_file_access(),
1077 errmsg("could not create file \"%s\": %m",
1078 tmppath)));
1079 return;
1080 }
1081
1082 cp.magic = SLOT_MAGIC;
1083 INIT_CRC32C(cp.checksum);
1084 cp.version = SLOT_VERSION;
1085 cp.length = ReplicationSlotOnDiskV2Size;
1086
1087 SpinLockAcquire(&slot->mutex);
1088
1089 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1090
1091 SpinLockRelease(&slot->mutex);
1092
1093 COMP_CRC32C(cp.checksum,
1094 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1095 SnapBuildOnDiskChecksummedSize);
1096 FIN_CRC32C(cp.checksum);
1097
1098 errno = 0;
1099 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1100 {
1101 int save_errno = errno;
1102
1103 CloseTransientFile(fd);
1104 LWLockRelease(&slot->io_in_progress_lock);
1105
1106 /* if write didn't set errno, assume problem is no disk space */
1107 errno = save_errno ? save_errno : ENOSPC;
1108 ereport(elevel,
1109 (errcode_for_file_access(),
1110 errmsg("could not write to file \"%s\": %m",
1111 tmppath)));
1112 return;
1113 }
1114
1115 /* fsync the temporary file */
1116 if (pg_fsync(fd) != 0)
1117 {
1118 int save_errno = errno;
1119
1120 CloseTransientFile(fd);
1121 LWLockRelease(&slot->io_in_progress_lock);
1122 errno = save_errno;
1123 ereport(elevel,
1124 (errcode_for_file_access(),
1125 errmsg("could not fsync file \"%s\": %m",
1126 tmppath)));
1127 return;
1128 }
1129
1130 CloseTransientFile(fd);
1131
1132 /* rename to permanent file, fsync file and directory */
1133 if (rename(tmppath, path) != 0)
1134 {
1135 int save_errno = errno;
1136
1137 LWLockRelease(&slot->io_in_progress_lock);
1138 errno = save_errno;
1139 ereport(elevel,
1140 (errcode_for_file_access(),
1141 errmsg("could not rename file \"%s\" to \"%s\": %m",
1142 tmppath, path)));
1143 return;
1144 }
1145
1146 /* Check CreateSlot() for the reasoning of using a crit. section. */
1147 START_CRIT_SECTION();
1148
1149 fsync_fname(path, false);
1150 fsync_fname(dir, true);
1151 fsync_fname("pg_replslot", true);
1152
1153 END_CRIT_SECTION();
1154
1155 /*
1156 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1157 * already.
1158 */
1159 SpinLockAcquire(&slot->mutex);
1160 if (!slot->just_dirtied)
1161 slot->dirty = false;
1162 SpinLockRelease(&slot->mutex);
1163
1164 LWLockRelease(&slot->io_in_progress_lock);
1165 }
1166
1167 /*
1168 * Load a single slot from disk into memory.
1169 */
1170 static void
RestoreSlotFromDisk(const char * name)1171 RestoreSlotFromDisk(const char *name)
1172 {
1173 ReplicationSlotOnDisk cp;
1174 int i;
1175 char slotdir[MAXPGPATH + 12];
1176 char path[MAXPGPATH + 22];
1177 int fd;
1178 bool restored = false;
1179 int readBytes;
1180 pg_crc32c checksum;
1181
1182 /* no need to lock here, no concurrent access allowed yet */
1183
1184 /* delete temp file if it exists */
1185 sprintf(slotdir, "pg_replslot/%s", name);
1186 sprintf(path, "%s/state.tmp", slotdir);
1187 if (unlink(path) < 0 && errno != ENOENT)
1188 ereport(PANIC,
1189 (errcode_for_file_access(),
1190 errmsg("could not remove file \"%s\": %m", path)));
1191
1192 sprintf(path, "%s/state", slotdir);
1193
1194 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1195
1196 fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1197
1198 /*
1199 * We do not need to handle this as we are rename()ing the directory into
1200 * place only after we fsync()ed the state file.
1201 */
1202 if (fd < 0)
1203 ereport(PANIC,
1204 (errcode_for_file_access(),
1205 errmsg("could not open file \"%s\": %m", path)));
1206
1207 /*
1208 * Sync state file before we're reading from it. We might have crashed
1209 * while it wasn't synced yet and we shouldn't continue on that basis.
1210 */
1211 if (pg_fsync(fd) != 0)
1212 {
1213 int save_errno = errno;
1214
1215 CloseTransientFile(fd);
1216 errno = save_errno;
1217 ereport(PANIC,
1218 (errcode_for_file_access(),
1219 errmsg("could not fsync file \"%s\": %m",
1220 path)));
1221 }
1222
1223 /* Also sync the parent directory */
1224 START_CRIT_SECTION();
1225 fsync_fname(slotdir, true);
1226 END_CRIT_SECTION();
1227
1228 /* read part of statefile that's guaranteed to be version independent */
1229 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1230 if (readBytes != ReplicationSlotOnDiskConstantSize)
1231 {
1232 int saved_errno = errno;
1233
1234 CloseTransientFile(fd);
1235 errno = saved_errno;
1236 ereport(PANIC,
1237 (errcode_for_file_access(),
1238 errmsg("could not read file \"%s\", read %d of %u: %m",
1239 path, readBytes,
1240 (uint32) ReplicationSlotOnDiskConstantSize)));
1241 }
1242
1243 /* verify magic */
1244 if (cp.magic != SLOT_MAGIC)
1245 ereport(PANIC,
1246 (errcode_for_file_access(),
1247 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1248 path, cp.magic, SLOT_MAGIC)));
1249
1250 /* verify version */
1251 if (cp.version != SLOT_VERSION)
1252 ereport(PANIC,
1253 (errcode_for_file_access(),
1254 errmsg("replication slot file \"%s\" has unsupported version %u",
1255 path, cp.version)));
1256
1257 /* boundary check on length */
1258 if (cp.length != ReplicationSlotOnDiskV2Size)
1259 ereport(PANIC,
1260 (errcode_for_file_access(),
1261 errmsg("replication slot file \"%s\" has corrupted length %u",
1262 path, cp.length)));
1263
1264 /* Now that we know the size, read the entire file */
1265 readBytes = read(fd,
1266 (char *) &cp + ReplicationSlotOnDiskConstantSize,
1267 cp.length);
1268 if (readBytes != cp.length)
1269 {
1270 int saved_errno = errno;
1271
1272 CloseTransientFile(fd);
1273 errno = saved_errno;
1274 ereport(PANIC,
1275 (errcode_for_file_access(),
1276 errmsg("could not read file \"%s\", read %d of %u: %m",
1277 path, readBytes, cp.length)));
1278 }
1279
1280 CloseTransientFile(fd);
1281
1282 /* now verify the CRC */
1283 INIT_CRC32C(checksum);
1284 COMP_CRC32C(checksum,
1285 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1286 SnapBuildOnDiskChecksummedSize);
1287 FIN_CRC32C(checksum);
1288
1289 if (!EQ_CRC32C(checksum, cp.checksum))
1290 ereport(PANIC,
1291 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1292 path, checksum, cp.checksum)));
1293
1294 /*
1295 * If we crashed with an ephemeral slot active, don't restore but delete
1296 * it.
1297 */
1298 if (cp.slotdata.persistency != RS_PERSISTENT)
1299 {
1300 if (!rmtree(slotdir, true))
1301 {
1302 ereport(WARNING,
1303 (errcode_for_file_access(),
1304 errmsg("could not remove directory \"%s\"", slotdir)));
1305 }
1306 fsync_fname("pg_replslot", true);
1307 return;
1308 }
1309
1310 /*
1311 * Verify that requirements for the specific slot type are met. That's
1312 * important because if these aren't met we're not guaranteed to retain
1313 * all the necessary resources for the slot.
1314 *
1315 * NB: We have to do so *after* the above checks for ephemeral slots,
1316 * because otherwise a slot that shouldn't exist anymore could prevent
1317 * restarts.
1318 *
1319 * NB: Changing the requirements here also requires adapting
1320 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
1321 */
1322 if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
1323 ereport(FATAL,
1324 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1325 errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
1326 NameStr(cp.slotdata.name)),
1327 errhint("Change wal_level to be logical or higher.")));
1328 else if (wal_level < WAL_LEVEL_REPLICA)
1329 ereport(FATAL,
1330 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1331 errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
1332 NameStr(cp.slotdata.name)),
1333 errhint("Change wal_level to be replica or higher.")));
1334
1335 /* nothing can be active yet, don't lock anything */
1336 for (i = 0; i < max_replication_slots; i++)
1337 {
1338 ReplicationSlot *slot;
1339
1340 slot = &ReplicationSlotCtl->replication_slots[i];
1341
1342 if (slot->in_use)
1343 continue;
1344
1345 /* restore the entire set of persistent data */
1346 memcpy(&slot->data, &cp.slotdata,
1347 sizeof(ReplicationSlotPersistentData));
1348
1349 /* initialize in memory state */
1350 slot->effective_xmin = cp.slotdata.xmin;
1351 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1352
1353 slot->candidate_catalog_xmin = InvalidTransactionId;
1354 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1355 slot->candidate_restart_lsn = InvalidXLogRecPtr;
1356 slot->candidate_restart_valid = InvalidXLogRecPtr;
1357
1358 slot->in_use = true;
1359 slot->active_pid = 0;
1360
1361 restored = true;
1362 break;
1363 }
1364
1365 if (!restored)
1366 ereport(PANIC,
1367 (errmsg("too many replication slots active before shutdown"),
1368 errhint("Increase max_replication_slots and try again.")));
1369 }
1370