1 /*-------------------------------------------------------------------------
2  * txid.c
3  *
4  *	Export internal transaction IDs to user level.
5  *
6  * Note that only top-level transaction IDs are ever converted to TXID.
7  * This is important because TXIDs frequently persist beyond the global
8  * xmin horizon, or may even be shipped to other machines, so we cannot
9  * rely on being able to correlate subtransaction IDs with their parents
10  * via functions such as SubTransGetTopmostTransaction().
11  *
12  *
13  *	Copyright (c) 2003-2016, PostgreSQL Global Development Group
14  *	Author: Jan Wieck, Afilias USA INC.
15  *	64-bit txids: Marko Kreen, Skype Technologies
16  *
17  *	src/backend/utils/adt/txid.c
18  *
19  *-------------------------------------------------------------------------
20  */
21 
22 #include "postgres.h"
23 
24 #include "access/transam.h"
25 #include "access/xact.h"
26 #include "access/xlog.h"
27 #include "funcapi.h"
28 #include "miscadmin.h"
29 #include "libpq/pqformat.h"
30 #include "postmaster/postmaster.h"
31 #include "utils/builtins.h"
32 #include "utils/memutils.h"
33 #include "utils/snapmgr.h"
34 
35 
36 /* txid will be signed int8 in database, so must limit to 63 bits */
37 #define MAX_TXID   ((uint64) PG_INT64_MAX)
38 
39 /* Use unsigned variant internally */
40 typedef uint64 txid;
41 
42 /* sprintf format code for uint64 */
43 #define TXID_FMT UINT64_FORMAT
44 
45 /*
46  * If defined, use bsearch() function for searching for txids in snapshots
47  * that have more than the specified number of values.
48  */
49 #define USE_BSEARCH_IF_NXIP_GREATER 30
50 
51 
52 /*
53  * Snapshot containing 8byte txids.
54  */
55 typedef struct
56 {
57 	/*
58 	 * 4-byte length hdr, should not be touched directly.
59 	 *
60 	 * Explicit embedding is ok as we want always correct alignment anyway.
61 	 */
62 	int32		__varsz;
63 
64 	uint32		nxip;			/* number of txids in xip array */
65 	txid		xmin;
66 	txid		xmax;
67 	/* in-progress txids, xmin <= xip[i] < xmax: */
68 	txid		xip[FLEXIBLE_ARRAY_MEMBER];
69 } TxidSnapshot;
70 
71 #define TXID_SNAPSHOT_SIZE(nxip) \
72 	(offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
73 #define TXID_SNAPSHOT_MAX_NXIP \
74 	((MaxAllocSize - offsetof(TxidSnapshot, xip)) / sizeof(txid))
75 
76 /*
77  * Epoch values from xact.c
78  */
79 typedef struct
80 {
81 	TransactionId last_xid;
82 	uint32		epoch;
83 } TxidEpoch;
84 
85 
86 /*
87  * Fetch epoch data from xact.c.
88  */
89 static void
load_xid_epoch(TxidEpoch * state)90 load_xid_epoch(TxidEpoch *state)
91 {
92 	GetNextXidAndEpoch(&state->last_xid, &state->epoch);
93 }
94 
95 /*
96  * do a TransactionId -> txid conversion for an XID near the given epoch
97  */
98 static txid
convert_xid(TransactionId xid,const TxidEpoch * state)99 convert_xid(TransactionId xid, const TxidEpoch *state)
100 {
101 	uint64		epoch;
102 
103 	/* return special xid's as-is */
104 	if (!TransactionIdIsNormal(xid))
105 		return (txid) xid;
106 
107 	/* xid can be on either side when near wrap-around */
108 	epoch = (uint64) state->epoch;
109 	if (xid > state->last_xid &&
110 		TransactionIdPrecedes(xid, state->last_xid))
111 		epoch--;
112 	else if (xid < state->last_xid &&
113 			 TransactionIdFollows(xid, state->last_xid))
114 		epoch++;
115 
116 	return (epoch << 32) | xid;
117 }
118 
119 /*
120  * txid comparator for qsort/bsearch
121  */
122 static int
cmp_txid(const void * aa,const void * bb)123 cmp_txid(const void *aa, const void *bb)
124 {
125 	txid		a = *(const txid *) aa;
126 	txid		b = *(const txid *) bb;
127 
128 	if (a < b)
129 		return -1;
130 	if (a > b)
131 		return 1;
132 	return 0;
133 }
134 
135 /*
136  * Sort a snapshot's txids, so we can use bsearch() later.  Also remove
137  * any duplicates.
138  *
139  * For consistency of on-disk representation, we always sort even if bsearch
140  * will not be used.
141  */
142 static void
sort_snapshot(TxidSnapshot * snap)143 sort_snapshot(TxidSnapshot *snap)
144 {
145 	txid		last = 0;
146 	int			nxip,
147 				idx1,
148 				idx2;
149 
150 	if (snap->nxip > 1)
151 	{
152 		qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
153 
154 		/* remove duplicates */
155 		nxip = snap->nxip;
156 		idx1 = idx2 = 0;
157 		while (idx1 < nxip)
158 		{
159 			if (snap->xip[idx1] != last)
160 				last = snap->xip[idx2++] = snap->xip[idx1];
161 			else
162 				snap->nxip--;
163 			idx1++;
164 		}
165 	}
166 }
167 
168 /*
169  * check txid visibility.
170  */
171 static bool
is_visible_txid(txid value,const TxidSnapshot * snap)172 is_visible_txid(txid value, const TxidSnapshot *snap)
173 {
174 	if (value < snap->xmin)
175 		return true;
176 	else if (value >= snap->xmax)
177 		return false;
178 #ifdef USE_BSEARCH_IF_NXIP_GREATER
179 	else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
180 	{
181 		void	   *res;
182 
183 		res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
184 		/* if found, transaction is still in progress */
185 		return (res) ? false : true;
186 	}
187 #endif
188 	else
189 	{
190 		uint32		i;
191 
192 		for (i = 0; i < snap->nxip; i++)
193 		{
194 			if (value == snap->xip[i])
195 				return false;
196 		}
197 		return true;
198 	}
199 }
200 
201 /*
202  * helper functions to use StringInfo for TxidSnapshot creation.
203  */
204 
205 static StringInfo
buf_init(txid xmin,txid xmax)206 buf_init(txid xmin, txid xmax)
207 {
208 	TxidSnapshot snap;
209 	StringInfo	buf;
210 
211 	snap.xmin = xmin;
212 	snap.xmax = xmax;
213 	snap.nxip = 0;
214 
215 	buf = makeStringInfo();
216 	appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
217 	return buf;
218 }
219 
220 static void
buf_add_txid(StringInfo buf,txid xid)221 buf_add_txid(StringInfo buf, txid xid)
222 {
223 	TxidSnapshot *snap = (TxidSnapshot *) buf->data;
224 
225 	/* do this before possible realloc */
226 	snap->nxip++;
227 
228 	appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
229 }
230 
231 static TxidSnapshot *
buf_finalize(StringInfo buf)232 buf_finalize(StringInfo buf)
233 {
234 	TxidSnapshot *snap = (TxidSnapshot *) buf->data;
235 
236 	SET_VARSIZE(snap, buf->len);
237 
238 	/* buf is not needed anymore */
239 	buf->data = NULL;
240 	pfree(buf);
241 
242 	return snap;
243 }
244 
245 /*
246  * simple number parser.
247  *
248  * We return 0 on error, which is invalid value for txid.
249  */
250 static txid
str2txid(const char * s,const char ** endp)251 str2txid(const char *s, const char **endp)
252 {
253 	txid		val = 0;
254 	txid		cutoff = MAX_TXID / 10;
255 	txid		cutlim = MAX_TXID % 10;
256 
257 	for (; *s; s++)
258 	{
259 		unsigned	d;
260 
261 		if (*s < '0' || *s > '9')
262 			break;
263 		d = *s - '0';
264 
265 		/*
266 		 * check for overflow
267 		 */
268 		if (val > cutoff || (val == cutoff && d > cutlim))
269 		{
270 			val = 0;
271 			break;
272 		}
273 
274 		val = val * 10 + d;
275 	}
276 	if (endp)
277 		*endp = s;
278 	return val;
279 }
280 
281 /*
282  * parse snapshot from cstring
283  */
284 static TxidSnapshot *
parse_snapshot(const char * str)285 parse_snapshot(const char *str)
286 {
287 	txid		xmin;
288 	txid		xmax;
289 	txid		last_val = 0,
290 				val;
291 	const char *str_start = str;
292 	const char *endp;
293 	StringInfo	buf;
294 
295 	xmin = str2txid(str, &endp);
296 	if (*endp != ':')
297 		goto bad_format;
298 	str = endp + 1;
299 
300 	xmax = str2txid(str, &endp);
301 	if (*endp != ':')
302 		goto bad_format;
303 	str = endp + 1;
304 
305 	/* it should look sane */
306 	if (xmin == 0 || xmax == 0 || xmin > xmax)
307 		goto bad_format;
308 
309 	/* allocate buffer */
310 	buf = buf_init(xmin, xmax);
311 
312 	/* loop over values */
313 	while (*str != '\0')
314 	{
315 		/* read next value */
316 		val = str2txid(str, &endp);
317 		str = endp;
318 
319 		/* require the input to be in order */
320 		if (val < xmin || val >= xmax || val < last_val)
321 			goto bad_format;
322 
323 		/* skip duplicates */
324 		if (val != last_val)
325 			buf_add_txid(buf, val);
326 		last_val = val;
327 
328 		if (*str == ',')
329 			str++;
330 		else if (*str != '\0')
331 			goto bad_format;
332 	}
333 
334 	return buf_finalize(buf);
335 
336 bad_format:
337 	ereport(ERROR,
338 			(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
339 			 errmsg("invalid input syntax for type txid_snapshot: \"%s\"",
340 					str_start)));
341 	return NULL;				/* keep compiler quiet */
342 }
343 
344 /*
345  * Public functions.
346  *
347  * txid_current() and txid_current_snapshot() are the only ones that
348  * communicate with core xid machinery.  All the others work on data
349  * returned by them.
350  */
351 
352 /*
353  * txid_current() returns int8
354  *
355  *	Return the current toplevel transaction ID as TXID
356  *	If the current transaction does not have one, one is assigned.
357  */
358 Datum
txid_current(PG_FUNCTION_ARGS)359 txid_current(PG_FUNCTION_ARGS)
360 {
361 	txid		val;
362 	TxidEpoch	state;
363 
364 	/*
365 	 * Must prevent during recovery because if an xid is not assigned we try
366 	 * to assign one, which would fail. Programs already rely on this function
367 	 * to always return a valid current xid, so we should not change this to
368 	 * return NULL or similar invalid xid.
369 	 */
370 	PreventCommandDuringRecovery("txid_current()");
371 
372 	load_xid_epoch(&state);
373 
374 	val = convert_xid(GetTopTransactionId(), &state);
375 
376 	PG_RETURN_INT64(val);
377 }
378 
379 /*
380  * txid_current_snapshot() returns txid_snapshot
381  *
382  *		Return current snapshot in TXID format
383  *
384  * Note that only top-transaction XIDs are included in the snapshot.
385  */
386 Datum
txid_current_snapshot(PG_FUNCTION_ARGS)387 txid_current_snapshot(PG_FUNCTION_ARGS)
388 {
389 	TxidSnapshot *snap;
390 	uint32		nxip,
391 				i;
392 	TxidEpoch	state;
393 	Snapshot	cur;
394 
395 	cur = GetActiveSnapshot();
396 	if (cur == NULL)
397 		elog(ERROR, "no active snapshot set");
398 
399 	load_xid_epoch(&state);
400 
401 	/*
402 	 * Compile-time limits on the procarray (MAX_BACKENDS processes plus
403 	 * MAX_BACKENDS prepared transactions) guarantee nxip won't be too large.
404 	 */
405 	StaticAssertStmt(MAX_BACKENDS * 2 <= TXID_SNAPSHOT_MAX_NXIP,
406 					 "possible overflow in txid_current_snapshot()");
407 
408 	/* allocate */
409 	nxip = cur->xcnt;
410 	snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
411 
412 	/* fill */
413 	snap->xmin = convert_xid(cur->xmin, &state);
414 	snap->xmax = convert_xid(cur->xmax, &state);
415 	snap->nxip = nxip;
416 	for (i = 0; i < nxip; i++)
417 		snap->xip[i] = convert_xid(cur->xip[i], &state);
418 
419 	/*
420 	 * We want them guaranteed to be in ascending order.  This also removes
421 	 * any duplicate xids.  Normally, an XID can only be assigned to one
422 	 * backend, but when preparing a transaction for two-phase commit, there
423 	 * is a transient state when both the original backend and the dummy
424 	 * PGPROC entry reserved for the prepared transaction hold the same XID.
425 	 */
426 	sort_snapshot(snap);
427 
428 	/* set size after sorting, because it may have removed duplicate xips */
429 	SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(snap->nxip));
430 
431 	PG_RETURN_POINTER(snap);
432 }
433 
434 /*
435  * txid_snapshot_in(cstring) returns txid_snapshot
436  *
437  *		input function for type txid_snapshot
438  */
439 Datum
txid_snapshot_in(PG_FUNCTION_ARGS)440 txid_snapshot_in(PG_FUNCTION_ARGS)
441 {
442 	char	   *str = PG_GETARG_CSTRING(0);
443 	TxidSnapshot *snap;
444 
445 	snap = parse_snapshot(str);
446 
447 	PG_RETURN_POINTER(snap);
448 }
449 
450 /*
451  * txid_snapshot_out(txid_snapshot) returns cstring
452  *
453  *		output function for type txid_snapshot
454  */
455 Datum
txid_snapshot_out(PG_FUNCTION_ARGS)456 txid_snapshot_out(PG_FUNCTION_ARGS)
457 {
458 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
459 	StringInfoData str;
460 	uint32		i;
461 
462 	initStringInfo(&str);
463 
464 	appendStringInfo(&str, TXID_FMT ":", snap->xmin);
465 	appendStringInfo(&str, TXID_FMT ":", snap->xmax);
466 
467 	for (i = 0; i < snap->nxip; i++)
468 	{
469 		if (i > 0)
470 			appendStringInfoChar(&str, ',');
471 		appendStringInfo(&str, TXID_FMT, snap->xip[i]);
472 	}
473 
474 	PG_RETURN_CSTRING(str.data);
475 }
476 
477 /*
478  * txid_snapshot_recv(internal) returns txid_snapshot
479  *
480  *		binary input function for type txid_snapshot
481  *
482  *		format: int4 nxip, int8 xmin, int8 xmax, int8 xip
483  */
484 Datum
txid_snapshot_recv(PG_FUNCTION_ARGS)485 txid_snapshot_recv(PG_FUNCTION_ARGS)
486 {
487 	StringInfo	buf = (StringInfo) PG_GETARG_POINTER(0);
488 	TxidSnapshot *snap;
489 	txid		last = 0;
490 	int			nxip;
491 	int			i;
492 	txid		xmin,
493 				xmax;
494 
495 	/* load and validate nxip */
496 	nxip = pq_getmsgint(buf, 4);
497 	if (nxip < 0 || nxip > TXID_SNAPSHOT_MAX_NXIP)
498 		goto bad_format;
499 
500 	xmin = pq_getmsgint64(buf);
501 	xmax = pq_getmsgint64(buf);
502 	if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
503 		goto bad_format;
504 
505 	snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
506 	snap->xmin = xmin;
507 	snap->xmax = xmax;
508 
509 	for (i = 0; i < nxip; i++)
510 	{
511 		txid		cur = pq_getmsgint64(buf);
512 
513 		if (cur < last || cur < xmin || cur >= xmax)
514 			goto bad_format;
515 
516 		/* skip duplicate xips */
517 		if (cur == last)
518 		{
519 			i--;
520 			nxip--;
521 			continue;
522 		}
523 
524 		snap->xip[i] = cur;
525 		last = cur;
526 	}
527 	snap->nxip = nxip;
528 	SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
529 	PG_RETURN_POINTER(snap);
530 
531 bad_format:
532 	ereport(ERROR,
533 			(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
534 			 errmsg("invalid external txid_snapshot data")));
535 	PG_RETURN_POINTER(NULL);	/* keep compiler quiet */
536 }
537 
538 /*
539  * txid_snapshot_send(txid_snapshot) returns bytea
540  *
541  *		binary output function for type txid_snapshot
542  *
543  *		format: int4 nxip, int8 xmin, int8 xmax, int8 xip
544  */
545 Datum
txid_snapshot_send(PG_FUNCTION_ARGS)546 txid_snapshot_send(PG_FUNCTION_ARGS)
547 {
548 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
549 	StringInfoData buf;
550 	uint32		i;
551 
552 	pq_begintypsend(&buf);
553 	pq_sendint(&buf, snap->nxip, 4);
554 	pq_sendint64(&buf, snap->xmin);
555 	pq_sendint64(&buf, snap->xmax);
556 	for (i = 0; i < snap->nxip; i++)
557 		pq_sendint64(&buf, snap->xip[i]);
558 	PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
559 }
560 
561 /*
562  * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
563  *
564  *		is txid visible in snapshot ?
565  */
566 Datum
txid_visible_in_snapshot(PG_FUNCTION_ARGS)567 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
568 {
569 	txid		value = PG_GETARG_INT64(0);
570 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
571 
572 	PG_RETURN_BOOL(is_visible_txid(value, snap));
573 }
574 
575 /*
576  * txid_snapshot_xmin(txid_snapshot) returns int8
577  *
578  *		return snapshot's xmin
579  */
580 Datum
txid_snapshot_xmin(PG_FUNCTION_ARGS)581 txid_snapshot_xmin(PG_FUNCTION_ARGS)
582 {
583 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
584 
585 	PG_RETURN_INT64(snap->xmin);
586 }
587 
588 /*
589  * txid_snapshot_xmax(txid_snapshot) returns int8
590  *
591  *		return snapshot's xmax
592  */
593 Datum
txid_snapshot_xmax(PG_FUNCTION_ARGS)594 txid_snapshot_xmax(PG_FUNCTION_ARGS)
595 {
596 	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
597 
598 	PG_RETURN_INT64(snap->xmax);
599 }
600 
601 /*
602  * txid_snapshot_xip(txid_snapshot) returns setof int8
603  *
604  *		return in-progress TXIDs in snapshot.
605  */
606 Datum
txid_snapshot_xip(PG_FUNCTION_ARGS)607 txid_snapshot_xip(PG_FUNCTION_ARGS)
608 {
609 	FuncCallContext *fctx;
610 	TxidSnapshot *snap;
611 	txid		value;
612 
613 	/* on first call initialize snap_state and get copy of snapshot */
614 	if (SRF_IS_FIRSTCALL())
615 	{
616 		TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
617 
618 		fctx = SRF_FIRSTCALL_INIT();
619 
620 		/* make a copy of user snapshot */
621 		snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
622 		memcpy(snap, arg, VARSIZE(arg));
623 
624 		fctx->user_fctx = snap;
625 	}
626 
627 	/* return values one-by-one */
628 	fctx = SRF_PERCALL_SETUP();
629 	snap = fctx->user_fctx;
630 	if (fctx->call_cntr < snap->nxip)
631 	{
632 		value = snap->xip[fctx->call_cntr];
633 		SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
634 	}
635 	else
636 	{
637 		SRF_RETURN_DONE(fctx);
638 	}
639 }
640