1 /*-------------------------------------------------------------------------
2  *
3  * slotfuncs.c
4  *	   Support functions for replication slots
5  *
6  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *	  src/backend/replication/slotfuncs.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xlog_internal.h"
17 #include "access/xlogutils.h"
18 #include "funcapi.h"
19 #include "miscadmin.h"
20 #include "replication/decode.h"
21 #include "replication/logical.h"
22 #include "replication/slot.h"
23 #include "utils/builtins.h"
24 #include "utils/inval.h"
25 #include "utils/pg_lsn.h"
26 #include "utils/resowner.h"
27 
28 static void
check_permissions(void)29 check_permissions(void)
30 {
31 	if (!superuser() && !has_rolreplication(GetUserId()))
32 		ereport(ERROR,
33 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
34 				 errmsg("must be superuser or replication role to use replication slots")));
35 }
36 
37 /*
38  * Helper function for creating a new physical replication slot with
39  * given arguments. Note that this function doesn't release the created
40  * slot.
41  *
42  * If restart_lsn is a valid value, we use it without WAL reservation
43  * routine. So the caller must guarantee that WAL is available.
44  */
45 static void
create_physical_replication_slot(char * name,bool immediately_reserve,bool temporary,XLogRecPtr restart_lsn)46 create_physical_replication_slot(char *name, bool immediately_reserve,
47 								 bool temporary, XLogRecPtr restart_lsn)
48 {
49 	Assert(!MyReplicationSlot);
50 
51 	/* acquire replication slot, this will check for conflicting names */
52 	ReplicationSlotCreate(name, false,
53 						  temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
54 
55 	if (immediately_reserve)
56 	{
57 		/* Reserve WAL as the user asked for it */
58 		if (XLogRecPtrIsInvalid(restart_lsn))
59 			ReplicationSlotReserveWal();
60 		else
61 			MyReplicationSlot->data.restart_lsn = restart_lsn;
62 
63 		/* Write this slot to disk */
64 		ReplicationSlotMarkDirty();
65 		ReplicationSlotSave();
66 	}
67 }
68 
69 /*
70  * SQL function for creating a new physical (streaming replication)
71  * replication slot.
72  */
73 Datum
pg_create_physical_replication_slot(PG_FUNCTION_ARGS)74 pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
75 {
76 	Name		name = PG_GETARG_NAME(0);
77 	bool		immediately_reserve = PG_GETARG_BOOL(1);
78 	bool		temporary = PG_GETARG_BOOL(2);
79 	Datum		values[2];
80 	bool		nulls[2];
81 	TupleDesc	tupdesc;
82 	HeapTuple	tuple;
83 	Datum		result;
84 
85 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
86 		elog(ERROR, "return type must be a row type");
87 
88 	check_permissions();
89 
90 	CheckSlotRequirements();
91 
92 	create_physical_replication_slot(NameStr(*name),
93 									 immediately_reserve,
94 									 temporary,
95 									 InvalidXLogRecPtr);
96 
97 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
98 	nulls[0] = false;
99 
100 	if (immediately_reserve)
101 	{
102 		values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
103 		nulls[1] = false;
104 	}
105 	else
106 		nulls[1] = true;
107 
108 	tuple = heap_form_tuple(tupdesc, values, nulls);
109 	result = HeapTupleGetDatum(tuple);
110 
111 	ReplicationSlotRelease();
112 
113 	PG_RETURN_DATUM(result);
114 }
115 
116 
117 /*
118  * Helper function for creating a new logical replication slot with
119  * given arguments. Note that this function doesn't release the created
120  * slot.
121  *
122  * When find_startpoint is false, the slot's confirmed_flush is not set; it's
123  * caller's responsibility to ensure it's set to something sensible.
124  */
125 static void
create_logical_replication_slot(char * name,char * plugin,bool temporary,bool two_phase,XLogRecPtr restart_lsn,bool find_startpoint)126 create_logical_replication_slot(char *name, char *plugin,
127 								bool temporary, bool two_phase,
128 								XLogRecPtr restart_lsn,
129 								bool find_startpoint)
130 {
131 	LogicalDecodingContext *ctx = NULL;
132 
133 	Assert(!MyReplicationSlot);
134 
135 	/*
136 	 * Acquire a logical decoding slot, this will check for conflicting names.
137 	 * Initially create persistent slot as ephemeral - that allows us to
138 	 * nicely handle errors during initialization because it'll get dropped if
139 	 * this transaction fails. We'll make it persistent at the end. Temporary
140 	 * slots can be created as temporary from beginning as they get dropped on
141 	 * error as well.
142 	 */
143 	ReplicationSlotCreate(name, true,
144 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
145 
146 	/*
147 	 * Create logical decoding context to find start point or, if we don't
148 	 * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
149 	 *
150 	 * Note: when !find_startpoint this is still important, because it's at
151 	 * this point that the output plugin is validated.
152 	 */
153 	ctx = CreateInitDecodingContext(plugin, NIL,
154 									false,	/* just catalogs is OK */
155 									restart_lsn,
156 									XL_ROUTINE(.page_read = read_local_xlog_page,
157 											   .segment_open = wal_segment_open,
158 											   .segment_close = wal_segment_close),
159 									NULL, NULL, NULL);
160 
161 	/*
162 	 * If caller needs us to determine the decoding start point, do so now.
163 	 * This might take a while.
164 	 */
165 	if (find_startpoint)
166 		DecodingContextFindStartpoint(ctx);
167 
168 	/* don't need the decoding context anymore */
169 	FreeDecodingContext(ctx);
170 }
171 
172 /*
173  * SQL function for creating a new logical replication slot.
174  */
175 Datum
pg_create_logical_replication_slot(PG_FUNCTION_ARGS)176 pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
177 {
178 	Name		name = PG_GETARG_NAME(0);
179 	Name		plugin = PG_GETARG_NAME(1);
180 	bool		temporary = PG_GETARG_BOOL(2);
181 	bool		two_phase = PG_GETARG_BOOL(3);
182 	Datum		result;
183 	TupleDesc	tupdesc;
184 	HeapTuple	tuple;
185 	Datum		values[2];
186 	bool		nulls[2];
187 
188 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
189 		elog(ERROR, "return type must be a row type");
190 
191 	check_permissions();
192 
193 	CheckLogicalDecodingRequirements();
194 
195 	create_logical_replication_slot(NameStr(*name),
196 									NameStr(*plugin),
197 									temporary,
198 									two_phase,
199 									InvalidXLogRecPtr,
200 									true);
201 
202 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
203 	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
204 
205 	memset(nulls, 0, sizeof(nulls));
206 
207 	tuple = heap_form_tuple(tupdesc, values, nulls);
208 	result = HeapTupleGetDatum(tuple);
209 
210 	/* ok, slot is now fully created, mark it as persistent if needed */
211 	if (!temporary)
212 		ReplicationSlotPersist();
213 	ReplicationSlotRelease();
214 
215 	PG_RETURN_DATUM(result);
216 }
217 
218 
219 /*
220  * SQL function for dropping a replication slot.
221  */
222 Datum
pg_drop_replication_slot(PG_FUNCTION_ARGS)223 pg_drop_replication_slot(PG_FUNCTION_ARGS)
224 {
225 	Name		name = PG_GETARG_NAME(0);
226 
227 	check_permissions();
228 
229 	CheckSlotRequirements();
230 
231 	ReplicationSlotDrop(NameStr(*name), true);
232 
233 	PG_RETURN_VOID();
234 }
235 
236 /*
237  * pg_get_replication_slots - SQL SRF showing active replication slots.
238  */
239 Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)240 pg_get_replication_slots(PG_FUNCTION_ARGS)
241 {
242 #define PG_GET_REPLICATION_SLOTS_COLS 14
243 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
244 	TupleDesc	tupdesc;
245 	Tuplestorestate *tupstore;
246 	MemoryContext per_query_ctx;
247 	MemoryContext oldcontext;
248 	XLogRecPtr	currlsn;
249 	int			slotno;
250 
251 	/* check to see if caller supports us returning a tuplestore */
252 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
253 		ereport(ERROR,
254 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
255 				 errmsg("set-valued function called in context that cannot accept a set")));
256 	if (!(rsinfo->allowedModes & SFRM_Materialize))
257 		ereport(ERROR,
258 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
259 				 errmsg("materialize mode required, but it is not allowed in this context")));
260 
261 	/* Build a tuple descriptor for our result type */
262 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
263 		elog(ERROR, "return type must be a row type");
264 
265 	/*
266 	 * We don't require any special permission to see this function's data
267 	 * because nothing should be sensitive. The most critical being the slot
268 	 * name, which shouldn't contain anything particularly sensitive.
269 	 */
270 
271 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
272 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
273 
274 	tupstore = tuplestore_begin_heap(true, false, work_mem);
275 	rsinfo->returnMode = SFRM_Materialize;
276 	rsinfo->setResult = tupstore;
277 	rsinfo->setDesc = tupdesc;
278 
279 	MemoryContextSwitchTo(oldcontext);
280 
281 	currlsn = GetXLogWriteRecPtr();
282 
283 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
284 	for (slotno = 0; slotno < max_replication_slots; slotno++)
285 	{
286 		ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
287 		ReplicationSlot slot_contents;
288 		Datum		values[PG_GET_REPLICATION_SLOTS_COLS];
289 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
290 		WALAvailability walstate;
291 		int			i;
292 
293 		if (!slot->in_use)
294 			continue;
295 
296 		/* Copy slot contents while holding spinlock, then examine at leisure */
297 		SpinLockAcquire(&slot->mutex);
298 		slot_contents = *slot;
299 		SpinLockRelease(&slot->mutex);
300 
301 		memset(values, 0, sizeof(values));
302 		memset(nulls, 0, sizeof(nulls));
303 
304 		i = 0;
305 		values[i++] = NameGetDatum(&slot_contents.data.name);
306 
307 		if (slot_contents.data.database == InvalidOid)
308 			nulls[i++] = true;
309 		else
310 			values[i++] = NameGetDatum(&slot_contents.data.plugin);
311 
312 		if (slot_contents.data.database == InvalidOid)
313 			values[i++] = CStringGetTextDatum("physical");
314 		else
315 			values[i++] = CStringGetTextDatum("logical");
316 
317 		if (slot_contents.data.database == InvalidOid)
318 			nulls[i++] = true;
319 		else
320 			values[i++] = ObjectIdGetDatum(slot_contents.data.database);
321 
322 		values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
323 		values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
324 
325 		if (slot_contents.active_pid != 0)
326 			values[i++] = Int32GetDatum(slot_contents.active_pid);
327 		else
328 			nulls[i++] = true;
329 
330 		if (slot_contents.data.xmin != InvalidTransactionId)
331 			values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
332 		else
333 			nulls[i++] = true;
334 
335 		if (slot_contents.data.catalog_xmin != InvalidTransactionId)
336 			values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
337 		else
338 			nulls[i++] = true;
339 
340 		if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
341 			values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
342 		else
343 			nulls[i++] = true;
344 
345 		if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
346 			values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
347 		else
348 			nulls[i++] = true;
349 
350 		/*
351 		 * If invalidated_at is valid and restart_lsn is invalid, we know for
352 		 * certain that the slot has been invalidated.  Otherwise, test
353 		 * availability from restart_lsn.
354 		 */
355 		if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
356 			!XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
357 			walstate = WALAVAIL_REMOVED;
358 		else
359 			walstate = GetWALAvailability(slot_contents.data.restart_lsn);
360 
361 		switch (walstate)
362 		{
363 			case WALAVAIL_INVALID_LSN:
364 				nulls[i++] = true;
365 				break;
366 
367 			case WALAVAIL_RESERVED:
368 				values[i++] = CStringGetTextDatum("reserved");
369 				break;
370 
371 			case WALAVAIL_EXTENDED:
372 				values[i++] = CStringGetTextDatum("extended");
373 				break;
374 
375 			case WALAVAIL_UNRESERVED:
376 				values[i++] = CStringGetTextDatum("unreserved");
377 				break;
378 
379 			case WALAVAIL_REMOVED:
380 
381 				/*
382 				 * If we read the restart_lsn long enough ago, maybe that file
383 				 * has been removed by now.  However, the walsender could have
384 				 * moved forward enough that it jumped to another file after
385 				 * we looked.  If checkpointer signalled the process to
386 				 * termination, then it's definitely lost; but if a process is
387 				 * still alive, then "unreserved" seems more appropriate.
388 				 *
389 				 * If we do change it, save the state for safe_wal_size below.
390 				 */
391 				if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
392 				{
393 					int			pid;
394 
395 					SpinLockAcquire(&slot->mutex);
396 					pid = slot->active_pid;
397 					slot_contents.data.restart_lsn = slot->data.restart_lsn;
398 					SpinLockRelease(&slot->mutex);
399 					if (pid != 0)
400 					{
401 						values[i++] = CStringGetTextDatum("unreserved");
402 						walstate = WALAVAIL_UNRESERVED;
403 						break;
404 					}
405 				}
406 				values[i++] = CStringGetTextDatum("lost");
407 				break;
408 		}
409 
410 		/*
411 		 * safe_wal_size is only computed for slots that have not been lost,
412 		 * and only if there's a configured maximum size.
413 		 */
414 		if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
415 			nulls[i++] = true;
416 		else
417 		{
418 			XLogSegNo	targetSeg;
419 			uint64		slotKeepSegs;
420 			uint64		keepSegs;
421 			XLogSegNo	failSeg;
422 			XLogRecPtr	failLSN;
423 
424 			XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
425 
426 			/* determine how many segments slots can be kept by slots */
427 			slotKeepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
428 			/* ditto for wal_keep_size */
429 			keepSegs = XLogMBVarToSegs(wal_keep_size_mb, wal_segment_size);
430 
431 			/* if currpos reaches failLSN, we lose our segment */
432 			failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
433 			XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
434 
435 			values[i++] = Int64GetDatum(failLSN - currlsn);
436 		}
437 
438 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
439 
440 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
441 
442 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
443 	}
444 
445 	LWLockRelease(ReplicationSlotControlLock);
446 
447 	tuplestore_donestoring(tupstore);
448 
449 	return (Datum) 0;
450 }
451 
452 /*
453  * Helper function for advancing our physical replication slot forward.
454  *
455  * The LSN position to move to is compared simply to the slot's restart_lsn,
456  * knowing that any position older than that would be removed by successive
457  * checkpoints.
458  */
459 static XLogRecPtr
pg_physical_replication_slot_advance(XLogRecPtr moveto)460 pg_physical_replication_slot_advance(XLogRecPtr moveto)
461 {
462 	XLogRecPtr	startlsn = MyReplicationSlot->data.restart_lsn;
463 	XLogRecPtr	retlsn = startlsn;
464 
465 	Assert(moveto != InvalidXLogRecPtr);
466 
467 	if (startlsn < moveto)
468 	{
469 		SpinLockAcquire(&MyReplicationSlot->mutex);
470 		MyReplicationSlot->data.restart_lsn = moveto;
471 		SpinLockRelease(&MyReplicationSlot->mutex);
472 		retlsn = moveto;
473 
474 		/*
475 		 * Dirty the slot so as it is written out at the next checkpoint. Note
476 		 * that the LSN position advanced may still be lost in the event of a
477 		 * crash, but this makes the data consistent after a clean shutdown.
478 		 */
479 		ReplicationSlotMarkDirty();
480 	}
481 
482 	return retlsn;
483 }
484 
485 /*
486  * Helper function for advancing our logical replication slot forward.
487  *
488  * The slot's restart_lsn is used as start point for reading records, while
489  * confirmed_flush is used as base point for the decoding context.
490  *
491  * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
492  * because we need to digest WAL to advance restart_lsn allowing to recycle
493  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
494  * mode, no changes are generated anyway.
495  */
496 static XLogRecPtr
pg_logical_replication_slot_advance(XLogRecPtr moveto)497 pg_logical_replication_slot_advance(XLogRecPtr moveto)
498 {
499 	LogicalDecodingContext *ctx;
500 	ResourceOwner old_resowner = CurrentResourceOwner;
501 	XLogRecPtr	retlsn;
502 
503 	Assert(moveto != InvalidXLogRecPtr);
504 
505 	PG_TRY();
506 	{
507 		/*
508 		 * Create our decoding context in fast_forward mode, passing start_lsn
509 		 * as InvalidXLogRecPtr, so that we start processing from my slot's
510 		 * confirmed_flush.
511 		 */
512 		ctx = CreateDecodingContext(InvalidXLogRecPtr,
513 									NIL,
514 									true,	/* fast_forward */
515 									XL_ROUTINE(.page_read = read_local_xlog_page,
516 											   .segment_open = wal_segment_open,
517 											   .segment_close = wal_segment_close),
518 									NULL, NULL, NULL);
519 
520 		/*
521 		 * Start reading at the slot's restart_lsn, which we know to point to
522 		 * a valid record.
523 		 */
524 		XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
525 
526 		/* invalidate non-timetravel entries */
527 		InvalidateSystemCaches();
528 
529 		/* Decode at least one record, until we run out of records */
530 		while (ctx->reader->EndRecPtr < moveto)
531 		{
532 			char	   *errm = NULL;
533 			XLogRecord *record;
534 
535 			/*
536 			 * Read records.  No changes are generated in fast_forward mode,
537 			 * but snapbuilder/slot statuses are updated properly.
538 			 */
539 			record = XLogReadRecord(ctx->reader, &errm);
540 			if (errm)
541 				elog(ERROR, "%s", errm);
542 
543 			/*
544 			 * Process the record.  Storage-level changes are ignored in
545 			 * fast_forward mode, but other modules (such as snapbuilder)
546 			 * might still have critical updates to do.
547 			 */
548 			if (record)
549 				LogicalDecodingProcessRecord(ctx, ctx->reader);
550 
551 			/* Stop once the requested target has been reached */
552 			if (moveto <= ctx->reader->EndRecPtr)
553 				break;
554 
555 			CHECK_FOR_INTERRUPTS();
556 		}
557 
558 		/*
559 		 * Logical decoding could have clobbered CurrentResourceOwner during
560 		 * transaction management, so restore the executor's value.  (This is
561 		 * a kluge, but it's not worth cleaning up right now.)
562 		 */
563 		CurrentResourceOwner = old_resowner;
564 
565 		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
566 		{
567 			LogicalConfirmReceivedLocation(moveto);
568 
569 			/*
570 			 * If only the confirmed_flush LSN has changed the slot won't get
571 			 * marked as dirty by the above. Callers on the walsender
572 			 * interface are expected to keep track of their own progress and
573 			 * don't need it written out. But SQL-interface users cannot
574 			 * specify their own start positions and it's harder for them to
575 			 * keep track of their progress, so we should make more of an
576 			 * effort to save it for them.
577 			 *
578 			 * Dirty the slot so it is written out at the next checkpoint. The
579 			 * LSN position advanced to may still be lost on a crash but this
580 			 * makes the data consistent after a clean shutdown.
581 			 */
582 			ReplicationSlotMarkDirty();
583 		}
584 
585 		retlsn = MyReplicationSlot->data.confirmed_flush;
586 
587 		/* free context, call shutdown callback */
588 		FreeDecodingContext(ctx);
589 
590 		InvalidateSystemCaches();
591 	}
592 	PG_CATCH();
593 	{
594 		/* clear all timetravel entries */
595 		InvalidateSystemCaches();
596 
597 		PG_RE_THROW();
598 	}
599 	PG_END_TRY();
600 
601 	return retlsn;
602 }
603 
604 /*
605  * SQL function for moving the position in a replication slot.
606  */
607 Datum
pg_replication_slot_advance(PG_FUNCTION_ARGS)608 pg_replication_slot_advance(PG_FUNCTION_ARGS)
609 {
610 	Name		slotname = PG_GETARG_NAME(0);
611 	XLogRecPtr	moveto = PG_GETARG_LSN(1);
612 	XLogRecPtr	endlsn;
613 	XLogRecPtr	minlsn;
614 	TupleDesc	tupdesc;
615 	Datum		values[2];
616 	bool		nulls[2];
617 	HeapTuple	tuple;
618 	Datum		result;
619 
620 	Assert(!MyReplicationSlot);
621 
622 	check_permissions();
623 
624 	if (XLogRecPtrIsInvalid(moveto))
625 		ereport(ERROR,
626 				(errmsg("invalid target WAL LSN")));
627 
628 	/* Build a tuple descriptor for our result type */
629 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
630 		elog(ERROR, "return type must be a row type");
631 
632 	/*
633 	 * We can't move slot past what's been flushed/replayed so clamp the
634 	 * target position accordingly.
635 	 */
636 	if (!RecoveryInProgress())
637 		moveto = Min(moveto, GetFlushRecPtr());
638 	else
639 		moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
640 
641 	/* Acquire the slot so we "own" it */
642 	ReplicationSlotAcquire(NameStr(*slotname), true);
643 
644 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
645 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
646 		ereport(ERROR,
647 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
648 				 errmsg("replication slot \"%s\" cannot be advanced",
649 						NameStr(*slotname)),
650 				 errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
651 
652 	/*
653 	 * Check if the slot is not moving backwards.  Physical slots rely simply
654 	 * on restart_lsn as a minimum point, while logical slots have confirmed
655 	 * consumption up to confirmed_flush, meaning that in both cases data
656 	 * older than that is not available anymore.
657 	 */
658 	if (OidIsValid(MyReplicationSlot->data.database))
659 		minlsn = MyReplicationSlot->data.confirmed_flush;
660 	else
661 		minlsn = MyReplicationSlot->data.restart_lsn;
662 
663 	if (moveto < minlsn)
664 		ereport(ERROR,
665 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
666 				 errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
667 						LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
668 
669 	/* Do the actual slot update, depending on the slot type */
670 	if (OidIsValid(MyReplicationSlot->data.database))
671 		endlsn = pg_logical_replication_slot_advance(moveto);
672 	else
673 		endlsn = pg_physical_replication_slot_advance(moveto);
674 
675 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
676 	nulls[0] = false;
677 
678 	/*
679 	 * Recompute the minimum LSN and xmin across all slots to adjust with the
680 	 * advancing potentially done.
681 	 */
682 	ReplicationSlotsComputeRequiredXmin(false);
683 	ReplicationSlotsComputeRequiredLSN();
684 
685 	ReplicationSlotRelease();
686 
687 	/* Return the reached position. */
688 	values[1] = LSNGetDatum(endlsn);
689 	nulls[1] = false;
690 
691 	tuple = heap_form_tuple(tupdesc, values, nulls);
692 	result = HeapTupleGetDatum(tuple);
693 
694 	PG_RETURN_DATUM(result);
695 }
696 
697 /*
698  * Helper function of copying a replication slot.
699  */
700 static Datum
copy_replication_slot(FunctionCallInfo fcinfo,bool logical_slot)701 copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
702 {
703 	Name		src_name = PG_GETARG_NAME(0);
704 	Name		dst_name = PG_GETARG_NAME(1);
705 	ReplicationSlot *src = NULL;
706 	ReplicationSlot first_slot_contents;
707 	ReplicationSlot second_slot_contents;
708 	XLogRecPtr	src_restart_lsn;
709 	bool		src_islogical;
710 	bool		temporary;
711 	char	   *plugin;
712 	Datum		values[2];
713 	bool		nulls[2];
714 	Datum		result;
715 	TupleDesc	tupdesc;
716 	HeapTuple	tuple;
717 
718 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
719 		elog(ERROR, "return type must be a row type");
720 
721 	check_permissions();
722 
723 	if (logical_slot)
724 		CheckLogicalDecodingRequirements();
725 	else
726 		CheckSlotRequirements();
727 
728 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
729 
730 	/*
731 	 * We need to prevent the source slot's reserved WAL from being removed,
732 	 * but we don't want to lock that slot for very long, and it can advance
733 	 * in the meantime.  So obtain the source slot's data, and create a new
734 	 * slot using its restart_lsn.  Afterwards we lock the source slot again
735 	 * and verify that the data we copied (name, type) has not changed
736 	 * incompatibly.  No inconvenient WAL removal can occur once the new slot
737 	 * is created -- but since WAL removal could have occurred before we
738 	 * managed to create the new slot, we advance the new slot's restart_lsn
739 	 * to the source slot's updated restart_lsn the second time we lock it.
740 	 */
741 	for (int i = 0; i < max_replication_slots; i++)
742 	{
743 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
744 
745 		if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
746 		{
747 			/* Copy the slot contents while holding spinlock */
748 			SpinLockAcquire(&s->mutex);
749 			first_slot_contents = *s;
750 			SpinLockRelease(&s->mutex);
751 			src = s;
752 			break;
753 		}
754 	}
755 
756 	LWLockRelease(ReplicationSlotControlLock);
757 
758 	if (src == NULL)
759 		ereport(ERROR,
760 				(errcode(ERRCODE_UNDEFINED_OBJECT),
761 				 errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
762 
763 	src_islogical = SlotIsLogical(&first_slot_contents);
764 	src_restart_lsn = first_slot_contents.data.restart_lsn;
765 	temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
766 	plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
767 
768 	/* Check type of replication slot */
769 	if (src_islogical != logical_slot)
770 		ereport(ERROR,
771 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
772 				 src_islogical ?
773 				 errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
774 						NameStr(*src_name)) :
775 				 errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
776 						NameStr(*src_name))));
777 
778 	/* Copying non-reserved slot doesn't make sense */
779 	if (XLogRecPtrIsInvalid(src_restart_lsn))
780 		ereport(ERROR,
781 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
782 				 errmsg("cannot copy a replication slot that doesn't reserve WAL")));
783 
784 	/* Overwrite params from optional arguments */
785 	if (PG_NARGS() >= 3)
786 		temporary = PG_GETARG_BOOL(2);
787 	if (PG_NARGS() >= 4)
788 	{
789 		Assert(logical_slot);
790 		plugin = NameStr(*(PG_GETARG_NAME(3)));
791 	}
792 
793 	/* Create new slot and acquire it */
794 	if (logical_slot)
795 	{
796 		/*
797 		 * We must not try to read WAL, since we haven't reserved it yet --
798 		 * hence pass find_startpoint false.  confirmed_flush will be set
799 		 * below, by copying from the source slot.
800 		 */
801 		create_logical_replication_slot(NameStr(*dst_name),
802 										plugin,
803 										temporary,
804 										false,
805 										src_restart_lsn,
806 										false);
807 	}
808 	else
809 		create_physical_replication_slot(NameStr(*dst_name),
810 										 true,
811 										 temporary,
812 										 src_restart_lsn);
813 
814 	/*
815 	 * Update the destination slot to current values of the source slot;
816 	 * recheck that the source slot is still the one we saw previously.
817 	 */
818 	{
819 		TransactionId copy_effective_xmin;
820 		TransactionId copy_effective_catalog_xmin;
821 		TransactionId copy_xmin;
822 		TransactionId copy_catalog_xmin;
823 		XLogRecPtr	copy_restart_lsn;
824 		XLogRecPtr	copy_confirmed_flush;
825 		bool		copy_islogical;
826 		char	   *copy_name;
827 
828 		/* Copy data of source slot again */
829 		SpinLockAcquire(&src->mutex);
830 		second_slot_contents = *src;
831 		SpinLockRelease(&src->mutex);
832 
833 		copy_effective_xmin = second_slot_contents.effective_xmin;
834 		copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
835 
836 		copy_xmin = second_slot_contents.data.xmin;
837 		copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
838 		copy_restart_lsn = second_slot_contents.data.restart_lsn;
839 		copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
840 
841 		/* for existence check */
842 		copy_name = NameStr(second_slot_contents.data.name);
843 		copy_islogical = SlotIsLogical(&second_slot_contents);
844 
845 		/*
846 		 * Check if the source slot still exists and is valid. We regard it as
847 		 * invalid if the type of replication slot or name has been changed,
848 		 * or the restart_lsn either is invalid or has gone backward. (The
849 		 * restart_lsn could go backwards if the source slot is dropped and
850 		 * copied from an older slot during installation.)
851 		 *
852 		 * Since erroring out will release and drop the destination slot we
853 		 * don't need to release it here.
854 		 */
855 		if (copy_restart_lsn < src_restart_lsn ||
856 			src_islogical != copy_islogical ||
857 			strcmp(copy_name, NameStr(*src_name)) != 0)
858 			ereport(ERROR,
859 					(errmsg("could not copy replication slot \"%s\"",
860 							NameStr(*src_name)),
861 					 errdetail("The source replication slot was modified incompatibly during the copy operation.")));
862 
863 		/* The source slot must have a consistent snapshot */
864 		if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
865 			ereport(ERROR,
866 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
867 					 errmsg("cannot copy unfinished logical replication slot \"%s\"",
868 							NameStr(*src_name)),
869 					 errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
870 
871 		/* Install copied values again */
872 		SpinLockAcquire(&MyReplicationSlot->mutex);
873 		MyReplicationSlot->effective_xmin = copy_effective_xmin;
874 		MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
875 
876 		MyReplicationSlot->data.xmin = copy_xmin;
877 		MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
878 		MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
879 		MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
880 		SpinLockRelease(&MyReplicationSlot->mutex);
881 
882 		ReplicationSlotMarkDirty();
883 		ReplicationSlotsComputeRequiredXmin(false);
884 		ReplicationSlotsComputeRequiredLSN();
885 		ReplicationSlotSave();
886 
887 #ifdef USE_ASSERT_CHECKING
888 		/* Check that the restart_lsn is available */
889 		{
890 			XLogSegNo	segno;
891 
892 			XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
893 			Assert(XLogGetLastRemovedSegno() < segno);
894 		}
895 #endif
896 	}
897 
898 	/* target slot fully created, mark as persistent if needed */
899 	if (logical_slot && !temporary)
900 		ReplicationSlotPersist();
901 
902 	/* All done.  Set up the return values */
903 	values[0] = NameGetDatum(dst_name);
904 	nulls[0] = false;
905 	if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
906 	{
907 		values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
908 		nulls[1] = false;
909 	}
910 	else
911 		nulls[1] = true;
912 
913 	tuple = heap_form_tuple(tupdesc, values, nulls);
914 	result = HeapTupleGetDatum(tuple);
915 
916 	ReplicationSlotRelease();
917 
918 	PG_RETURN_DATUM(result);
919 }
920 
921 /* The wrappers below are all to appease opr_sanity */
922 Datum
pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)923 pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
924 {
925 	return copy_replication_slot(fcinfo, true);
926 }
927 
928 Datum
pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)929 pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
930 {
931 	return copy_replication_slot(fcinfo, true);
932 }
933 
934 Datum
pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)935 pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
936 {
937 	return copy_replication_slot(fcinfo, true);
938 }
939 
940 Datum
pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)941 pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
942 {
943 	return copy_replication_slot(fcinfo, false);
944 }
945 
946 Datum
pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)947 pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
948 {
949 	return copy_replication_slot(fcinfo, false);
950 }
951