1 /*-------------------------------------------------------------------------
2  * txid.c
3  *
4  *	Export internal transaction IDs to user level.
5  *
6  * Note that only top-level transaction IDs are ever converted to TXID.
7  * This is important because TXIDs frequently persist beyond the global
8  * xmin horizon, or may even be shipped to other machines, so we cannot
9  * rely on being able to correlate subtransaction IDs with their parents
10  * via functions such as SubTransGetTopmostTransaction().
11  *
12  *
13  *	Copyright (c) 2003-2018, PostgreSQL Global Development Group
14  *	Author: Jan Wieck, Afilias USA INC.
15  *	64-bit txids: Marko Kreen, Skype Technologies
16  *
17  *	src/backend/utils/adt/txid.c
18  *
19  *-------------------------------------------------------------------------
20  */
21 
22 #include "postgres.h"
23 
24 #include "access/clog.h"
25 #include "access/transam.h"
26 #include "access/xact.h"
27 #include "access/xlog.h"
28 #include "funcapi.h"
29 #include "miscadmin.h"
30 #include "libpq/pqformat.h"
31 #include "postmaster/postmaster.h"
32 #include "storage/lwlock.h"
33 #include "utils/builtins.h"
34 #include "utils/memutils.h"
35 #include "utils/snapmgr.h"
36 
37 
38 /* txid will be signed int8 in database, so must limit to 63 bits */
39 #define MAX_TXID   ((uint64) PG_INT64_MAX)
40 
41 /* Use unsigned variant internally */
42 typedef uint64 txid;
43 
44 /* sprintf format code for uint64 */
45 #define TXID_FMT UINT64_FORMAT
46 
47 /*
48  * If defined, use bsearch() function for searching for txids in snapshots
49  * that have more than the specified number of values.
50  */
51 #define USE_BSEARCH_IF_NXIP_GREATER 30
52 
53 
54 /*
55  * Snapshot containing 8byte txids.
56  */
57 typedef struct
58 {
59 	/*
60 	 * 4-byte length hdr, should not be touched directly.
61 	 *
62 	 * Explicit embedding is ok as we want always correct alignment anyway.
63 	 */
64 	int32		__varsz;
65 
66 	uint32		nxip;			/* number of txids in xip array */
67 	txid		xmin;
68 	txid		xmax;
69 	/* in-progress txids, xmin <= xip[i] < xmax: */
70 	txid		xip[FLEXIBLE_ARRAY_MEMBER];
71 } TxidSnapshot;
72 
73 #define TXID_SNAPSHOT_SIZE(nxip) \
74 	(offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
75 #define TXID_SNAPSHOT_MAX_NXIP \
76 	((MaxAllocSize - offsetof(TxidSnapshot, xip)) / sizeof(txid))
77 
78 /*
79  * Epoch values from xact.c
80  */
81 typedef struct
82 {
83 	TransactionId last_xid;
84 	uint32		epoch;
85 } TxidEpoch;
86 
87 
88 /*
89  * Fetch epoch data from xact.c.
90  */
91 static void
load_xid_epoch(TxidEpoch * state)92 load_xid_epoch(TxidEpoch *state)
93 {
94 	GetNextXidAndEpoch(&state->last_xid, &state->epoch);
95 }
96 
97 /*
98  * Helper to get a TransactionId from a 64-bit xid with wraparound detection.
99  *
100  * It is an ERROR if the xid is in the future.  Otherwise, returns true if
101  * the transaction is still new enough that we can determine whether it
102  * committed and false otherwise.  If *extracted_xid is not NULL, it is set
103  * to the low 32 bits of the transaction ID (i.e. the actual XID, without the
104  * epoch).
105  *
106  * The caller must hold CLogTruncationLock since it's dealing with arbitrary
107  * XIDs, and must continue to hold it until it's done with any clog lookups
108  * relating to those XIDs.
109  */
110 static bool
TransactionIdInRecentPast(uint64 xid_with_epoch,TransactionId * extracted_xid)111 TransactionIdInRecentPast(uint64 xid_with_epoch, TransactionId *extracted_xid)
112 {
113 	uint32		xid_epoch = (uint32) (xid_with_epoch >> 32);
114 	TransactionId xid = (TransactionId) xid_with_epoch;
115 	uint32		now_epoch;
116 	TransactionId now_epoch_next_xid;
117 
118 	GetNextXidAndEpoch(&now_epoch_next_xid, &now_epoch);
119 
120 	if (extracted_xid != NULL)
121 		*extracted_xid = xid;
122 
123 	if (!TransactionIdIsValid(xid))
124 		return false;
125 
126 	/* For non-normal transaction IDs, we can ignore the epoch. */
127 	if (!TransactionIdIsNormal(xid))
128 		return true;
129 
130 	/* If the transaction ID is in the future, throw an error. */
131 	if (xid_epoch > now_epoch
132 		|| (xid_epoch == now_epoch && xid >= now_epoch_next_xid))
133 		ereport(ERROR,
134 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
135 				 errmsg("transaction ID %s is in the future",
136 						psprintf(UINT64_FORMAT, xid_with_epoch))));
137 
138 	/*
139 	 * ShmemVariableCache->oldestClogXid is protected by CLogTruncationLock,
140 	 * but we don't acquire that lock here.  Instead, we require the caller to
141 	 * acquire it, because the caller is presumably going to look up the
142 	 * returned XID.  If we took and released the lock within this function, a
143 	 * CLOG truncation could occur before the caller finished with the XID.
144 	 */
145 	Assert(LWLockHeldByMe(CLogTruncationLock));
146 
147 	/*
148 	 * If the transaction ID has wrapped around, it's definitely too old to
149 	 * determine the commit status.  Otherwise, we can compare it to
150 	 * ShmemVariableCache->oldestClogXid to determine whether the relevant
151 	 * CLOG entry is guaranteed to still exist.
152 	 */
153 	if (xid_epoch + 1 < now_epoch
154 		|| (xid_epoch + 1 == now_epoch && xid < now_epoch_next_xid)
155 		|| TransactionIdPrecedes(xid, ShmemVariableCache->oldestClogXid))
156 		return false;
157 
158 	return true;
159 }
160 
161 /*
162  * do a TransactionId -> txid conversion for an XID near the given epoch
163  */
164 static txid
convert_xid(TransactionId xid,const TxidEpoch * state)165 convert_xid(TransactionId xid, const TxidEpoch *state)
166 {
167 	uint64		epoch;
168 
169 	/* return special xid's as-is */
170 	if (!TransactionIdIsNormal(xid))
171 		return (txid) xid;
172 
173 	/* xid can be on either side when near wrap-around */
174 	epoch = (uint64) state->epoch;
175 	if (xid > state->last_xid &&
176 		TransactionIdPrecedes(xid, state->last_xid))
177 		epoch--;
178 	else if (xid < state->last_xid &&
179 			 TransactionIdFollows(xid, state->last_xid))
180 		epoch++;
181 
182 	return (epoch << 32) | xid;
183 }
184 
185 /*
186  * txid comparator for qsort/bsearch
187  */
188 static int
cmp_txid(const void * aa,const void * bb)189 cmp_txid(const void *aa, const void *bb)
190 {
191 	txid		a = *(const txid *) aa;
192 	txid		b = *(const txid *) bb;
193 
194 	if (a < b)
195 		return -1;
196 	if (a > b)
197 		return 1;
198 	return 0;
199 }
200 
201 /*
202  * Sort a snapshot's txids, so we can use bsearch() later.  Also remove
203  * any duplicates.
204  *
205  * For consistency of on-disk representation, we always sort even if bsearch
206  * will not be used.
207  */
208 static void
sort_snapshot(TxidSnapshot * snap)209 sort_snapshot(TxidSnapshot *snap)
210 {
211 	txid		last = 0;
212 	int			nxip,
213 				idx1,
214 				idx2;
215 
216 	if (snap->nxip > 1)
217 	{
218 		qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
219 
220 		/* remove duplicates */
221 		nxip = snap->nxip;
222 		idx1 = idx2 = 0;
223 		while (idx1 < nxip)
224 		{
225 			if (snap->xip[idx1] != last)
226 				last = snap->xip[idx2++] = snap->xip[idx1];
227 			else
228 				snap->nxip--;
229 			idx1++;
230 		}
231 	}
232 }
233 
234 /*
235  * check txid visibility.
236  */
237 static bool
is_visible_txid(txid value,const TxidSnapshot * snap)238 is_visible_txid(txid value, const TxidSnapshot *snap)
239 {
240 	if (value < snap->xmin)
241 		return true;
242 	else if (value >= snap->xmax)
243 		return false;
244 #ifdef USE_BSEARCH_IF_NXIP_GREATER
245 	else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
246 	{
247 		void	   *res;
248 
249 		res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
250 		/* if found, transaction is still in progress */
251 		return (res) ? false : true;
252 	}
253 #endif
254 	else
255 	{
256 		uint32		i;
257 
258 		for (i = 0; i < snap->nxip; i++)
259 		{
260 			if (value == snap->xip[i])
261 				return false;
262 		}
263 		return true;
264 	}
265 }
266 
267 /*
268  * helper functions to use StringInfo for TxidSnapshot creation.
269  */
270 
271 static StringInfo
buf_init(txid xmin,txid xmax)272 buf_init(txid xmin, txid xmax)
273 {
274 	TxidSnapshot snap;
275 	StringInfo	buf;
276 
277 	snap.xmin = xmin;
278 	snap.xmax = xmax;
279 	snap.nxip = 0;
280 
281 	buf = makeStringInfo();
282 	appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
283 	return buf;
284 }
285 
286 static void
buf_add_txid(StringInfo buf,txid xid)287 buf_add_txid(StringInfo buf, txid xid)
288 {
289 	TxidSnapshot *snap = (TxidSnapshot *) buf->data;
290 
291 	/* do this before possible realloc */
292 	snap->nxip++;
293 
294 	appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
295 }
296 
297 static TxidSnapshot *
buf_finalize(StringInfo buf)298 buf_finalize(StringInfo buf)
299 {
300 	TxidSnapshot *snap = (TxidSnapshot *) buf->data;
301 
302 	SET_VARSIZE(snap, buf->len);
303 
304 	/* buf is not needed anymore */
305 	buf->data = NULL;
306 	pfree(buf);
307 
308 	return snap;
309 }
310 
311 /*
312  * simple number parser.
313  *
314  * We return 0 on error, which is invalid value for txid.
315  */
316 static txid
str2txid(const char * s,const char ** endp)317 str2txid(const char *s, const char **endp)
318 {
319 	txid		val = 0;
320 	txid		cutoff = MAX_TXID / 10;
321 	txid		cutlim = MAX_TXID % 10;
322 
323 	for (; *s; s++)
324 	{
325 		unsigned	d;
326 
327 		if (*s < '0' || *s > '9')
328 			break;
329 		d = *s - '0';
330 
331 		/*
332 		 * check for overflow
333 		 */
334 		if (val > cutoff || (val == cutoff && d > cutlim))
335 		{
336 			val = 0;
337 			break;
338 		}
339 
340 		val = val * 10 + d;
341 	}
342 	if (endp)
343 		*endp = s;
344 	return val;
345 }
346 
347 /*
348  * parse snapshot from cstring
349  */
350 static TxidSnapshot *
parse_snapshot(const char * str)351 parse_snapshot(const char *str)
352 {
353 	txid		xmin;
354 	txid		xmax;
355 	txid		last_val = 0,
356 				val;
357 	const char *str_start = str;
358 	const char *endp;
359 	StringInfo	buf;
360 
361 	xmin = str2txid(str, &endp);
362 	if (*endp != ':')
363 		goto bad_format;
364 	str = endp + 1;
365 
366 	xmax = str2txid(str, &endp);
367 	if (*endp != ':')
368 		goto bad_format;
369 	str = endp + 1;
370 
371 	/* it should look sane */
372 	if (xmin == 0 || xmax == 0 || xmin > xmax)
373 		goto bad_format;
374 
375 	/* allocate buffer */
376 	buf = buf_init(xmin, xmax);
377 
378 	/* loop over values */
379 	while (*str != '\0')
380 	{
381 		/* read next value */
382 		val = str2txid(str, &endp);
383 		str = endp;
384 
385 		/* require the input to be in order */
386 		if (val < xmin || val >= xmax || val < last_val)
387 			goto bad_format;
388 
389 		/* skip duplicates */
390 		if (val != last_val)
391 			buf_add_txid(buf, val);
392 		last_val = val;
393 
394 		if (*str == ',')
395 			str++;
396 		else if (*str != '\0')
397 			goto bad_format;
398 	}
399 
400 	return buf_finalize(buf);
401 
402 bad_format:
403 	ereport(ERROR,
404 			(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
405 			 errmsg("invalid input syntax for type %s: \"%s\"",
406 					"txid_snapshot", str_start)));
407 	return NULL;				/* keep compiler quiet */
408 }
409 
410 /*
411  * Public functions.
412  *
413  * txid_current() and txid_current_snapshot() are the only ones that
414  * communicate with core xid machinery.  All the others work on data
415  * returned by them.
416  */
417 
418 /*
419  * txid_current() returns int8
420  *
421  *	Return the current toplevel transaction ID as TXID
422  *	If the current transaction does not have one, one is assigned.
423  *
424  *	This value has the epoch as the high 32 bits and the 32-bit xid
425  *	as the low 32 bits.
426  */
427 Datum
txid_current(PG_FUNCTION_ARGS)428 txid_current(PG_FUNCTION_ARGS)
429 {
430 	txid		val;
431 	TxidEpoch	state;
432 
433 	/*
434 	 * Must prevent during recovery because if an xid is not assigned we try
435 	 * to assign one, which would fail. Programs already rely on this function
436 	 * to always return a valid current xid, so we should not change this to
437 	 * return NULL or similar invalid xid.
438 	 */
439 	PreventCommandDuringRecovery("txid_current()");
440 
441 	load_xid_epoch(&state);
442 
443 	val = convert_xid(GetTopTransactionId(), &state);
444 
445 	PG_RETURN_INT64(val);
446 }
447 
448 /*
449  * Same as txid_current() but doesn't assign a new xid if there isn't one
450  * yet.
451  */
452 Datum
txid_current_if_assigned(PG_FUNCTION_ARGS)453 txid_current_if_assigned(PG_FUNCTION_ARGS)
454 {
455 	txid		val;
456 	TxidEpoch	state;
457 	TransactionId topxid = GetTopTransactionIdIfAny();
458 
459 	if (topxid == InvalidTransactionId)
460 		PG_RETURN_NULL();
461 
462 	load_xid_epoch(&state);
463 
464 	val = convert_xid(topxid, &state);
465 
466 	PG_RETURN_INT64(val);
467 }
468 
469 /*
470  * txid_current_snapshot() returns txid_snapshot
471  *
472  *		Return current snapshot in TXID format
473  *
474  * Note that only top-transaction XIDs are included in the snapshot.
475  */
476 Datum
txid_current_snapshot(PG_FUNCTION_ARGS)477 txid_current_snapshot(PG_FUNCTION_ARGS)
478 {
479 	TxidSnapshot *snap;
480 	uint32		nxip,
481 				i;
482 	TxidEpoch	state;
483 	Snapshot	cur;
484 
485 	cur = GetActiveSnapshot();
486 	if (cur == NULL)
487 		elog(ERROR, "no active snapshot set");
488 
489 	load_xid_epoch(&state);
490 
491 	/*
492 	 * Compile-time limits on the procarray (MAX_BACKENDS processes plus
493 	 * MAX_BACKENDS prepared transactions) guarantee nxip won't be too large.
494 	 */
495 	StaticAssertStmt(MAX_BACKENDS * 2 <= TXID_SNAPSHOT_MAX_NXIP,
496 					 "possible overflow in txid_current_snapshot()");
497 
498 	/* allocate */
499 	nxip = cur->xcnt;
500 	snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
501 
502 	/* fill */
503 	snap->xmin = convert_xid(cur->xmin, &state);
504 	snap->xmax = convert_xid(cur->xmax, &state);
505 	snap->nxip = nxip;
506 	for (i = 0; i < nxip; i++)
507 		snap->xip[i] = convert_xid(cur->xip[i], &state);
508 
509 	/*
510 	 * We want them guaranteed to be in ascending order.  This also removes
511 	 * any duplicate xids.  Normally, an XID can only be assigned to one
512 	 * backend, but when preparing a transaction for two-phase commit, there
513 	 * is a transient state when both the original backend and the dummy
514 	 * PGPROC entry reserved for the prepared transaction hold the same XID.
515 	 */
516 	sort_snapshot(snap);
517 
518 	/* set size after sorting, because it may have removed duplicate xips */
519 	SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(snap->nxip));
520 
521 	PG_RETURN_POINTER(snap);
522 }
523 
524 /*
525  * txid_snapshot_in(cstring) returns txid_snapshot
526  *
527  *		input function for type txid_snapshot
528  */
529 Datum
txid_snapshot_in(PG_FUNCTION_ARGS)530 txid_snapshot_in(PG_FUNCTION_ARGS)
531 {
532 	char	   *str = PG_GETARG_CSTRING(0);
533 	TxidSnapshot *snap;
534 
535 	snap = parse_snapshot(str);
536 
537 	PG_RETURN_POINTER(snap);
538 }
539 
540 /*
541  * txid_snapshot_out(txid_snapshot) returns cstring
542  *
543  *		output function for type txid_snapshot
544  */
545 Datum
txid_snapshot_out(PG_FUNCTION_ARGS)546 txid_snapshot_out(PG_FUNCTION_ARGS)
547 {
548 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
549 	StringInfoData str;
550 	uint32		i;
551 
552 	initStringInfo(&str);
553 
554 	appendStringInfo(&str, TXID_FMT ":", snap->xmin);
555 	appendStringInfo(&str, TXID_FMT ":", snap->xmax);
556 
557 	for (i = 0; i < snap->nxip; i++)
558 	{
559 		if (i > 0)
560 			appendStringInfoChar(&str, ',');
561 		appendStringInfo(&str, TXID_FMT, snap->xip[i]);
562 	}
563 
564 	PG_RETURN_CSTRING(str.data);
565 }
566 
567 /*
568  * txid_snapshot_recv(internal) returns txid_snapshot
569  *
570  *		binary input function for type txid_snapshot
571  *
572  *		format: int4 nxip, int8 xmin, int8 xmax, int8 xip
573  */
574 Datum
txid_snapshot_recv(PG_FUNCTION_ARGS)575 txid_snapshot_recv(PG_FUNCTION_ARGS)
576 {
577 	StringInfo	buf = (StringInfo) PG_GETARG_POINTER(0);
578 	TxidSnapshot *snap;
579 	txid		last = 0;
580 	int			nxip;
581 	int			i;
582 	txid		xmin,
583 				xmax;
584 
585 	/* load and validate nxip */
586 	nxip = pq_getmsgint(buf, 4);
587 	if (nxip < 0 || nxip > TXID_SNAPSHOT_MAX_NXIP)
588 		goto bad_format;
589 
590 	xmin = pq_getmsgint64(buf);
591 	xmax = pq_getmsgint64(buf);
592 	if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
593 		goto bad_format;
594 
595 	snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
596 	snap->xmin = xmin;
597 	snap->xmax = xmax;
598 
599 	for (i = 0; i < nxip; i++)
600 	{
601 		txid		cur = pq_getmsgint64(buf);
602 
603 		if (cur < last || cur < xmin || cur >= xmax)
604 			goto bad_format;
605 
606 		/* skip duplicate xips */
607 		if (cur == last)
608 		{
609 			i--;
610 			nxip--;
611 			continue;
612 		}
613 
614 		snap->xip[i] = cur;
615 		last = cur;
616 	}
617 	snap->nxip = nxip;
618 	SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
619 	PG_RETURN_POINTER(snap);
620 
621 bad_format:
622 	ereport(ERROR,
623 			(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
624 			 errmsg("invalid external txid_snapshot data")));
625 	PG_RETURN_POINTER(NULL);	/* keep compiler quiet */
626 }
627 
628 /*
629  * txid_snapshot_send(txid_snapshot) returns bytea
630  *
631  *		binary output function for type txid_snapshot
632  *
633  *		format: int4 nxip, int8 xmin, int8 xmax, int8 xip
634  */
635 Datum
txid_snapshot_send(PG_FUNCTION_ARGS)636 txid_snapshot_send(PG_FUNCTION_ARGS)
637 {
638 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
639 	StringInfoData buf;
640 	uint32		i;
641 
642 	pq_begintypsend(&buf);
643 	pq_sendint32(&buf, snap->nxip);
644 	pq_sendint64(&buf, snap->xmin);
645 	pq_sendint64(&buf, snap->xmax);
646 	for (i = 0; i < snap->nxip; i++)
647 		pq_sendint64(&buf, snap->xip[i]);
648 	PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
649 }
650 
651 /*
652  * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
653  *
654  *		is txid visible in snapshot ?
655  */
656 Datum
txid_visible_in_snapshot(PG_FUNCTION_ARGS)657 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
658 {
659 	txid		value = PG_GETARG_INT64(0);
660 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
661 
662 	PG_RETURN_BOOL(is_visible_txid(value, snap));
663 }
664 
665 /*
666  * txid_snapshot_xmin(txid_snapshot) returns int8
667  *
668  *		return snapshot's xmin
669  */
670 Datum
txid_snapshot_xmin(PG_FUNCTION_ARGS)671 txid_snapshot_xmin(PG_FUNCTION_ARGS)
672 {
673 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
674 
675 	PG_RETURN_INT64(snap->xmin);
676 }
677 
678 /*
679  * txid_snapshot_xmax(txid_snapshot) returns int8
680  *
681  *		return snapshot's xmax
682  */
683 Datum
txid_snapshot_xmax(PG_FUNCTION_ARGS)684 txid_snapshot_xmax(PG_FUNCTION_ARGS)
685 {
686 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
687 
688 	PG_RETURN_INT64(snap->xmax);
689 }
690 
691 /*
692  * txid_snapshot_xip(txid_snapshot) returns setof int8
693  *
694  *		return in-progress TXIDs in snapshot.
695  */
696 Datum
txid_snapshot_xip(PG_FUNCTION_ARGS)697 txid_snapshot_xip(PG_FUNCTION_ARGS)
698 {
699 	FuncCallContext *fctx;
700 	TxidSnapshot *snap;
701 	txid		value;
702 
703 	/* on first call initialize snap_state and get copy of snapshot */
704 	if (SRF_IS_FIRSTCALL())
705 	{
706 		TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
707 
708 		fctx = SRF_FIRSTCALL_INIT();
709 
710 		/* make a copy of user snapshot */
711 		snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
712 		memcpy(snap, arg, VARSIZE(arg));
713 
714 		fctx->user_fctx = snap;
715 	}
716 
717 	/* return values one-by-one */
718 	fctx = SRF_PERCALL_SETUP();
719 	snap = fctx->user_fctx;
720 	if (fctx->call_cntr < snap->nxip)
721 	{
722 		value = snap->xip[fctx->call_cntr];
723 		SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
724 	}
725 	else
726 	{
727 		SRF_RETURN_DONE(fctx);
728 	}
729 }
730 
731 /*
732  * Report the status of a recent transaction ID, or null for wrapped,
733  * truncated away or otherwise too old XIDs.
734  *
735  * The passed epoch-qualified xid is treated as a normal xid, not a
736  * multixact id.
737  *
738  * If it points to a committed subxact the result is the subxact status even
739  * though the parent xact may still be in progress or may have aborted.
740  */
741 Datum
txid_status(PG_FUNCTION_ARGS)742 txid_status(PG_FUNCTION_ARGS)
743 {
744 	const char *status;
745 	uint64		xid_with_epoch = PG_GETARG_INT64(0);
746 	TransactionId xid;
747 
748 	/*
749 	 * We must protect against concurrent truncation of clog entries to avoid
750 	 * an I/O error on SLRU lookup.
751 	 */
752 	LWLockAcquire(CLogTruncationLock, LW_SHARED);
753 	if (TransactionIdInRecentPast(xid_with_epoch, &xid))
754 	{
755 		Assert(TransactionIdIsValid(xid));
756 
757 		if (TransactionIdIsCurrentTransactionId(xid))
758 			status = "in progress";
759 		else if (TransactionIdDidCommit(xid))
760 			status = "committed";
761 		else if (TransactionIdDidAbort(xid))
762 			status = "aborted";
763 		else
764 		{
765 			/*
766 			 * The xact is not marked as either committed or aborted in clog.
767 			 *
768 			 * It could be a transaction that ended without updating clog or
769 			 * writing an abort record due to a crash. We can safely assume
770 			 * it's aborted if it isn't committed and is older than our
771 			 * snapshot xmin.
772 			 *
773 			 * Otherwise it must be in-progress (or have been at the time we
774 			 * checked commit/abort status).
775 			 */
776 			if (TransactionIdPrecedes(xid, GetActiveSnapshot()->xmin))
777 				status = "aborted";
778 			else
779 				status = "in progress";
780 		}
781 	}
782 	else
783 	{
784 		status = NULL;
785 	}
786 	LWLockRelease(CLogTruncationLock);
787 
788 	if (status == NULL)
789 		PG_RETURN_NULL();
790 	else
791 		PG_RETURN_TEXT_P(cstring_to_text(status));
792 }
793