1 /*-------------------------------------------------------------------------
2 *
3 * slotfuncs.c
4 * Support functions for replication slots
5 *
6 * Copyright (c) 2012-2021, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/slotfuncs.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "access/htup_details.h"
16 #include "access/xlog_internal.h"
17 #include "access/xlogutils.h"
18 #include "funcapi.h"
19 #include "miscadmin.h"
20 #include "replication/decode.h"
21 #include "replication/logical.h"
22 #include "replication/slot.h"
23 #include "utils/builtins.h"
24 #include "utils/inval.h"
25 #include "utils/pg_lsn.h"
26 #include "utils/resowner.h"
27
28 static void
check_permissions(void)29 check_permissions(void)
30 {
31 if (!superuser() && !has_rolreplication(GetUserId()))
32 ereport(ERROR,
33 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
34 errmsg("must be superuser or replication role to use replication slots")));
35 }
36
37 /*
38 * Helper function for creating a new physical replication slot with
39 * given arguments. Note that this function doesn't release the created
40 * slot.
41 *
42 * If restart_lsn is a valid value, we use it without WAL reservation
43 * routine. So the caller must guarantee that WAL is available.
44 */
45 static void
create_physical_replication_slot(char * name,bool immediately_reserve,bool temporary,XLogRecPtr restart_lsn)46 create_physical_replication_slot(char *name, bool immediately_reserve,
47 bool temporary, XLogRecPtr restart_lsn)
48 {
49 Assert(!MyReplicationSlot);
50
51 /* acquire replication slot, this will check for conflicting names */
52 ReplicationSlotCreate(name, false,
53 temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
54
55 if (immediately_reserve)
56 {
57 /* Reserve WAL as the user asked for it */
58 if (XLogRecPtrIsInvalid(restart_lsn))
59 ReplicationSlotReserveWal();
60 else
61 MyReplicationSlot->data.restart_lsn = restart_lsn;
62
63 /* Write this slot to disk */
64 ReplicationSlotMarkDirty();
65 ReplicationSlotSave();
66 }
67 }
68
69 /*
70 * SQL function for creating a new physical (streaming replication)
71 * replication slot.
72 */
73 Datum
pg_create_physical_replication_slot(PG_FUNCTION_ARGS)74 pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
75 {
76 Name name = PG_GETARG_NAME(0);
77 bool immediately_reserve = PG_GETARG_BOOL(1);
78 bool temporary = PG_GETARG_BOOL(2);
79 Datum values[2];
80 bool nulls[2];
81 TupleDesc tupdesc;
82 HeapTuple tuple;
83 Datum result;
84
85 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
86 elog(ERROR, "return type must be a row type");
87
88 check_permissions();
89
90 CheckSlotRequirements();
91
92 create_physical_replication_slot(NameStr(*name),
93 immediately_reserve,
94 temporary,
95 InvalidXLogRecPtr);
96
97 values[0] = NameGetDatum(&MyReplicationSlot->data.name);
98 nulls[0] = false;
99
100 if (immediately_reserve)
101 {
102 values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
103 nulls[1] = false;
104 }
105 else
106 nulls[1] = true;
107
108 tuple = heap_form_tuple(tupdesc, values, nulls);
109 result = HeapTupleGetDatum(tuple);
110
111 ReplicationSlotRelease();
112
113 PG_RETURN_DATUM(result);
114 }
115
116
117 /*
118 * Helper function for creating a new logical replication slot with
119 * given arguments. Note that this function doesn't release the created
120 * slot.
121 *
122 * When find_startpoint is false, the slot's confirmed_flush is not set; it's
123 * caller's responsibility to ensure it's set to something sensible.
124 */
125 static void
create_logical_replication_slot(char * name,char * plugin,bool temporary,bool two_phase,XLogRecPtr restart_lsn,bool find_startpoint)126 create_logical_replication_slot(char *name, char *plugin,
127 bool temporary, bool two_phase,
128 XLogRecPtr restart_lsn,
129 bool find_startpoint)
130 {
131 LogicalDecodingContext *ctx = NULL;
132
133 Assert(!MyReplicationSlot);
134
135 /*
136 * Acquire a logical decoding slot, this will check for conflicting names.
137 * Initially create persistent slot as ephemeral - that allows us to
138 * nicely handle errors during initialization because it'll get dropped if
139 * this transaction fails. We'll make it persistent at the end. Temporary
140 * slots can be created as temporary from beginning as they get dropped on
141 * error as well.
142 */
143 ReplicationSlotCreate(name, true,
144 temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
145
146 /*
147 * Create logical decoding context to find start point or, if we don't
148 * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
149 *
150 * Note: when !find_startpoint this is still important, because it's at
151 * this point that the output plugin is validated.
152 */
153 ctx = CreateInitDecodingContext(plugin, NIL,
154 false, /* just catalogs is OK */
155 restart_lsn,
156 XL_ROUTINE(.page_read = read_local_xlog_page,
157 .segment_open = wal_segment_open,
158 .segment_close = wal_segment_close),
159 NULL, NULL, NULL);
160
161 /*
162 * If caller needs us to determine the decoding start point, do so now.
163 * This might take a while.
164 */
165 if (find_startpoint)
166 DecodingContextFindStartpoint(ctx);
167
168 /* don't need the decoding context anymore */
169 FreeDecodingContext(ctx);
170 }
171
172 /*
173 * SQL function for creating a new logical replication slot.
174 */
175 Datum
pg_create_logical_replication_slot(PG_FUNCTION_ARGS)176 pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
177 {
178 Name name = PG_GETARG_NAME(0);
179 Name plugin = PG_GETARG_NAME(1);
180 bool temporary = PG_GETARG_BOOL(2);
181 bool two_phase = PG_GETARG_BOOL(3);
182 Datum result;
183 TupleDesc tupdesc;
184 HeapTuple tuple;
185 Datum values[2];
186 bool nulls[2];
187
188 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
189 elog(ERROR, "return type must be a row type");
190
191 check_permissions();
192
193 CheckLogicalDecodingRequirements();
194
195 create_logical_replication_slot(NameStr(*name),
196 NameStr(*plugin),
197 temporary,
198 two_phase,
199 InvalidXLogRecPtr,
200 true);
201
202 values[0] = NameGetDatum(&MyReplicationSlot->data.name);
203 values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
204
205 memset(nulls, 0, sizeof(nulls));
206
207 tuple = heap_form_tuple(tupdesc, values, nulls);
208 result = HeapTupleGetDatum(tuple);
209
210 /* ok, slot is now fully created, mark it as persistent if needed */
211 if (!temporary)
212 ReplicationSlotPersist();
213 ReplicationSlotRelease();
214
215 PG_RETURN_DATUM(result);
216 }
217
218
219 /*
220 * SQL function for dropping a replication slot.
221 */
222 Datum
pg_drop_replication_slot(PG_FUNCTION_ARGS)223 pg_drop_replication_slot(PG_FUNCTION_ARGS)
224 {
225 Name name = PG_GETARG_NAME(0);
226
227 check_permissions();
228
229 CheckSlotRequirements();
230
231 ReplicationSlotDrop(NameStr(*name), true);
232
233 PG_RETURN_VOID();
234 }
235
236 /*
237 * pg_get_replication_slots - SQL SRF showing active replication slots.
238 */
239 Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)240 pg_get_replication_slots(PG_FUNCTION_ARGS)
241 {
242 #define PG_GET_REPLICATION_SLOTS_COLS 14
243 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
244 TupleDesc tupdesc;
245 Tuplestorestate *tupstore;
246 MemoryContext per_query_ctx;
247 MemoryContext oldcontext;
248 XLogRecPtr currlsn;
249 int slotno;
250
251 /* check to see if caller supports us returning a tuplestore */
252 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
253 ereport(ERROR,
254 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
255 errmsg("set-valued function called in context that cannot accept a set")));
256 if (!(rsinfo->allowedModes & SFRM_Materialize))
257 ereport(ERROR,
258 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
259 errmsg("materialize mode required, but it is not allowed in this context")));
260
261 /* Build a tuple descriptor for our result type */
262 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
263 elog(ERROR, "return type must be a row type");
264
265 /*
266 * We don't require any special permission to see this function's data
267 * because nothing should be sensitive. The most critical being the slot
268 * name, which shouldn't contain anything particularly sensitive.
269 */
270
271 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
272 oldcontext = MemoryContextSwitchTo(per_query_ctx);
273
274 tupstore = tuplestore_begin_heap(true, false, work_mem);
275 rsinfo->returnMode = SFRM_Materialize;
276 rsinfo->setResult = tupstore;
277 rsinfo->setDesc = tupdesc;
278
279 MemoryContextSwitchTo(oldcontext);
280
281 currlsn = GetXLogWriteRecPtr();
282
283 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
284 for (slotno = 0; slotno < max_replication_slots; slotno++)
285 {
286 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
287 ReplicationSlot slot_contents;
288 Datum values[PG_GET_REPLICATION_SLOTS_COLS];
289 bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
290 WALAvailability walstate;
291 int i;
292
293 if (!slot->in_use)
294 continue;
295
296 /* Copy slot contents while holding spinlock, then examine at leisure */
297 SpinLockAcquire(&slot->mutex);
298 slot_contents = *slot;
299 SpinLockRelease(&slot->mutex);
300
301 memset(values, 0, sizeof(values));
302 memset(nulls, 0, sizeof(nulls));
303
304 i = 0;
305 values[i++] = NameGetDatum(&slot_contents.data.name);
306
307 if (slot_contents.data.database == InvalidOid)
308 nulls[i++] = true;
309 else
310 values[i++] = NameGetDatum(&slot_contents.data.plugin);
311
312 if (slot_contents.data.database == InvalidOid)
313 values[i++] = CStringGetTextDatum("physical");
314 else
315 values[i++] = CStringGetTextDatum("logical");
316
317 if (slot_contents.data.database == InvalidOid)
318 nulls[i++] = true;
319 else
320 values[i++] = ObjectIdGetDatum(slot_contents.data.database);
321
322 values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
323 values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
324
325 if (slot_contents.active_pid != 0)
326 values[i++] = Int32GetDatum(slot_contents.active_pid);
327 else
328 nulls[i++] = true;
329
330 if (slot_contents.data.xmin != InvalidTransactionId)
331 values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
332 else
333 nulls[i++] = true;
334
335 if (slot_contents.data.catalog_xmin != InvalidTransactionId)
336 values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
337 else
338 nulls[i++] = true;
339
340 if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
341 values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
342 else
343 nulls[i++] = true;
344
345 if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
346 values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
347 else
348 nulls[i++] = true;
349
350 /*
351 * If invalidated_at is valid and restart_lsn is invalid, we know for
352 * certain that the slot has been invalidated. Otherwise, test
353 * availability from restart_lsn.
354 */
355 if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
356 !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
357 walstate = WALAVAIL_REMOVED;
358 else
359 walstate = GetWALAvailability(slot_contents.data.restart_lsn);
360
361 switch (walstate)
362 {
363 case WALAVAIL_INVALID_LSN:
364 nulls[i++] = true;
365 break;
366
367 case WALAVAIL_RESERVED:
368 values[i++] = CStringGetTextDatum("reserved");
369 break;
370
371 case WALAVAIL_EXTENDED:
372 values[i++] = CStringGetTextDatum("extended");
373 break;
374
375 case WALAVAIL_UNRESERVED:
376 values[i++] = CStringGetTextDatum("unreserved");
377 break;
378
379 case WALAVAIL_REMOVED:
380
381 /*
382 * If we read the restart_lsn long enough ago, maybe that file
383 * has been removed by now. However, the walsender could have
384 * moved forward enough that it jumped to another file after
385 * we looked. If checkpointer signalled the process to
386 * termination, then it's definitely lost; but if a process is
387 * still alive, then "unreserved" seems more appropriate.
388 *
389 * If we do change it, save the state for safe_wal_size below.
390 */
391 if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
392 {
393 int pid;
394
395 SpinLockAcquire(&slot->mutex);
396 pid = slot->active_pid;
397 slot_contents.data.restart_lsn = slot->data.restart_lsn;
398 SpinLockRelease(&slot->mutex);
399 if (pid != 0)
400 {
401 values[i++] = CStringGetTextDatum("unreserved");
402 walstate = WALAVAIL_UNRESERVED;
403 break;
404 }
405 }
406 values[i++] = CStringGetTextDatum("lost");
407 break;
408 }
409
410 /*
411 * safe_wal_size is only computed for slots that have not been lost,
412 * and only if there's a configured maximum size.
413 */
414 if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
415 nulls[i++] = true;
416 else
417 {
418 XLogSegNo targetSeg;
419 uint64 slotKeepSegs;
420 uint64 keepSegs;
421 XLogSegNo failSeg;
422 XLogRecPtr failLSN;
423
424 XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
425
426 /* determine how many segments slots can be kept by slots */
427 slotKeepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
428 /* ditto for wal_keep_size */
429 keepSegs = XLogMBVarToSegs(wal_keep_size_mb, wal_segment_size);
430
431 /* if currpos reaches failLSN, we lose our segment */
432 failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
433 XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
434
435 values[i++] = Int64GetDatum(failLSN - currlsn);
436 }
437
438 values[i++] = BoolGetDatum(slot_contents.data.two_phase);
439
440 Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
441
442 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
443 }
444
445 LWLockRelease(ReplicationSlotControlLock);
446
447 tuplestore_donestoring(tupstore);
448
449 return (Datum) 0;
450 }
451
452 /*
453 * Helper function for advancing our physical replication slot forward.
454 *
455 * The LSN position to move to is compared simply to the slot's restart_lsn,
456 * knowing that any position older than that would be removed by successive
457 * checkpoints.
458 */
459 static XLogRecPtr
pg_physical_replication_slot_advance(XLogRecPtr moveto)460 pg_physical_replication_slot_advance(XLogRecPtr moveto)
461 {
462 XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
463 XLogRecPtr retlsn = startlsn;
464
465 Assert(moveto != InvalidXLogRecPtr);
466
467 if (startlsn < moveto)
468 {
469 SpinLockAcquire(&MyReplicationSlot->mutex);
470 MyReplicationSlot->data.restart_lsn = moveto;
471 SpinLockRelease(&MyReplicationSlot->mutex);
472 retlsn = moveto;
473
474 /*
475 * Dirty the slot so as it is written out at the next checkpoint. Note
476 * that the LSN position advanced may still be lost in the event of a
477 * crash, but this makes the data consistent after a clean shutdown.
478 */
479 ReplicationSlotMarkDirty();
480 }
481
482 return retlsn;
483 }
484
485 /*
486 * Helper function for advancing our logical replication slot forward.
487 *
488 * The slot's restart_lsn is used as start point for reading records, while
489 * confirmed_flush is used as base point for the decoding context.
490 *
491 * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
492 * because we need to digest WAL to advance restart_lsn allowing to recycle
493 * WAL and removal of old catalog tuples. As decoding is done in fast_forward
494 * mode, no changes are generated anyway.
495 */
496 static XLogRecPtr
pg_logical_replication_slot_advance(XLogRecPtr moveto)497 pg_logical_replication_slot_advance(XLogRecPtr moveto)
498 {
499 LogicalDecodingContext *ctx;
500 ResourceOwner old_resowner = CurrentResourceOwner;
501 XLogRecPtr retlsn;
502
503 Assert(moveto != InvalidXLogRecPtr);
504
505 PG_TRY();
506 {
507 /*
508 * Create our decoding context in fast_forward mode, passing start_lsn
509 * as InvalidXLogRecPtr, so that we start processing from my slot's
510 * confirmed_flush.
511 */
512 ctx = CreateDecodingContext(InvalidXLogRecPtr,
513 NIL,
514 true, /* fast_forward */
515 XL_ROUTINE(.page_read = read_local_xlog_page,
516 .segment_open = wal_segment_open,
517 .segment_close = wal_segment_close),
518 NULL, NULL, NULL);
519
520 /*
521 * Start reading at the slot's restart_lsn, which we know to point to
522 * a valid record.
523 */
524 XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
525
526 /* invalidate non-timetravel entries */
527 InvalidateSystemCaches();
528
529 /* Decode at least one record, until we run out of records */
530 while (ctx->reader->EndRecPtr < moveto)
531 {
532 char *errm = NULL;
533 XLogRecord *record;
534
535 /*
536 * Read records. No changes are generated in fast_forward mode,
537 * but snapbuilder/slot statuses are updated properly.
538 */
539 record = XLogReadRecord(ctx->reader, &errm);
540 if (errm)
541 elog(ERROR, "%s", errm);
542
543 /*
544 * Process the record. Storage-level changes are ignored in
545 * fast_forward mode, but other modules (such as snapbuilder)
546 * might still have critical updates to do.
547 */
548 if (record)
549 LogicalDecodingProcessRecord(ctx, ctx->reader);
550
551 /* Stop once the requested target has been reached */
552 if (moveto <= ctx->reader->EndRecPtr)
553 break;
554
555 CHECK_FOR_INTERRUPTS();
556 }
557
558 /*
559 * Logical decoding could have clobbered CurrentResourceOwner during
560 * transaction management, so restore the executor's value. (This is
561 * a kluge, but it's not worth cleaning up right now.)
562 */
563 CurrentResourceOwner = old_resowner;
564
565 if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
566 {
567 LogicalConfirmReceivedLocation(moveto);
568
569 /*
570 * If only the confirmed_flush LSN has changed the slot won't get
571 * marked as dirty by the above. Callers on the walsender
572 * interface are expected to keep track of their own progress and
573 * don't need it written out. But SQL-interface users cannot
574 * specify their own start positions and it's harder for them to
575 * keep track of their progress, so we should make more of an
576 * effort to save it for them.
577 *
578 * Dirty the slot so it is written out at the next checkpoint. The
579 * LSN position advanced to may still be lost on a crash but this
580 * makes the data consistent after a clean shutdown.
581 */
582 ReplicationSlotMarkDirty();
583 }
584
585 retlsn = MyReplicationSlot->data.confirmed_flush;
586
587 /* free context, call shutdown callback */
588 FreeDecodingContext(ctx);
589
590 InvalidateSystemCaches();
591 }
592 PG_CATCH();
593 {
594 /* clear all timetravel entries */
595 InvalidateSystemCaches();
596
597 PG_RE_THROW();
598 }
599 PG_END_TRY();
600
601 return retlsn;
602 }
603
604 /*
605 * SQL function for moving the position in a replication slot.
606 */
607 Datum
pg_replication_slot_advance(PG_FUNCTION_ARGS)608 pg_replication_slot_advance(PG_FUNCTION_ARGS)
609 {
610 Name slotname = PG_GETARG_NAME(0);
611 XLogRecPtr moveto = PG_GETARG_LSN(1);
612 XLogRecPtr endlsn;
613 XLogRecPtr minlsn;
614 TupleDesc tupdesc;
615 Datum values[2];
616 bool nulls[2];
617 HeapTuple tuple;
618 Datum result;
619
620 Assert(!MyReplicationSlot);
621
622 check_permissions();
623
624 if (XLogRecPtrIsInvalid(moveto))
625 ereport(ERROR,
626 (errmsg("invalid target WAL LSN")));
627
628 /* Build a tuple descriptor for our result type */
629 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
630 elog(ERROR, "return type must be a row type");
631
632 /*
633 * We can't move slot past what's been flushed/replayed so clamp the
634 * target position accordingly.
635 */
636 if (!RecoveryInProgress())
637 moveto = Min(moveto, GetFlushRecPtr());
638 else
639 moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
640
641 /* Acquire the slot so we "own" it */
642 ReplicationSlotAcquire(NameStr(*slotname), true);
643
644 /* A slot whose restart_lsn has never been reserved cannot be advanced */
645 if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
646 ereport(ERROR,
647 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
648 errmsg("replication slot \"%s\" cannot be advanced",
649 NameStr(*slotname)),
650 errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
651
652 /*
653 * Check if the slot is not moving backwards. Physical slots rely simply
654 * on restart_lsn as a minimum point, while logical slots have confirmed
655 * consumption up to confirmed_flush, meaning that in both cases data
656 * older than that is not available anymore.
657 */
658 if (OidIsValid(MyReplicationSlot->data.database))
659 minlsn = MyReplicationSlot->data.confirmed_flush;
660 else
661 minlsn = MyReplicationSlot->data.restart_lsn;
662
663 if (moveto < minlsn)
664 ereport(ERROR,
665 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
666 errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
667 LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
668
669 /* Do the actual slot update, depending on the slot type */
670 if (OidIsValid(MyReplicationSlot->data.database))
671 endlsn = pg_logical_replication_slot_advance(moveto);
672 else
673 endlsn = pg_physical_replication_slot_advance(moveto);
674
675 values[0] = NameGetDatum(&MyReplicationSlot->data.name);
676 nulls[0] = false;
677
678 /*
679 * Recompute the minimum LSN and xmin across all slots to adjust with the
680 * advancing potentially done.
681 */
682 ReplicationSlotsComputeRequiredXmin(false);
683 ReplicationSlotsComputeRequiredLSN();
684
685 ReplicationSlotRelease();
686
687 /* Return the reached position. */
688 values[1] = LSNGetDatum(endlsn);
689 nulls[1] = false;
690
691 tuple = heap_form_tuple(tupdesc, values, nulls);
692 result = HeapTupleGetDatum(tuple);
693
694 PG_RETURN_DATUM(result);
695 }
696
697 /*
698 * Helper function of copying a replication slot.
699 */
700 static Datum
copy_replication_slot(FunctionCallInfo fcinfo,bool logical_slot)701 copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
702 {
703 Name src_name = PG_GETARG_NAME(0);
704 Name dst_name = PG_GETARG_NAME(1);
705 ReplicationSlot *src = NULL;
706 ReplicationSlot first_slot_contents;
707 ReplicationSlot second_slot_contents;
708 XLogRecPtr src_restart_lsn;
709 bool src_islogical;
710 bool temporary;
711 char *plugin;
712 Datum values[2];
713 bool nulls[2];
714 Datum result;
715 TupleDesc tupdesc;
716 HeapTuple tuple;
717
718 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
719 elog(ERROR, "return type must be a row type");
720
721 check_permissions();
722
723 if (logical_slot)
724 CheckLogicalDecodingRequirements();
725 else
726 CheckSlotRequirements();
727
728 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
729
730 /*
731 * We need to prevent the source slot's reserved WAL from being removed,
732 * but we don't want to lock that slot for very long, and it can advance
733 * in the meantime. So obtain the source slot's data, and create a new
734 * slot using its restart_lsn. Afterwards we lock the source slot again
735 * and verify that the data we copied (name, type) has not changed
736 * incompatibly. No inconvenient WAL removal can occur once the new slot
737 * is created -- but since WAL removal could have occurred before we
738 * managed to create the new slot, we advance the new slot's restart_lsn
739 * to the source slot's updated restart_lsn the second time we lock it.
740 */
741 for (int i = 0; i < max_replication_slots; i++)
742 {
743 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
744
745 if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
746 {
747 /* Copy the slot contents while holding spinlock */
748 SpinLockAcquire(&s->mutex);
749 first_slot_contents = *s;
750 SpinLockRelease(&s->mutex);
751 src = s;
752 break;
753 }
754 }
755
756 LWLockRelease(ReplicationSlotControlLock);
757
758 if (src == NULL)
759 ereport(ERROR,
760 (errcode(ERRCODE_UNDEFINED_OBJECT),
761 errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
762
763 src_islogical = SlotIsLogical(&first_slot_contents);
764 src_restart_lsn = first_slot_contents.data.restart_lsn;
765 temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
766 plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
767
768 /* Check type of replication slot */
769 if (src_islogical != logical_slot)
770 ereport(ERROR,
771 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
772 src_islogical ?
773 errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
774 NameStr(*src_name)) :
775 errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
776 NameStr(*src_name))));
777
778 /* Copying non-reserved slot doesn't make sense */
779 if (XLogRecPtrIsInvalid(src_restart_lsn))
780 ereport(ERROR,
781 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
782 errmsg("cannot copy a replication slot that doesn't reserve WAL")));
783
784 /* Overwrite params from optional arguments */
785 if (PG_NARGS() >= 3)
786 temporary = PG_GETARG_BOOL(2);
787 if (PG_NARGS() >= 4)
788 {
789 Assert(logical_slot);
790 plugin = NameStr(*(PG_GETARG_NAME(3)));
791 }
792
793 /* Create new slot and acquire it */
794 if (logical_slot)
795 {
796 /*
797 * We must not try to read WAL, since we haven't reserved it yet --
798 * hence pass find_startpoint false. confirmed_flush will be set
799 * below, by copying from the source slot.
800 */
801 create_logical_replication_slot(NameStr(*dst_name),
802 plugin,
803 temporary,
804 false,
805 src_restart_lsn,
806 false);
807 }
808 else
809 create_physical_replication_slot(NameStr(*dst_name),
810 true,
811 temporary,
812 src_restart_lsn);
813
814 /*
815 * Update the destination slot to current values of the source slot;
816 * recheck that the source slot is still the one we saw previously.
817 */
818 {
819 TransactionId copy_effective_xmin;
820 TransactionId copy_effective_catalog_xmin;
821 TransactionId copy_xmin;
822 TransactionId copy_catalog_xmin;
823 XLogRecPtr copy_restart_lsn;
824 XLogRecPtr copy_confirmed_flush;
825 bool copy_islogical;
826 char *copy_name;
827
828 /* Copy data of source slot again */
829 SpinLockAcquire(&src->mutex);
830 second_slot_contents = *src;
831 SpinLockRelease(&src->mutex);
832
833 copy_effective_xmin = second_slot_contents.effective_xmin;
834 copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
835
836 copy_xmin = second_slot_contents.data.xmin;
837 copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
838 copy_restart_lsn = second_slot_contents.data.restart_lsn;
839 copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
840
841 /* for existence check */
842 copy_name = NameStr(second_slot_contents.data.name);
843 copy_islogical = SlotIsLogical(&second_slot_contents);
844
845 /*
846 * Check if the source slot still exists and is valid. We regard it as
847 * invalid if the type of replication slot or name has been changed,
848 * or the restart_lsn either is invalid or has gone backward. (The
849 * restart_lsn could go backwards if the source slot is dropped and
850 * copied from an older slot during installation.)
851 *
852 * Since erroring out will release and drop the destination slot we
853 * don't need to release it here.
854 */
855 if (copy_restart_lsn < src_restart_lsn ||
856 src_islogical != copy_islogical ||
857 strcmp(copy_name, NameStr(*src_name)) != 0)
858 ereport(ERROR,
859 (errmsg("could not copy replication slot \"%s\"",
860 NameStr(*src_name)),
861 errdetail("The source replication slot was modified incompatibly during the copy operation.")));
862
863 /* The source slot must have a consistent snapshot */
864 if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
865 ereport(ERROR,
866 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
867 errmsg("cannot copy unfinished logical replication slot \"%s\"",
868 NameStr(*src_name)),
869 errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
870
871 /* Install copied values again */
872 SpinLockAcquire(&MyReplicationSlot->mutex);
873 MyReplicationSlot->effective_xmin = copy_effective_xmin;
874 MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
875
876 MyReplicationSlot->data.xmin = copy_xmin;
877 MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
878 MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
879 MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
880 SpinLockRelease(&MyReplicationSlot->mutex);
881
882 ReplicationSlotMarkDirty();
883 ReplicationSlotsComputeRequiredXmin(false);
884 ReplicationSlotsComputeRequiredLSN();
885 ReplicationSlotSave();
886
887 #ifdef USE_ASSERT_CHECKING
888 /* Check that the restart_lsn is available */
889 {
890 XLogSegNo segno;
891
892 XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
893 Assert(XLogGetLastRemovedSegno() < segno);
894 }
895 #endif
896 }
897
898 /* target slot fully created, mark as persistent if needed */
899 if (logical_slot && !temporary)
900 ReplicationSlotPersist();
901
902 /* All done. Set up the return values */
903 values[0] = NameGetDatum(dst_name);
904 nulls[0] = false;
905 if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
906 {
907 values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
908 nulls[1] = false;
909 }
910 else
911 nulls[1] = true;
912
913 tuple = heap_form_tuple(tupdesc, values, nulls);
914 result = HeapTupleGetDatum(tuple);
915
916 ReplicationSlotRelease();
917
918 PG_RETURN_DATUM(result);
919 }
920
921 /* The wrappers below are all to appease opr_sanity */
922 Datum
pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)923 pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
924 {
925 return copy_replication_slot(fcinfo, true);
926 }
927
928 Datum
pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)929 pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
930 {
931 return copy_replication_slot(fcinfo, true);
932 }
933
934 Datum
pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)935 pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
936 {
937 return copy_replication_slot(fcinfo, true);
938 }
939
940 Datum
pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)941 pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
942 {
943 return copy_replication_slot(fcinfo, false);
944 }
945
946 Datum
pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)947 pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
948 {
949 return copy_replication_slot(fcinfo, false);
950 }
951