1 /*-------------------------------------------------------------------------
2  *
3  * slotfuncs.c
4  *	   Support functions for replication slots
5  *
6  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *	  src/backend/replication/slotfuncs.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xlog_internal.h"
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 #include "replication/decode.h"
20 #include "replication/slot.h"
21 #include "replication/logical.h"
22 #include "replication/logicalfuncs.h"
23 #include "utils/builtins.h"
24 #include "utils/inval.h"
25 #include "utils/pg_lsn.h"
26 #include "utils/resowner.h"
27 
28 static void
29 check_permissions(void)
30 {
31 	if (!superuser() && !has_rolreplication(GetUserId()))
32 		ereport(ERROR,
33 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
34 				 (errmsg("must be superuser or replication role to use replication slots"))));
35 }
36 
37 /*
38  * Helper function for creating a new physical replication slot with
39  * given arguments. Note that this function doesn't release the created
40  * slot.
41  *
42  * If restart_lsn is a valid value, we use it without WAL reservation
43  * routine. So the caller must guarantee that WAL is available.
44  */
45 static void
46 create_physical_replication_slot(char *name, bool immediately_reserve,
47 								 bool temporary, XLogRecPtr restart_lsn)
48 {
49 	Assert(!MyReplicationSlot);
50 
51 	/* acquire replication slot, this will check for conflicting names */
52 	ReplicationSlotCreate(name, false,
53 						  temporary ? RS_TEMPORARY : RS_PERSISTENT);
54 
55 	if (immediately_reserve)
56 	{
57 		/* Reserve WAL as the user asked for it */
58 		if (XLogRecPtrIsInvalid(restart_lsn))
59 			ReplicationSlotReserveWal();
60 		else
61 			MyReplicationSlot->data.restart_lsn = restart_lsn;
62 
63 		/* Write this slot to disk */
64 		ReplicationSlotMarkDirty();
65 		ReplicationSlotSave();
66 	}
67 }
68 
69 /*
70  * SQL function for creating a new physical (streaming replication)
71  * replication slot.
72  */
73 Datum
74 pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
75 {
76 	Name		name = PG_GETARG_NAME(0);
77 	bool		immediately_reserve = PG_GETARG_BOOL(1);
78 	bool		temporary = PG_GETARG_BOOL(2);
79 	Datum		values[2];
80 	bool		nulls[2];
81 	TupleDesc	tupdesc;
82 	HeapTuple	tuple;
83 	Datum		result;
84 
85 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
86 		elog(ERROR, "return type must be a row type");
87 
88 	check_permissions();
89 
90 	CheckSlotRequirements();
91 
92 	create_physical_replication_slot(NameStr(*name),
93 									 immediately_reserve,
94 									 temporary,
95 									 InvalidXLogRecPtr);
96 
97 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
WalWriterMain(void)98 	nulls[0] = false;
99 
100 	if (immediately_reserve)
101 	{
102 		values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
103 		nulls[1] = false;
104 	}
105 	else
106 		nulls[1] = true;
107 
108 	tuple = heap_form_tuple(tupdesc, values, nulls);
109 	result = HeapTupleGetDatum(tuple);
110 
111 	ReplicationSlotRelease();
112 
113 	PG_RETURN_DATUM(result);
114 }
115 
116 
117 /*
118  * Helper function for creating a new logical replication slot with
119  * given arguments. Note that this function doesn't release the created
120  * slot.
121  *
122  * When find_startpoint is false, the slot's confirmed_flush is not set; it's
123  * caller's responsibility to ensure it's set to something sensible.
124  */
125 static void
126 create_logical_replication_slot(char *name, char *plugin,
127 								bool temporary, XLogRecPtr restart_lsn,
128 								bool find_startpoint)
129 {
130 	LogicalDecodingContext *ctx = NULL;
131 
132 	Assert(!MyReplicationSlot);
133 
134 	/*
135 	 * Acquire a logical decoding slot, this will check for conflicting names.
136 	 * Initially create persistent slot as ephemeral - that allows us to
137 	 * nicely handle errors during initialization because it'll get dropped if
138 	 * this transaction fails. We'll make it persistent at the end. Temporary
139 	 * slots can be created as temporary from beginning as they get dropped on
140 	 * error as well.
141 	 */
142 	ReplicationSlotCreate(name, true,
143 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
144 
145 	/*
146 	 * Create logical decoding context to find start point or, if we don't
147 	 * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
148 	 *
149 	 * Note: when !find_startpoint this is still important, because it's at
150 	 * this point that the output plugin is validated.
151 	 */
152 	ctx = CreateInitDecodingContext(plugin, NIL,
153 									false,	/* just catalogs is OK */
154 									restart_lsn,
155 									logical_read_local_xlog_page, NULL, NULL,
156 									NULL);
157 
158 	/*
159 	 * If caller needs us to determine the decoding start point, do so now.
160 	 * This might take a while.
161 	 */
162 	if (find_startpoint)
163 		DecodingContextFindStartpoint(ctx);
164 
165 	/* don't need the decoding context anymore */
166 	FreeDecodingContext(ctx);
167 }
168 
169 /*
170  * SQL function for creating a new logical replication slot.
171  */
172 Datum
173 pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
174 {
175 	Name		name = PG_GETARG_NAME(0);
176 	Name		plugin = PG_GETARG_NAME(1);
177 	bool		temporary = PG_GETARG_BOOL(2);
178 	Datum		result;
179 	TupleDesc	tupdesc;
180 	HeapTuple	tuple;
181 	Datum		values[2];
182 	bool		nulls[2];
183 
184 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
185 		elog(ERROR, "return type must be a row type");
186 
187 	check_permissions();
188 
189 	CheckLogicalDecodingRequirements();
190 
191 	create_logical_replication_slot(NameStr(*name),
192 									NameStr(*plugin),
193 									temporary,
194 									InvalidXLogRecPtr,
195 									true);
196 
197 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
198 	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
199 
200 	memset(nulls, 0, sizeof(nulls));
201 
202 	tuple = heap_form_tuple(tupdesc, values, nulls);
203 	result = HeapTupleGetDatum(tuple);
204 
205 	/* ok, slot is now fully created, mark it as persistent if needed */
206 	if (!temporary)
207 		ReplicationSlotPersist();
208 	ReplicationSlotRelease();
209 
210 	PG_RETURN_DATUM(result);
211 }
212 
213 
214 /*
215  * SQL function for dropping a replication slot.
216  */
217 Datum
218 pg_drop_replication_slot(PG_FUNCTION_ARGS)
219 {
220 	Name		name = PG_GETARG_NAME(0);
221 
222 	check_permissions();
223 
224 	CheckSlotRequirements();
225 
226 	ReplicationSlotDrop(NameStr(*name), true);
227 
228 	PG_RETURN_VOID();
229 }
230 
231 /*
232  * pg_get_replication_slots - SQL SRF showing active replication slots.
233  */
234 Datum
235 pg_get_replication_slots(PG_FUNCTION_ARGS)
236 {
237 #define PG_GET_REPLICATION_SLOTS_COLS 11
238 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
239 	TupleDesc	tupdesc;
240 	Tuplestorestate *tupstore;
241 	MemoryContext per_query_ctx;
242 	MemoryContext oldcontext;
243 	int			slotno;
244 
245 	/* check to see if caller supports us returning a tuplestore */
246 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
247 		ereport(ERROR,
248 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
249 				 errmsg("set-valued function called in context that cannot accept a set")));
250 	if (!(rsinfo->allowedModes & SFRM_Materialize))
251 		ereport(ERROR,
252 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
253 				 errmsg("materialize mode required, but it is not " \
254 						"allowed in this context")));
255 
256 	/* Build a tuple descriptor for our result type */
257 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
258 		elog(ERROR, "return type must be a row type");
259 
260 	/*
261 	 * We don't require any special permission to see this function's data
262 	 * because nothing should be sensitive. The most critical being the slot
263 	 * name, which shouldn't contain anything particularly sensitive.
264 	 */
265 
266 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
267 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
268 
269 	tupstore = tuplestore_begin_heap(true, false, work_mem);
270 	rsinfo->returnMode = SFRM_Materialize;
271 	rsinfo->setResult = tupstore;
272 	rsinfo->setDesc = tupdesc;
273 
274 	MemoryContextSwitchTo(oldcontext);
275 
276 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
277 	for (slotno = 0; slotno < max_replication_slots; slotno++)
278 	{
279 		ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
280 		ReplicationSlot slot_contents;
281 		Datum		values[PG_GET_REPLICATION_SLOTS_COLS];
282 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
283 		int			i;
284 
285 		if (!slot->in_use)
286 			continue;
287 
288 		/* Copy slot contents while holding spinlock, then examine at leisure */
289 		SpinLockAcquire(&slot->mutex);
290 		slot_contents = *slot;
291 		SpinLockRelease(&slot->mutex);
292 
293 		memset(values, 0, sizeof(values));
294 		memset(nulls, 0, sizeof(nulls));
295 
296 		i = 0;
297 		values[i++] = NameGetDatum(&slot_contents.data.name);
wal_quickdie(SIGNAL_ARGS)298 
299 		if (slot_contents.data.database == InvalidOid)
300 			nulls[i++] = true;
301 		else
302 			values[i++] = NameGetDatum(&slot_contents.data.plugin);
303 
304 		if (slot_contents.data.database == InvalidOid)
305 			values[i++] = CStringGetTextDatum("physical");
306 		else
307 			values[i++] = CStringGetTextDatum("logical");
308 
309 		if (slot_contents.data.database == InvalidOid)
310 			nulls[i++] = true;
311 		else
312 			values[i++] = ObjectIdGetDatum(slot_contents.data.database);
313 
314 		values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
315 		values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
316 
317 		if (slot_contents.active_pid != 0)
318 			values[i++] = Int32GetDatum(slot_contents.active_pid);
WalSigHupHandler(SIGNAL_ARGS)319 		else
320 			nulls[i++] = true;
321 
322 		if (slot_contents.data.xmin != InvalidTransactionId)
323 			values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
324 		else
325 			nulls[i++] = true;
326 
327 		if (slot_contents.data.catalog_xmin != InvalidTransactionId)
328 			values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
329 		else
330 			nulls[i++] = true;
WalShutdownHandler(SIGNAL_ARGS)331 
332 		if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
333 			values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
334 		else
335 			nulls[i++] = true;
336 
337 		if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
338 			values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
339 		else
340 			nulls[i++] = true;
341 
342 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
walwriter_sigusr1_handler(SIGNAL_ARGS)343 
344 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
345 	}
346 
347 	LWLockRelease(ReplicationSlotControlLock);
348 
349 	tuplestore_donestoring(tupstore);
350 
351 	return (Datum) 0;
352 }
353 
354 /*
355  * Helper function for advancing our physical replication slot forward.
356  *
357  * The LSN position to move to is compared simply to the slot's restart_lsn,
358  * knowing that any position older than that would be removed by successive
359  * checkpoints.
360  */
361 static XLogRecPtr
362 pg_physical_replication_slot_advance(XLogRecPtr moveto)
363 {
364 	XLogRecPtr	startlsn = MyReplicationSlot->data.restart_lsn;
365 	XLogRecPtr	retlsn = startlsn;
366 
367 	if (startlsn < moveto)
368 	{
369 		SpinLockAcquire(&MyReplicationSlot->mutex);
370 		MyReplicationSlot->data.restart_lsn = moveto;
371 		SpinLockRelease(&MyReplicationSlot->mutex);
372 		retlsn = moveto;
373 
374 		/*
375 		 * Dirty the slot so as it is written out at the next checkpoint.
376 		 * Note that the LSN position advanced may still be lost in the
377 		 * event of a crash, but this makes the data consistent after a
378 		 * clean shutdown.
379 		 */
380 		ReplicationSlotMarkDirty();
381 	}
382 
383 	return retlsn;
384 }
385 
386 /*
387  * Helper function for advancing our logical replication slot forward.
388  *
389  * The slot's restart_lsn is used as start point for reading records, while
390  * confirmed_flush is used as base point for the decoding context.
391  *
392  * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
393  * because we need to digest WAL to advance restart_lsn allowing to recycle
394  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
395  * mode, no changes are generated anyway.
396  */
397 static XLogRecPtr
398 pg_logical_replication_slot_advance(XLogRecPtr moveto)
399 {
400 	LogicalDecodingContext *ctx;
401 	ResourceOwner old_resowner = CurrentResourceOwner;
402 	XLogRecPtr	startlsn;
403 	XLogRecPtr	retlsn;
404 
405 	PG_TRY();
406 	{
407 		/*
408 		 * Create our decoding context in fast_forward mode, passing start_lsn
409 		 * as InvalidXLogRecPtr, so that we start processing from my slot's
410 		 * confirmed_flush.
411 		 */
412 		ctx = CreateDecodingContext(InvalidXLogRecPtr,
413 									NIL,
414 									true,	/* fast_forward */
415 									logical_read_local_xlog_page,
416 									NULL, NULL, NULL);
417 
418 		/*
419 		 * Start reading at the slot's restart_lsn, which we know to point to
420 		 * a valid record.
421 		 */
422 		startlsn = MyReplicationSlot->data.restart_lsn;
423 
424 		/* Initialize our return value in case we don't do anything */
425 		retlsn = MyReplicationSlot->data.confirmed_flush;
426 
427 		/* invalidate non-timetravel entries */
428 		InvalidateSystemCaches();
429 
430 		/* Decode at least one record, until we run out of records */
431 		while ((!XLogRecPtrIsInvalid(startlsn) &&
432 				startlsn < moveto) ||
433 			   (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
434 				ctx->reader->EndRecPtr < moveto))
435 		{
436 			char	   *errm = NULL;
437 			XLogRecord *record;
438 
439 			/*
440 			 * Read records.  No changes are generated in fast_forward mode,
441 			 * but snapbuilder/slot statuses are updated properly.
442 			 */
443 			record = XLogReadRecord(ctx->reader, startlsn, &errm);
444 			if (errm)
445 				elog(ERROR, "%s", errm);
446 
447 			/* Read sequentially from now on */
448 			startlsn = InvalidXLogRecPtr;
449 
450 			/*
451 			 * Process the record.  Storage-level changes are ignored in
452 			 * fast_forward mode, but other modules (such as snapbuilder)
453 			 * might still have critical updates to do.
454 			 */
455 			if (record)
456 				LogicalDecodingProcessRecord(ctx, ctx->reader);
457 
458 			/* Stop once the requested target has been reached */
459 			if (moveto <= ctx->reader->EndRecPtr)
460 				break;
461 
462 			CHECK_FOR_INTERRUPTS();
463 		}
464 
465 		/*
466 		 * Logical decoding could have clobbered CurrentResourceOwner during
467 		 * transaction management, so restore the executor's value.  (This is
468 		 * a kluge, but it's not worth cleaning up right now.)
469 		 */
470 		CurrentResourceOwner = old_resowner;
471 
472 		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
473 		{
474 			LogicalConfirmReceivedLocation(moveto);
475 
476 			/*
477 			 * If only the confirmed_flush LSN has changed the slot won't get
478 			 * marked as dirty by the above. Callers on the walsender
479 			 * interface are expected to keep track of their own progress and
480 			 * don't need it written out. But SQL-interface users cannot
481 			 * specify their own start positions and it's harder for them to
482 			 * keep track of their progress, so we should make more of an
483 			 * effort to save it for them.
484 			 *
485 			 * Dirty the slot so it is written out at the next checkpoint.
486 			 * The LSN position advanced to may still be lost on a crash
487 			 * but this makes the data consistent after a clean shutdown.
488 			 */
489 			ReplicationSlotMarkDirty();
490 		}
491 
492 		retlsn = MyReplicationSlot->data.confirmed_flush;
493 
494 		/* free context, call shutdown callback */
495 		FreeDecodingContext(ctx);
496 
497 		InvalidateSystemCaches();
498 	}
499 	PG_CATCH();
500 	{
501 		/* clear all timetravel entries */
502 		InvalidateSystemCaches();
503 
504 		PG_RE_THROW();
505 	}
506 	PG_END_TRY();
507 
508 	return retlsn;
509 }
510 
511 /*
512  * SQL function for moving the position in a replication slot.
513  */
514 Datum
515 pg_replication_slot_advance(PG_FUNCTION_ARGS)
516 {
517 	Name		slotname = PG_GETARG_NAME(0);
518 	XLogRecPtr	moveto = PG_GETARG_LSN(1);
519 	XLogRecPtr	endlsn;
520 	XLogRecPtr	minlsn;
521 	TupleDesc	tupdesc;
522 	Datum		values[2];
523 	bool		nulls[2];
524 	HeapTuple	tuple;
525 	Datum		result;
526 
527 	Assert(!MyReplicationSlot);
528 
529 	check_permissions();
530 
531 	if (XLogRecPtrIsInvalid(moveto))
532 		ereport(ERROR,
533 				(errmsg("invalid target WAL LSN")));
534 
535 	/* Build a tuple descriptor for our result type */
536 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
537 		elog(ERROR, "return type must be a row type");
538 
539 	/*
540 	 * We can't move slot past what's been flushed/replayed so clamp the
541 	 * target position accordingly.
542 	 */
543 	if (!RecoveryInProgress())
544 		moveto = Min(moveto, GetFlushRecPtr());
545 	else
546 		moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
547 
548 	/* Acquire the slot so we "own" it */
549 	ReplicationSlotAcquire(NameStr(*slotname), true);
550 
551 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
552 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
553 		ereport(ERROR,
554 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
555 				 errmsg("cannot advance replication slot that has not previously reserved WAL")));
556 
557 	/*
558 	 * Check if the slot is not moving backwards.  Physical slots rely simply
559 	 * on restart_lsn as a minimum point, while logical slots have confirmed
560 	 * consumption up to confirmed_flush, meaning that in both cases data
561 	 * older than that is not available anymore.
562 	 */
563 	if (OidIsValid(MyReplicationSlot->data.database))
564 		minlsn = MyReplicationSlot->data.confirmed_flush;
565 	else
566 		minlsn = MyReplicationSlot->data.restart_lsn;
567 
568 	if (moveto < minlsn)
569 		ereport(ERROR,
570 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
571 				 errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
572 						(uint32) (moveto >> 32), (uint32) moveto,
573 						(uint32) (minlsn >> 32), (uint32) minlsn)));
574 
575 	/* Do the actual slot update, depending on the slot type */
576 	if (OidIsValid(MyReplicationSlot->data.database))
577 		endlsn = pg_logical_replication_slot_advance(moveto);
578 	else
579 		endlsn = pg_physical_replication_slot_advance(moveto);
580 
581 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
582 	nulls[0] = false;
583 
584 	/*
585 	 * Recompute the minimum LSN and xmin across all slots to adjust with the
586 	 * advancing potentially done.
587 	 */
588 	ReplicationSlotsComputeRequiredXmin(false);
589 	ReplicationSlotsComputeRequiredLSN();
590 
591 	ReplicationSlotRelease();
592 
593 	/* Return the reached position. */
594 	values[1] = LSNGetDatum(endlsn);
595 	nulls[1] = false;
596 
597 	tuple = heap_form_tuple(tupdesc, values, nulls);
598 	result = HeapTupleGetDatum(tuple);
599 
600 	PG_RETURN_DATUM(result);
601 }
602 
603 /*
604  * Helper function of copying a replication slot.
605  */
606 static Datum
607 copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
608 {
609 	Name		src_name = PG_GETARG_NAME(0);
610 	Name		dst_name = PG_GETARG_NAME(1);
611 	ReplicationSlot *src = NULL;
612 	ReplicationSlot first_slot_contents;
613 	ReplicationSlot second_slot_contents;
614 	XLogRecPtr	src_restart_lsn;
615 	bool		src_islogical;
616 	bool		temporary;
617 	char	   *plugin;
618 	Datum		values[2];
619 	bool		nulls[2];
620 	Datum		result;
621 	TupleDesc	tupdesc;
622 	HeapTuple	tuple;
623 
624 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
625 		elog(ERROR, "return type must be a row type");
626 
627 	check_permissions();
628 
629 	if (logical_slot)
630 		CheckLogicalDecodingRequirements();
631 	else
632 		CheckSlotRequirements();
633 
634 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
635 
636 	/*
637 	 * We need to prevent the source slot's reserved WAL from being removed,
638 	 * but we don't want to lock that slot for very long, and it can advance
639 	 * in the meantime.  So obtain the source slot's data, and create a new
640 	 * slot using its restart_lsn.  Afterwards we lock the source slot again
641 	 * and verify that the data we copied (name, type) has not changed
642 	 * incompatibly.  No inconvenient WAL removal can occur once the new slot
643 	 * is created -- but since WAL removal could have occurred before we
644 	 * managed to create the new slot, we advance the new slot's restart_lsn
645 	 * to the source slot's updated restart_lsn the second time we lock it.
646 	 */
647 	for (int i = 0; i < max_replication_slots; i++)
648 	{
649 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
650 
651 		if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
652 		{
653 			/* Copy the slot contents while holding spinlock */
654 			SpinLockAcquire(&s->mutex);
655 			first_slot_contents = *s;
656 			SpinLockRelease(&s->mutex);
657 			src = s;
658 			break;
659 		}
660 	}
661 
662 	LWLockRelease(ReplicationSlotControlLock);
663 
664 	if (src == NULL)
665 		ereport(ERROR,
666 				(errcode(ERRCODE_UNDEFINED_OBJECT),
667 				 errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
668 
669 	src_islogical = SlotIsLogical(&first_slot_contents);
670 	src_restart_lsn = first_slot_contents.data.restart_lsn;
671 	temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
672 	plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
673 
674 	/* Check type of replication slot */
675 	if (src_islogical != logical_slot)
676 		ereport(ERROR,
677 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
678 				 src_islogical ?
679 				 errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
680 						NameStr(*src_name)) :
681 				 errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
682 						NameStr(*src_name))));
683 
684 	/* Copying non-reserved slot doesn't make sense */
685 	if (XLogRecPtrIsInvalid(src_restart_lsn))
686 	{
687 		Assert(!logical_slot);
688 		ereport(ERROR,
689 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
690 				 (errmsg("cannot copy a replication slot that doesn't reserve WAL"))));
691 	}
692 
693 	/* Overwrite params from optional arguments */
694 	if (PG_NARGS() >= 3)
695 		temporary = PG_GETARG_BOOL(2);
696 	if (PG_NARGS() >= 4)
697 	{
698 		Assert(logical_slot);
699 		plugin = NameStr(*(PG_GETARG_NAME(3)));
700 	}
701 
702 	/* Create new slot and acquire it */
703 	if (logical_slot)
704 	{
705 		/*
706 		 * We must not try to read WAL, since we haven't reserved it yet --
707 		 * hence pass find_startpoint false.  confirmed_flush will be set
708 		 * below, by copying from the source slot.
709 		 */
710 		create_logical_replication_slot(NameStr(*dst_name),
711 										plugin,
712 										temporary,
713 										src_restart_lsn,
714 										false);
715 	}
716 	else
717 		create_physical_replication_slot(NameStr(*dst_name),
718 										 true,
719 										 temporary,
720 										 src_restart_lsn);
721 
722 	/*
723 	 * Update the destination slot to current values of the source slot;
724 	 * recheck that the source slot is still the one we saw previously.
725 	 */
726 	{
727 		TransactionId copy_effective_xmin;
728 		TransactionId copy_effective_catalog_xmin;
729 		TransactionId copy_xmin;
730 		TransactionId copy_catalog_xmin;
731 		XLogRecPtr	copy_restart_lsn;
732 		XLogRecPtr	copy_confirmed_flush;
733 		bool		copy_islogical;
734 		char	   *copy_name;
735 
736 		/* Copy data of source slot again */
737 		SpinLockAcquire(&src->mutex);
738 		second_slot_contents = *src;
739 		SpinLockRelease(&src->mutex);
740 
741 		copy_effective_xmin = second_slot_contents.effective_xmin;
742 		copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
743 
744 		copy_xmin = second_slot_contents.data.xmin;
745 		copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
746 		copy_restart_lsn = second_slot_contents.data.restart_lsn;
747 		copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
748 
749 		/* for existence check */
750 		copy_name = NameStr(second_slot_contents.data.name);
751 		copy_islogical = SlotIsLogical(&second_slot_contents);
752 
753 		/*
754 		 * Check if the source slot still exists and is valid. We regard it as
755 		 * invalid if the type of replication slot or name has been changed,
756 		 * or the restart_lsn either is invalid or has gone backward. (The
757 		 * restart_lsn could go backwards if the source slot is dropped and
758 		 * copied from an older slot during installation.)
759 		 *
760 		 * Since erroring out will release and drop the destination slot we
761 		 * don't need to release it here.
762 		 */
763 		if (copy_restart_lsn < src_restart_lsn ||
764 			src_islogical != copy_islogical ||
765 			strcmp(copy_name, NameStr(*src_name)) != 0)
766 			ereport(ERROR,
767 					(errmsg("could not copy replication slot \"%s\"",
768 							NameStr(*src_name)),
769 					 errdetail("The source replication slot was modified incompatibly during the copy operation.")));
770 
771 		/* The source slot must have a consistent snapshot */
772 		if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
773 			ereport(ERROR,
774 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
775 					 errmsg("cannot copy unfinished logical replication slot \"%s\"",
776 							NameStr(*src_name)),
777 					 errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
778 
779 		/* Install copied values again */
780 		SpinLockAcquire(&MyReplicationSlot->mutex);
781 		MyReplicationSlot->effective_xmin = copy_effective_xmin;
782 		MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
783 
784 		MyReplicationSlot->data.xmin = copy_xmin;
785 		MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
786 		MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
787 		MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
788 		SpinLockRelease(&MyReplicationSlot->mutex);
789 
790 		ReplicationSlotMarkDirty();
791 		ReplicationSlotsComputeRequiredXmin(false);
792 		ReplicationSlotsComputeRequiredLSN();
793 		ReplicationSlotSave();
794 
795 #ifdef USE_ASSERT_CHECKING
796 		/* Check that the restart_lsn is available */
797 		{
798 			XLogSegNo	segno;
799 
800 			XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
801 			Assert(XLogGetLastRemovedSegno() < segno);
802 		}
803 #endif
804 	}
805 
806 	/* target slot fully created, mark as persistent if needed */
807 	if (logical_slot && !temporary)
808 		ReplicationSlotPersist();
809 
810 	/* All done.  Set up the return values */
811 	values[0] = NameGetDatum(dst_name);
812 	nulls[0] = false;
813 	if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
814 	{
815 		values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
816 		nulls[1] = false;
817 	}
818 	else
819 		nulls[1] = true;
820 
821 	tuple = heap_form_tuple(tupdesc, values, nulls);
822 	result = HeapTupleGetDatum(tuple);
823 
824 	ReplicationSlotRelease();
825 
826 	PG_RETURN_DATUM(result);
827 }
828 
829 /* The wrappers below are all to appease opr_sanity */
830 Datum
831 pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
832 {
833 	return copy_replication_slot(fcinfo, true);
834 }
835 
836 Datum
837 pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
838 {
839 	return copy_replication_slot(fcinfo, true);
840 }
841 
842 Datum
843 pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
844 {
845 	return copy_replication_slot(fcinfo, true);
846 }
847 
848 Datum
849 pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
850 {
851 	return copy_replication_slot(fcinfo, false);
852 }
853 
854 Datum
855 pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
856 {
857 	return copy_replication_slot(fcinfo, false);
858 }
859