1 /*-------------------------------------------------------------------------
2 *
3 * commit_ts.c
4 * PostgreSQL commit timestamp manager
5 *
6 * This module is a pg_clog-like system that stores the commit timestamp
7 * for each transaction.
8 *
9 * XLOG interactions: this module generates an XLOG record whenever a new
10 * CommitTs page is initialized to zeroes. Also, one XLOG record is
11 * generated for setting of values when the caller requests it; this allows
12 * us to support values coming from places other than transaction commit.
13 * Other writes of CommitTS come from recording of transaction commit in
14 * xact.c, which generates its own XLOG records for these events and will
15 * re-perform the status update on redo; so we need make no additional XLOG
16 * entry here.
17 *
18 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
19 * Portions Copyright (c) 1994, Regents of the University of California
20 *
21 * src/backend/access/transam/commit_ts.c
22 *
23 *-------------------------------------------------------------------------
24 */
25 #include "postgres.h"
26
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
38
39 /*
40 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
41 * everywhere else in Postgres.
42 *
43 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44 * CommitTs page numbering also wraps around at
45 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
47 * explicit notice of that fact in this module, except when comparing segment
48 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
49 */
50
51 /*
52 * We need 8+2 bytes per xact. Note that enlarging this struct might mean
53 * the largest possible file name is more than 5 chars long; see
54 * SlruScanDirectory.
55 */
56 typedef struct CommitTimestampEntry
57 {
58 TimestampTz time;
59 RepOriginId nodeid;
60 } CommitTimestampEntry;
61
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63 sizeof(RepOriginId))
64
65 #define COMMIT_TS_XACTS_PER_PAGE \
66 (BLCKSZ / SizeOfCommitTimestampEntry)
67
68 #define TransactionIdToCTsPage(xid) \
69 ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid) \
71 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
72
73 /*
74 * Link to shared-memory data structures for CommitTs control
75 */
76 static SlruCtlData CommitTsCtlData;
77
78 #define CommitTsCtl (&CommitTsCtlData)
79
80 /*
81 * We keep a cache of the last value set in shared memory.
82 *
83 * This is also good place to keep the activation status. We keep this
84 * separate from the GUC so that the standby can activate the module if the
85 * primary has it active independently of the value of the GUC.
86 *
87 * This is protected by CommitTsLock. In some places, we use commitTsActive
88 * without acquiring the lock; where this happens, a comment explains the
89 * rationale for it.
90 */
91 typedef struct CommitTimestampShared
92 {
93 TransactionId xidLastCommit;
94 CommitTimestampEntry dataLastCommit;
95 bool commitTsActive;
96 } CommitTimestampShared;
97
98 CommitTimestampShared *commitTsShared;
99
100
101 /* GUC variable */
102 bool track_commit_timestamp;
103
104 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
105 TransactionId *subxids, TimestampTz ts,
106 RepOriginId nodeid, int pageno);
107 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
108 RepOriginId nodeid, int slotno);
109 static void error_commit_ts_disabled(void);
110 static int ZeroCommitTsPage(int pageno, bool writeXlog);
111 static bool CommitTsPagePrecedes(int page1, int page2);
112 static void ActivateCommitTs(void);
113 static void DeactivateCommitTs(void);
114 static void WriteZeroPageXlogRec(int pageno);
115 static void WriteTruncateXlogRec(int pageno);
116 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
117 TransactionId *subxids, TimestampTz timestamp,
118 RepOriginId nodeid);
119
120 /*
121 * TransactionTreeSetCommitTsData
122 *
123 * Record the final commit timestamp of transaction entries in the commit log
124 * for a transaction and its subtransaction tree, as efficiently as possible.
125 *
126 * xid is the top level transaction id.
127 *
128 * subxids is an array of xids of length nsubxids, representing subtransactions
129 * in the tree of xid. In various cases nsubxids may be zero.
130 * The reason why tracking just the parent xid commit timestamp is not enough
131 * is that the subtrans SLRU does not stay valid across crashes (it's not
132 * permanent) so we need to keep the information about them here. If the
133 * subtrans implementation changes in the future, we might want to revisit the
134 * decision of storing timestamp info for each subxid.
135 *
136 * The write_xlog parameter tells us whether to include an XLog record of this
137 * or not. Normally, this is called from transaction commit routines (both
138 * normal and prepared) and the information will be stored in the transaction
139 * commit XLog record, and so they should pass "false" for this. The XLog redo
140 * code should use "false" here as well. Other callers probably want to pass
141 * true, so that the given values persist in case of crashes.
142 */
143 void
TransactionTreeSetCommitTsData(TransactionId xid,int nsubxids,TransactionId * subxids,TimestampTz timestamp,RepOriginId nodeid,bool write_xlog)144 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
145 TransactionId *subxids, TimestampTz timestamp,
146 RepOriginId nodeid, bool write_xlog)
147 {
148 int i;
149 TransactionId headxid;
150 TransactionId newestXact;
151
152 /*
153 * No-op if the module is not active.
154 *
155 * An unlocked read here is fine, because in a standby (the only place
156 * where the flag can change in flight) this routine is only called by the
157 * recovery process, which is also the only process which can change the
158 * flag.
159 */
160 if (!commitTsShared->commitTsActive)
161 return;
162
163 /*
164 * Comply with the WAL-before-data rule: if caller specified it wants this
165 * value to be recorded in WAL, do so before touching the data.
166 */
167 if (write_xlog)
168 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
169
170 /*
171 * Figure out the latest Xid in this batch: either the last subxid if
172 * there's any, otherwise the parent xid.
173 */
174 if (nsubxids > 0)
175 newestXact = subxids[nsubxids - 1];
176 else
177 newestXact = xid;
178
179 /*
180 * We split the xids to set the timestamp to in groups belonging to the
181 * same SLRU page; the first element in each such set is its head. The
182 * first group has the main XID as the head; subsequent sets use the first
183 * subxid not on the previous page as head. This way, we only have to
184 * lock/modify each SLRU page once.
185 */
186 for (i = 0, headxid = xid;;)
187 {
188 int pageno = TransactionIdToCTsPage(headxid);
189 int j;
190
191 for (j = i; j < nsubxids; j++)
192 {
193 if (TransactionIdToCTsPage(subxids[j]) != pageno)
194 break;
195 }
196 /* subxids[i..j] are on the same page as the head */
197
198 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
199 pageno);
200
201 /* if we wrote out all subxids, we're done. */
202 if (j + 1 >= nsubxids)
203 break;
204
205 /*
206 * Set the new head and skip over it, as well as over the subxids we
207 * just wrote.
208 */
209 headxid = subxids[j];
210 i += j - i + 1;
211 }
212
213 /* update the cached value in shared memory */
214 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
215 commitTsShared->xidLastCommit = xid;
216 commitTsShared->dataLastCommit.time = timestamp;
217 commitTsShared->dataLastCommit.nodeid = nodeid;
218
219 /* and move forwards our endpoint, if needed */
220 if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
221 ShmemVariableCache->newestCommitTsXid = newestXact;
222 LWLockRelease(CommitTsLock);
223 }
224
225 /*
226 * Record the commit timestamp of transaction entries in the commit log for all
227 * entries on a single page. Atomic only on this page.
228 */
229 static void
SetXidCommitTsInPage(TransactionId xid,int nsubxids,TransactionId * subxids,TimestampTz ts,RepOriginId nodeid,int pageno)230 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
231 TransactionId *subxids, TimestampTz ts,
232 RepOriginId nodeid, int pageno)
233 {
234 int slotno;
235 int i;
236
237 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
238
239 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
240
241 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
242 for (i = 0; i < nsubxids; i++)
243 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
244
245 CommitTsCtl->shared->page_dirty[slotno] = true;
246
247 LWLockRelease(CommitTsControlLock);
248 }
249
250 /*
251 * Sets the commit timestamp of a single transaction.
252 *
253 * Must be called with CommitTsControlLock held
254 */
255 static void
TransactionIdSetCommitTs(TransactionId xid,TimestampTz ts,RepOriginId nodeid,int slotno)256 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
257 RepOriginId nodeid, int slotno)
258 {
259 int entryno = TransactionIdToCTsEntry(xid);
260 CommitTimestampEntry entry;
261
262 Assert(TransactionIdIsNormal(xid));
263
264 entry.time = ts;
265 entry.nodeid = nodeid;
266
267 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
268 SizeOfCommitTimestampEntry * entryno,
269 &entry, SizeOfCommitTimestampEntry);
270 }
271
272 /*
273 * Interrogate the commit timestamp of a transaction.
274 *
275 * The return value indicates whether a commit timestamp record was found for
276 * the given xid. The timestamp value is returned in *ts (which may not be
277 * null), and the origin node for the Xid is returned in *nodeid, if it's not
278 * null.
279 */
280 bool
TransactionIdGetCommitTsData(TransactionId xid,TimestampTz * ts,RepOriginId * nodeid)281 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
282 RepOriginId *nodeid)
283 {
284 int pageno = TransactionIdToCTsPage(xid);
285 int entryno = TransactionIdToCTsEntry(xid);
286 int slotno;
287 CommitTimestampEntry entry;
288 TransactionId oldestCommitTsXid;
289 TransactionId newestCommitTsXid;
290
291 if (!TransactionIdIsValid(xid))
292 ereport(ERROR,
293 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
294 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
295 else if (!TransactionIdIsNormal(xid))
296 {
297 /* frozen and bootstrap xids are always committed far in the past */
298 *ts = 0;
299 if (nodeid)
300 *nodeid = 0;
301 return false;
302 }
303
304 LWLockAcquire(CommitTsLock, LW_SHARED);
305
306 /* Error if module not enabled */
307 if (!commitTsShared->commitTsActive)
308 error_commit_ts_disabled();
309
310 /*
311 * If we're asked for the cached value, return that. Otherwise, fall
312 * through to read from SLRU.
313 */
314 if (commitTsShared->xidLastCommit == xid)
315 {
316 *ts = commitTsShared->dataLastCommit.time;
317 if (nodeid)
318 *nodeid = commitTsShared->dataLastCommit.nodeid;
319
320 LWLockRelease(CommitTsLock);
321 return *ts != 0;
322 }
323
324 oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
325 newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
326 /* neither is invalid, or both are */
327 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
328 LWLockRelease(CommitTsLock);
329
330 /*
331 * Return empty if the requested value is outside our valid range.
332 */
333 if (!TransactionIdIsValid(oldestCommitTsXid) ||
334 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
335 TransactionIdPrecedes(newestCommitTsXid, xid))
336 {
337 *ts = 0;
338 if (nodeid)
339 *nodeid = InvalidRepOriginId;
340 return false;
341 }
342
343 /* lock is acquired by SimpleLruReadPage_ReadOnly */
344 slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
345 memcpy(&entry,
346 CommitTsCtl->shared->page_buffer[slotno] +
347 SizeOfCommitTimestampEntry * entryno,
348 SizeOfCommitTimestampEntry);
349
350 *ts = entry.time;
351 if (nodeid)
352 *nodeid = entry.nodeid;
353
354 LWLockRelease(CommitTsControlLock);
355 return *ts != 0;
356 }
357
358 /*
359 * Return the Xid of the latest committed transaction. (As far as this module
360 * is concerned, anyway; it's up to the caller to ensure the value is useful
361 * for its purposes.)
362 *
363 * ts and extra are filled with the corresponding data; they can be passed
364 * as NULL if not wanted.
365 */
366 TransactionId
GetLatestCommitTsData(TimestampTz * ts,RepOriginId * nodeid)367 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
368 {
369 TransactionId xid;
370
371 LWLockAcquire(CommitTsLock, LW_SHARED);
372
373 /* Error if module not enabled */
374 if (!commitTsShared->commitTsActive)
375 error_commit_ts_disabled();
376
377 xid = commitTsShared->xidLastCommit;
378 if (ts)
379 *ts = commitTsShared->dataLastCommit.time;
380 if (nodeid)
381 *nodeid = commitTsShared->dataLastCommit.nodeid;
382 LWLockRelease(CommitTsLock);
383
384 return xid;
385 }
386
387 static void
error_commit_ts_disabled(void)388 error_commit_ts_disabled(void)
389 {
390 ereport(ERROR,
391 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
392 errmsg("could not get commit timestamp data"),
393 RecoveryInProgress() ?
394 errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
395 "track_commit_timestamp") :
396 errhint("Make sure the configuration parameter \"%s\" is set.",
397 "track_commit_timestamp")));
398 }
399
400 /*
401 * SQL-callable wrapper to obtain commit time of a transaction
402 */
403 Datum
pg_xact_commit_timestamp(PG_FUNCTION_ARGS)404 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
405 {
406 TransactionId xid = PG_GETARG_UINT32(0);
407 TimestampTz ts;
408 bool found;
409
410 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
411
412 if (!found)
413 PG_RETURN_NULL();
414
415 PG_RETURN_TIMESTAMPTZ(ts);
416 }
417
418
419 Datum
pg_last_committed_xact(PG_FUNCTION_ARGS)420 pg_last_committed_xact(PG_FUNCTION_ARGS)
421 {
422 TransactionId xid;
423 TimestampTz ts;
424 Datum values[2];
425 bool nulls[2];
426 TupleDesc tupdesc;
427 HeapTuple htup;
428
429 /* and construct a tuple with our data */
430 xid = GetLatestCommitTsData(&ts, NULL);
431
432 /*
433 * Construct a tuple descriptor for the result row. This must match this
434 * function's pg_proc entry!
435 */
436 tupdesc = CreateTemplateTupleDesc(2, false);
437 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
438 XIDOID, -1, 0);
439 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
440 TIMESTAMPTZOID, -1, 0);
441 tupdesc = BlessTupleDesc(tupdesc);
442
443 if (!TransactionIdIsNormal(xid))
444 {
445 memset(nulls, true, sizeof(nulls));
446 }
447 else
448 {
449 values[0] = TransactionIdGetDatum(xid);
450 nulls[0] = false;
451
452 values[1] = TimestampTzGetDatum(ts);
453 nulls[1] = false;
454 }
455
456 htup = heap_form_tuple(tupdesc, values, nulls);
457
458 PG_RETURN_DATUM(HeapTupleGetDatum(htup));
459 }
460
461
462 /*
463 * Number of shared CommitTS buffers.
464 *
465 * We use a very similar logic as for the number of CLOG buffers; see comments
466 * in CLOGShmemBuffers.
467 */
468 Size
CommitTsShmemBuffers(void)469 CommitTsShmemBuffers(void)
470 {
471 return Min(16, Max(4, NBuffers / 1024));
472 }
473
474 /*
475 * Shared memory sizing for CommitTs
476 */
477 Size
CommitTsShmemSize(void)478 CommitTsShmemSize(void)
479 {
480 return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
481 sizeof(CommitTimestampShared);
482 }
483
484 /*
485 * Initialize CommitTs at system startup (postmaster start or standalone
486 * backend)
487 */
488 void
CommitTsShmemInit(void)489 CommitTsShmemInit(void)
490 {
491 bool found;
492
493 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
494 SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
495 CommitTsControlLock, "pg_commit_ts",
496 LWTRANCHE_COMMITTS_BUFFERS);
497 SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
498
499 commitTsShared = ShmemInitStruct("CommitTs shared",
500 sizeof(CommitTimestampShared),
501 &found);
502
503 if (!IsUnderPostmaster)
504 {
505 Assert(!found);
506
507 commitTsShared->xidLastCommit = InvalidTransactionId;
508 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
509 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
510 commitTsShared->commitTsActive = false;
511 }
512 else
513 Assert(found);
514 }
515
516 /*
517 * This function must be called ONCE on system install.
518 *
519 * (The CommitTs directory is assumed to have been created by initdb, and
520 * CommitTsShmemInit must have been called already.)
521 */
522 void
BootStrapCommitTs(void)523 BootStrapCommitTs(void)
524 {
525 /*
526 * Nothing to do here at present, unlike most other SLRU modules; segments
527 * are created when the server is started with this module enabled. See
528 * ActivateCommitTs.
529 */
530 }
531
532 /*
533 * Initialize (or reinitialize) a page of CommitTs to zeroes.
534 * If writeXlog is TRUE, also emit an XLOG record saying we did this.
535 *
536 * The page is not actually written, just set up in shared memory.
537 * The slot number of the new page is returned.
538 *
539 * Control lock must be held at entry, and will be held at exit.
540 */
541 static int
ZeroCommitTsPage(int pageno,bool writeXlog)542 ZeroCommitTsPage(int pageno, bool writeXlog)
543 {
544 int slotno;
545
546 slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
547
548 if (writeXlog)
549 WriteZeroPageXlogRec(pageno);
550
551 return slotno;
552 }
553
554 /*
555 * This must be called ONCE during postmaster or standalone-backend startup,
556 * after StartupXLOG has initialized ShmemVariableCache->nextXid.
557 */
558 void
StartupCommitTs(void)559 StartupCommitTs(void)
560 {
561 ActivateCommitTs();
562 }
563
564 /*
565 * This must be called ONCE during postmaster or standalone-backend startup,
566 * after recovery has finished.
567 */
568 void
CompleteCommitTsInitialization(void)569 CompleteCommitTsInitialization(void)
570 {
571 /*
572 * If the feature is not enabled, turn it off for good. This also removes
573 * any leftover data.
574 *
575 * Conversely, we activate the module if the feature is enabled. This is
576 * necessary for primary and standby as the activation depends on the
577 * control file contents at the beginning of recovery or when a
578 * XLOG_PARAMETER_CHANGE is replayed.
579 */
580 if (!track_commit_timestamp)
581 DeactivateCommitTs();
582 else
583 ActivateCommitTs();
584 }
585
586 /*
587 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
588 * XLog record during recovery.
589 */
590 void
CommitTsParameterChange(bool newvalue,bool oldvalue)591 CommitTsParameterChange(bool newvalue, bool oldvalue)
592 {
593 /*
594 * If the commit_ts module is disabled in this server and we get word from
595 * the master server that it is enabled there, activate it so that we can
596 * replay future WAL records involving it; also mark it as active on
597 * pg_control. If the old value was already set, we already did this, so
598 * don't do anything.
599 *
600 * If the module is disabled in the master, disable it here too, unless
601 * the module is enabled locally.
602 *
603 * Note this only runs in the recovery process, so an unlocked read is
604 * fine.
605 */
606 if (newvalue)
607 {
608 if (!commitTsShared->commitTsActive)
609 ActivateCommitTs();
610 }
611 else if (commitTsShared->commitTsActive)
612 DeactivateCommitTs();
613 }
614
615 /*
616 * Activate this module whenever necessary.
617 * This must happen during postmaster or standalone-backend startup,
618 * or during WAL replay anytime the track_commit_timestamp setting is
619 * changed in the master.
620 *
621 * The reason why this SLRU needs separate activation/deactivation functions is
622 * that it can be enabled/disabled during start and the activation/deactivation
623 * on master is propagated to slave via replay. Other SLRUs don't have this
624 * property and they can be just initialized during normal startup.
625 *
626 * This is in charge of creating the currently active segment, if it's not
627 * already there. The reason for this is that the server might have been
628 * running with this module disabled for a while and thus might have skipped
629 * the normal creation point.
630 */
631 static void
ActivateCommitTs(void)632 ActivateCommitTs(void)
633 {
634 TransactionId xid;
635 int pageno;
636
637 /* If we've done this already, there's nothing to do */
638 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
639 if (commitTsShared->commitTsActive)
640 {
641 LWLockRelease(CommitTsLock);
642 return;
643 }
644 LWLockRelease(CommitTsLock);
645
646 xid = ShmemVariableCache->nextXid;
647 pageno = TransactionIdToCTsPage(xid);
648
649 /*
650 * Re-Initialize our idea of the latest page number.
651 */
652 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
653 CommitTsCtl->shared->latest_page_number = pageno;
654 LWLockRelease(CommitTsControlLock);
655
656 /*
657 * If CommitTs is enabled, but it wasn't in the previous server run, we
658 * need to set the oldest and newest values to the next Xid; that way, we
659 * will not try to read data that might not have been set.
660 *
661 * XXX does this have a problem if a server is started with commitTs
662 * enabled, then started with commitTs disabled, then restarted with it
663 * enabled again? It doesn't look like it does, because there should be a
664 * checkpoint that sets the value to InvalidTransactionId at end of
665 * recovery; and so any chance of injecting new transactions without
666 * CommitTs values would occur after the oldestCommitTsXid has been set to
667 * Invalid temporarily.
668 */
669 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
670 if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
671 {
672 ShmemVariableCache->oldestCommitTsXid =
673 ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
674 }
675 LWLockRelease(CommitTsLock);
676
677 /* Create the current segment file, if necessary */
678 if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
679 {
680 int slotno;
681
682 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
683 slotno = ZeroCommitTsPage(pageno, false);
684 SimpleLruWritePage(CommitTsCtl, slotno);
685 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
686 LWLockRelease(CommitTsControlLock);
687 }
688
689 /* Change the activation status in shared memory. */
690 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
691 commitTsShared->commitTsActive = true;
692 LWLockRelease(CommitTsLock);
693 }
694
695 /*
696 * Deactivate this module.
697 *
698 * This must be called when the track_commit_timestamp parameter is turned off.
699 * This happens during postmaster or standalone-backend startup, or during WAL
700 * replay.
701 *
702 * Resets CommitTs into invalid state to make sure we don't hand back
703 * possibly-invalid data; also removes segments of old data.
704 */
705 static void
DeactivateCommitTs(void)706 DeactivateCommitTs(void)
707 {
708 /*
709 * Cleanup the status in the shared memory.
710 *
711 * We reset everything in the commitTsShared record to prevent user from
712 * getting confusing data about last committed transaction on the standby
713 * when the module was activated repeatedly on the primary.
714 */
715 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
716
717 commitTsShared->commitTsActive = false;
718 commitTsShared->xidLastCommit = InvalidTransactionId;
719 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
720 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
721
722 ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
723 ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
724
725 LWLockRelease(CommitTsLock);
726
727 /*
728 * Remove *all* files. This is necessary so that there are no leftover
729 * files; in the case where this feature is later enabled after running
730 * with it disabled for some time there may be a gap in the file sequence.
731 * (We can probably tolerate out-of-sequence files, as they are going to
732 * be overwritten anyway when we wrap around, but it seems better to be
733 * tidy.)
734 */
735 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
736 (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
737 LWLockRelease(CommitTsControlLock);
738 }
739
740 /*
741 * This must be called ONCE during postmaster or standalone-backend shutdown
742 */
743 void
ShutdownCommitTs(void)744 ShutdownCommitTs(void)
745 {
746 /* Flush dirty CommitTs pages to disk */
747 SimpleLruFlush(CommitTsCtl, false);
748 }
749
750 /*
751 * Perform a checkpoint --- either during shutdown, or on-the-fly
752 */
753 void
CheckPointCommitTs(void)754 CheckPointCommitTs(void)
755 {
756 /* Flush dirty CommitTs pages to disk */
757 SimpleLruFlush(CommitTsCtl, true);
758 }
759
760 /*
761 * Make sure that CommitTs has room for a newly-allocated XID.
762 *
763 * NB: this is called while holding XidGenLock. We want it to be very fast
764 * most of the time; even when it's not so fast, no actual I/O need happen
765 * unless we're forced to write out a dirty CommitTs or xlog page to make room
766 * in shared memory.
767 *
768 * NB: the current implementation relies on track_commit_timestamp being
769 * PGC_POSTMASTER.
770 */
771 void
ExtendCommitTs(TransactionId newestXact)772 ExtendCommitTs(TransactionId newestXact)
773 {
774 int pageno;
775
776 /*
777 * Nothing to do if module not enabled. Note we do an unlocked read of
778 * the flag here, which is okay because this routine is only called from
779 * GetNewTransactionId, which is never called in a standby.
780 */
781 Assert(!InRecovery);
782 if (!commitTsShared->commitTsActive)
783 return;
784
785 /*
786 * No work except at first XID of a page. But beware: just after
787 * wraparound, the first XID of page zero is FirstNormalTransactionId.
788 */
789 if (TransactionIdToCTsEntry(newestXact) != 0 &&
790 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
791 return;
792
793 pageno = TransactionIdToCTsPage(newestXact);
794
795 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
796
797 /* Zero the page and make an XLOG entry about it */
798 ZeroCommitTsPage(pageno, !InRecovery);
799
800 LWLockRelease(CommitTsControlLock);
801 }
802
803 /*
804 * Remove all CommitTs segments before the one holding the passed
805 * transaction ID.
806 *
807 * Note that we don't need to flush XLOG here.
808 */
809 void
TruncateCommitTs(TransactionId oldestXact)810 TruncateCommitTs(TransactionId oldestXact)
811 {
812 int cutoffPage;
813
814 /*
815 * The cutoff point is the start of the segment containing oldestXact. We
816 * pass the *page* containing oldestXact to SimpleLruTruncate.
817 */
818 cutoffPage = TransactionIdToCTsPage(oldestXact);
819
820 /* Check to see if there's any files that could be removed */
821 if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
822 &cutoffPage))
823 return; /* nothing to remove */
824
825 /* Write XLOG record */
826 WriteTruncateXlogRec(cutoffPage);
827
828 /* Now we can remove the old CommitTs segment(s) */
829 SimpleLruTruncate(CommitTsCtl, cutoffPage);
830 }
831
832 /*
833 * Set the limit values between which commit TS can be consulted.
834 */
835 void
SetCommitTsLimit(TransactionId oldestXact,TransactionId newestXact)836 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
837 {
838 /*
839 * Be careful not to overwrite values that are either further into the
840 * "future" or signal a disabled committs.
841 */
842 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
843 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
844 {
845 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
846 ShmemVariableCache->oldestCommitTsXid = oldestXact;
847 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
848 ShmemVariableCache->newestCommitTsXid = newestXact;
849 }
850 else
851 {
852 Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
853 ShmemVariableCache->oldestCommitTsXid = oldestXact;
854 ShmemVariableCache->newestCommitTsXid = newestXact;
855 }
856 LWLockRelease(CommitTsLock);
857 }
858
859 /*
860 * Move forwards the oldest commitTS value that can be consulted
861 */
862 void
AdvanceOldestCommitTsXid(TransactionId oldestXact)863 AdvanceOldestCommitTsXid(TransactionId oldestXact)
864 {
865 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
866 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
867 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
868 ShmemVariableCache->oldestCommitTsXid = oldestXact;
869 LWLockRelease(CommitTsLock);
870 }
871
872
873 /*
874 * Decide whether a commitTS page number is "older" for truncation purposes.
875 * Analogous to CLOGPagePrecedes().
876 *
877 * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
878 * introduces differences compared to CLOG and the other SLRUs having (1 <<
879 * 31) % per_page == 0. This function never tests exactly
880 * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
881 * there are two possible counts of page boundaries between oldestXact and the
882 * latest XID assigned, depending on whether oldestXact is within the first
883 * 128 entries of its page. Since this function doesn't know the location of
884 * oldestXact within page2, it returns false for one page that actually is
885 * expendable. This is a wider (yet still negligible) version of the
886 * truncation opportunity that CLOGPagePrecedes() cannot recognize.
887 *
888 * For the sake of a worked example, number entries with decimal values such
889 * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
890 * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
891 * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
892 * because entry=2.85 is the border that toggles whether entries precede the
893 * last entry of the oldestXact page. While page 2 is expendable at
894 * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
895 */
896 static bool
CommitTsPagePrecedes(int page1,int page2)897 CommitTsPagePrecedes(int page1, int page2)
898 {
899 TransactionId xid1;
900 TransactionId xid2;
901
902 xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
903 xid1 += FirstNormalTransactionId + 1;
904 xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
905 xid2 += FirstNormalTransactionId + 1;
906
907 return (TransactionIdPrecedes(xid1, xid2) &&
908 TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
909 }
910
911
912 /*
913 * Write a ZEROPAGE xlog record
914 */
915 static void
WriteZeroPageXlogRec(int pageno)916 WriteZeroPageXlogRec(int pageno)
917 {
918 XLogBeginInsert();
919 XLogRegisterData((char *) (&pageno), sizeof(int));
920 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
921 }
922
923 /*
924 * Write a TRUNCATE xlog record
925 */
926 static void
WriteTruncateXlogRec(int pageno)927 WriteTruncateXlogRec(int pageno)
928 {
929 XLogBeginInsert();
930 XLogRegisterData((char *) (&pageno), sizeof(int));
931 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
932 }
933
934 /*
935 * Write a SETTS xlog record
936 */
937 static void
WriteSetTimestampXlogRec(TransactionId mainxid,int nsubxids,TransactionId * subxids,TimestampTz timestamp,RepOriginId nodeid)938 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
939 TransactionId *subxids, TimestampTz timestamp,
940 RepOriginId nodeid)
941 {
942 xl_commit_ts_set record;
943
944 record.timestamp = timestamp;
945 record.nodeid = nodeid;
946 record.mainxid = mainxid;
947
948 XLogBeginInsert();
949 XLogRegisterData((char *) &record,
950 offsetof(xl_commit_ts_set, mainxid) +
951 sizeof(TransactionId));
952 XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
953 XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
954 }
955
956 /*
957 * CommitTS resource manager's routines
958 */
959 void
commit_ts_redo(XLogReaderState * record)960 commit_ts_redo(XLogReaderState *record)
961 {
962 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
963
964 /* Backup blocks are not used in commit_ts records */
965 Assert(!XLogRecHasAnyBlockRefs(record));
966
967 if (info == COMMIT_TS_ZEROPAGE)
968 {
969 int pageno;
970 int slotno;
971
972 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
973
974 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
975
976 slotno = ZeroCommitTsPage(pageno, false);
977 SimpleLruWritePage(CommitTsCtl, slotno);
978 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
979
980 LWLockRelease(CommitTsControlLock);
981 }
982 else if (info == COMMIT_TS_TRUNCATE)
983 {
984 int pageno;
985
986 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
987
988 /*
989 * During XLOG replay, latest_page_number isn't set up yet; insert a
990 * suitable value to bypass the sanity test in SimpleLruTruncate.
991 */
992 CommitTsCtl->shared->latest_page_number = pageno;
993
994 SimpleLruTruncate(CommitTsCtl, pageno);
995 }
996 else if (info == COMMIT_TS_SETTS)
997 {
998 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
999 int nsubxids;
1000 TransactionId *subxids;
1001
1002 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1003 sizeof(TransactionId));
1004 if (nsubxids > 0)
1005 {
1006 subxids = palloc(sizeof(TransactionId) * nsubxids);
1007 memcpy(subxids,
1008 XLogRecGetData(record) + SizeOfCommitTsSet,
1009 sizeof(TransactionId) * nsubxids);
1010 }
1011 else
1012 subxids = NULL;
1013
1014 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1015 setts->timestamp, setts->nodeid, false);
1016 if (subxids)
1017 pfree(subxids);
1018 }
1019 else
1020 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1021 }
1022