1 /*-------------------------------------------------------------------------
2 *
3 * commit_ts.c
4 * PostgreSQL commit timestamp manager
5 *
6 * This module is a pg_xact-like system that stores the commit timestamp
7 * for each transaction.
8 *
9 * XLOG interactions: this module generates an XLOG record whenever a new
10 * CommitTs page is initialized to zeroes. Also, one XLOG record is
11 * generated for setting of values when the caller requests it; this allows
12 * us to support values coming from places other than transaction commit.
13 * Other writes of CommitTS come from recording of transaction commit in
14 * xact.c, which generates its own XLOG records for these events and will
15 * re-perform the status update on redo; so we need make no additional XLOG
16 * entry here.
17 *
18 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
19 * Portions Copyright (c) 1994, Regents of the University of California
20 *
21 * src/backend/access/transam/commit_ts.c
22 *
23 *-------------------------------------------------------------------------
24 */
25 #include "postgres.h"
26
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "storage/shmem.h"
36 #include "utils/builtins.h"
37 #include "utils/snapmgr.h"
38 #include "utils/timestamp.h"
39
40 /*
41 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
42 * everywhere else in Postgres.
43 *
44 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45 * CommitTs page numbering also wraps around at
46 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
47 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
48 * explicit notice of that fact in this module, except when comparing segment
49 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50 */
51
52 /*
53 * We need 8+2 bytes per xact. Note that enlarging this struct might mean
54 * the largest possible file name is more than 5 chars long; see
55 * SlruScanDirectory.
56 */
57 typedef struct CommitTimestampEntry
58 {
59 TimestampTz time;
60 RepOriginId nodeid;
61 } CommitTimestampEntry;
62
63 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
64 sizeof(RepOriginId))
65
66 #define COMMIT_TS_XACTS_PER_PAGE \
67 (BLCKSZ / SizeOfCommitTimestampEntry)
68
69 #define TransactionIdToCTsPage(xid) \
70 ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
71 #define TransactionIdToCTsEntry(xid) \
72 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
73
74 /*
75 * Link to shared-memory data structures for CommitTs control
76 */
77 static SlruCtlData CommitTsCtlData;
78
79 #define CommitTsCtl (&CommitTsCtlData)
80
81 /*
82 * We keep a cache of the last value set in shared memory.
83 *
84 * This is also good place to keep the activation status. We keep this
85 * separate from the GUC so that the standby can activate the module if the
86 * primary has it active independently of the value of the GUC.
87 *
88 * This is protected by CommitTsLock. In some places, we use commitTsActive
89 * without acquiring the lock; where this happens, a comment explains the
90 * rationale for it.
91 */
92 typedef struct CommitTimestampShared
93 {
94 TransactionId xidLastCommit;
95 CommitTimestampEntry dataLastCommit;
96 bool commitTsActive;
97 } CommitTimestampShared;
98
99 CommitTimestampShared *commitTsShared;
100
101
102 /* GUC variable */
103 bool track_commit_timestamp;
104
105 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
106 TransactionId *subxids, TimestampTz ts,
107 RepOriginId nodeid, int pageno);
108 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
109 RepOriginId nodeid, int slotno);
110 static void error_commit_ts_disabled(void);
111 static int ZeroCommitTsPage(int pageno, bool writeXlog);
112 static bool CommitTsPagePrecedes(int page1, int page2);
113 static void ActivateCommitTs(void);
114 static void DeactivateCommitTs(void);
115 static void WriteZeroPageXlogRec(int pageno);
116 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
117 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
118 TransactionId *subxids, TimestampTz timestamp,
119 RepOriginId nodeid);
120
121 /*
122 * TransactionTreeSetCommitTsData
123 *
124 * Record the final commit timestamp of transaction entries in the commit log
125 * for a transaction and its subtransaction tree, as efficiently as possible.
126 *
127 * xid is the top level transaction id.
128 *
129 * subxids is an array of xids of length nsubxids, representing subtransactions
130 * in the tree of xid. In various cases nsubxids may be zero.
131 * The reason why tracking just the parent xid commit timestamp is not enough
132 * is that the subtrans SLRU does not stay valid across crashes (it's not
133 * permanent) so we need to keep the information about them here. If the
134 * subtrans implementation changes in the future, we might want to revisit the
135 * decision of storing timestamp info for each subxid.
136 *
137 * The write_xlog parameter tells us whether to include an XLog record of this
138 * or not. Normally, this is called from transaction commit routines (both
139 * normal and prepared) and the information will be stored in the transaction
140 * commit XLog record, and so they should pass "false" for this. The XLog redo
141 * code should use "false" here as well. Other callers probably want to pass
142 * true, so that the given values persist in case of crashes.
143 */
144 void
TransactionTreeSetCommitTsData(TransactionId xid,int nsubxids,TransactionId * subxids,TimestampTz timestamp,RepOriginId nodeid,bool write_xlog)145 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
146 TransactionId *subxids, TimestampTz timestamp,
147 RepOriginId nodeid, bool write_xlog)
148 {
149 int i;
150 TransactionId headxid;
151 TransactionId newestXact;
152
153 /*
154 * No-op if the module is not active.
155 *
156 * An unlocked read here is fine, because in a standby (the only place
157 * where the flag can change in flight) this routine is only called by the
158 * recovery process, which is also the only process which can change the
159 * flag.
160 */
161 if (!commitTsShared->commitTsActive)
162 return;
163
164 /*
165 * Comply with the WAL-before-data rule: if caller specified it wants this
166 * value to be recorded in WAL, do so before touching the data.
167 */
168 if (write_xlog)
169 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
170
171 /*
172 * Figure out the latest Xid in this batch: either the last subxid if
173 * there's any, otherwise the parent xid.
174 */
175 if (nsubxids > 0)
176 newestXact = subxids[nsubxids - 1];
177 else
178 newestXact = xid;
179
180 /*
181 * We split the xids to set the timestamp to in groups belonging to the
182 * same SLRU page; the first element in each such set is its head. The
183 * first group has the main XID as the head; subsequent sets use the first
184 * subxid not on the previous page as head. This way, we only have to
185 * lock/modify each SLRU page once.
186 */
187 for (i = 0, headxid = xid;;)
188 {
189 int pageno = TransactionIdToCTsPage(headxid);
190 int j;
191
192 for (j = i; j < nsubxids; j++)
193 {
194 if (TransactionIdToCTsPage(subxids[j]) != pageno)
195 break;
196 }
197 /* subxids[i..j] are on the same page as the head */
198
199 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
200 pageno);
201
202 /* if we wrote out all subxids, we're done. */
203 if (j + 1 >= nsubxids)
204 break;
205
206 /*
207 * Set the new head and skip over it, as well as over the subxids we
208 * just wrote.
209 */
210 headxid = subxids[j];
211 i += j - i + 1;
212 }
213
214 /* update the cached value in shared memory */
215 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
216 commitTsShared->xidLastCommit = xid;
217 commitTsShared->dataLastCommit.time = timestamp;
218 commitTsShared->dataLastCommit.nodeid = nodeid;
219
220 /* and move forwards our endpoint, if needed */
221 if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
222 ShmemVariableCache->newestCommitTsXid = newestXact;
223 LWLockRelease(CommitTsLock);
224 }
225
226 /*
227 * Record the commit timestamp of transaction entries in the commit log for all
228 * entries on a single page. Atomic only on this page.
229 */
230 static void
SetXidCommitTsInPage(TransactionId xid,int nsubxids,TransactionId * subxids,TimestampTz ts,RepOriginId nodeid,int pageno)231 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
232 TransactionId *subxids, TimestampTz ts,
233 RepOriginId nodeid, int pageno)
234 {
235 int slotno;
236 int i;
237
238 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
239
240 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
241
242 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
243 for (i = 0; i < nsubxids; i++)
244 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
245
246 CommitTsCtl->shared->page_dirty[slotno] = true;
247
248 LWLockRelease(CommitTsControlLock);
249 }
250
251 /*
252 * Sets the commit timestamp of a single transaction.
253 *
254 * Must be called with CommitTsControlLock held
255 */
256 static void
TransactionIdSetCommitTs(TransactionId xid,TimestampTz ts,RepOriginId nodeid,int slotno)257 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
258 RepOriginId nodeid, int slotno)
259 {
260 int entryno = TransactionIdToCTsEntry(xid);
261 CommitTimestampEntry entry;
262
263 Assert(TransactionIdIsNormal(xid));
264
265 entry.time = ts;
266 entry.nodeid = nodeid;
267
268 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
269 SizeOfCommitTimestampEntry * entryno,
270 &entry, SizeOfCommitTimestampEntry);
271 }
272
273 /*
274 * Interrogate the commit timestamp of a transaction.
275 *
276 * The return value indicates whether a commit timestamp record was found for
277 * the given xid. The timestamp value is returned in *ts (which may not be
278 * null), and the origin node for the Xid is returned in *nodeid, if it's not
279 * null.
280 */
281 bool
TransactionIdGetCommitTsData(TransactionId xid,TimestampTz * ts,RepOriginId * nodeid)282 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
283 RepOriginId *nodeid)
284 {
285 int pageno = TransactionIdToCTsPage(xid);
286 int entryno = TransactionIdToCTsEntry(xid);
287 int slotno;
288 CommitTimestampEntry entry;
289 TransactionId oldestCommitTsXid;
290 TransactionId newestCommitTsXid;
291
292 if (!TransactionIdIsValid(xid))
293 ereport(ERROR,
294 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296 else if (!TransactionIdIsNormal(xid))
297 {
298 /* frozen and bootstrap xids are always committed far in the past */
299 *ts = 0;
300 if (nodeid)
301 *nodeid = 0;
302 return false;
303 }
304
305 LWLockAcquire(CommitTsLock, LW_SHARED);
306
307 /* Error if module not enabled */
308 if (!commitTsShared->commitTsActive)
309 error_commit_ts_disabled();
310
311 /*
312 * If we're asked for the cached value, return that. Otherwise, fall
313 * through to read from SLRU.
314 */
315 if (commitTsShared->xidLastCommit == xid)
316 {
317 *ts = commitTsShared->dataLastCommit.time;
318 if (nodeid)
319 *nodeid = commitTsShared->dataLastCommit.nodeid;
320
321 LWLockRelease(CommitTsLock);
322 return *ts != 0;
323 }
324
325 oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
326 newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
327 /* neither is invalid, or both are */
328 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
329 LWLockRelease(CommitTsLock);
330
331 /*
332 * Return empty if the requested value is outside our valid range.
333 */
334 if (!TransactionIdIsValid(oldestCommitTsXid) ||
335 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
336 TransactionIdPrecedes(newestCommitTsXid, xid))
337 {
338 *ts = 0;
339 if (nodeid)
340 *nodeid = InvalidRepOriginId;
341 return false;
342 }
343
344 /* lock is acquired by SimpleLruReadPage_ReadOnly */
345 slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
346 memcpy(&entry,
347 CommitTsCtl->shared->page_buffer[slotno] +
348 SizeOfCommitTimestampEntry * entryno,
349 SizeOfCommitTimestampEntry);
350
351 *ts = entry.time;
352 if (nodeid)
353 *nodeid = entry.nodeid;
354
355 LWLockRelease(CommitTsControlLock);
356 return *ts != 0;
357 }
358
359 /*
360 * Return the Xid of the latest committed transaction. (As far as this module
361 * is concerned, anyway; it's up to the caller to ensure the value is useful
362 * for its purposes.)
363 *
364 * ts and extra are filled with the corresponding data; they can be passed
365 * as NULL if not wanted.
366 */
367 TransactionId
GetLatestCommitTsData(TimestampTz * ts,RepOriginId * nodeid)368 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
369 {
370 TransactionId xid;
371
372 LWLockAcquire(CommitTsLock, LW_SHARED);
373
374 /* Error if module not enabled */
375 if (!commitTsShared->commitTsActive)
376 error_commit_ts_disabled();
377
378 xid = commitTsShared->xidLastCommit;
379 if (ts)
380 *ts = commitTsShared->dataLastCommit.time;
381 if (nodeid)
382 *nodeid = commitTsShared->dataLastCommit.nodeid;
383 LWLockRelease(CommitTsLock);
384
385 return xid;
386 }
387
388 static void
error_commit_ts_disabled(void)389 error_commit_ts_disabled(void)
390 {
391 ereport(ERROR,
392 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393 errmsg("could not get commit timestamp data"),
394 RecoveryInProgress() ?
395 errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
396 "track_commit_timestamp") :
397 errhint("Make sure the configuration parameter \"%s\" is set.",
398 "track_commit_timestamp")));
399 }
400
401 /*
402 * SQL-callable wrapper to obtain commit time of a transaction
403 */
404 Datum
pg_xact_commit_timestamp(PG_FUNCTION_ARGS)405 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
406 {
407 TransactionId xid = PG_GETARG_UINT32(0);
408 TimestampTz ts;
409 bool found;
410
411 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
412
413 if (!found)
414 PG_RETURN_NULL();
415
416 PG_RETURN_TIMESTAMPTZ(ts);
417 }
418
419
420 Datum
pg_last_committed_xact(PG_FUNCTION_ARGS)421 pg_last_committed_xact(PG_FUNCTION_ARGS)
422 {
423 TransactionId xid;
424 TimestampTz ts;
425 Datum values[2];
426 bool nulls[2];
427 TupleDesc tupdesc;
428 HeapTuple htup;
429
430 /* and construct a tuple with our data */
431 xid = GetLatestCommitTsData(&ts, NULL);
432
433 /*
434 * Construct a tuple descriptor for the result row. This must match this
435 * function's pg_proc entry!
436 */
437 tupdesc = CreateTemplateTupleDesc(2, false);
438 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
439 XIDOID, -1, 0);
440 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
441 TIMESTAMPTZOID, -1, 0);
442 tupdesc = BlessTupleDesc(tupdesc);
443
444 if (!TransactionIdIsNormal(xid))
445 {
446 memset(nulls, true, sizeof(nulls));
447 }
448 else
449 {
450 values[0] = TransactionIdGetDatum(xid);
451 nulls[0] = false;
452
453 values[1] = TimestampTzGetDatum(ts);
454 nulls[1] = false;
455 }
456
457 htup = heap_form_tuple(tupdesc, values, nulls);
458
459 PG_RETURN_DATUM(HeapTupleGetDatum(htup));
460 }
461
462
463 /*
464 * Number of shared CommitTS buffers.
465 *
466 * We use a very similar logic as for the number of CLOG buffers; see comments
467 * in CLOGShmemBuffers.
468 */
469 Size
CommitTsShmemBuffers(void)470 CommitTsShmemBuffers(void)
471 {
472 return Min(16, Max(4, NBuffers / 1024));
473 }
474
475 /*
476 * Shared memory sizing for CommitTs
477 */
478 Size
CommitTsShmemSize(void)479 CommitTsShmemSize(void)
480 {
481 return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
482 sizeof(CommitTimestampShared);
483 }
484
485 /*
486 * Initialize CommitTs at system startup (postmaster start or standalone
487 * backend)
488 */
489 void
CommitTsShmemInit(void)490 CommitTsShmemInit(void)
491 {
492 bool found;
493
494 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
495 SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
496 CommitTsControlLock, "pg_commit_ts",
497 LWTRANCHE_COMMITTS_BUFFERS);
498 SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
499
500 commitTsShared = ShmemInitStruct("CommitTs shared",
501 sizeof(CommitTimestampShared),
502 &found);
503
504 if (!IsUnderPostmaster)
505 {
506 Assert(!found);
507
508 commitTsShared->xidLastCommit = InvalidTransactionId;
509 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
510 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
511 commitTsShared->commitTsActive = false;
512 }
513 else
514 Assert(found);
515 }
516
517 /*
518 * This function must be called ONCE on system install.
519 *
520 * (The CommitTs directory is assumed to have been created by initdb, and
521 * CommitTsShmemInit must have been called already.)
522 */
523 void
BootStrapCommitTs(void)524 BootStrapCommitTs(void)
525 {
526 /*
527 * Nothing to do here at present, unlike most other SLRU modules; segments
528 * are created when the server is started with this module enabled. See
529 * ActivateCommitTs.
530 */
531 }
532
533 /*
534 * Initialize (or reinitialize) a page of CommitTs to zeroes.
535 * If writeXlog is TRUE, also emit an XLOG record saying we did this.
536 *
537 * The page is not actually written, just set up in shared memory.
538 * The slot number of the new page is returned.
539 *
540 * Control lock must be held at entry, and will be held at exit.
541 */
542 static int
ZeroCommitTsPage(int pageno,bool writeXlog)543 ZeroCommitTsPage(int pageno, bool writeXlog)
544 {
545 int slotno;
546
547 slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
548
549 if (writeXlog)
550 WriteZeroPageXlogRec(pageno);
551
552 return slotno;
553 }
554
555 /*
556 * This must be called ONCE during postmaster or standalone-backend startup,
557 * after StartupXLOG has initialized ShmemVariableCache->nextXid.
558 */
559 void
StartupCommitTs(void)560 StartupCommitTs(void)
561 {
562 ActivateCommitTs();
563 }
564
565 /*
566 * This must be called ONCE during postmaster or standalone-backend startup,
567 * after recovery has finished.
568 */
569 void
CompleteCommitTsInitialization(void)570 CompleteCommitTsInitialization(void)
571 {
572 /*
573 * If the feature is not enabled, turn it off for good. This also removes
574 * any leftover data.
575 *
576 * Conversely, we activate the module if the feature is enabled. This is
577 * necessary for primary and standby as the activation depends on the
578 * control file contents at the beginning of recovery or when a
579 * XLOG_PARAMETER_CHANGE is replayed.
580 */
581 if (!track_commit_timestamp)
582 DeactivateCommitTs();
583 else
584 ActivateCommitTs();
585 }
586
587 /*
588 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
589 * XLog record during recovery.
590 */
591 void
CommitTsParameterChange(bool newvalue,bool oldvalue)592 CommitTsParameterChange(bool newvalue, bool oldvalue)
593 {
594 /*
595 * If the commit_ts module is disabled in this server and we get word from
596 * the master server that it is enabled there, activate it so that we can
597 * replay future WAL records involving it; also mark it as active on
598 * pg_control. If the old value was already set, we already did this, so
599 * don't do anything.
600 *
601 * If the module is disabled in the master, disable it here too, unless
602 * the module is enabled locally.
603 *
604 * Note this only runs in the recovery process, so an unlocked read is
605 * fine.
606 */
607 if (newvalue)
608 {
609 if (!commitTsShared->commitTsActive)
610 ActivateCommitTs();
611 }
612 else if (commitTsShared->commitTsActive)
613 DeactivateCommitTs();
614 }
615
616 /*
617 * Activate this module whenever necessary.
618 * This must happen during postmaster or standalone-backend startup,
619 * or during WAL replay anytime the track_commit_timestamp setting is
620 * changed in the master.
621 *
622 * The reason why this SLRU needs separate activation/deactivation functions is
623 * that it can be enabled/disabled during start and the activation/deactivation
624 * on master is propagated to standby via replay. Other SLRUs don't have this
625 * property and they can be just initialized during normal startup.
626 *
627 * This is in charge of creating the currently active segment, if it's not
628 * already there. The reason for this is that the server might have been
629 * running with this module disabled for a while and thus might have skipped
630 * the normal creation point.
631 */
632 static void
ActivateCommitTs(void)633 ActivateCommitTs(void)
634 {
635 TransactionId xid;
636 int pageno;
637
638 /* If we've done this already, there's nothing to do */
639 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
640 if (commitTsShared->commitTsActive)
641 {
642 LWLockRelease(CommitTsLock);
643 return;
644 }
645 LWLockRelease(CommitTsLock);
646
647 xid = ShmemVariableCache->nextXid;
648 pageno = TransactionIdToCTsPage(xid);
649
650 /*
651 * Re-Initialize our idea of the latest page number.
652 */
653 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
654 CommitTsCtl->shared->latest_page_number = pageno;
655 LWLockRelease(CommitTsControlLock);
656
657 /*
658 * If CommitTs is enabled, but it wasn't in the previous server run, we
659 * need to set the oldest and newest values to the next Xid; that way, we
660 * will not try to read data that might not have been set.
661 *
662 * XXX does this have a problem if a server is started with commitTs
663 * enabled, then started with commitTs disabled, then restarted with it
664 * enabled again? It doesn't look like it does, because there should be a
665 * checkpoint that sets the value to InvalidTransactionId at end of
666 * recovery; and so any chance of injecting new transactions without
667 * CommitTs values would occur after the oldestCommitTsXid has been set to
668 * Invalid temporarily.
669 */
670 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
671 if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
672 {
673 ShmemVariableCache->oldestCommitTsXid =
674 ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
675 }
676 LWLockRelease(CommitTsLock);
677
678 /* Create the current segment file, if necessary */
679 if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
680 {
681 int slotno;
682
683 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
684 slotno = ZeroCommitTsPage(pageno, false);
685 SimpleLruWritePage(CommitTsCtl, slotno);
686 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
687 LWLockRelease(CommitTsControlLock);
688 }
689
690 /* Change the activation status in shared memory. */
691 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
692 commitTsShared->commitTsActive = true;
693 LWLockRelease(CommitTsLock);
694 }
695
696 /*
697 * Deactivate this module.
698 *
699 * This must be called when the track_commit_timestamp parameter is turned off.
700 * This happens during postmaster or standalone-backend startup, or during WAL
701 * replay.
702 *
703 * Resets CommitTs into invalid state to make sure we don't hand back
704 * possibly-invalid data; also removes segments of old data.
705 */
706 static void
DeactivateCommitTs(void)707 DeactivateCommitTs(void)
708 {
709 /*
710 * Cleanup the status in the shared memory.
711 *
712 * We reset everything in the commitTsShared record to prevent user from
713 * getting confusing data about last committed transaction on the standby
714 * when the module was activated repeatedly on the primary.
715 */
716 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
717
718 commitTsShared->commitTsActive = false;
719 commitTsShared->xidLastCommit = InvalidTransactionId;
720 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
721 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
722
723 ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
724 ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
725
726 LWLockRelease(CommitTsLock);
727
728 /*
729 * Remove *all* files. This is necessary so that there are no leftover
730 * files; in the case where this feature is later enabled after running
731 * with it disabled for some time there may be a gap in the file sequence.
732 * (We can probably tolerate out-of-sequence files, as they are going to
733 * be overwritten anyway when we wrap around, but it seems better to be
734 * tidy.)
735 */
736 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
737 (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
738 LWLockRelease(CommitTsControlLock);
739 }
740
741 /*
742 * This must be called ONCE during postmaster or standalone-backend shutdown
743 */
744 void
ShutdownCommitTs(void)745 ShutdownCommitTs(void)
746 {
747 /* Flush dirty CommitTs pages to disk */
748 SimpleLruFlush(CommitTsCtl, false);
749
750 /*
751 * fsync pg_commit_ts to ensure that any files flushed previously are
752 * durably on disk.
753 */
754 fsync_fname("pg_commit_ts", true);
755 }
756
757 /*
758 * Perform a checkpoint --- either during shutdown, or on-the-fly
759 */
760 void
CheckPointCommitTs(void)761 CheckPointCommitTs(void)
762 {
763 /* Flush dirty CommitTs pages to disk */
764 SimpleLruFlush(CommitTsCtl, true);
765 }
766
767 /*
768 * Make sure that CommitTs has room for a newly-allocated XID.
769 *
770 * NB: this is called while holding XidGenLock. We want it to be very fast
771 * most of the time; even when it's not so fast, no actual I/O need happen
772 * unless we're forced to write out a dirty CommitTs or xlog page to make room
773 * in shared memory.
774 *
775 * NB: the current implementation relies on track_commit_timestamp being
776 * PGC_POSTMASTER.
777 */
778 void
ExtendCommitTs(TransactionId newestXact)779 ExtendCommitTs(TransactionId newestXact)
780 {
781 int pageno;
782
783 /*
784 * Nothing to do if module not enabled. Note we do an unlocked read of
785 * the flag here, which is okay because this routine is only called from
786 * GetNewTransactionId, which is never called in a standby.
787 */
788 Assert(!InRecovery);
789 if (!commitTsShared->commitTsActive)
790 return;
791
792 /*
793 * No work except at first XID of a page. But beware: just after
794 * wraparound, the first XID of page zero is FirstNormalTransactionId.
795 */
796 if (TransactionIdToCTsEntry(newestXact) != 0 &&
797 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
798 return;
799
800 pageno = TransactionIdToCTsPage(newestXact);
801
802 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
803
804 /* Zero the page and make an XLOG entry about it */
805 ZeroCommitTsPage(pageno, !InRecovery);
806
807 LWLockRelease(CommitTsControlLock);
808 }
809
810 /*
811 * Remove all CommitTs segments before the one holding the passed
812 * transaction ID.
813 *
814 * Note that we don't need to flush XLOG here.
815 */
816 void
TruncateCommitTs(TransactionId oldestXact)817 TruncateCommitTs(TransactionId oldestXact)
818 {
819 int cutoffPage;
820
821 /*
822 * The cutoff point is the start of the segment containing oldestXact. We
823 * pass the *page* containing oldestXact to SimpleLruTruncate.
824 */
825 cutoffPage = TransactionIdToCTsPage(oldestXact);
826
827 /* Check to see if there's any files that could be removed */
828 if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
829 &cutoffPage))
830 return; /* nothing to remove */
831
832 /* Write XLOG record */
833 WriteTruncateXlogRec(cutoffPage, oldestXact);
834
835 /* Now we can remove the old CommitTs segment(s) */
836 SimpleLruTruncate(CommitTsCtl, cutoffPage);
837 }
838
839 /*
840 * Set the limit values between which commit TS can be consulted.
841 */
842 void
SetCommitTsLimit(TransactionId oldestXact,TransactionId newestXact)843 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
844 {
845 /*
846 * Be careful not to overwrite values that are either further into the
847 * "future" or signal a disabled committs.
848 */
849 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
850 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
851 {
852 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
853 ShmemVariableCache->oldestCommitTsXid = oldestXact;
854 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
855 ShmemVariableCache->newestCommitTsXid = newestXact;
856 }
857 else
858 {
859 Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
860 ShmemVariableCache->oldestCommitTsXid = oldestXact;
861 ShmemVariableCache->newestCommitTsXid = newestXact;
862 }
863 LWLockRelease(CommitTsLock);
864 }
865
866 /*
867 * Move forwards the oldest commitTS value that can be consulted
868 */
869 void
AdvanceOldestCommitTsXid(TransactionId oldestXact)870 AdvanceOldestCommitTsXid(TransactionId oldestXact)
871 {
872 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
873 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
874 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
875 ShmemVariableCache->oldestCommitTsXid = oldestXact;
876 LWLockRelease(CommitTsLock);
877 }
878
879
880 /*
881 * Decide whether a commitTS page number is "older" for truncation purposes.
882 * Analogous to CLOGPagePrecedes().
883 *
884 * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
885 * introduces differences compared to CLOG and the other SLRUs having (1 <<
886 * 31) % per_page == 0. This function never tests exactly
887 * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
888 * there are two possible counts of page boundaries between oldestXact and the
889 * latest XID assigned, depending on whether oldestXact is within the first
890 * 128 entries of its page. Since this function doesn't know the location of
891 * oldestXact within page2, it returns false for one page that actually is
892 * expendable. This is a wider (yet still negligible) version of the
893 * truncation opportunity that CLOGPagePrecedes() cannot recognize.
894 *
895 * For the sake of a worked example, number entries with decimal values such
896 * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
897 * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
898 * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
899 * because entry=2.85 is the border that toggles whether entries precede the
900 * last entry of the oldestXact page. While page 2 is expendable at
901 * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
902 */
903 static bool
CommitTsPagePrecedes(int page1,int page2)904 CommitTsPagePrecedes(int page1, int page2)
905 {
906 TransactionId xid1;
907 TransactionId xid2;
908
909 xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
910 xid1 += FirstNormalTransactionId + 1;
911 xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
912 xid2 += FirstNormalTransactionId + 1;
913
914 return (TransactionIdPrecedes(xid1, xid2) &&
915 TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
916 }
917
918
919 /*
920 * Write a ZEROPAGE xlog record
921 */
922 static void
WriteZeroPageXlogRec(int pageno)923 WriteZeroPageXlogRec(int pageno)
924 {
925 XLogBeginInsert();
926 XLogRegisterData((char *) (&pageno), sizeof(int));
927 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
928 }
929
930 /*
931 * Write a TRUNCATE xlog record
932 */
933 static void
WriteTruncateXlogRec(int pageno,TransactionId oldestXid)934 WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
935 {
936 xl_commit_ts_truncate xlrec;
937
938 xlrec.pageno = pageno;
939 xlrec.oldestXid = oldestXid;
940
941 XLogBeginInsert();
942 XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
943 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
944 }
945
946 /*
947 * Write a SETTS xlog record
948 */
949 static void
WriteSetTimestampXlogRec(TransactionId mainxid,int nsubxids,TransactionId * subxids,TimestampTz timestamp,RepOriginId nodeid)950 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
951 TransactionId *subxids, TimestampTz timestamp,
952 RepOriginId nodeid)
953 {
954 xl_commit_ts_set record;
955
956 record.timestamp = timestamp;
957 record.nodeid = nodeid;
958 record.mainxid = mainxid;
959
960 XLogBeginInsert();
961 XLogRegisterData((char *) &record,
962 offsetof(xl_commit_ts_set, mainxid) +
963 sizeof(TransactionId));
964 XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
965 XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
966 }
967
968 /*
969 * CommitTS resource manager's routines
970 */
971 void
commit_ts_redo(XLogReaderState * record)972 commit_ts_redo(XLogReaderState *record)
973 {
974 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
975
976 /* Backup blocks are not used in commit_ts records */
977 Assert(!XLogRecHasAnyBlockRefs(record));
978
979 if (info == COMMIT_TS_ZEROPAGE)
980 {
981 int pageno;
982 int slotno;
983
984 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
985
986 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
987
988 slotno = ZeroCommitTsPage(pageno, false);
989 SimpleLruWritePage(CommitTsCtl, slotno);
990 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
991
992 LWLockRelease(CommitTsControlLock);
993 }
994 else if (info == COMMIT_TS_TRUNCATE)
995 {
996 xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
997
998 AdvanceOldestCommitTsXid(trunc->oldestXid);
999
1000 /*
1001 * During XLOG replay, latest_page_number isn't set up yet; insert a
1002 * suitable value to bypass the sanity test in SimpleLruTruncate.
1003 */
1004 CommitTsCtl->shared->latest_page_number = trunc->pageno;
1005
1006 SimpleLruTruncate(CommitTsCtl, trunc->pageno);
1007 }
1008 else if (info == COMMIT_TS_SETTS)
1009 {
1010 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
1011 int nsubxids;
1012 TransactionId *subxids;
1013
1014 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1015 sizeof(TransactionId));
1016 if (nsubxids > 0)
1017 {
1018 subxids = palloc(sizeof(TransactionId) * nsubxids);
1019 memcpy(subxids,
1020 XLogRecGetData(record) + SizeOfCommitTsSet,
1021 sizeof(TransactionId) * nsubxids);
1022 }
1023 else
1024 subxids = NULL;
1025
1026 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1027 setts->timestamp, setts->nodeid, false);
1028 if (subxids)
1029 pfree(subxids);
1030 }
1031 else
1032 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1033 }
1034