1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  *		PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_xact-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26 
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "storage/shmem.h"
36 #include "utils/builtins.h"
37 #include "utils/snapmgr.h"
38 #include "utils/timestamp.h"
39 
40 /*
41  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
42  * everywhere else in Postgres.
43  *
44  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45  * CommitTs page numbering also wraps around at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
47  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
48  * explicit notice of that fact in this module, except when comparing segment
49  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50  */
51 
52 /*
53  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
54  * the largest possible file name is more than 5 chars long; see
55  * SlruScanDirectory.
56  */
57 typedef struct CommitTimestampEntry
58 {
59 	TimestampTz time;
60 	RepOriginId nodeid;
61 } CommitTimestampEntry;
62 
63 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
64 									sizeof(RepOriginId))
65 
66 #define COMMIT_TS_XACTS_PER_PAGE \
67 	(BLCKSZ / SizeOfCommitTimestampEntry)
68 
69 #define TransactionIdToCTsPage(xid) \
70 	((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
71 #define TransactionIdToCTsEntry(xid)	\
72 	((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
73 
74 /*
75  * Link to shared-memory data structures for CommitTs control
76  */
77 static SlruCtlData CommitTsCtlData;
78 
79 #define CommitTsCtl (&CommitTsCtlData)
80 
81 /*
82  * We keep a cache of the last value set in shared memory.
83  *
84  * This is also good place to keep the activation status.  We keep this
85  * separate from the GUC so that the standby can activate the module if the
86  * primary has it active independently of the value of the GUC.
87  *
88  * This is protected by CommitTsLock.  In some places, we use commitTsActive
89  * without acquiring the lock; where this happens, a comment explains the
90  * rationale for it.
91  */
92 typedef struct CommitTimestampShared
93 {
94 	TransactionId xidLastCommit;
95 	CommitTimestampEntry dataLastCommit;
96 	bool		commitTsActive;
97 } CommitTimestampShared;
98 
99 CommitTimestampShared *commitTsShared;
100 
101 
102 /* GUC variable */
103 bool		track_commit_timestamp;
104 
105 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
106 								 TransactionId *subxids, TimestampTz ts,
107 								 RepOriginId nodeid, int pageno);
108 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
109 									 RepOriginId nodeid, int slotno);
110 static void error_commit_ts_disabled(void);
111 static int	ZeroCommitTsPage(int pageno, bool writeXlog);
112 static bool CommitTsPagePrecedes(int page1, int page2);
113 static void ActivateCommitTs(void);
114 static void DeactivateCommitTs(void);
115 static void WriteZeroPageXlogRec(int pageno);
116 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
117 
118 /*
119  * TransactionTreeSetCommitTsData
120  *
121  * Record the final commit timestamp of transaction entries in the commit log
122  * for a transaction and its subtransaction tree, as efficiently as possible.
123  *
124  * xid is the top level transaction id.
125  *
126  * subxids is an array of xids of length nsubxids, representing subtransactions
127  * in the tree of xid. In various cases nsubxids may be zero.
128  * The reason why tracking just the parent xid commit timestamp is not enough
129  * is that the subtrans SLRU does not stay valid across crashes (it's not
130  * permanent) so we need to keep the information about them here. If the
131  * subtrans implementation changes in the future, we might want to revisit the
132  * decision of storing timestamp info for each subxid.
133  */
134 void
TransactionTreeSetCommitTsData(TransactionId xid,int nsubxids,TransactionId * subxids,TimestampTz timestamp,RepOriginId nodeid)135 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
136 							   TransactionId *subxids, TimestampTz timestamp,
137 							   RepOriginId nodeid)
138 {
139 	int			i;
140 	TransactionId headxid;
141 	TransactionId newestXact;
142 
143 	/*
144 	 * No-op if the module is not active.
145 	 *
146 	 * An unlocked read here is fine, because in a standby (the only place
147 	 * where the flag can change in flight) this routine is only called by the
148 	 * recovery process, which is also the only process which can change the
149 	 * flag.
150 	 */
151 	if (!commitTsShared->commitTsActive)
152 		return;
153 
154 	/*
155 	 * Figure out the latest Xid in this batch: either the last subxid if
156 	 * there's any, otherwise the parent xid.
157 	 */
158 	if (nsubxids > 0)
159 		newestXact = subxids[nsubxids - 1];
160 	else
161 		newestXact = xid;
162 
163 	/*
164 	 * We split the xids to set the timestamp to in groups belonging to the
165 	 * same SLRU page; the first element in each such set is its head.  The
166 	 * first group has the main XID as the head; subsequent sets use the first
167 	 * subxid not on the previous page as head.  This way, we only have to
168 	 * lock/modify each SLRU page once.
169 	 */
170 	for (i = 0, headxid = xid;;)
171 	{
172 		int			pageno = TransactionIdToCTsPage(headxid);
173 		int			j;
174 
175 		for (j = i; j < nsubxids; j++)
176 		{
177 			if (TransactionIdToCTsPage(subxids[j]) != pageno)
178 				break;
179 		}
180 		/* subxids[i..j] are on the same page as the head */
181 
182 		SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
183 							 pageno);
184 
185 		/* if we wrote out all subxids, we're done. */
186 		if (j + 1 >= nsubxids)
187 			break;
188 
189 		/*
190 		 * Set the new head and skip over it, as well as over the subxids we
191 		 * just wrote.
192 		 */
193 		headxid = subxids[j];
194 		i += j - i + 1;
195 	}
196 
197 	/* update the cached value in shared memory */
198 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
199 	commitTsShared->xidLastCommit = xid;
200 	commitTsShared->dataLastCommit.time = timestamp;
201 	commitTsShared->dataLastCommit.nodeid = nodeid;
202 
203 	/* and move forwards our endpoint, if needed */
204 	if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
205 		ShmemVariableCache->newestCommitTsXid = newestXact;
206 	LWLockRelease(CommitTsLock);
207 }
208 
209 /*
210  * Record the commit timestamp of transaction entries in the commit log for all
211  * entries on a single page.  Atomic only on this page.
212  */
213 static void
SetXidCommitTsInPage(TransactionId xid,int nsubxids,TransactionId * subxids,TimestampTz ts,RepOriginId nodeid,int pageno)214 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
215 					 TransactionId *subxids, TimestampTz ts,
216 					 RepOriginId nodeid, int pageno)
217 {
218 	int			slotno;
219 	int			i;
220 
221 	LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
222 
223 	slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
224 
225 	TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
226 	for (i = 0; i < nsubxids; i++)
227 		TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
228 
229 	CommitTsCtl->shared->page_dirty[slotno] = true;
230 
231 	LWLockRelease(CommitTsSLRULock);
232 }
233 
234 /*
235  * Sets the commit timestamp of a single transaction.
236  *
237  * Must be called with CommitTsSLRULock held
238  */
239 static void
TransactionIdSetCommitTs(TransactionId xid,TimestampTz ts,RepOriginId nodeid,int slotno)240 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
241 						 RepOriginId nodeid, int slotno)
242 {
243 	int			entryno = TransactionIdToCTsEntry(xid);
244 	CommitTimestampEntry entry;
245 
246 	Assert(TransactionIdIsNormal(xid));
247 
248 	entry.time = ts;
249 	entry.nodeid = nodeid;
250 
251 	memcpy(CommitTsCtl->shared->page_buffer[slotno] +
252 		   SizeOfCommitTimestampEntry * entryno,
253 		   &entry, SizeOfCommitTimestampEntry);
254 }
255 
256 /*
257  * Interrogate the commit timestamp of a transaction.
258  *
259  * The return value indicates whether a commit timestamp record was found for
260  * the given xid.  The timestamp value is returned in *ts (which may not be
261  * null), and the origin node for the Xid is returned in *nodeid, if it's not
262  * null.
263  */
264 bool
TransactionIdGetCommitTsData(TransactionId xid,TimestampTz * ts,RepOriginId * nodeid)265 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
266 							 RepOriginId *nodeid)
267 {
268 	int			pageno = TransactionIdToCTsPage(xid);
269 	int			entryno = TransactionIdToCTsEntry(xid);
270 	int			slotno;
271 	CommitTimestampEntry entry;
272 	TransactionId oldestCommitTsXid;
273 	TransactionId newestCommitTsXid;
274 
275 	if (!TransactionIdIsValid(xid))
276 		ereport(ERROR,
277 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
278 				 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
279 	else if (!TransactionIdIsNormal(xid))
280 	{
281 		/* frozen and bootstrap xids are always committed far in the past */
282 		*ts = 0;
283 		if (nodeid)
284 			*nodeid = 0;
285 		return false;
286 	}
287 
288 	LWLockAcquire(CommitTsLock, LW_SHARED);
289 
290 	/* Error if module not enabled */
291 	if (!commitTsShared->commitTsActive)
292 		error_commit_ts_disabled();
293 
294 	/*
295 	 * If we're asked for the cached value, return that.  Otherwise, fall
296 	 * through to read from SLRU.
297 	 */
298 	if (commitTsShared->xidLastCommit == xid)
299 	{
300 		*ts = commitTsShared->dataLastCommit.time;
301 		if (nodeid)
302 			*nodeid = commitTsShared->dataLastCommit.nodeid;
303 
304 		LWLockRelease(CommitTsLock);
305 		return *ts != 0;
306 	}
307 
308 	oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
309 	newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
310 	/* neither is invalid, or both are */
311 	Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
312 	LWLockRelease(CommitTsLock);
313 
314 	/*
315 	 * Return empty if the requested value is outside our valid range.
316 	 */
317 	if (!TransactionIdIsValid(oldestCommitTsXid) ||
318 		TransactionIdPrecedes(xid, oldestCommitTsXid) ||
319 		TransactionIdPrecedes(newestCommitTsXid, xid))
320 	{
321 		*ts = 0;
322 		if (nodeid)
323 			*nodeid = InvalidRepOriginId;
324 		return false;
325 	}
326 
327 	/* lock is acquired by SimpleLruReadPage_ReadOnly */
328 	slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
329 	memcpy(&entry,
330 		   CommitTsCtl->shared->page_buffer[slotno] +
331 		   SizeOfCommitTimestampEntry * entryno,
332 		   SizeOfCommitTimestampEntry);
333 
334 	*ts = entry.time;
335 	if (nodeid)
336 		*nodeid = entry.nodeid;
337 
338 	LWLockRelease(CommitTsSLRULock);
339 	return *ts != 0;
340 }
341 
342 /*
343  * Return the Xid of the latest committed transaction.  (As far as this module
344  * is concerned, anyway; it's up to the caller to ensure the value is useful
345  * for its purposes.)
346  *
347  * ts and nodeid are filled with the corresponding data; they can be passed
348  * as NULL if not wanted.
349  */
350 TransactionId
GetLatestCommitTsData(TimestampTz * ts,RepOriginId * nodeid)351 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
352 {
353 	TransactionId xid;
354 
355 	LWLockAcquire(CommitTsLock, LW_SHARED);
356 
357 	/* Error if module not enabled */
358 	if (!commitTsShared->commitTsActive)
359 		error_commit_ts_disabled();
360 
361 	xid = commitTsShared->xidLastCommit;
362 	if (ts)
363 		*ts = commitTsShared->dataLastCommit.time;
364 	if (nodeid)
365 		*nodeid = commitTsShared->dataLastCommit.nodeid;
366 	LWLockRelease(CommitTsLock);
367 
368 	return xid;
369 }
370 
371 static void
error_commit_ts_disabled(void)372 error_commit_ts_disabled(void)
373 {
374 	ereport(ERROR,
375 			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
376 			 errmsg("could not get commit timestamp data"),
377 			 RecoveryInProgress() ?
378 			 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
379 					 "track_commit_timestamp") :
380 			 errhint("Make sure the configuration parameter \"%s\" is set.",
381 					 "track_commit_timestamp")));
382 }
383 
384 /*
385  * SQL-callable wrapper to obtain commit time of a transaction
386  */
387 Datum
pg_xact_commit_timestamp(PG_FUNCTION_ARGS)388 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
389 {
390 	TransactionId xid = PG_GETARG_TRANSACTIONID(0);
391 	TimestampTz ts;
392 	bool		found;
393 
394 	found = TransactionIdGetCommitTsData(xid, &ts, NULL);
395 
396 	if (!found)
397 		PG_RETURN_NULL();
398 
399 	PG_RETURN_TIMESTAMPTZ(ts);
400 }
401 
402 
403 /*
404  * pg_last_committed_xact
405  *
406  * SQL-callable wrapper to obtain some information about the latest
407  * committed transaction: transaction ID, timestamp and replication
408  * origin.
409  */
410 Datum
pg_last_committed_xact(PG_FUNCTION_ARGS)411 pg_last_committed_xact(PG_FUNCTION_ARGS)
412 {
413 	TransactionId xid;
414 	RepOriginId nodeid;
415 	TimestampTz ts;
416 	Datum		values[3];
417 	bool		nulls[3];
418 	TupleDesc	tupdesc;
419 	HeapTuple	htup;
420 
421 	/* and construct a tuple with our data */
422 	xid = GetLatestCommitTsData(&ts, &nodeid);
423 
424 	/*
425 	 * Construct a tuple descriptor for the result row.  This must match this
426 	 * function's pg_proc entry!
427 	 */
428 	tupdesc = CreateTemplateTupleDesc(3);
429 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
430 					   XIDOID, -1, 0);
431 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
432 					   TIMESTAMPTZOID, -1, 0);
433 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident",
434 					   OIDOID, -1, 0);
435 	tupdesc = BlessTupleDesc(tupdesc);
436 
437 	if (!TransactionIdIsNormal(xid))
438 	{
439 		memset(nulls, true, sizeof(nulls));
440 	}
441 	else
442 	{
443 		values[0] = TransactionIdGetDatum(xid);
444 		nulls[0] = false;
445 
446 		values[1] = TimestampTzGetDatum(ts);
447 		nulls[1] = false;
448 
449 		values[2] = ObjectIdGetDatum((Oid) nodeid);
450 		nulls[2] = false;
451 	}
452 
453 	htup = heap_form_tuple(tupdesc, values, nulls);
454 
455 	PG_RETURN_DATUM(HeapTupleGetDatum(htup));
456 }
457 
458 /*
459  * pg_xact_commit_timestamp_origin
460  *
461  * SQL-callable wrapper to obtain commit timestamp and replication origin
462  * of a given transaction.
463  */
464 Datum
pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)465 pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
466 {
467 	TransactionId xid = PG_GETARG_TRANSACTIONID(0);
468 	RepOriginId nodeid;
469 	TimestampTz ts;
470 	Datum		values[2];
471 	bool		nulls[2];
472 	TupleDesc	tupdesc;
473 	HeapTuple	htup;
474 	bool		found;
475 
476 	found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
477 
478 	/*
479 	 * Construct a tuple descriptor for the result row.  This must match this
480 	 * function's pg_proc entry!
481 	 */
482 	tupdesc = CreateTemplateTupleDesc(2);
483 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
484 					   TIMESTAMPTZOID, -1, 0);
485 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "roident",
486 					   OIDOID, -1, 0);
487 	tupdesc = BlessTupleDesc(tupdesc);
488 
489 	if (!found)
490 	{
491 		memset(nulls, true, sizeof(nulls));
492 	}
493 	else
494 	{
495 		values[0] = TimestampTzGetDatum(ts);
496 		nulls[0] = false;
497 
498 		values[1] = ObjectIdGetDatum((Oid) nodeid);
499 		nulls[1] = false;
500 	}
501 
502 	htup = heap_form_tuple(tupdesc, values, nulls);
503 
504 	PG_RETURN_DATUM(HeapTupleGetDatum(htup));
505 }
506 
507 /*
508  * Number of shared CommitTS buffers.
509  *
510  * We use a very similar logic as for the number of CLOG buffers; see comments
511  * in CLOGShmemBuffers.
512  */
513 Size
CommitTsShmemBuffers(void)514 CommitTsShmemBuffers(void)
515 {
516 	return Min(16, Max(4, NBuffers / 1024));
517 }
518 
519 /*
520  * Shared memory sizing for CommitTs
521  */
522 Size
CommitTsShmemSize(void)523 CommitTsShmemSize(void)
524 {
525 	return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
526 		sizeof(CommitTimestampShared);
527 }
528 
529 /*
530  * Initialize CommitTs at system startup (postmaster start or standalone
531  * backend)
532  */
533 void
CommitTsShmemInit(void)534 CommitTsShmemInit(void)
535 {
536 	bool		found;
537 
538 	CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
539 	SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0,
540 				  CommitTsSLRULock, "pg_commit_ts",
541 				  LWTRANCHE_COMMITTS_BUFFER,
542 				  SYNC_HANDLER_COMMIT_TS);
543 	SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
544 
545 	commitTsShared = ShmemInitStruct("CommitTs shared",
546 									 sizeof(CommitTimestampShared),
547 									 &found);
548 
549 	if (!IsUnderPostmaster)
550 	{
551 		Assert(!found);
552 
553 		commitTsShared->xidLastCommit = InvalidTransactionId;
554 		TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
555 		commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
556 		commitTsShared->commitTsActive = false;
557 	}
558 	else
559 		Assert(found);
560 }
561 
562 /*
563  * This function must be called ONCE on system install.
564  *
565  * (The CommitTs directory is assumed to have been created by initdb, and
566  * CommitTsShmemInit must have been called already.)
567  */
568 void
BootStrapCommitTs(void)569 BootStrapCommitTs(void)
570 {
571 	/*
572 	 * Nothing to do here at present, unlike most other SLRU modules; segments
573 	 * are created when the server is started with this module enabled. See
574 	 * ActivateCommitTs.
575 	 */
576 }
577 
578 /*
579  * Initialize (or reinitialize) a page of CommitTs to zeroes.
580  * If writeXlog is true, also emit an XLOG record saying we did this.
581  *
582  * The page is not actually written, just set up in shared memory.
583  * The slot number of the new page is returned.
584  *
585  * Control lock must be held at entry, and will be held at exit.
586  */
587 static int
ZeroCommitTsPage(int pageno,bool writeXlog)588 ZeroCommitTsPage(int pageno, bool writeXlog)
589 {
590 	int			slotno;
591 
592 	slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
593 
594 	if (writeXlog)
595 		WriteZeroPageXlogRec(pageno);
596 
597 	return slotno;
598 }
599 
600 /*
601  * This must be called ONCE during postmaster or standalone-backend startup,
602  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
603  */
604 void
StartupCommitTs(void)605 StartupCommitTs(void)
606 {
607 	ActivateCommitTs();
608 }
609 
610 /*
611  * This must be called ONCE during postmaster or standalone-backend startup,
612  * after recovery has finished.
613  */
614 void
CompleteCommitTsInitialization(void)615 CompleteCommitTsInitialization(void)
616 {
617 	/*
618 	 * If the feature is not enabled, turn it off for good.  This also removes
619 	 * any leftover data.
620 	 *
621 	 * Conversely, we activate the module if the feature is enabled.  This is
622 	 * necessary for primary and standby as the activation depends on the
623 	 * control file contents at the beginning of recovery or when a
624 	 * XLOG_PARAMETER_CHANGE is replayed.
625 	 */
626 	if (!track_commit_timestamp)
627 		DeactivateCommitTs();
628 	else
629 		ActivateCommitTs();
630 }
631 
632 /*
633  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
634  * XLog record during recovery.
635  */
636 void
CommitTsParameterChange(bool newvalue,bool oldvalue)637 CommitTsParameterChange(bool newvalue, bool oldvalue)
638 {
639 	/*
640 	 * If the commit_ts module is disabled in this server and we get word from
641 	 * the primary server that it is enabled there, activate it so that we can
642 	 * replay future WAL records involving it; also mark it as active on
643 	 * pg_control.  If the old value was already set, we already did this, so
644 	 * don't do anything.
645 	 *
646 	 * If the module is disabled in the primary, disable it here too, unless
647 	 * the module is enabled locally.
648 	 *
649 	 * Note this only runs in the recovery process, so an unlocked read is
650 	 * fine.
651 	 */
652 	if (newvalue)
653 	{
654 		if (!commitTsShared->commitTsActive)
655 			ActivateCommitTs();
656 	}
657 	else if (commitTsShared->commitTsActive)
658 		DeactivateCommitTs();
659 }
660 
661 /*
662  * Activate this module whenever necessary.
663  *		This must happen during postmaster or standalone-backend startup,
664  *		or during WAL replay anytime the track_commit_timestamp setting is
665  *		changed in the primary.
666  *
667  * The reason why this SLRU needs separate activation/deactivation functions is
668  * that it can be enabled/disabled during start and the activation/deactivation
669  * on the primary is propagated to the standby via replay. Other SLRUs don't
670  * have this property and they can be just initialized during normal startup.
671  *
672  * This is in charge of creating the currently active segment, if it's not
673  * already there.  The reason for this is that the server might have been
674  * running with this module disabled for a while and thus might have skipped
675  * the normal creation point.
676  */
677 static void
ActivateCommitTs(void)678 ActivateCommitTs(void)
679 {
680 	TransactionId xid;
681 	int			pageno;
682 
683 	/* If we've done this already, there's nothing to do */
684 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
685 	if (commitTsShared->commitTsActive)
686 	{
687 		LWLockRelease(CommitTsLock);
688 		return;
689 	}
690 	LWLockRelease(CommitTsLock);
691 
692 	xid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
693 	pageno = TransactionIdToCTsPage(xid);
694 
695 	/*
696 	 * Re-Initialize our idea of the latest page number.
697 	 */
698 	LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
699 	CommitTsCtl->shared->latest_page_number = pageno;
700 	LWLockRelease(CommitTsSLRULock);
701 
702 	/*
703 	 * If CommitTs is enabled, but it wasn't in the previous server run, we
704 	 * need to set the oldest and newest values to the next Xid; that way, we
705 	 * will not try to read data that might not have been set.
706 	 *
707 	 * XXX does this have a problem if a server is started with commitTs
708 	 * enabled, then started with commitTs disabled, then restarted with it
709 	 * enabled again?  It doesn't look like it does, because there should be a
710 	 * checkpoint that sets the value to InvalidTransactionId at end of
711 	 * recovery; and so any chance of injecting new transactions without
712 	 * CommitTs values would occur after the oldestCommitTsXid has been set to
713 	 * Invalid temporarily.
714 	 */
715 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
716 	if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
717 	{
718 		ShmemVariableCache->oldestCommitTsXid =
719 			ShmemVariableCache->newestCommitTsXid = ReadNextTransactionId();
720 	}
721 	LWLockRelease(CommitTsLock);
722 
723 	/* Create the current segment file, if necessary */
724 	if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
725 	{
726 		int			slotno;
727 
728 		LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
729 		slotno = ZeroCommitTsPage(pageno, false);
730 		SimpleLruWritePage(CommitTsCtl, slotno);
731 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
732 		LWLockRelease(CommitTsSLRULock);
733 	}
734 
735 	/* Change the activation status in shared memory. */
736 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
737 	commitTsShared->commitTsActive = true;
738 	LWLockRelease(CommitTsLock);
739 }
740 
741 /*
742  * Deactivate this module.
743  *
744  * This must be called when the track_commit_timestamp parameter is turned off.
745  * This happens during postmaster or standalone-backend startup, or during WAL
746  * replay.
747  *
748  * Resets CommitTs into invalid state to make sure we don't hand back
749  * possibly-invalid data; also removes segments of old data.
750  */
751 static void
DeactivateCommitTs(void)752 DeactivateCommitTs(void)
753 {
754 	/*
755 	 * Cleanup the status in the shared memory.
756 	 *
757 	 * We reset everything in the commitTsShared record to prevent user from
758 	 * getting confusing data about last committed transaction on the standby
759 	 * when the module was activated repeatedly on the primary.
760 	 */
761 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
762 
763 	commitTsShared->commitTsActive = false;
764 	commitTsShared->xidLastCommit = InvalidTransactionId;
765 	TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
766 	commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
767 
768 	ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
769 	ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
770 
771 	LWLockRelease(CommitTsLock);
772 
773 	/*
774 	 * Remove *all* files.  This is necessary so that there are no leftover
775 	 * files; in the case where this feature is later enabled after running
776 	 * with it disabled for some time there may be a gap in the file sequence.
777 	 * (We can probably tolerate out-of-sequence files, as they are going to
778 	 * be overwritten anyway when we wrap around, but it seems better to be
779 	 * tidy.)
780 	 */
781 	LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
782 	(void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
783 	LWLockRelease(CommitTsSLRULock);
784 }
785 
786 /*
787  * Perform a checkpoint --- either during shutdown, or on-the-fly
788  */
789 void
CheckPointCommitTs(void)790 CheckPointCommitTs(void)
791 {
792 	/*
793 	 * Write dirty CommitTs pages to disk.  This may result in sync requests
794 	 * queued for later handling by ProcessSyncRequests(), as part of the
795 	 * checkpoint.
796 	 */
797 	SimpleLruWriteAll(CommitTsCtl, true);
798 }
799 
800 /*
801  * Make sure that CommitTs has room for a newly-allocated XID.
802  *
803  * NB: this is called while holding XidGenLock.  We want it to be very fast
804  * most of the time; even when it's not so fast, no actual I/O need happen
805  * unless we're forced to write out a dirty CommitTs or xlog page to make room
806  * in shared memory.
807  *
808  * NB: the current implementation relies on track_commit_timestamp being
809  * PGC_POSTMASTER.
810  */
811 void
ExtendCommitTs(TransactionId newestXact)812 ExtendCommitTs(TransactionId newestXact)
813 {
814 	int			pageno;
815 
816 	/*
817 	 * Nothing to do if module not enabled.  Note we do an unlocked read of
818 	 * the flag here, which is okay because this routine is only called from
819 	 * GetNewTransactionId, which is never called in a standby.
820 	 */
821 	Assert(!InRecovery);
822 	if (!commitTsShared->commitTsActive)
823 		return;
824 
825 	/*
826 	 * No work except at first XID of a page.  But beware: just after
827 	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
828 	 */
829 	if (TransactionIdToCTsEntry(newestXact) != 0 &&
830 		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
831 		return;
832 
833 	pageno = TransactionIdToCTsPage(newestXact);
834 
835 	LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
836 
837 	/* Zero the page and make an XLOG entry about it */
838 	ZeroCommitTsPage(pageno, !InRecovery);
839 
840 	LWLockRelease(CommitTsSLRULock);
841 }
842 
843 /*
844  * Remove all CommitTs segments before the one holding the passed
845  * transaction ID.
846  *
847  * Note that we don't need to flush XLOG here.
848  */
849 void
TruncateCommitTs(TransactionId oldestXact)850 TruncateCommitTs(TransactionId oldestXact)
851 {
852 	int			cutoffPage;
853 
854 	/*
855 	 * The cutoff point is the start of the segment containing oldestXact. We
856 	 * pass the *page* containing oldestXact to SimpleLruTruncate.
857 	 */
858 	cutoffPage = TransactionIdToCTsPage(oldestXact);
859 
860 	/* Check to see if there's any files that could be removed */
861 	if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
862 						   &cutoffPage))
863 		return;					/* nothing to remove */
864 
865 	/* Write XLOG record */
866 	WriteTruncateXlogRec(cutoffPage, oldestXact);
867 
868 	/* Now we can remove the old CommitTs segment(s) */
869 	SimpleLruTruncate(CommitTsCtl, cutoffPage);
870 }
871 
872 /*
873  * Set the limit values between which commit TS can be consulted.
874  */
875 void
SetCommitTsLimit(TransactionId oldestXact,TransactionId newestXact)876 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
877 {
878 	/*
879 	 * Be careful not to overwrite values that are either further into the
880 	 * "future" or signal a disabled committs.
881 	 */
882 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
883 	if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
884 	{
885 		if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
886 			ShmemVariableCache->oldestCommitTsXid = oldestXact;
887 		if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
888 			ShmemVariableCache->newestCommitTsXid = newestXact;
889 	}
890 	else
891 	{
892 		Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
893 		ShmemVariableCache->oldestCommitTsXid = oldestXact;
894 		ShmemVariableCache->newestCommitTsXid = newestXact;
895 	}
896 	LWLockRelease(CommitTsLock);
897 }
898 
899 /*
900  * Move forwards the oldest commitTS value that can be consulted
901  */
902 void
AdvanceOldestCommitTsXid(TransactionId oldestXact)903 AdvanceOldestCommitTsXid(TransactionId oldestXact)
904 {
905 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
906 	if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
907 		TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
908 		ShmemVariableCache->oldestCommitTsXid = oldestXact;
909 	LWLockRelease(CommitTsLock);
910 }
911 
912 
913 /*
914  * Decide whether a commitTS page number is "older" for truncation purposes.
915  * Analogous to CLOGPagePrecedes().
916  *
917  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128.  This
918  * introduces differences compared to CLOG and the other SLRUs having (1 <<
919  * 31) % per_page == 0.  This function never tests exactly
920  * TransactionIdPrecedes(x-2^31, x).  When the system reaches xidStopLimit,
921  * there are two possible counts of page boundaries between oldestXact and the
922  * latest XID assigned, depending on whether oldestXact is within the first
923  * 128 entries of its page.  Since this function doesn't know the location of
924  * oldestXact within page2, it returns false for one page that actually is
925  * expendable.  This is a wider (yet still negligible) version of the
926  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
927  *
928  * For the sake of a worked example, number entries with decimal values such
929  * that page1==1 entries range from 1.0 to 1.999.  Let N+0.15 be the number of
930  * pages that 2^31 entries will span (N is an integer).  If oldestXact=N+2.1,
931  * then the final safe XID assignment leaves newestXact=1.95.  We keep page 2,
932  * because entry=2.85 is the border that toggles whether entries precede the
933  * last entry of the oldestXact page.  While page 2 is expendable at
934  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
935  */
936 static bool
CommitTsPagePrecedes(int page1,int page2)937 CommitTsPagePrecedes(int page1, int page2)
938 {
939 	TransactionId xid1;
940 	TransactionId xid2;
941 
942 	xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
943 	xid1 += FirstNormalTransactionId + 1;
944 	xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
945 	xid2 += FirstNormalTransactionId + 1;
946 
947 	return (TransactionIdPrecedes(xid1, xid2) &&
948 			TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
949 }
950 
951 
952 /*
953  * Write a ZEROPAGE xlog record
954  */
955 static void
WriteZeroPageXlogRec(int pageno)956 WriteZeroPageXlogRec(int pageno)
957 {
958 	XLogBeginInsert();
959 	XLogRegisterData((char *) (&pageno), sizeof(int));
960 	(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
961 }
962 
963 /*
964  * Write a TRUNCATE xlog record
965  */
966 static void
WriteTruncateXlogRec(int pageno,TransactionId oldestXid)967 WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
968 {
969 	xl_commit_ts_truncate xlrec;
970 
971 	xlrec.pageno = pageno;
972 	xlrec.oldestXid = oldestXid;
973 
974 	XLogBeginInsert();
975 	XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
976 	(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
977 }
978 
979 /*
980  * CommitTS resource manager's routines
981  */
982 void
commit_ts_redo(XLogReaderState * record)983 commit_ts_redo(XLogReaderState *record)
984 {
985 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
986 
987 	/* Backup blocks are not used in commit_ts records */
988 	Assert(!XLogRecHasAnyBlockRefs(record));
989 
990 	if (info == COMMIT_TS_ZEROPAGE)
991 	{
992 		int			pageno;
993 		int			slotno;
994 
995 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
996 
997 		LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
998 
999 		slotno = ZeroCommitTsPage(pageno, false);
1000 		SimpleLruWritePage(CommitTsCtl, slotno);
1001 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1002 
1003 		LWLockRelease(CommitTsSLRULock);
1004 	}
1005 	else if (info == COMMIT_TS_TRUNCATE)
1006 	{
1007 		xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
1008 
1009 		AdvanceOldestCommitTsXid(trunc->oldestXid);
1010 
1011 		/*
1012 		 * During XLOG replay, latest_page_number isn't set up yet; insert a
1013 		 * suitable value to bypass the sanity test in SimpleLruTruncate.
1014 		 */
1015 		CommitTsCtl->shared->latest_page_number = trunc->pageno;
1016 
1017 		SimpleLruTruncate(CommitTsCtl, trunc->pageno);
1018 	}
1019 	else
1020 		elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1021 }
1022 
1023 /*
1024  * Entrypoint for sync.c to sync commit_ts files.
1025  */
1026 int
committssyncfiletag(const FileTag * ftag,char * path)1027 committssyncfiletag(const FileTag *ftag, char *path)
1028 {
1029 	return SlruSyncFileTag(CommitTsCtl, ftag, path);
1030 }
1031