1 /*------------------------------------------------------------------------- 2 * 3 * commit_ts.c 4 * PostgreSQL commit timestamp manager 5 * 6 * This module is a pg_xact-like system that stores the commit timestamp 7 * for each transaction. 8 * 9 * XLOG interactions: this module generates an XLOG record whenever a new 10 * CommitTs page is initialized to zeroes. Also, one XLOG record is 11 * generated for setting of values when the caller requests it; this allows 12 * us to support values coming from places other than transaction commit. 13 * Other writes of CommitTS come from recording of transaction commit in 14 * xact.c, which generates its own XLOG records for these events and will 15 * re-perform the status update on redo; so we need make no additional XLOG 16 * entry here. 17 * 18 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group 19 * Portions Copyright (c) 1994, Regents of the University of California 20 * 21 * src/backend/access/transam/commit_ts.c 22 * 23 *------------------------------------------------------------------------- 24 */ 25 #include "postgres.h" 26 27 #include "access/commit_ts.h" 28 #include "access/htup_details.h" 29 #include "access/slru.h" 30 #include "access/transam.h" 31 #include "catalog/pg_type.h" 32 #include "funcapi.h" 33 #include "miscadmin.h" 34 #include "pg_trace.h" 35 #include "storage/shmem.h" 36 #include "utils/builtins.h" 37 #include "utils/snapmgr.h" 38 #include "utils/timestamp.h" 39 40 /* 41 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used 42 * everywhere else in Postgres. 43 * 44 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, 45 * CommitTs page numbering also wraps around at 46 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at 47 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no 48 * explicit notice of that fact in this module, except when comparing segment 49 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes). 50 */ 51 52 /* 53 * We need 8+2 bytes per xact. Note that enlarging this struct might mean 54 * the largest possible file name is more than 5 chars long; see 55 * SlruScanDirectory. 56 */ 57 typedef struct CommitTimestampEntry 58 { 59 TimestampTz time; 60 RepOriginId nodeid; 61 } CommitTimestampEntry; 62 63 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \ 64 sizeof(RepOriginId)) 65 66 #define COMMIT_TS_XACTS_PER_PAGE \ 67 (BLCKSZ / SizeOfCommitTimestampEntry) 68 69 #define TransactionIdToCTsPage(xid) \ 70 ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE) 71 #define TransactionIdToCTsEntry(xid) \ 72 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE) 73 74 /* 75 * Link to shared-memory data structures for CommitTs control 76 */ 77 static SlruCtlData CommitTsCtlData; 78 79 #define CommitTsCtl (&CommitTsCtlData) 80 81 /* 82 * We keep a cache of the last value set in shared memory. 83 * 84 * This is also good place to keep the activation status. We keep this 85 * separate from the GUC so that the standby can activate the module if the 86 * primary has it active independently of the value of the GUC. 87 * 88 * This is protected by CommitTsLock. In some places, we use commitTsActive 89 * without acquiring the lock; where this happens, a comment explains the 90 * rationale for it. 91 */ 92 typedef struct CommitTimestampShared 93 { 94 TransactionId xidLastCommit; 95 CommitTimestampEntry dataLastCommit; 96 bool commitTsActive; 97 } CommitTimestampShared; 98 99 CommitTimestampShared *commitTsShared; 100 101 102 /* GUC variable */ 103 bool track_commit_timestamp; 104 105 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, 106 TransactionId *subxids, TimestampTz ts, 107 RepOriginId nodeid, int pageno); 108 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, 109 RepOriginId nodeid, int slotno); 110 static void error_commit_ts_disabled(void); 111 static int ZeroCommitTsPage(int pageno, bool writeXlog); 112 static bool CommitTsPagePrecedes(int page1, int page2); 113 static void ActivateCommitTs(void); 114 static void DeactivateCommitTs(void); 115 static void WriteZeroPageXlogRec(int pageno); 116 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid); 117 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, 118 TransactionId *subxids, TimestampTz timestamp, 119 RepOriginId nodeid); 120 121 /* 122 * TransactionTreeSetCommitTsData 123 * 124 * Record the final commit timestamp of transaction entries in the commit log 125 * for a transaction and its subtransaction tree, as efficiently as possible. 126 * 127 * xid is the top level transaction id. 128 * 129 * subxids is an array of xids of length nsubxids, representing subtransactions 130 * in the tree of xid. In various cases nsubxids may be zero. 131 * The reason why tracking just the parent xid commit timestamp is not enough 132 * is that the subtrans SLRU does not stay valid across crashes (it's not 133 * permanent) so we need to keep the information about them here. If the 134 * subtrans implementation changes in the future, we might want to revisit the 135 * decision of storing timestamp info for each subxid. 136 * 137 * The write_xlog parameter tells us whether to include an XLog record of this 138 * or not. Normally, this is called from transaction commit routines (both 139 * normal and prepared) and the information will be stored in the transaction 140 * commit XLog record, and so they should pass "false" for this. The XLog redo 141 * code should use "false" here as well. Other callers probably want to pass 142 * true, so that the given values persist in case of crashes. 143 */ 144 void 145 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, 146 TransactionId *subxids, TimestampTz timestamp, 147 RepOriginId nodeid, bool write_xlog) 148 { 149 int i; 150 TransactionId headxid; 151 TransactionId newestXact; 152 153 /* 154 * No-op if the module is not active. 155 * 156 * An unlocked read here is fine, because in a standby (the only place 157 * where the flag can change in flight) this routine is only called by the 158 * recovery process, which is also the only process which can change the 159 * flag. 160 */ 161 if (!commitTsShared->commitTsActive) 162 return; 163 164 /* 165 * Comply with the WAL-before-data rule: if caller specified it wants this 166 * value to be recorded in WAL, do so before touching the data. 167 */ 168 if (write_xlog) 169 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid); 170 171 /* 172 * Figure out the latest Xid in this batch: either the last subxid if 173 * there's any, otherwise the parent xid. 174 */ 175 if (nsubxids > 0) 176 newestXact = subxids[nsubxids - 1]; 177 else 178 newestXact = xid; 179 180 /* 181 * We split the xids to set the timestamp to in groups belonging to the 182 * same SLRU page; the first element in each such set is its head. The 183 * first group has the main XID as the head; subsequent sets use the first 184 * subxid not on the previous page as head. This way, we only have to 185 * lock/modify each SLRU page once. 186 */ 187 for (i = 0, headxid = xid;;) 188 { 189 int pageno = TransactionIdToCTsPage(headxid); 190 int j; 191 192 for (j = i; j < nsubxids; j++) 193 { 194 if (TransactionIdToCTsPage(subxids[j]) != pageno) 195 break; 196 } 197 /* subxids[i..j] are on the same page as the head */ 198 199 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid, 200 pageno); 201 202 /* if we wrote out all subxids, we're done. */ 203 if (j + 1 >= nsubxids) 204 break; 205 206 /* 207 * Set the new head and skip over it, as well as over the subxids we 208 * just wrote. 209 */ 210 headxid = subxids[j]; 211 i += j - i + 1; 212 } 213 214 /* update the cached value in shared memory */ 215 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); 216 commitTsShared->xidLastCommit = xid; 217 commitTsShared->dataLastCommit.time = timestamp; 218 commitTsShared->dataLastCommit.nodeid = nodeid; 219 220 /* and move forwards our endpoint, if needed */ 221 if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact)) 222 ShmemVariableCache->newestCommitTsXid = newestXact; 223 LWLockRelease(CommitTsLock); 224 } 225 226 /* 227 * Record the commit timestamp of transaction entries in the commit log for all 228 * entries on a single page. Atomic only on this page. 229 */ 230 static void 231 SetXidCommitTsInPage(TransactionId xid, int nsubxids, 232 TransactionId *subxids, TimestampTz ts, 233 RepOriginId nodeid, int pageno) 234 { 235 int slotno; 236 int i; 237 238 LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); 239 240 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid); 241 242 TransactionIdSetCommitTs(xid, ts, nodeid, slotno); 243 for (i = 0; i < nsubxids; i++) 244 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno); 245 246 CommitTsCtl->shared->page_dirty[slotno] = true; 247 248 LWLockRelease(CommitTsSLRULock); 249 } 250 251 /* 252 * Sets the commit timestamp of a single transaction. 253 * 254 * Must be called with CommitTsSLRULock held 255 */ 256 static void 257 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, 258 RepOriginId nodeid, int slotno) 259 { 260 int entryno = TransactionIdToCTsEntry(xid); 261 CommitTimestampEntry entry; 262 263 Assert(TransactionIdIsNormal(xid)); 264 265 entry.time = ts; 266 entry.nodeid = nodeid; 267 268 memcpy(CommitTsCtl->shared->page_buffer[slotno] + 269 SizeOfCommitTimestampEntry * entryno, 270 &entry, SizeOfCommitTimestampEntry); 271 } 272 273 /* 274 * Interrogate the commit timestamp of a transaction. 275 * 276 * The return value indicates whether a commit timestamp record was found for 277 * the given xid. The timestamp value is returned in *ts (which may not be 278 * null), and the origin node for the Xid is returned in *nodeid, if it's not 279 * null. 280 */ 281 bool 282 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, 283 RepOriginId *nodeid) 284 { 285 int pageno = TransactionIdToCTsPage(xid); 286 int entryno = TransactionIdToCTsEntry(xid); 287 int slotno; 288 CommitTimestampEntry entry; 289 TransactionId oldestCommitTsXid; 290 TransactionId newestCommitTsXid; 291 292 if (!TransactionIdIsValid(xid)) 293 ereport(ERROR, 294 (errcode(ERRCODE_INVALID_PARAMETER_VALUE), 295 errmsg("cannot retrieve commit timestamp for transaction %u", xid))); 296 else if (!TransactionIdIsNormal(xid)) 297 { 298 /* frozen and bootstrap xids are always committed far in the past */ 299 *ts = 0; 300 if (nodeid) 301 *nodeid = 0; 302 return false; 303 } 304 305 LWLockAcquire(CommitTsLock, LW_SHARED); 306 307 /* Error if module not enabled */ 308 if (!commitTsShared->commitTsActive) 309 error_commit_ts_disabled(); 310 311 /* 312 * If we're asked for the cached value, return that. Otherwise, fall 313 * through to read from SLRU. 314 */ 315 if (commitTsShared->xidLastCommit == xid) 316 { 317 *ts = commitTsShared->dataLastCommit.time; 318 if (nodeid) 319 *nodeid = commitTsShared->dataLastCommit.nodeid; 320 321 LWLockRelease(CommitTsLock); 322 return *ts != 0; 323 } 324 325 oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid; 326 newestCommitTsXid = ShmemVariableCache->newestCommitTsXid; 327 /* neither is invalid, or both are */ 328 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid)); 329 LWLockRelease(CommitTsLock); 330 331 /* 332 * Return empty if the requested value is outside our valid range. 333 */ 334 if (!TransactionIdIsValid(oldestCommitTsXid) || 335 TransactionIdPrecedes(xid, oldestCommitTsXid) || 336 TransactionIdPrecedes(newestCommitTsXid, xid)) 337 { 338 *ts = 0; 339 if (nodeid) 340 *nodeid = InvalidRepOriginId; 341 return false; 342 } 343 344 /* lock is acquired by SimpleLruReadPage_ReadOnly */ 345 slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid); 346 memcpy(&entry, 347 CommitTsCtl->shared->page_buffer[slotno] + 348 SizeOfCommitTimestampEntry * entryno, 349 SizeOfCommitTimestampEntry); 350 351 *ts = entry.time; 352 if (nodeid) 353 *nodeid = entry.nodeid; 354 355 LWLockRelease(CommitTsSLRULock); 356 return *ts != 0; 357 } 358 359 /* 360 * Return the Xid of the latest committed transaction. (As far as this module 361 * is concerned, anyway; it's up to the caller to ensure the value is useful 362 * for its purposes.) 363 * 364 * ts and extra are filled with the corresponding data; they can be passed 365 * as NULL if not wanted. 366 */ 367 TransactionId 368 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid) 369 { 370 TransactionId xid; 371 372 LWLockAcquire(CommitTsLock, LW_SHARED); 373 374 /* Error if module not enabled */ 375 if (!commitTsShared->commitTsActive) 376 error_commit_ts_disabled(); 377 378 xid = commitTsShared->xidLastCommit; 379 if (ts) 380 *ts = commitTsShared->dataLastCommit.time; 381 if (nodeid) 382 *nodeid = commitTsShared->dataLastCommit.nodeid; 383 LWLockRelease(CommitTsLock); 384 385 return xid; 386 } 387 388 static void 389 error_commit_ts_disabled(void) 390 { 391 ereport(ERROR, 392 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 393 errmsg("could not get commit timestamp data"), 394 RecoveryInProgress() ? 395 errhint("Make sure the configuration parameter \"%s\" is set on the master server.", 396 "track_commit_timestamp") : 397 errhint("Make sure the configuration parameter \"%s\" is set.", 398 "track_commit_timestamp"))); 399 } 400 401 /* 402 * SQL-callable wrapper to obtain commit time of a transaction 403 */ 404 Datum 405 pg_xact_commit_timestamp(PG_FUNCTION_ARGS) 406 { 407 TransactionId xid = PG_GETARG_UINT32(0); 408 TimestampTz ts; 409 bool found; 410 411 found = TransactionIdGetCommitTsData(xid, &ts, NULL); 412 413 if (!found) 414 PG_RETURN_NULL(); 415 416 PG_RETURN_TIMESTAMPTZ(ts); 417 } 418 419 420 Datum 421 pg_last_committed_xact(PG_FUNCTION_ARGS) 422 { 423 TransactionId xid; 424 TimestampTz ts; 425 Datum values[2]; 426 bool nulls[2]; 427 TupleDesc tupdesc; 428 HeapTuple htup; 429 430 /* and construct a tuple with our data */ 431 xid = GetLatestCommitTsData(&ts, NULL); 432 433 /* 434 * Construct a tuple descriptor for the result row. This must match this 435 * function's pg_proc entry! 436 */ 437 tupdesc = CreateTemplateTupleDesc(2); 438 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid", 439 XIDOID, -1, 0); 440 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp", 441 TIMESTAMPTZOID, -1, 0); 442 tupdesc = BlessTupleDesc(tupdesc); 443 444 if (!TransactionIdIsNormal(xid)) 445 { 446 memset(nulls, true, sizeof(nulls)); 447 } 448 else 449 { 450 values[0] = TransactionIdGetDatum(xid); 451 nulls[0] = false; 452 453 values[1] = TimestampTzGetDatum(ts); 454 nulls[1] = false; 455 } 456 457 htup = heap_form_tuple(tupdesc, values, nulls); 458 459 PG_RETURN_DATUM(HeapTupleGetDatum(htup)); 460 } 461 462 463 /* 464 * Number of shared CommitTS buffers. 465 * 466 * We use a very similar logic as for the number of CLOG buffers; see comments 467 * in CLOGShmemBuffers. 468 */ 469 Size 470 CommitTsShmemBuffers(void) 471 { 472 return Min(16, Max(4, NBuffers / 1024)); 473 } 474 475 /* 476 * Shared memory sizing for CommitTs 477 */ 478 Size 479 CommitTsShmemSize(void) 480 { 481 return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) + 482 sizeof(CommitTimestampShared); 483 } 484 485 /* 486 * Initialize CommitTs at system startup (postmaster start or standalone 487 * backend) 488 */ 489 void 490 CommitTsShmemInit(void) 491 { 492 bool found; 493 494 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes; 495 SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0, 496 CommitTsSLRULock, "pg_commit_ts", 497 LWTRANCHE_COMMITTS_BUFFER); 498 SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE); 499 500 commitTsShared = ShmemInitStruct("CommitTs shared", 501 sizeof(CommitTimestampShared), 502 &found); 503 504 if (!IsUnderPostmaster) 505 { 506 Assert(!found); 507 508 commitTsShared->xidLastCommit = InvalidTransactionId; 509 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); 510 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId; 511 commitTsShared->commitTsActive = false; 512 } 513 else 514 Assert(found); 515 } 516 517 /* 518 * This function must be called ONCE on system install. 519 * 520 * (The CommitTs directory is assumed to have been created by initdb, and 521 * CommitTsShmemInit must have been called already.) 522 */ 523 void 524 BootStrapCommitTs(void) 525 { 526 /* 527 * Nothing to do here at present, unlike most other SLRU modules; segments 528 * are created when the server is started with this module enabled. See 529 * ActivateCommitTs. 530 */ 531 } 532 533 /* 534 * Initialize (or reinitialize) a page of CommitTs to zeroes. 535 * If writeXlog is true, also emit an XLOG record saying we did this. 536 * 537 * The page is not actually written, just set up in shared memory. 538 * The slot number of the new page is returned. 539 * 540 * Control lock must be held at entry, and will be held at exit. 541 */ 542 static int 543 ZeroCommitTsPage(int pageno, bool writeXlog) 544 { 545 int slotno; 546 547 slotno = SimpleLruZeroPage(CommitTsCtl, pageno); 548 549 if (writeXlog) 550 WriteZeroPageXlogRec(pageno); 551 552 return slotno; 553 } 554 555 /* 556 * This must be called ONCE during postmaster or standalone-backend startup, 557 * after StartupXLOG has initialized ShmemVariableCache->nextFullXid. 558 */ 559 void 560 StartupCommitTs(void) 561 { 562 ActivateCommitTs(); 563 } 564 565 /* 566 * This must be called ONCE during postmaster or standalone-backend startup, 567 * after recovery has finished. 568 */ 569 void 570 CompleteCommitTsInitialization(void) 571 { 572 /* 573 * If the feature is not enabled, turn it off for good. This also removes 574 * any leftover data. 575 * 576 * Conversely, we activate the module if the feature is enabled. This is 577 * necessary for primary and standby as the activation depends on the 578 * control file contents at the beginning of recovery or when a 579 * XLOG_PARAMETER_CHANGE is replayed. 580 */ 581 if (!track_commit_timestamp) 582 DeactivateCommitTs(); 583 else 584 ActivateCommitTs(); 585 } 586 587 /* 588 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE 589 * XLog record during recovery. 590 */ 591 void 592 CommitTsParameterChange(bool newvalue, bool oldvalue) 593 { 594 /* 595 * If the commit_ts module is disabled in this server and we get word from 596 * the master server that it is enabled there, activate it so that we can 597 * replay future WAL records involving it; also mark it as active on 598 * pg_control. If the old value was already set, we already did this, so 599 * don't do anything. 600 * 601 * If the module is disabled in the master, disable it here too, unless 602 * the module is enabled locally. 603 * 604 * Note this only runs in the recovery process, so an unlocked read is 605 * fine. 606 */ 607 if (newvalue) 608 { 609 if (!commitTsShared->commitTsActive) 610 ActivateCommitTs(); 611 } 612 else if (commitTsShared->commitTsActive) 613 DeactivateCommitTs(); 614 } 615 616 /* 617 * Activate this module whenever necessary. 618 * This must happen during postmaster or standalone-backend startup, 619 * or during WAL replay anytime the track_commit_timestamp setting is 620 * changed in the master. 621 * 622 * The reason why this SLRU needs separate activation/deactivation functions is 623 * that it can be enabled/disabled during start and the activation/deactivation 624 * on master is propagated to standby via replay. Other SLRUs don't have this 625 * property and they can be just initialized during normal startup. 626 * 627 * This is in charge of creating the currently active segment, if it's not 628 * already there. The reason for this is that the server might have been 629 * running with this module disabled for a while and thus might have skipped 630 * the normal creation point. 631 */ 632 static void 633 ActivateCommitTs(void) 634 { 635 TransactionId xid; 636 int pageno; 637 638 /* If we've done this already, there's nothing to do */ 639 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); 640 if (commitTsShared->commitTsActive) 641 { 642 LWLockRelease(CommitTsLock); 643 return; 644 } 645 LWLockRelease(CommitTsLock); 646 647 xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid); 648 pageno = TransactionIdToCTsPage(xid); 649 650 /* 651 * Re-Initialize our idea of the latest page number. 652 */ 653 LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); 654 CommitTsCtl->shared->latest_page_number = pageno; 655 LWLockRelease(CommitTsSLRULock); 656 657 /* 658 * If CommitTs is enabled, but it wasn't in the previous server run, we 659 * need to set the oldest and newest values to the next Xid; that way, we 660 * will not try to read data that might not have been set. 661 * 662 * XXX does this have a problem if a server is started with commitTs 663 * enabled, then started with commitTs disabled, then restarted with it 664 * enabled again? It doesn't look like it does, because there should be a 665 * checkpoint that sets the value to InvalidTransactionId at end of 666 * recovery; and so any chance of injecting new transactions without 667 * CommitTs values would occur after the oldestCommitTsXid has been set to 668 * Invalid temporarily. 669 */ 670 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); 671 if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId) 672 { 673 ShmemVariableCache->oldestCommitTsXid = 674 ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId(); 675 } 676 LWLockRelease(CommitTsLock); 677 678 /* Create the current segment file, if necessary */ 679 if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno)) 680 { 681 int slotno; 682 683 LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); 684 slotno = ZeroCommitTsPage(pageno, false); 685 SimpleLruWritePage(CommitTsCtl, slotno); 686 Assert(!CommitTsCtl->shared->page_dirty[slotno]); 687 LWLockRelease(CommitTsSLRULock); 688 } 689 690 /* Change the activation status in shared memory. */ 691 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); 692 commitTsShared->commitTsActive = true; 693 LWLockRelease(CommitTsLock); 694 } 695 696 /* 697 * Deactivate this module. 698 * 699 * This must be called when the track_commit_timestamp parameter is turned off. 700 * This happens during postmaster or standalone-backend startup, or during WAL 701 * replay. 702 * 703 * Resets CommitTs into invalid state to make sure we don't hand back 704 * possibly-invalid data; also removes segments of old data. 705 */ 706 static void 707 DeactivateCommitTs(void) 708 { 709 /* 710 * Cleanup the status in the shared memory. 711 * 712 * We reset everything in the commitTsShared record to prevent user from 713 * getting confusing data about last committed transaction on the standby 714 * when the module was activated repeatedly on the primary. 715 */ 716 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); 717 718 commitTsShared->commitTsActive = false; 719 commitTsShared->xidLastCommit = InvalidTransactionId; 720 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); 721 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId; 722 723 ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId; 724 ShmemVariableCache->newestCommitTsXid = InvalidTransactionId; 725 726 LWLockRelease(CommitTsLock); 727 728 /* 729 * Remove *all* files. This is necessary so that there are no leftover 730 * files; in the case where this feature is later enabled after running 731 * with it disabled for some time there may be a gap in the file sequence. 732 * (We can probably tolerate out-of-sequence files, as they are going to 733 * be overwritten anyway when we wrap around, but it seems better to be 734 * tidy.) 735 */ 736 LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); 737 (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL); 738 LWLockRelease(CommitTsSLRULock); 739 } 740 741 /* 742 * This must be called ONCE during postmaster or standalone-backend shutdown 743 */ 744 void 745 ShutdownCommitTs(void) 746 { 747 /* Flush dirty CommitTs pages to disk */ 748 SimpleLruFlush(CommitTsCtl, false); 749 750 /* 751 * fsync pg_commit_ts to ensure that any files flushed previously are 752 * durably on disk. 753 */ 754 fsync_fname("pg_commit_ts", true); 755 } 756 757 /* 758 * Perform a checkpoint --- either during shutdown, or on-the-fly 759 */ 760 void 761 CheckPointCommitTs(void) 762 { 763 /* Flush dirty CommitTs pages to disk */ 764 SimpleLruFlush(CommitTsCtl, true); 765 } 766 767 /* 768 * Make sure that CommitTs has room for a newly-allocated XID. 769 * 770 * NB: this is called while holding XidGenLock. We want it to be very fast 771 * most of the time; even when it's not so fast, no actual I/O need happen 772 * unless we're forced to write out a dirty CommitTs or xlog page to make room 773 * in shared memory. 774 * 775 * NB: the current implementation relies on track_commit_timestamp being 776 * PGC_POSTMASTER. 777 */ 778 void 779 ExtendCommitTs(TransactionId newestXact) 780 { 781 int pageno; 782 783 /* 784 * Nothing to do if module not enabled. Note we do an unlocked read of 785 * the flag here, which is okay because this routine is only called from 786 * GetNewTransactionId, which is never called in a standby. 787 */ 788 Assert(!InRecovery); 789 if (!commitTsShared->commitTsActive) 790 return; 791 792 /* 793 * No work except at first XID of a page. But beware: just after 794 * wraparound, the first XID of page zero is FirstNormalTransactionId. 795 */ 796 if (TransactionIdToCTsEntry(newestXact) != 0 && 797 !TransactionIdEquals(newestXact, FirstNormalTransactionId)) 798 return; 799 800 pageno = TransactionIdToCTsPage(newestXact); 801 802 LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); 803 804 /* Zero the page and make an XLOG entry about it */ 805 ZeroCommitTsPage(pageno, !InRecovery); 806 807 LWLockRelease(CommitTsSLRULock); 808 } 809 810 /* 811 * Remove all CommitTs segments before the one holding the passed 812 * transaction ID. 813 * 814 * Note that we don't need to flush XLOG here. 815 */ 816 void 817 TruncateCommitTs(TransactionId oldestXact) 818 { 819 int cutoffPage; 820 821 /* 822 * The cutoff point is the start of the segment containing oldestXact. We 823 * pass the *page* containing oldestXact to SimpleLruTruncate. 824 */ 825 cutoffPage = TransactionIdToCTsPage(oldestXact); 826 827 /* Check to see if there's any files that could be removed */ 828 if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence, 829 &cutoffPage)) 830 return; /* nothing to remove */ 831 832 /* Write XLOG record */ 833 WriteTruncateXlogRec(cutoffPage, oldestXact); 834 835 /* Now we can remove the old CommitTs segment(s) */ 836 SimpleLruTruncate(CommitTsCtl, cutoffPage); 837 } 838 839 /* 840 * Set the limit values between which commit TS can be consulted. 841 */ 842 void 843 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact) 844 { 845 /* 846 * Be careful not to overwrite values that are either further into the 847 * "future" or signal a disabled committs. 848 */ 849 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); 850 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId) 851 { 852 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact)) 853 ShmemVariableCache->oldestCommitTsXid = oldestXact; 854 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid)) 855 ShmemVariableCache->newestCommitTsXid = newestXact; 856 } 857 else 858 { 859 Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId); 860 ShmemVariableCache->oldestCommitTsXid = oldestXact; 861 ShmemVariableCache->newestCommitTsXid = newestXact; 862 } 863 LWLockRelease(CommitTsLock); 864 } 865 866 /* 867 * Move forwards the oldest commitTS value that can be consulted 868 */ 869 void 870 AdvanceOldestCommitTsXid(TransactionId oldestXact) 871 { 872 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); 873 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId && 874 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact)) 875 ShmemVariableCache->oldestCommitTsXid = oldestXact; 876 LWLockRelease(CommitTsLock); 877 } 878 879 880 /* 881 * Decide whether a commitTS page number is "older" for truncation purposes. 882 * Analogous to CLOGPagePrecedes(). 883 * 884 * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This 885 * introduces differences compared to CLOG and the other SLRUs having (1 << 886 * 31) % per_page == 0. This function never tests exactly 887 * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit, 888 * there are two possible counts of page boundaries between oldestXact and the 889 * latest XID assigned, depending on whether oldestXact is within the first 890 * 128 entries of its page. Since this function doesn't know the location of 891 * oldestXact within page2, it returns false for one page that actually is 892 * expendable. This is a wider (yet still negligible) version of the 893 * truncation opportunity that CLOGPagePrecedes() cannot recognize. 894 * 895 * For the sake of a worked example, number entries with decimal values such 896 * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of 897 * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1, 898 * then the final safe XID assignment leaves newestXact=1.95. We keep page 2, 899 * because entry=2.85 is the border that toggles whether entries precede the 900 * last entry of the oldestXact page. While page 2 is expendable at 901 * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9. 902 */ 903 static bool 904 CommitTsPagePrecedes(int page1, int page2) 905 { 906 TransactionId xid1; 907 TransactionId xid2; 908 909 xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE; 910 xid1 += FirstNormalTransactionId + 1; 911 xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE; 912 xid2 += FirstNormalTransactionId + 1; 913 914 return (TransactionIdPrecedes(xid1, xid2) && 915 TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1)); 916 } 917 918 919 /* 920 * Write a ZEROPAGE xlog record 921 */ 922 static void 923 WriteZeroPageXlogRec(int pageno) 924 { 925 XLogBeginInsert(); 926 XLogRegisterData((char *) (&pageno), sizeof(int)); 927 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE); 928 } 929 930 /* 931 * Write a TRUNCATE xlog record 932 */ 933 static void 934 WriteTruncateXlogRec(int pageno, TransactionId oldestXid) 935 { 936 xl_commit_ts_truncate xlrec; 937 938 xlrec.pageno = pageno; 939 xlrec.oldestXid = oldestXid; 940 941 XLogBeginInsert(); 942 XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate); 943 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE); 944 } 945 946 /* 947 * Write a SETTS xlog record 948 */ 949 static void 950 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, 951 TransactionId *subxids, TimestampTz timestamp, 952 RepOriginId nodeid) 953 { 954 xl_commit_ts_set record; 955 956 record.timestamp = timestamp; 957 record.nodeid = nodeid; 958 record.mainxid = mainxid; 959 960 XLogBeginInsert(); 961 XLogRegisterData((char *) &record, 962 offsetof(xl_commit_ts_set, mainxid) + 963 sizeof(TransactionId)); 964 XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId)); 965 XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS); 966 } 967 968 /* 969 * CommitTS resource manager's routines 970 */ 971 void 972 commit_ts_redo(XLogReaderState *record) 973 { 974 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; 975 976 /* Backup blocks are not used in commit_ts records */ 977 Assert(!XLogRecHasAnyBlockRefs(record)); 978 979 if (info == COMMIT_TS_ZEROPAGE) 980 { 981 int pageno; 982 int slotno; 983 984 memcpy(&pageno, XLogRecGetData(record), sizeof(int)); 985 986 LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); 987 988 slotno = ZeroCommitTsPage(pageno, false); 989 SimpleLruWritePage(CommitTsCtl, slotno); 990 Assert(!CommitTsCtl->shared->page_dirty[slotno]); 991 992 LWLockRelease(CommitTsSLRULock); 993 } 994 else if (info == COMMIT_TS_TRUNCATE) 995 { 996 xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record); 997 998 AdvanceOldestCommitTsXid(trunc->oldestXid); 999 1000 /* 1001 * During XLOG replay, latest_page_number isn't set up yet; insert a 1002 * suitable value to bypass the sanity test in SimpleLruTruncate. 1003 */ 1004 CommitTsCtl->shared->latest_page_number = trunc->pageno; 1005 1006 SimpleLruTruncate(CommitTsCtl, trunc->pageno); 1007 } 1008 else if (info == COMMIT_TS_SETTS) 1009 { 1010 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record); 1011 int nsubxids; 1012 TransactionId *subxids; 1013 1014 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) / 1015 sizeof(TransactionId)); 1016 if (nsubxids > 0) 1017 { 1018 subxids = palloc(sizeof(TransactionId) * nsubxids); 1019 memcpy(subxids, 1020 XLogRecGetData(record) + SizeOfCommitTsSet, 1021 sizeof(TransactionId) * nsubxids); 1022 } 1023 else 1024 subxids = NULL; 1025 1026 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids, 1027 setts->timestamp, setts->nodeid, false); 1028 if (subxids) 1029 pfree(subxids); 1030 } 1031 else 1032 elog(PANIC, "commit_ts_redo: unknown op code %u", info); 1033 } 1034