1 /*-------------------------------------------------------------------------
2 * txid.c
3 *
4 * Export internal transaction IDs to user level.
5 *
6 * Note that only top-level transaction IDs are ever converted to TXID.
7 * This is important because TXIDs frequently persist beyond the global
8 * xmin horizon, or may even be shipped to other machines, so we cannot
9 * rely on being able to correlate subtransaction IDs with their parents
10 * via functions such as SubTransGetTopmostTransaction().
11 *
12 *
13 * Copyright (c) 2003-2016, PostgreSQL Global Development Group
14 * Author: Jan Wieck, Afilias USA INC.
15 * 64-bit txids: Marko Kreen, Skype Technologies
16 *
17 * src/backend/utils/adt/txid.c
18 *
19 *-------------------------------------------------------------------------
20 */
21
22 #include "postgres.h"
23
24 #include "access/transam.h"
25 #include "access/xact.h"
26 #include "access/xlog.h"
27 #include "funcapi.h"
28 #include "miscadmin.h"
29 #include "libpq/pqformat.h"
30 #include "postmaster/postmaster.h"
31 #include "utils/builtins.h"
32 #include "utils/memutils.h"
33 #include "utils/snapmgr.h"
34
35
36 /* txid will be signed int8 in database, so must limit to 63 bits */
37 #define MAX_TXID ((uint64) PG_INT64_MAX)
38
39 /* Use unsigned variant internally */
40 typedef uint64 txid;
41
42 /* sprintf format code for uint64 */
43 #define TXID_FMT UINT64_FORMAT
44
45 /*
46 * If defined, use bsearch() function for searching for txids in snapshots
47 * that have more than the specified number of values.
48 */
49 #define USE_BSEARCH_IF_NXIP_GREATER 30
50
51
52 /*
53 * Snapshot containing 8byte txids.
54 */
55 typedef struct
56 {
57 /*
58 * 4-byte length hdr, should not be touched directly.
59 *
60 * Explicit embedding is ok as we want always correct alignment anyway.
61 */
62 int32 __varsz;
63
64 uint32 nxip; /* number of txids in xip array */
65 txid xmin;
66 txid xmax;
67 /* in-progress txids, xmin <= xip[i] < xmax: */
68 txid xip[FLEXIBLE_ARRAY_MEMBER];
69 } TxidSnapshot;
70
71 #define TXID_SNAPSHOT_SIZE(nxip) \
72 (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
73 #define TXID_SNAPSHOT_MAX_NXIP \
74 ((MaxAllocSize - offsetof(TxidSnapshot, xip)) / sizeof(txid))
75
76 /*
77 * Epoch values from xact.c
78 */
79 typedef struct
80 {
81 TransactionId last_xid;
82 uint32 epoch;
83 } TxidEpoch;
84
85
86 /*
87 * Fetch epoch data from xact.c.
88 */
89 static void
load_xid_epoch(TxidEpoch * state)90 load_xid_epoch(TxidEpoch *state)
91 {
92 GetNextXidAndEpoch(&state->last_xid, &state->epoch);
93 }
94
95 /*
96 * do a TransactionId -> txid conversion for an XID near the given epoch
97 */
98 static txid
convert_xid(TransactionId xid,const TxidEpoch * state)99 convert_xid(TransactionId xid, const TxidEpoch *state)
100 {
101 uint64 epoch;
102
103 /* return special xid's as-is */
104 if (!TransactionIdIsNormal(xid))
105 return (txid) xid;
106
107 /* xid can be on either side when near wrap-around */
108 epoch = (uint64) state->epoch;
109 if (xid > state->last_xid &&
110 TransactionIdPrecedes(xid, state->last_xid))
111 epoch--;
112 else if (xid < state->last_xid &&
113 TransactionIdFollows(xid, state->last_xid))
114 epoch++;
115
116 return (epoch << 32) | xid;
117 }
118
119 /*
120 * txid comparator for qsort/bsearch
121 */
122 static int
cmp_txid(const void * aa,const void * bb)123 cmp_txid(const void *aa, const void *bb)
124 {
125 txid a = *(const txid *) aa;
126 txid b = *(const txid *) bb;
127
128 if (a < b)
129 return -1;
130 if (a > b)
131 return 1;
132 return 0;
133 }
134
135 /*
136 * Sort a snapshot's txids, so we can use bsearch() later. Also remove
137 * any duplicates.
138 *
139 * For consistency of on-disk representation, we always sort even if bsearch
140 * will not be used.
141 */
142 static void
sort_snapshot(TxidSnapshot * snap)143 sort_snapshot(TxidSnapshot *snap)
144 {
145 txid last = 0;
146 int nxip,
147 idx1,
148 idx2;
149
150 if (snap->nxip > 1)
151 {
152 qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
153
154 /* remove duplicates */
155 nxip = snap->nxip;
156 idx1 = idx2 = 0;
157 while (idx1 < nxip)
158 {
159 if (snap->xip[idx1] != last)
160 last = snap->xip[idx2++] = snap->xip[idx1];
161 else
162 snap->nxip--;
163 idx1++;
164 }
165 }
166 }
167
168 /*
169 * check txid visibility.
170 */
171 static bool
is_visible_txid(txid value,const TxidSnapshot * snap)172 is_visible_txid(txid value, const TxidSnapshot *snap)
173 {
174 if (value < snap->xmin)
175 return true;
176 else if (value >= snap->xmax)
177 return false;
178 #ifdef USE_BSEARCH_IF_NXIP_GREATER
179 else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
180 {
181 void *res;
182
183 res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
184 /* if found, transaction is still in progress */
185 return (res) ? false : true;
186 }
187 #endif
188 else
189 {
190 uint32 i;
191
192 for (i = 0; i < snap->nxip; i++)
193 {
194 if (value == snap->xip[i])
195 return false;
196 }
197 return true;
198 }
199 }
200
201 /*
202 * helper functions to use StringInfo for TxidSnapshot creation.
203 */
204
205 static StringInfo
buf_init(txid xmin,txid xmax)206 buf_init(txid xmin, txid xmax)
207 {
208 TxidSnapshot snap;
209 StringInfo buf;
210
211 snap.xmin = xmin;
212 snap.xmax = xmax;
213 snap.nxip = 0;
214
215 buf = makeStringInfo();
216 appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
217 return buf;
218 }
219
220 static void
buf_add_txid(StringInfo buf,txid xid)221 buf_add_txid(StringInfo buf, txid xid)
222 {
223 TxidSnapshot *snap = (TxidSnapshot *) buf->data;
224
225 /* do this before possible realloc */
226 snap->nxip++;
227
228 appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
229 }
230
231 static TxidSnapshot *
buf_finalize(StringInfo buf)232 buf_finalize(StringInfo buf)
233 {
234 TxidSnapshot *snap = (TxidSnapshot *) buf->data;
235
236 SET_VARSIZE(snap, buf->len);
237
238 /* buf is not needed anymore */
239 buf->data = NULL;
240 pfree(buf);
241
242 return snap;
243 }
244
245 /*
246 * simple number parser.
247 *
248 * We return 0 on error, which is invalid value for txid.
249 */
250 static txid
str2txid(const char * s,const char ** endp)251 str2txid(const char *s, const char **endp)
252 {
253 txid val = 0;
254 txid cutoff = MAX_TXID / 10;
255 txid cutlim = MAX_TXID % 10;
256
257 for (; *s; s++)
258 {
259 unsigned d;
260
261 if (*s < '0' || *s > '9')
262 break;
263 d = *s - '0';
264
265 /*
266 * check for overflow
267 */
268 if (val > cutoff || (val == cutoff && d > cutlim))
269 {
270 val = 0;
271 break;
272 }
273
274 val = val * 10 + d;
275 }
276 if (endp)
277 *endp = s;
278 return val;
279 }
280
281 /*
282 * parse snapshot from cstring
283 */
284 static TxidSnapshot *
parse_snapshot(const char * str)285 parse_snapshot(const char *str)
286 {
287 txid xmin;
288 txid xmax;
289 txid last_val = 0,
290 val;
291 const char *str_start = str;
292 const char *endp;
293 StringInfo buf;
294
295 xmin = str2txid(str, &endp);
296 if (*endp != ':')
297 goto bad_format;
298 str = endp + 1;
299
300 xmax = str2txid(str, &endp);
301 if (*endp != ':')
302 goto bad_format;
303 str = endp + 1;
304
305 /* it should look sane */
306 if (xmin == 0 || xmax == 0 || xmin > xmax)
307 goto bad_format;
308
309 /* allocate buffer */
310 buf = buf_init(xmin, xmax);
311
312 /* loop over values */
313 while (*str != '\0')
314 {
315 /* read next value */
316 val = str2txid(str, &endp);
317 str = endp;
318
319 /* require the input to be in order */
320 if (val < xmin || val >= xmax || val < last_val)
321 goto bad_format;
322
323 /* skip duplicates */
324 if (val != last_val)
325 buf_add_txid(buf, val);
326 last_val = val;
327
328 if (*str == ',')
329 str++;
330 else if (*str != '\0')
331 goto bad_format;
332 }
333
334 return buf_finalize(buf);
335
336 bad_format:
337 ereport(ERROR,
338 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
339 errmsg("invalid input syntax for type txid_snapshot: \"%s\"",
340 str_start)));
341 return NULL; /* keep compiler quiet */
342 }
343
344 /*
345 * Public functions.
346 *
347 * txid_current() and txid_current_snapshot() are the only ones that
348 * communicate with core xid machinery. All the others work on data
349 * returned by them.
350 */
351
352 /*
353 * txid_current() returns int8
354 *
355 * Return the current toplevel transaction ID as TXID
356 * If the current transaction does not have one, one is assigned.
357 */
358 Datum
txid_current(PG_FUNCTION_ARGS)359 txid_current(PG_FUNCTION_ARGS)
360 {
361 txid val;
362 TxidEpoch state;
363
364 /*
365 * Must prevent during recovery because if an xid is not assigned we try
366 * to assign one, which would fail. Programs already rely on this function
367 * to always return a valid current xid, so we should not change this to
368 * return NULL or similar invalid xid.
369 */
370 PreventCommandDuringRecovery("txid_current()");
371
372 load_xid_epoch(&state);
373
374 val = convert_xid(GetTopTransactionId(), &state);
375
376 PG_RETURN_INT64(val);
377 }
378
379 /*
380 * txid_current_snapshot() returns txid_snapshot
381 *
382 * Return current snapshot in TXID format
383 *
384 * Note that only top-transaction XIDs are included in the snapshot.
385 */
386 Datum
txid_current_snapshot(PG_FUNCTION_ARGS)387 txid_current_snapshot(PG_FUNCTION_ARGS)
388 {
389 TxidSnapshot *snap;
390 uint32 nxip,
391 i;
392 TxidEpoch state;
393 Snapshot cur;
394
395 cur = GetActiveSnapshot();
396 if (cur == NULL)
397 elog(ERROR, "no active snapshot set");
398
399 load_xid_epoch(&state);
400
401 /*
402 * Compile-time limits on the procarray (MAX_BACKENDS processes plus
403 * MAX_BACKENDS prepared transactions) guarantee nxip won't be too large.
404 */
405 StaticAssertStmt(MAX_BACKENDS * 2 <= TXID_SNAPSHOT_MAX_NXIP,
406 "possible overflow in txid_current_snapshot()");
407
408 /* allocate */
409 nxip = cur->xcnt;
410 snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
411
412 /* fill */
413 snap->xmin = convert_xid(cur->xmin, &state);
414 snap->xmax = convert_xid(cur->xmax, &state);
415 snap->nxip = nxip;
416 for (i = 0; i < nxip; i++)
417 snap->xip[i] = convert_xid(cur->xip[i], &state);
418
419 /*
420 * We want them guaranteed to be in ascending order. This also removes
421 * any duplicate xids. Normally, an XID can only be assigned to one
422 * backend, but when preparing a transaction for two-phase commit, there
423 * is a transient state when both the original backend and the dummy
424 * PGPROC entry reserved for the prepared transaction hold the same XID.
425 */
426 sort_snapshot(snap);
427
428 /* set size after sorting, because it may have removed duplicate xips */
429 SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(snap->nxip));
430
431 PG_RETURN_POINTER(snap);
432 }
433
434 /*
435 * txid_snapshot_in(cstring) returns txid_snapshot
436 *
437 * input function for type txid_snapshot
438 */
439 Datum
txid_snapshot_in(PG_FUNCTION_ARGS)440 txid_snapshot_in(PG_FUNCTION_ARGS)
441 {
442 char *str = PG_GETARG_CSTRING(0);
443 TxidSnapshot *snap;
444
445 snap = parse_snapshot(str);
446
447 PG_RETURN_POINTER(snap);
448 }
449
450 /*
451 * txid_snapshot_out(txid_snapshot) returns cstring
452 *
453 * output function for type txid_snapshot
454 */
455 Datum
txid_snapshot_out(PG_FUNCTION_ARGS)456 txid_snapshot_out(PG_FUNCTION_ARGS)
457 {
458 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
459 StringInfoData str;
460 uint32 i;
461
462 initStringInfo(&str);
463
464 appendStringInfo(&str, TXID_FMT ":", snap->xmin);
465 appendStringInfo(&str, TXID_FMT ":", snap->xmax);
466
467 for (i = 0; i < snap->nxip; i++)
468 {
469 if (i > 0)
470 appendStringInfoChar(&str, ',');
471 appendStringInfo(&str, TXID_FMT, snap->xip[i]);
472 }
473
474 PG_RETURN_CSTRING(str.data);
475 }
476
477 /*
478 * txid_snapshot_recv(internal) returns txid_snapshot
479 *
480 * binary input function for type txid_snapshot
481 *
482 * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
483 */
484 Datum
txid_snapshot_recv(PG_FUNCTION_ARGS)485 txid_snapshot_recv(PG_FUNCTION_ARGS)
486 {
487 StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
488 TxidSnapshot *snap;
489 txid last = 0;
490 int nxip;
491 int i;
492 txid xmin,
493 xmax;
494
495 /* load and validate nxip */
496 nxip = pq_getmsgint(buf, 4);
497 if (nxip < 0 || nxip > TXID_SNAPSHOT_MAX_NXIP)
498 goto bad_format;
499
500 xmin = pq_getmsgint64(buf);
501 xmax = pq_getmsgint64(buf);
502 if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
503 goto bad_format;
504
505 snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
506 snap->xmin = xmin;
507 snap->xmax = xmax;
508
509 for (i = 0; i < nxip; i++)
510 {
511 txid cur = pq_getmsgint64(buf);
512
513 if (cur < last || cur < xmin || cur >= xmax)
514 goto bad_format;
515
516 /* skip duplicate xips */
517 if (cur == last)
518 {
519 i--;
520 nxip--;
521 continue;
522 }
523
524 snap->xip[i] = cur;
525 last = cur;
526 }
527 snap->nxip = nxip;
528 SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
529 PG_RETURN_POINTER(snap);
530
531 bad_format:
532 ereport(ERROR,
533 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
534 errmsg("invalid external txid_snapshot data")));
535 PG_RETURN_POINTER(NULL); /* keep compiler quiet */
536 }
537
538 /*
539 * txid_snapshot_send(txid_snapshot) returns bytea
540 *
541 * binary output function for type txid_snapshot
542 *
543 * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
544 */
545 Datum
txid_snapshot_send(PG_FUNCTION_ARGS)546 txid_snapshot_send(PG_FUNCTION_ARGS)
547 {
548 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
549 StringInfoData buf;
550 uint32 i;
551
552 pq_begintypsend(&buf);
553 pq_sendint(&buf, snap->nxip, 4);
554 pq_sendint64(&buf, snap->xmin);
555 pq_sendint64(&buf, snap->xmax);
556 for (i = 0; i < snap->nxip; i++)
557 pq_sendint64(&buf, snap->xip[i]);
558 PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
559 }
560
561 /*
562 * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
563 *
564 * is txid visible in snapshot ?
565 */
566 Datum
txid_visible_in_snapshot(PG_FUNCTION_ARGS)567 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
568 {
569 txid value = PG_GETARG_INT64(0);
570 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
571
572 PG_RETURN_BOOL(is_visible_txid(value, snap));
573 }
574
575 /*
576 * txid_snapshot_xmin(txid_snapshot) returns int8
577 *
578 * return snapshot's xmin
579 */
580 Datum
txid_snapshot_xmin(PG_FUNCTION_ARGS)581 txid_snapshot_xmin(PG_FUNCTION_ARGS)
582 {
583 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
584
585 PG_RETURN_INT64(snap->xmin);
586 }
587
588 /*
589 * txid_snapshot_xmax(txid_snapshot) returns int8
590 *
591 * return snapshot's xmax
592 */
593 Datum
txid_snapshot_xmax(PG_FUNCTION_ARGS)594 txid_snapshot_xmax(PG_FUNCTION_ARGS)
595 {
596 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
597
598 PG_RETURN_INT64(snap->xmax);
599 }
600
601 /*
602 * txid_snapshot_xip(txid_snapshot) returns setof int8
603 *
604 * return in-progress TXIDs in snapshot.
605 */
606 Datum
txid_snapshot_xip(PG_FUNCTION_ARGS)607 txid_snapshot_xip(PG_FUNCTION_ARGS)
608 {
609 FuncCallContext *fctx;
610 TxidSnapshot *snap;
611 txid value;
612
613 /* on first call initialize snap_state and get copy of snapshot */
614 if (SRF_IS_FIRSTCALL())
615 {
616 TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
617
618 fctx = SRF_FIRSTCALL_INIT();
619
620 /* make a copy of user snapshot */
621 snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
622 memcpy(snap, arg, VARSIZE(arg));
623
624 fctx->user_fctx = snap;
625 }
626
627 /* return values one-by-one */
628 fctx = SRF_PERCALL_SETUP();
629 snap = fctx->user_fctx;
630 if (fctx->call_cntr < snap->nxip)
631 {
632 value = snap->xip[fctx->call_cntr];
633 SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
634 }
635 else
636 {
637 SRF_RETURN_DONE(fctx);
638 }
639 }
640