1 /*-------------------------------------------------------------------------
2  * xid8funcs.c
3  *
4  *	Export internal transaction IDs to user level.
5  *
6  * Note that only top-level transaction IDs are exposed to user sessions.
7  * This is important because xid8s frequently persist beyond the global
8  * xmin horizon, or may even be shipped to other machines, so we cannot
9  * rely on being able to correlate subtransaction IDs with their parents
10  * via functions such as SubTransGetTopmostTransaction().
11  *
12  * These functions are used to support the txid_XXX functions and the newer
13  * pg_current_xact, pg_current_snapshot and related fmgr functions, since the
14  * only difference between them is whether they expose xid8 or int8 values to
15  * users.  The txid_XXX variants should eventually be dropped.
16  *
17  *
18  *	Copyright (c) 2003-2020, PostgreSQL Global Development Group
19  *	Author: Jan Wieck, Afilias USA INC.
20  *	64-bit txids: Marko Kreen, Skype Technologies
21  *
22  *	src/backend/utils/adt/xid8funcs.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 
27 #include "postgres.h"
28 
29 #include "access/clog.h"
30 #include "access/transam.h"
31 #include "access/xact.h"
32 #include "access/xlog.h"
33 #include "funcapi.h"
34 #include "lib/qunique.h"
35 #include "libpq/pqformat.h"
36 #include "miscadmin.h"
37 #include "postmaster/postmaster.h"
38 #include "storage/lwlock.h"
39 #include "utils/builtins.h"
40 #include "utils/memutils.h"
41 #include "utils/snapmgr.h"
42 #include "utils/xid8.h"
43 
44 
45 /*
46  * If defined, use bsearch() function for searching for xid8s in snapshots
47  * that have more than the specified number of values.
48  */
49 #define USE_BSEARCH_IF_NXIP_GREATER 30
50 
51 
52 /*
53  * Snapshot containing FullTransactionIds.
54  */
55 typedef struct
56 {
57 	/*
58 	 * 4-byte length hdr, should not be touched directly.
59 	 *
60 	 * Explicit embedding is ok as we want always correct alignment anyway.
61 	 */
62 	int32		__varsz;
63 
64 	uint32		nxip;			/* number of fxids in xip array */
65 	FullTransactionId xmin;
66 	FullTransactionId xmax;
67 	/* in-progress fxids, xmin <= xip[i] < xmax: */
68 	FullTransactionId xip[FLEXIBLE_ARRAY_MEMBER];
69 } pg_snapshot;
70 
71 #define PG_SNAPSHOT_SIZE(nxip) \
72 	(offsetof(pg_snapshot, xip) + sizeof(FullTransactionId) * (nxip))
73 #define PG_SNAPSHOT_MAX_NXIP \
74 	((MaxAllocSize - offsetof(pg_snapshot, xip)) / sizeof(FullTransactionId))
75 
76 /*
77  * Helper to get a TransactionId from a 64-bit xid with wraparound detection.
78  *
79  * It is an ERROR if the xid is in the future.  Otherwise, returns true if
80  * the transaction is still new enough that we can determine whether it
81  * committed and false otherwise.  If *extracted_xid is not NULL, it is set
82  * to the low 32 bits of the transaction ID (i.e. the actual XID, without the
83  * epoch).
84  *
85  * The caller must hold XactTruncationLock since it's dealing with arbitrary
86  * XIDs, and must continue to hold it until it's done with any clog lookups
87  * relating to those XIDs.
88  */
89 static bool
TransactionIdInRecentPast(FullTransactionId fxid,TransactionId * extracted_xid)90 TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid)
91 {
92 	uint32		xid_epoch = EpochFromFullTransactionId(fxid);
93 	TransactionId xid = XidFromFullTransactionId(fxid);
94 	uint32		now_epoch;
95 	TransactionId now_epoch_next_xid;
96 	FullTransactionId now_fullxid;
97 
98 	now_fullxid = ReadNextFullTransactionId();
99 	now_epoch_next_xid = XidFromFullTransactionId(now_fullxid);
100 	now_epoch = EpochFromFullTransactionId(now_fullxid);
101 
102 	if (extracted_xid != NULL)
103 		*extracted_xid = xid;
104 
105 	if (!TransactionIdIsValid(xid))
106 		return false;
107 
108 	/* For non-normal transaction IDs, we can ignore the epoch. */
109 	if (!TransactionIdIsNormal(xid))
110 		return true;
111 
112 	/* If the transaction ID is in the future, throw an error. */
113 	if (!FullTransactionIdPrecedes(fxid, now_fullxid))
114 		ereport(ERROR,
115 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
116 				 errmsg("transaction ID %s is in the future",
117 						psprintf(UINT64_FORMAT,
118 								 U64FromFullTransactionId(fxid)))));
119 
120 	/*
121 	 * ShmemVariableCache->oldestClogXid is protected by XactTruncationLock,
122 	 * but we don't acquire that lock here.  Instead, we require the caller to
123 	 * acquire it, because the caller is presumably going to look up the
124 	 * returned XID.  If we took and released the lock within this function, a
125 	 * CLOG truncation could occur before the caller finished with the XID.
126 	 */
127 	Assert(LWLockHeldByMe(XactTruncationLock));
128 
129 	/*
130 	 * If the transaction ID has wrapped around, it's definitely too old to
131 	 * determine the commit status.  Otherwise, we can compare it to
132 	 * ShmemVariableCache->oldestClogXid to determine whether the relevant
133 	 * CLOG entry is guaranteed to still exist.
134 	 */
135 	if (xid_epoch + 1 < now_epoch
136 		|| (xid_epoch + 1 == now_epoch && xid < now_epoch_next_xid)
137 		|| TransactionIdPrecedes(xid, ShmemVariableCache->oldestClogXid))
138 		return false;
139 
140 	return true;
141 }
142 
143 /*
144  * Convert a TransactionId obtained from a snapshot held by the caller to a
145  * FullTransactionId.  Use next_fxid as a reference FullTransactionId, so that
146  * we can compute the high order bits.  It must have been obtained by the
147  * caller with ReadNextFullTransactionId() after the snapshot was created.
148  */
149 static FullTransactionId
widen_snapshot_xid(TransactionId xid,FullTransactionId next_fxid)150 widen_snapshot_xid(TransactionId xid, FullTransactionId next_fxid)
151 {
152 	TransactionId next_xid = XidFromFullTransactionId(next_fxid);
153 	uint32		epoch = EpochFromFullTransactionId(next_fxid);
154 
155 	/* Special transaction ID. */
156 	if (!TransactionIdIsNormal(xid))
157 		return FullTransactionIdFromEpochAndXid(0, xid);
158 
159 	/*
160 	 * The 64 bit result must be <= next_fxid, since next_fxid hadn't been
161 	 * issued yet when the snapshot was created.  Every TransactionId in the
162 	 * snapshot must therefore be from the same epoch as next_fxid, or the
163 	 * epoch before.  We know this because next_fxid is never allow to get
164 	 * more than one epoch ahead of the TransactionIds in any snapshot.
165 	 */
166 	if (xid > next_xid)
167 		epoch--;
168 
169 	return FullTransactionIdFromEpochAndXid(epoch, xid);
170 }
171 
172 /*
173  * txid comparator for qsort/bsearch
174  */
175 static int
cmp_fxid(const void * aa,const void * bb)176 cmp_fxid(const void *aa, const void *bb)
177 {
178 	FullTransactionId a = *(const FullTransactionId *) aa;
179 	FullTransactionId b = *(const FullTransactionId *) bb;
180 
181 	if (FullTransactionIdPrecedes(a, b))
182 		return -1;
183 	if (FullTransactionIdPrecedes(b, a))
184 		return 1;
185 	return 0;
186 }
187 
188 /*
189  * Sort a snapshot's txids, so we can use bsearch() later.  Also remove
190  * any duplicates.
191  *
192  * For consistency of on-disk representation, we always sort even if bsearch
193  * will not be used.
194  */
195 static void
sort_snapshot(pg_snapshot * snap)196 sort_snapshot(pg_snapshot *snap)
197 {
198 	if (snap->nxip > 1)
199 	{
200 		qsort(snap->xip, snap->nxip, sizeof(FullTransactionId), cmp_fxid);
201 		snap->nxip = qunique(snap->xip, snap->nxip, sizeof(FullTransactionId),
202 							 cmp_fxid);
203 	}
204 }
205 
206 /*
207  * check fxid visibility.
208  */
209 static bool
is_visible_fxid(FullTransactionId value,const pg_snapshot * snap)210 is_visible_fxid(FullTransactionId value, const pg_snapshot *snap)
211 {
212 	if (FullTransactionIdPrecedes(value, snap->xmin))
213 		return true;
214 	else if (!FullTransactionIdPrecedes(value, snap->xmax))
215 		return false;
216 #ifdef USE_BSEARCH_IF_NXIP_GREATER
217 	else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
218 	{
219 		void	   *res;
220 
221 		res = bsearch(&value, snap->xip, snap->nxip, sizeof(FullTransactionId),
222 					  cmp_fxid);
223 		/* if found, transaction is still in progress */
224 		return (res) ? false : true;
225 	}
226 #endif
227 	else
228 	{
229 		uint32		i;
230 
231 		for (i = 0; i < snap->nxip; i++)
232 		{
233 			if (FullTransactionIdEquals(value, snap->xip[i]))
234 				return false;
235 		}
236 		return true;
237 	}
238 }
239 
240 /*
241  * helper functions to use StringInfo for pg_snapshot creation.
242  */
243 
244 static StringInfo
buf_init(FullTransactionId xmin,FullTransactionId xmax)245 buf_init(FullTransactionId xmin, FullTransactionId xmax)
246 {
247 	pg_snapshot snap;
248 	StringInfo	buf;
249 
250 	snap.xmin = xmin;
251 	snap.xmax = xmax;
252 	snap.nxip = 0;
253 
254 	buf = makeStringInfo();
255 	appendBinaryStringInfo(buf, (char *) &snap, PG_SNAPSHOT_SIZE(0));
256 	return buf;
257 }
258 
259 static void
buf_add_txid(StringInfo buf,FullTransactionId fxid)260 buf_add_txid(StringInfo buf, FullTransactionId fxid)
261 {
262 	pg_snapshot *snap = (pg_snapshot *) buf->data;
263 
264 	/* do this before possible realloc */
265 	snap->nxip++;
266 
267 	appendBinaryStringInfo(buf, (char *) &fxid, sizeof(fxid));
268 }
269 
270 static pg_snapshot *
buf_finalize(StringInfo buf)271 buf_finalize(StringInfo buf)
272 {
273 	pg_snapshot *snap = (pg_snapshot *) buf->data;
274 
275 	SET_VARSIZE(snap, buf->len);
276 
277 	/* buf is not needed anymore */
278 	buf->data = NULL;
279 	pfree(buf);
280 
281 	return snap;
282 }
283 
284 /*
285  * parse snapshot from cstring
286  */
287 static pg_snapshot *
parse_snapshot(const char * str)288 parse_snapshot(const char *str)
289 {
290 	FullTransactionId xmin;
291 	FullTransactionId xmax;
292 	FullTransactionId last_val = InvalidFullTransactionId;
293 	FullTransactionId val;
294 	const char *str_start = str;
295 	char	   *endp;
296 	StringInfo	buf;
297 
298 	xmin = FullTransactionIdFromU64(pg_strtouint64(str, &endp, 10));
299 	if (*endp != ':')
300 		goto bad_format;
301 	str = endp + 1;
302 
303 	xmax = FullTransactionIdFromU64(pg_strtouint64(str, &endp, 10));
304 	if (*endp != ':')
305 		goto bad_format;
306 	str = endp + 1;
307 
308 	/* it should look sane */
309 	if (!FullTransactionIdIsValid(xmin) ||
310 		!FullTransactionIdIsValid(xmax) ||
311 		FullTransactionIdPrecedes(xmax, xmin))
312 		goto bad_format;
313 
314 	/* allocate buffer */
315 	buf = buf_init(xmin, xmax);
316 
317 	/* loop over values */
318 	while (*str != '\0')
319 	{
320 		/* read next value */
321 		val = FullTransactionIdFromU64(pg_strtouint64(str, &endp, 10));
322 		str = endp;
323 
324 		/* require the input to be in order */
325 		if (FullTransactionIdPrecedes(val, xmin) ||
326 			FullTransactionIdFollowsOrEquals(val, xmax) ||
327 			FullTransactionIdPrecedes(val, last_val))
328 			goto bad_format;
329 
330 		/* skip duplicates */
331 		if (!FullTransactionIdEquals(val, last_val))
332 			buf_add_txid(buf, val);
333 		last_val = val;
334 
335 		if (*str == ',')
336 			str++;
337 		else if (*str != '\0')
338 			goto bad_format;
339 	}
340 
341 	return buf_finalize(buf);
342 
343 bad_format:
344 	ereport(ERROR,
345 			(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
346 			 errmsg("invalid input syntax for type %s: \"%s\"",
347 					"pg_snapshot", str_start)));
348 	return NULL;				/* keep compiler quiet */
349 }
350 
351 /*
352  * pg_current_xact_id() returns xid8
353  *
354  *	Return the current toplevel full transaction ID.
355  *	If the current transaction does not have one, one is assigned.
356  */
357 Datum
pg_current_xact_id(PG_FUNCTION_ARGS)358 pg_current_xact_id(PG_FUNCTION_ARGS)
359 {
360 	/*
361 	 * Must prevent during recovery because if an xid is not assigned we try
362 	 * to assign one, which would fail. Programs already rely on this function
363 	 * to always return a valid current xid, so we should not change this to
364 	 * return NULL or similar invalid xid.
365 	 */
366 	PreventCommandDuringRecovery("pg_current_xact_id()");
367 
368 	PG_RETURN_FULLTRANSACTIONID(GetTopFullTransactionId());
369 }
370 
371 /*
372  * Same as pg_current_xact_if_assigned() but doesn't assign a new xid if there
373  * isn't one yet.
374  */
375 Datum
pg_current_xact_id_if_assigned(PG_FUNCTION_ARGS)376 pg_current_xact_id_if_assigned(PG_FUNCTION_ARGS)
377 {
378 	FullTransactionId topfxid = GetTopFullTransactionIdIfAny();
379 
380 	if (!FullTransactionIdIsValid(topfxid))
381 		PG_RETURN_NULL();
382 
383 	PG_RETURN_FULLTRANSACTIONID(topfxid);
384 }
385 
386 /*
387  * pg_current_snapshot() returns pg_snapshot
388  *
389  *		Return current snapshot
390  *
391  * Note that only top-transaction XIDs are included in the snapshot.
392  */
393 Datum
pg_current_snapshot(PG_FUNCTION_ARGS)394 pg_current_snapshot(PG_FUNCTION_ARGS)
395 {
396 	pg_snapshot *snap;
397 	uint32		nxip,
398 				i;
399 	Snapshot	cur;
400 	FullTransactionId next_fxid = ReadNextFullTransactionId();
401 
402 	cur = GetActiveSnapshot();
403 	if (cur == NULL)
404 		elog(ERROR, "no active snapshot set");
405 
406 	/*
407 	 * Compile-time limits on the procarray (MAX_BACKENDS processes plus
408 	 * MAX_BACKENDS prepared transactions) guarantee nxip won't be too large.
409 	 */
410 	StaticAssertStmt(MAX_BACKENDS * 2 <= PG_SNAPSHOT_MAX_NXIP,
411 					 "possible overflow in pg_current_snapshot()");
412 
413 	/* allocate */
414 	nxip = cur->xcnt;
415 	snap = palloc(PG_SNAPSHOT_SIZE(nxip));
416 
417 	/* fill */
418 	snap->xmin = widen_snapshot_xid(cur->xmin, next_fxid);
419 	snap->xmax = widen_snapshot_xid(cur->xmax, next_fxid);
420 	snap->nxip = nxip;
421 	for (i = 0; i < nxip; i++)
422 		snap->xip[i] = widen_snapshot_xid(cur->xip[i], next_fxid);
423 
424 	/*
425 	 * We want them guaranteed to be in ascending order.  This also removes
426 	 * any duplicate xids.  Normally, an XID can only be assigned to one
427 	 * backend, but when preparing a transaction for two-phase commit, there
428 	 * is a transient state when both the original backend and the dummy
429 	 * PGPROC entry reserved for the prepared transaction hold the same XID.
430 	 */
431 	sort_snapshot(snap);
432 
433 	/* set size after sorting, because it may have removed duplicate xips */
434 	SET_VARSIZE(snap, PG_SNAPSHOT_SIZE(snap->nxip));
435 
436 	PG_RETURN_POINTER(snap);
437 }
438 
439 /*
440  * pg_snapshot_in(cstring) returns pg_snapshot
441  *
442  *		input function for type pg_snapshot
443  */
444 Datum
pg_snapshot_in(PG_FUNCTION_ARGS)445 pg_snapshot_in(PG_FUNCTION_ARGS)
446 {
447 	char	   *str = PG_GETARG_CSTRING(0);
448 	pg_snapshot *snap;
449 
450 	snap = parse_snapshot(str);
451 
452 	PG_RETURN_POINTER(snap);
453 }
454 
455 /*
456  * pg_snapshot_out(pg_snapshot) returns cstring
457  *
458  *		output function for type pg_snapshot
459  */
460 Datum
pg_snapshot_out(PG_FUNCTION_ARGS)461 pg_snapshot_out(PG_FUNCTION_ARGS)
462 {
463 	pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
464 	StringInfoData str;
465 	uint32		i;
466 
467 	initStringInfo(&str);
468 
469 	appendStringInfo(&str, UINT64_FORMAT ":",
470 					 U64FromFullTransactionId(snap->xmin));
471 	appendStringInfo(&str, UINT64_FORMAT ":",
472 					 U64FromFullTransactionId(snap->xmax));
473 
474 	for (i = 0; i < snap->nxip; i++)
475 	{
476 		if (i > 0)
477 			appendStringInfoChar(&str, ',');
478 		appendStringInfo(&str, UINT64_FORMAT,
479 						 U64FromFullTransactionId(snap->xip[i]));
480 	}
481 
482 	PG_RETURN_CSTRING(str.data);
483 }
484 
485 /*
486  * pg_snapshot_recv(internal) returns pg_snapshot
487  *
488  *		binary input function for type pg_snapshot
489  *
490  *		format: int4 nxip, int8 xmin, int8 xmax, int8 xip
491  */
492 Datum
pg_snapshot_recv(PG_FUNCTION_ARGS)493 pg_snapshot_recv(PG_FUNCTION_ARGS)
494 {
495 	StringInfo	buf = (StringInfo) PG_GETARG_POINTER(0);
496 	pg_snapshot *snap;
497 	FullTransactionId last = InvalidFullTransactionId;
498 	int			nxip;
499 	int			i;
500 	FullTransactionId xmin;
501 	FullTransactionId xmax;
502 
503 	/* load and validate nxip */
504 	nxip = pq_getmsgint(buf, 4);
505 	if (nxip < 0 || nxip > PG_SNAPSHOT_MAX_NXIP)
506 		goto bad_format;
507 
508 	xmin = FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
509 	xmax = FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
510 	if (!FullTransactionIdIsValid(xmin) ||
511 		!FullTransactionIdIsValid(xmax) ||
512 		FullTransactionIdPrecedes(xmax, xmin))
513 		goto bad_format;
514 
515 	snap = palloc(PG_SNAPSHOT_SIZE(nxip));
516 	snap->xmin = xmin;
517 	snap->xmax = xmax;
518 
519 	for (i = 0; i < nxip; i++)
520 	{
521 		FullTransactionId cur =
522 		FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
523 
524 		if (FullTransactionIdPrecedes(cur, last) ||
525 			FullTransactionIdPrecedes(cur, xmin) ||
526 			FullTransactionIdPrecedes(xmax, cur))
527 			goto bad_format;
528 
529 		/* skip duplicate xips */
530 		if (FullTransactionIdEquals(cur, last))
531 		{
532 			i--;
533 			nxip--;
534 			continue;
535 		}
536 
537 		snap->xip[i] = cur;
538 		last = cur;
539 	}
540 	snap->nxip = nxip;
541 	SET_VARSIZE(snap, PG_SNAPSHOT_SIZE(nxip));
542 	PG_RETURN_POINTER(snap);
543 
544 bad_format:
545 	ereport(ERROR,
546 			(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
547 			 errmsg("invalid external pg_snapshot data")));
548 	PG_RETURN_POINTER(NULL);	/* keep compiler quiet */
549 }
550 
551 /*
552  * pg_snapshot_send(pg_snapshot) returns bytea
553  *
554  *		binary output function for type pg_snapshot
555  *
556  *		format: int4 nxip, u64 xmin, u64 xmax, u64 xip...
557  */
558 Datum
pg_snapshot_send(PG_FUNCTION_ARGS)559 pg_snapshot_send(PG_FUNCTION_ARGS)
560 {
561 	pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
562 	StringInfoData buf;
563 	uint32		i;
564 
565 	pq_begintypsend(&buf);
566 	pq_sendint32(&buf, snap->nxip);
567 	pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xmin));
568 	pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xmax));
569 	for (i = 0; i < snap->nxip; i++)
570 		pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xip[i]));
571 	PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
572 }
573 
574 /*
575  * pg_visible_in_snapshot(xid8, pg_snapshot) returns bool
576  *
577  *		is txid visible in snapshot ?
578  */
579 Datum
pg_visible_in_snapshot(PG_FUNCTION_ARGS)580 pg_visible_in_snapshot(PG_FUNCTION_ARGS)
581 {
582 	FullTransactionId value = PG_GETARG_FULLTRANSACTIONID(0);
583 	pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(1);
584 
585 	PG_RETURN_BOOL(is_visible_fxid(value, snap));
586 }
587 
588 /*
589  * pg_snapshot_xmin(pg_snapshot) returns xid8
590  *
591  *		return snapshot's xmin
592  */
593 Datum
pg_snapshot_xmin(PG_FUNCTION_ARGS)594 pg_snapshot_xmin(PG_FUNCTION_ARGS)
595 {
596 	pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
597 
598 	PG_RETURN_FULLTRANSACTIONID(snap->xmin);
599 }
600 
601 /*
602  * pg_snapshot_xmax(pg_snapshot) returns xid8
603  *
604  *		return snapshot's xmax
605  */
606 Datum
pg_snapshot_xmax(PG_FUNCTION_ARGS)607 pg_snapshot_xmax(PG_FUNCTION_ARGS)
608 {
609 	pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
610 
611 	PG_RETURN_FULLTRANSACTIONID(snap->xmax);
612 }
613 
614 /*
615  * pg_snapshot_xip(pg_snapshot) returns setof xid8
616  *
617  *		return in-progress xid8s in snapshot.
618  */
619 Datum
pg_snapshot_xip(PG_FUNCTION_ARGS)620 pg_snapshot_xip(PG_FUNCTION_ARGS)
621 {
622 	FuncCallContext *fctx;
623 	pg_snapshot *snap;
624 	FullTransactionId value;
625 
626 	/* on first call initialize fctx and get copy of snapshot */
627 	if (SRF_IS_FIRSTCALL())
628 	{
629 		pg_snapshot *arg = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
630 
631 		fctx = SRF_FIRSTCALL_INIT();
632 
633 		/* make a copy of user snapshot */
634 		snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
635 		memcpy(snap, arg, VARSIZE(arg));
636 
637 		fctx->user_fctx = snap;
638 	}
639 
640 	/* return values one-by-one */
641 	fctx = SRF_PERCALL_SETUP();
642 	snap = fctx->user_fctx;
643 	if (fctx->call_cntr < snap->nxip)
644 	{
645 		value = snap->xip[fctx->call_cntr];
646 		SRF_RETURN_NEXT(fctx, FullTransactionIdGetDatum(value));
647 	}
648 	else
649 	{
650 		SRF_RETURN_DONE(fctx);
651 	}
652 }
653 
654 /*
655  * Report the status of a recent transaction ID, or null for wrapped,
656  * truncated away or otherwise too old XIDs.
657  *
658  * The passed epoch-qualified xid is treated as a normal xid, not a
659  * multixact id.
660  *
661  * If it points to a committed subxact the result is the subxact status even
662  * though the parent xact may still be in progress or may have aborted.
663  */
664 Datum
pg_xact_status(PG_FUNCTION_ARGS)665 pg_xact_status(PG_FUNCTION_ARGS)
666 {
667 	const char *status;
668 	FullTransactionId fxid = PG_GETARG_FULLTRANSACTIONID(0);
669 	TransactionId xid;
670 
671 	/*
672 	 * We must protect against concurrent truncation of clog entries to avoid
673 	 * an I/O error on SLRU lookup.
674 	 */
675 	LWLockAcquire(XactTruncationLock, LW_SHARED);
676 	if (TransactionIdInRecentPast(fxid, &xid))
677 	{
678 		Assert(TransactionIdIsValid(xid));
679 
680 		if (TransactionIdIsCurrentTransactionId(xid))
681 			status = "in progress";
682 		else if (TransactionIdDidCommit(xid))
683 			status = "committed";
684 		else if (TransactionIdDidAbort(xid))
685 			status = "aborted";
686 		else
687 		{
688 			/*
689 			 * The xact is not marked as either committed or aborted in clog.
690 			 *
691 			 * It could be a transaction that ended without updating clog or
692 			 * writing an abort record due to a crash. We can safely assume
693 			 * it's aborted if it isn't committed and is older than our
694 			 * snapshot xmin.
695 			 *
696 			 * Otherwise it must be in-progress (or have been at the time we
697 			 * checked commit/abort status).
698 			 */
699 			if (TransactionIdPrecedes(xid, GetActiveSnapshot()->xmin))
700 				status = "aborted";
701 			else
702 				status = "in progress";
703 		}
704 	}
705 	else
706 	{
707 		status = NULL;
708 	}
709 	LWLockRelease(XactTruncationLock);
710 
711 	if (status == NULL)
712 		PG_RETURN_NULL();
713 	else
714 		PG_RETURN_TEXT_P(cstring_to_text(status));
715 }
716