1 /*-------------------------------------------------------------------------
2  * txid.c
3  *
4  *	Export internal transaction IDs to user level.
5  *
6  * Note that only top-level transaction IDs are ever converted to TXID.
7  * This is important because TXIDs frequently persist beyond the global
8  * xmin horizon, or may even be shipped to other machines, so we cannot
9  * rely on being able to correlate subtransaction IDs with their parents
10  * via functions such as SubTransGetTopmostTransaction().
11  *
12  *
13  *	Copyright (c) 2003-2019, PostgreSQL Global Development Group
14  *	Author: Jan Wieck, Afilias USA INC.
15  *	64-bit txids: Marko Kreen, Skype Technologies
16  *
17  *	src/backend/utils/adt/txid.c
18  *
19  *-------------------------------------------------------------------------
20  */
21 
22 #include "postgres.h"
23 
24 #include "access/clog.h"
25 #include "access/transam.h"
26 #include "access/xact.h"
27 #include "access/xlog.h"
28 #include "funcapi.h"
29 #include "miscadmin.h"
30 #include "libpq/pqformat.h"
31 #include "postmaster/postmaster.h"
32 #include "storage/lwlock.h"
33 #include "utils/builtins.h"
34 #include "utils/memutils.h"
35 #include "utils/snapmgr.h"
36 
37 
38 /* txid will be signed int8 in database, so must limit to 63 bits */
39 #define MAX_TXID   ((uint64) PG_INT64_MAX)
40 
41 /* Use unsigned variant internally */
42 typedef uint64 txid;
43 
44 /* sprintf format code for uint64 */
45 #define TXID_FMT UINT64_FORMAT
46 
47 /*
48  * If defined, use bsearch() function for searching for txids in snapshots
49  * that have more than the specified number of values.
50  */
51 #define USE_BSEARCH_IF_NXIP_GREATER 30
52 
53 
54 /*
55  * Snapshot containing 8byte txids.
56  */
57 typedef struct
58 {
59 	/*
60 	 * 4-byte length hdr, should not be touched directly.
61 	 *
62 	 * Explicit embedding is ok as we want always correct alignment anyway.
63 	 */
64 	int32		__varsz;
65 
66 	uint32		nxip;			/* number of txids in xip array */
67 	txid		xmin;
68 	txid		xmax;
69 	/* in-progress txids, xmin <= xip[i] < xmax: */
70 	txid		xip[FLEXIBLE_ARRAY_MEMBER];
71 } TxidSnapshot;
72 
73 #define TXID_SNAPSHOT_SIZE(nxip) \
74 	(offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
75 #define TXID_SNAPSHOT_MAX_NXIP \
76 	((MaxAllocSize - offsetof(TxidSnapshot, xip)) / sizeof(txid))
77 
78 /*
79  * Epoch values from xact.c
80  */
81 typedef struct
82 {
83 	TransactionId last_xid;
84 	uint32		epoch;
85 } TxidEpoch;
86 
87 
88 /*
89  * Fetch epoch data from xact.c.
90  */
91 static void
load_xid_epoch(TxidEpoch * state)92 load_xid_epoch(TxidEpoch *state)
93 {
94 	FullTransactionId fullXid = ReadNextFullTransactionId();
95 
96 	state->last_xid = XidFromFullTransactionId(fullXid);
97 	state->epoch = EpochFromFullTransactionId(fullXid);
98 }
99 
100 /*
101  * Helper to get a TransactionId from a 64-bit xid with wraparound detection.
102  *
103  * It is an ERROR if the xid is in the future.  Otherwise, returns true if
104  * the transaction is still new enough that we can determine whether it
105  * committed and false otherwise.  If *extracted_xid is not NULL, it is set
106  * to the low 32 bits of the transaction ID (i.e. the actual XID, without the
107  * epoch).
108  *
109  * The caller must hold CLogTruncationLock since it's dealing with arbitrary
110  * XIDs, and must continue to hold it until it's done with any clog lookups
111  * relating to those XIDs.
112  */
113 static bool
TransactionIdInRecentPast(uint64 xid_with_epoch,TransactionId * extracted_xid)114 TransactionIdInRecentPast(uint64 xid_with_epoch, TransactionId *extracted_xid)
115 {
116 	uint32		xid_epoch = (uint32) (xid_with_epoch >> 32);
117 	TransactionId xid = (TransactionId) xid_with_epoch;
118 	uint32		now_epoch;
119 	TransactionId now_epoch_next_xid;
120 	FullTransactionId now_fullxid;
121 
122 	now_fullxid = ReadNextFullTransactionId();
123 	now_epoch_next_xid = XidFromFullTransactionId(now_fullxid);
124 	now_epoch = EpochFromFullTransactionId(now_fullxid);
125 
126 	if (extracted_xid != NULL)
127 		*extracted_xid = xid;
128 
129 	if (!TransactionIdIsValid(xid))
130 		return false;
131 
132 	/* For non-normal transaction IDs, we can ignore the epoch. */
133 	if (!TransactionIdIsNormal(xid))
134 		return true;
135 
136 	/* If the transaction ID is in the future, throw an error. */
137 	if (xid_with_epoch >= U64FromFullTransactionId(now_fullxid))
138 		ereport(ERROR,
139 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
140 				 errmsg("transaction ID %s is in the future",
141 						psprintf(UINT64_FORMAT, xid_with_epoch))));
142 
143 	/*
144 	 * ShmemVariableCache->oldestClogXid is protected by CLogTruncationLock,
145 	 * but we don't acquire that lock here.  Instead, we require the caller to
146 	 * acquire it, because the caller is presumably going to look up the
147 	 * returned XID.  If we took and released the lock within this function, a
148 	 * CLOG truncation could occur before the caller finished with the XID.
149 	 */
150 	Assert(LWLockHeldByMe(CLogTruncationLock));
151 
152 	/*
153 	 * If the transaction ID has wrapped around, it's definitely too old to
154 	 * determine the commit status.  Otherwise, we can compare it to
155 	 * ShmemVariableCache->oldestClogXid to determine whether the relevant
156 	 * CLOG entry is guaranteed to still exist.
157 	 */
158 	if (xid_epoch + 1 < now_epoch
159 		|| (xid_epoch + 1 == now_epoch && xid < now_epoch_next_xid)
160 		|| TransactionIdPrecedes(xid, ShmemVariableCache->oldestClogXid))
161 		return false;
162 
163 	return true;
164 }
165 
166 /*
167  * do a TransactionId -> txid conversion for an XID near the given epoch
168  */
169 static txid
convert_xid(TransactionId xid,const TxidEpoch * state)170 convert_xid(TransactionId xid, const TxidEpoch *state)
171 {
172 	uint64		epoch;
173 
174 	/* return special xid's as-is */
175 	if (!TransactionIdIsNormal(xid))
176 		return (txid) xid;
177 
178 	/* xid can be on either side when near wrap-around */
179 	epoch = (uint64) state->epoch;
180 	if (xid > state->last_xid &&
181 		TransactionIdPrecedes(xid, state->last_xid))
182 		epoch--;
183 	else if (xid < state->last_xid &&
184 			 TransactionIdFollows(xid, state->last_xid))
185 		epoch++;
186 
187 	return (epoch << 32) | xid;
188 }
189 
190 /*
191  * txid comparator for qsort/bsearch
192  */
193 static int
cmp_txid(const void * aa,const void * bb)194 cmp_txid(const void *aa, const void *bb)
195 {
196 	txid		a = *(const txid *) aa;
197 	txid		b = *(const txid *) bb;
198 
199 	if (a < b)
200 		return -1;
201 	if (a > b)
202 		return 1;
203 	return 0;
204 }
205 
206 /*
207  * Sort a snapshot's txids, so we can use bsearch() later.  Also remove
208  * any duplicates.
209  *
210  * For consistency of on-disk representation, we always sort even if bsearch
211  * will not be used.
212  */
213 static void
sort_snapshot(TxidSnapshot * snap)214 sort_snapshot(TxidSnapshot *snap)
215 {
216 	txid		last = 0;
217 	int			nxip,
218 				idx1,
219 				idx2;
220 
221 	if (snap->nxip > 1)
222 	{
223 		qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
224 
225 		/* remove duplicates */
226 		nxip = snap->nxip;
227 		idx1 = idx2 = 0;
228 		while (idx1 < nxip)
229 		{
230 			if (snap->xip[idx1] != last)
231 				last = snap->xip[idx2++] = snap->xip[idx1];
232 			else
233 				snap->nxip--;
234 			idx1++;
235 		}
236 	}
237 }
238 
239 /*
240  * check txid visibility.
241  */
242 static bool
is_visible_txid(txid value,const TxidSnapshot * snap)243 is_visible_txid(txid value, const TxidSnapshot *snap)
244 {
245 	if (value < snap->xmin)
246 		return true;
247 	else if (value >= snap->xmax)
248 		return false;
249 #ifdef USE_BSEARCH_IF_NXIP_GREATER
250 	else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
251 	{
252 		void	   *res;
253 
254 		res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
255 		/* if found, transaction is still in progress */
256 		return (res) ? false : true;
257 	}
258 #endif
259 	else
260 	{
261 		uint32		i;
262 
263 		for (i = 0; i < snap->nxip; i++)
264 		{
265 			if (value == snap->xip[i])
266 				return false;
267 		}
268 		return true;
269 	}
270 }
271 
272 /*
273  * helper functions to use StringInfo for TxidSnapshot creation.
274  */
275 
276 static StringInfo
buf_init(txid xmin,txid xmax)277 buf_init(txid xmin, txid xmax)
278 {
279 	TxidSnapshot snap;
280 	StringInfo	buf;
281 
282 	snap.xmin = xmin;
283 	snap.xmax = xmax;
284 	snap.nxip = 0;
285 
286 	buf = makeStringInfo();
287 	appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
288 	return buf;
289 }
290 
291 static void
buf_add_txid(StringInfo buf,txid xid)292 buf_add_txid(StringInfo buf, txid xid)
293 {
294 	TxidSnapshot *snap = (TxidSnapshot *) buf->data;
295 
296 	/* do this before possible realloc */
297 	snap->nxip++;
298 
299 	appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
300 }
301 
302 static TxidSnapshot *
buf_finalize(StringInfo buf)303 buf_finalize(StringInfo buf)
304 {
305 	TxidSnapshot *snap = (TxidSnapshot *) buf->data;
306 
307 	SET_VARSIZE(snap, buf->len);
308 
309 	/* buf is not needed anymore */
310 	buf->data = NULL;
311 	pfree(buf);
312 
313 	return snap;
314 }
315 
316 /*
317  * simple number parser.
318  *
319  * We return 0 on error, which is invalid value for txid.
320  */
321 static txid
str2txid(const char * s,const char ** endp)322 str2txid(const char *s, const char **endp)
323 {
324 	txid		val = 0;
325 	txid		cutoff = MAX_TXID / 10;
326 	txid		cutlim = MAX_TXID % 10;
327 
328 	for (; *s; s++)
329 	{
330 		unsigned	d;
331 
332 		if (*s < '0' || *s > '9')
333 			break;
334 		d = *s - '0';
335 
336 		/*
337 		 * check for overflow
338 		 */
339 		if (val > cutoff || (val == cutoff && d > cutlim))
340 		{
341 			val = 0;
342 			break;
343 		}
344 
345 		val = val * 10 + d;
346 	}
347 	if (endp)
348 		*endp = s;
349 	return val;
350 }
351 
352 /*
353  * parse snapshot from cstring
354  */
355 static TxidSnapshot *
parse_snapshot(const char * str)356 parse_snapshot(const char *str)
357 {
358 	txid		xmin;
359 	txid		xmax;
360 	txid		last_val = 0,
361 				val;
362 	const char *str_start = str;
363 	const char *endp;
364 	StringInfo	buf;
365 
366 	xmin = str2txid(str, &endp);
367 	if (*endp != ':')
368 		goto bad_format;
369 	str = endp + 1;
370 
371 	xmax = str2txid(str, &endp);
372 	if (*endp != ':')
373 		goto bad_format;
374 	str = endp + 1;
375 
376 	/* it should look sane */
377 	if (xmin == 0 || xmax == 0 || xmin > xmax)
378 		goto bad_format;
379 
380 	/* allocate buffer */
381 	buf = buf_init(xmin, xmax);
382 
383 	/* loop over values */
384 	while (*str != '\0')
385 	{
386 		/* read next value */
387 		val = str2txid(str, &endp);
388 		str = endp;
389 
390 		/* require the input to be in order */
391 		if (val < xmin || val >= xmax || val < last_val)
392 			goto bad_format;
393 
394 		/* skip duplicates */
395 		if (val != last_val)
396 			buf_add_txid(buf, val);
397 		last_val = val;
398 
399 		if (*str == ',')
400 			str++;
401 		else if (*str != '\0')
402 			goto bad_format;
403 	}
404 
405 	return buf_finalize(buf);
406 
407 bad_format:
408 	ereport(ERROR,
409 			(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
410 			 errmsg("invalid input syntax for type %s: \"%s\"",
411 					"txid_snapshot", str_start)));
412 	return NULL;				/* keep compiler quiet */
413 }
414 
415 /*
416  * Public functions.
417  *
418  * txid_current() and txid_current_snapshot() are the only ones that
419  * communicate with core xid machinery.  All the others work on data
420  * returned by them.
421  */
422 
423 /*
424  * txid_current() returns int8
425  *
426  *	Return the current toplevel transaction ID as TXID
427  *	If the current transaction does not have one, one is assigned.
428  *
429  *	This value has the epoch as the high 32 bits and the 32-bit xid
430  *	as the low 32 bits.
431  */
432 Datum
txid_current(PG_FUNCTION_ARGS)433 txid_current(PG_FUNCTION_ARGS)
434 {
435 	txid		val;
436 	TxidEpoch	state;
437 
438 	/*
439 	 * Must prevent during recovery because if an xid is not assigned we try
440 	 * to assign one, which would fail. Programs already rely on this function
441 	 * to always return a valid current xid, so we should not change this to
442 	 * return NULL or similar invalid xid.
443 	 */
444 	PreventCommandDuringRecovery("txid_current()");
445 
446 	load_xid_epoch(&state);
447 
448 	val = convert_xid(GetTopTransactionId(), &state);
449 
450 	PG_RETURN_INT64(val);
451 }
452 
453 /*
454  * Same as txid_current() but doesn't assign a new xid if there isn't one
455  * yet.
456  */
457 Datum
txid_current_if_assigned(PG_FUNCTION_ARGS)458 txid_current_if_assigned(PG_FUNCTION_ARGS)
459 {
460 	txid		val;
461 	TxidEpoch	state;
462 	TransactionId topxid = GetTopTransactionIdIfAny();
463 
464 	if (topxid == InvalidTransactionId)
465 		PG_RETURN_NULL();
466 
467 	load_xid_epoch(&state);
468 
469 	val = convert_xid(topxid, &state);
470 
471 	PG_RETURN_INT64(val);
472 }
473 
474 /*
475  * txid_current_snapshot() returns txid_snapshot
476  *
477  *		Return current snapshot in TXID format
478  *
479  * Note that only top-transaction XIDs are included in the snapshot.
480  */
481 Datum
txid_current_snapshot(PG_FUNCTION_ARGS)482 txid_current_snapshot(PG_FUNCTION_ARGS)
483 {
484 	TxidSnapshot *snap;
485 	uint32		nxip,
486 				i;
487 	TxidEpoch	state;
488 	Snapshot	cur;
489 
490 	cur = GetActiveSnapshot();
491 	if (cur == NULL)
492 		elog(ERROR, "no active snapshot set");
493 
494 	load_xid_epoch(&state);
495 
496 	/*
497 	 * Compile-time limits on the procarray (MAX_BACKENDS processes plus
498 	 * MAX_BACKENDS prepared transactions) guarantee nxip won't be too large.
499 	 */
500 	StaticAssertStmt(MAX_BACKENDS * 2 <= TXID_SNAPSHOT_MAX_NXIP,
501 					 "possible overflow in txid_current_snapshot()");
502 
503 	/* allocate */
504 	nxip = cur->xcnt;
505 	snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
506 
507 	/* fill */
508 	snap->xmin = convert_xid(cur->xmin, &state);
509 	snap->xmax = convert_xid(cur->xmax, &state);
510 	snap->nxip = nxip;
511 	for (i = 0; i < nxip; i++)
512 		snap->xip[i] = convert_xid(cur->xip[i], &state);
513 
514 	/*
515 	 * We want them guaranteed to be in ascending order.  This also removes
516 	 * any duplicate xids.  Normally, an XID can only be assigned to one
517 	 * backend, but when preparing a transaction for two-phase commit, there
518 	 * is a transient state when both the original backend and the dummy
519 	 * PGPROC entry reserved for the prepared transaction hold the same XID.
520 	 */
521 	sort_snapshot(snap);
522 
523 	/* set size after sorting, because it may have removed duplicate xips */
524 	SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(snap->nxip));
525 
526 	PG_RETURN_POINTER(snap);
527 }
528 
529 /*
530  * txid_snapshot_in(cstring) returns txid_snapshot
531  *
532  *		input function for type txid_snapshot
533  */
534 Datum
txid_snapshot_in(PG_FUNCTION_ARGS)535 txid_snapshot_in(PG_FUNCTION_ARGS)
536 {
537 	char	   *str = PG_GETARG_CSTRING(0);
538 	TxidSnapshot *snap;
539 
540 	snap = parse_snapshot(str);
541 
542 	PG_RETURN_POINTER(snap);
543 }
544 
545 /*
546  * txid_snapshot_out(txid_snapshot) returns cstring
547  *
548  *		output function for type txid_snapshot
549  */
550 Datum
txid_snapshot_out(PG_FUNCTION_ARGS)551 txid_snapshot_out(PG_FUNCTION_ARGS)
552 {
553 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
554 	StringInfoData str;
555 	uint32		i;
556 
557 	initStringInfo(&str);
558 
559 	appendStringInfo(&str, TXID_FMT ":", snap->xmin);
560 	appendStringInfo(&str, TXID_FMT ":", snap->xmax);
561 
562 	for (i = 0; i < snap->nxip; i++)
563 	{
564 		if (i > 0)
565 			appendStringInfoChar(&str, ',');
566 		appendStringInfo(&str, TXID_FMT, snap->xip[i]);
567 	}
568 
569 	PG_RETURN_CSTRING(str.data);
570 }
571 
572 /*
573  * txid_snapshot_recv(internal) returns txid_snapshot
574  *
575  *		binary input function for type txid_snapshot
576  *
577  *		format: int4 nxip, int8 xmin, int8 xmax, int8 xip
578  */
579 Datum
txid_snapshot_recv(PG_FUNCTION_ARGS)580 txid_snapshot_recv(PG_FUNCTION_ARGS)
581 {
582 	StringInfo	buf = (StringInfo) PG_GETARG_POINTER(0);
583 	TxidSnapshot *snap;
584 	txid		last = 0;
585 	int			nxip;
586 	int			i;
587 	txid		xmin,
588 				xmax;
589 
590 	/* load and validate nxip */
591 	nxip = pq_getmsgint(buf, 4);
592 	if (nxip < 0 || nxip > TXID_SNAPSHOT_MAX_NXIP)
593 		goto bad_format;
594 
595 	xmin = pq_getmsgint64(buf);
596 	xmax = pq_getmsgint64(buf);
597 	if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
598 		goto bad_format;
599 
600 	snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
601 	snap->xmin = xmin;
602 	snap->xmax = xmax;
603 
604 	for (i = 0; i < nxip; i++)
605 	{
606 		txid		cur = pq_getmsgint64(buf);
607 
608 		if (cur < last || cur < xmin || cur >= xmax)
609 			goto bad_format;
610 
611 		/* skip duplicate xips */
612 		if (cur == last)
613 		{
614 			i--;
615 			nxip--;
616 			continue;
617 		}
618 
619 		snap->xip[i] = cur;
620 		last = cur;
621 	}
622 	snap->nxip = nxip;
623 	SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
624 	PG_RETURN_POINTER(snap);
625 
626 bad_format:
627 	ereport(ERROR,
628 			(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
629 			 errmsg("invalid external txid_snapshot data")));
630 	PG_RETURN_POINTER(NULL);	/* keep compiler quiet */
631 }
632 
633 /*
634  * txid_snapshot_send(txid_snapshot) returns bytea
635  *
636  *		binary output function for type txid_snapshot
637  *
638  *		format: int4 nxip, int8 xmin, int8 xmax, int8 xip
639  */
640 Datum
txid_snapshot_send(PG_FUNCTION_ARGS)641 txid_snapshot_send(PG_FUNCTION_ARGS)
642 {
643 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
644 	StringInfoData buf;
645 	uint32		i;
646 
647 	pq_begintypsend(&buf);
648 	pq_sendint32(&buf, snap->nxip);
649 	pq_sendint64(&buf, snap->xmin);
650 	pq_sendint64(&buf, snap->xmax);
651 	for (i = 0; i < snap->nxip; i++)
652 		pq_sendint64(&buf, snap->xip[i]);
653 	PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
654 }
655 
656 /*
657  * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
658  *
659  *		is txid visible in snapshot ?
660  */
661 Datum
txid_visible_in_snapshot(PG_FUNCTION_ARGS)662 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
663 {
664 	txid		value = PG_GETARG_INT64(0);
665 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
666 
667 	PG_RETURN_BOOL(is_visible_txid(value, snap));
668 }
669 
670 /*
671  * txid_snapshot_xmin(txid_snapshot) returns int8
672  *
673  *		return snapshot's xmin
674  */
675 Datum
txid_snapshot_xmin(PG_FUNCTION_ARGS)676 txid_snapshot_xmin(PG_FUNCTION_ARGS)
677 {
678 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
679 
680 	PG_RETURN_INT64(snap->xmin);
681 }
682 
683 /*
684  * txid_snapshot_xmax(txid_snapshot) returns int8
685  *
686  *		return snapshot's xmax
687  */
688 Datum
txid_snapshot_xmax(PG_FUNCTION_ARGS)689 txid_snapshot_xmax(PG_FUNCTION_ARGS)
690 {
691 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
692 
693 	PG_RETURN_INT64(snap->xmax);
694 }
695 
696 /*
697  * txid_snapshot_xip(txid_snapshot) returns setof int8
698  *
699  *		return in-progress TXIDs in snapshot.
700  */
701 Datum
txid_snapshot_xip(PG_FUNCTION_ARGS)702 txid_snapshot_xip(PG_FUNCTION_ARGS)
703 {
704 	FuncCallContext *fctx;
705 	TxidSnapshot *snap;
706 	txid		value;
707 
708 	/* on first call initialize snap_state and get copy of snapshot */
709 	if (SRF_IS_FIRSTCALL())
710 	{
711 		TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
712 
713 		fctx = SRF_FIRSTCALL_INIT();
714 
715 		/* make a copy of user snapshot */
716 		snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
717 		memcpy(snap, arg, VARSIZE(arg));
718 
719 		fctx->user_fctx = snap;
720 	}
721 
722 	/* return values one-by-one */
723 	fctx = SRF_PERCALL_SETUP();
724 	snap = fctx->user_fctx;
725 	if (fctx->call_cntr < snap->nxip)
726 	{
727 		value = snap->xip[fctx->call_cntr];
728 		SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
729 	}
730 	else
731 	{
732 		SRF_RETURN_DONE(fctx);
733 	}
734 }
735 
736 /*
737  * Report the status of a recent transaction ID, or null for wrapped,
738  * truncated away or otherwise too old XIDs.
739  *
740  * The passed epoch-qualified xid is treated as a normal xid, not a
741  * multixact id.
742  *
743  * If it points to a committed subxact the result is the subxact status even
744  * though the parent xact may still be in progress or may have aborted.
745  */
746 Datum
txid_status(PG_FUNCTION_ARGS)747 txid_status(PG_FUNCTION_ARGS)
748 {
749 	const char *status;
750 	uint64		xid_with_epoch = PG_GETARG_INT64(0);
751 	TransactionId xid;
752 
753 	/*
754 	 * We must protect against concurrent truncation of clog entries to avoid
755 	 * an I/O error on SLRU lookup.
756 	 */
757 	LWLockAcquire(CLogTruncationLock, LW_SHARED);
758 	if (TransactionIdInRecentPast(xid_with_epoch, &xid))
759 	{
760 		Assert(TransactionIdIsValid(xid));
761 
762 		if (TransactionIdIsCurrentTransactionId(xid))
763 			status = "in progress";
764 		else if (TransactionIdDidCommit(xid))
765 			status = "committed";
766 		else if (TransactionIdDidAbort(xid))
767 			status = "aborted";
768 		else
769 		{
770 			/*
771 			 * The xact is not marked as either committed or aborted in clog.
772 			 *
773 			 * It could be a transaction that ended without updating clog or
774 			 * writing an abort record due to a crash. We can safely assume
775 			 * it's aborted if it isn't committed and is older than our
776 			 * snapshot xmin.
777 			 *
778 			 * Otherwise it must be in-progress (or have been at the time we
779 			 * checked commit/abort status).
780 			 */
781 			if (TransactionIdPrecedes(xid, GetActiveSnapshot()->xmin))
782 				status = "aborted";
783 			else
784 				status = "in progress";
785 		}
786 	}
787 	else
788 	{
789 		status = NULL;
790 	}
791 	LWLockRelease(CLogTruncationLock);
792 
793 	if (status == NULL)
794 		PG_RETURN_NULL();
795 	else
796 		PG_RETURN_TEXT_P(cstring_to_text(status));
797 }
798