1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  *		PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_clog-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26 
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
38 
39 /*
40  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
41  * everywhere else in Postgres.
42  *
43  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44  * CommitTs page numbering also wraps around at
45  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
47  * explicit notice of that fact in this module, except when comparing segment
48  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
49  */
50 
51 /*
52  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
53  * the largest possible file name is more than 5 chars long; see
54  * SlruScanDirectory.
55  */
56 typedef struct CommitTimestampEntry
57 {
58 	TimestampTz time;
59 	RepOriginId nodeid;
60 } CommitTimestampEntry;
61 
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63 									sizeof(RepOriginId))
64 
65 #define COMMIT_TS_XACTS_PER_PAGE \
66 	(BLCKSZ / SizeOfCommitTimestampEntry)
67 
68 #define TransactionIdToCTsPage(xid) \
69 	((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid)	\
71 	((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
72 
73 /*
74  * Link to shared-memory data structures for CommitTs control
75  */
76 static SlruCtlData CommitTsCtlData;
77 
78 #define CommitTsCtl (&CommitTsCtlData)
79 
80 /*
81  * We keep a cache of the last value set in shared memory.
82  *
83  * This is also good place to keep the activation status.  We keep this
84  * separate from the GUC so that the standby can activate the module if the
85  * primary has it active independently of the value of the GUC.
86  *
87  * This is protected by CommitTsLock.  In some places, we use commitTsActive
88  * without acquiring the lock; where this happens, a comment explains the
89  * rationale for it.
90  */
91 typedef struct CommitTimestampShared
92 {
93 	TransactionId xidLastCommit;
94 	CommitTimestampEntry dataLastCommit;
95 	bool		commitTsActive;
96 } CommitTimestampShared;
97 
98 CommitTimestampShared *commitTsShared;
99 
100 
101 /* GUC variable */
102 bool		track_commit_timestamp;
103 
104 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
105 					 TransactionId *subxids, TimestampTz ts,
106 					 RepOriginId nodeid, int pageno);
107 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
108 						 RepOriginId nodeid, int slotno);
109 static void error_commit_ts_disabled(void);
110 static int	ZeroCommitTsPage(int pageno, bool writeXlog);
111 static bool CommitTsPagePrecedes(int page1, int page2);
112 static void ActivateCommitTs(void);
113 static void DeactivateCommitTs(void);
114 static void WriteZeroPageXlogRec(int pageno);
115 static void WriteTruncateXlogRec(int pageno);
116 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
117 						 TransactionId *subxids, TimestampTz timestamp,
118 						 RepOriginId nodeid);
119 
120 /*
121  * TransactionTreeSetCommitTsData
122  *
123  * Record the final commit timestamp of transaction entries in the commit log
124  * for a transaction and its subtransaction tree, as efficiently as possible.
125  *
126  * xid is the top level transaction id.
127  *
128  * subxids is an array of xids of length nsubxids, representing subtransactions
129  * in the tree of xid. In various cases nsubxids may be zero.
130  * The reason why tracking just the parent xid commit timestamp is not enough
131  * is that the subtrans SLRU does not stay valid across crashes (it's not
132  * permanent) so we need to keep the information about them here. If the
133  * subtrans implementation changes in the future, we might want to revisit the
134  * decision of storing timestamp info for each subxid.
135  *
136  * The write_xlog parameter tells us whether to include an XLog record of this
137  * or not.  Normally, this is called from transaction commit routines (both
138  * normal and prepared) and the information will be stored in the transaction
139  * commit XLog record, and so they should pass "false" for this.  The XLog redo
140  * code should use "false" here as well.  Other callers probably want to pass
141  * true, so that the given values persist in case of crashes.
142  */
143 void
TransactionTreeSetCommitTsData(TransactionId xid,int nsubxids,TransactionId * subxids,TimestampTz timestamp,RepOriginId nodeid,bool write_xlog)144 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
145 							   TransactionId *subxids, TimestampTz timestamp,
146 							   RepOriginId nodeid, bool write_xlog)
147 {
148 	int			i;
149 	TransactionId headxid;
150 	TransactionId newestXact;
151 
152 	/*
153 	 * No-op if the module is not active.
154 	 *
155 	 * An unlocked read here is fine, because in a standby (the only place
156 	 * where the flag can change in flight) this routine is only called by the
157 	 * recovery process, which is also the only process which can change the
158 	 * flag.
159 	 */
160 	if (!commitTsShared->commitTsActive)
161 		return;
162 
163 	/*
164 	 * Comply with the WAL-before-data rule: if caller specified it wants this
165 	 * value to be recorded in WAL, do so before touching the data.
166 	 */
167 	if (write_xlog)
168 		WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
169 
170 	/*
171 	 * Figure out the latest Xid in this batch: either the last subxid if
172 	 * there's any, otherwise the parent xid.
173 	 */
174 	if (nsubxids > 0)
175 		newestXact = subxids[nsubxids - 1];
176 	else
177 		newestXact = xid;
178 
179 	/*
180 	 * We split the xids to set the timestamp to in groups belonging to the
181 	 * same SLRU page; the first element in each such set is its head.  The
182 	 * first group has the main XID as the head; subsequent sets use the first
183 	 * subxid not on the previous page as head.  This way, we only have to
184 	 * lock/modify each SLRU page once.
185 	 */
186 	for (i = 0, headxid = xid;;)
187 	{
188 		int			pageno = TransactionIdToCTsPage(headxid);
189 		int			j;
190 
191 		for (j = i; j < nsubxids; j++)
192 		{
193 			if (TransactionIdToCTsPage(subxids[j]) != pageno)
194 				break;
195 		}
196 		/* subxids[i..j] are on the same page as the head */
197 
198 		SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
199 							 pageno);
200 
201 		/* if we wrote out all subxids, we're done. */
202 		if (j + 1 >= nsubxids)
203 			break;
204 
205 		/*
206 		 * Set the new head and skip over it, as well as over the subxids we
207 		 * just wrote.
208 		 */
209 		headxid = subxids[j];
210 		i += j - i + 1;
211 	}
212 
213 	/* update the cached value in shared memory */
214 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
215 	commitTsShared->xidLastCommit = xid;
216 	commitTsShared->dataLastCommit.time = timestamp;
217 	commitTsShared->dataLastCommit.nodeid = nodeid;
218 
219 	/* and move forwards our endpoint, if needed */
220 	if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
221 		ShmemVariableCache->newestCommitTsXid = newestXact;
222 	LWLockRelease(CommitTsLock);
223 }
224 
225 /*
226  * Record the commit timestamp of transaction entries in the commit log for all
227  * entries on a single page.  Atomic only on this page.
228  */
229 static void
SetXidCommitTsInPage(TransactionId xid,int nsubxids,TransactionId * subxids,TimestampTz ts,RepOriginId nodeid,int pageno)230 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
231 					 TransactionId *subxids, TimestampTz ts,
232 					 RepOriginId nodeid, int pageno)
233 {
234 	int			slotno;
235 	int			i;
236 
237 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
238 
239 	slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
240 
241 	TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
242 	for (i = 0; i < nsubxids; i++)
243 		TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
244 
245 	CommitTsCtl->shared->page_dirty[slotno] = true;
246 
247 	LWLockRelease(CommitTsControlLock);
248 }
249 
250 /*
251  * Sets the commit timestamp of a single transaction.
252  *
253  * Must be called with CommitTsControlLock held
254  */
255 static void
TransactionIdSetCommitTs(TransactionId xid,TimestampTz ts,RepOriginId nodeid,int slotno)256 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
257 						 RepOriginId nodeid, int slotno)
258 {
259 	int			entryno = TransactionIdToCTsEntry(xid);
260 	CommitTimestampEntry entry;
261 
262 	Assert(TransactionIdIsNormal(xid));
263 
264 	entry.time = ts;
265 	entry.nodeid = nodeid;
266 
267 	memcpy(CommitTsCtl->shared->page_buffer[slotno] +
268 		   SizeOfCommitTimestampEntry * entryno,
269 		   &entry, SizeOfCommitTimestampEntry);
270 }
271 
272 /*
273  * Interrogate the commit timestamp of a transaction.
274  *
275  * The return value indicates whether a commit timestamp record was found for
276  * the given xid.  The timestamp value is returned in *ts (which may not be
277  * null), and the origin node for the Xid is returned in *nodeid, if it's not
278  * null.
279  */
280 bool
TransactionIdGetCommitTsData(TransactionId xid,TimestampTz * ts,RepOriginId * nodeid)281 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
282 							 RepOriginId *nodeid)
283 {
284 	int			pageno = TransactionIdToCTsPage(xid);
285 	int			entryno = TransactionIdToCTsEntry(xid);
286 	int			slotno;
287 	CommitTimestampEntry entry;
288 	TransactionId oldestCommitTsXid;
289 	TransactionId newestCommitTsXid;
290 
291 	if (!TransactionIdIsValid(xid))
292 		ereport(ERROR,
293 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
294 		errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
295 	else if (!TransactionIdIsNormal(xid))
296 	{
297 		/* frozen and bootstrap xids are always committed far in the past */
298 		*ts = 0;
299 		if (nodeid)
300 			*nodeid = 0;
301 		return false;
302 	}
303 
304 	LWLockAcquire(CommitTsLock, LW_SHARED);
305 
306 	/* Error if module not enabled */
307 	if (!commitTsShared->commitTsActive)
308 		error_commit_ts_disabled();
309 
310 	/*
311 	 * If we're asked for the cached value, return that.  Otherwise, fall
312 	 * through to read from SLRU.
313 	 */
314 	if (commitTsShared->xidLastCommit == xid)
315 	{
316 		*ts = commitTsShared->dataLastCommit.time;
317 		if (nodeid)
318 			*nodeid = commitTsShared->dataLastCommit.nodeid;
319 
320 		LWLockRelease(CommitTsLock);
321 		return *ts != 0;
322 	}
323 
324 	oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
325 	newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
326 	/* neither is invalid, or both are */
327 	Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
328 	LWLockRelease(CommitTsLock);
329 
330 	/*
331 	 * Return empty if the requested value is outside our valid range.
332 	 */
333 	if (!TransactionIdIsValid(oldestCommitTsXid) ||
334 		TransactionIdPrecedes(xid, oldestCommitTsXid) ||
335 		TransactionIdPrecedes(newestCommitTsXid, xid))
336 	{
337 		*ts = 0;
338 		if (nodeid)
339 			*nodeid = InvalidRepOriginId;
340 		return false;
341 	}
342 
343 	/* lock is acquired by SimpleLruReadPage_ReadOnly */
344 	slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
345 	memcpy(&entry,
346 		   CommitTsCtl->shared->page_buffer[slotno] +
347 		   SizeOfCommitTimestampEntry * entryno,
348 		   SizeOfCommitTimestampEntry);
349 
350 	*ts = entry.time;
351 	if (nodeid)
352 		*nodeid = entry.nodeid;
353 
354 	LWLockRelease(CommitTsControlLock);
355 	return *ts != 0;
356 }
357 
358 /*
359  * Return the Xid of the latest committed transaction.  (As far as this module
360  * is concerned, anyway; it's up to the caller to ensure the value is useful
361  * for its purposes.)
362  *
363  * ts and extra are filled with the corresponding data; they can be passed
364  * as NULL if not wanted.
365  */
366 TransactionId
GetLatestCommitTsData(TimestampTz * ts,RepOriginId * nodeid)367 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
368 {
369 	TransactionId xid;
370 
371 	LWLockAcquire(CommitTsLock, LW_SHARED);
372 
373 	/* Error if module not enabled */
374 	if (!commitTsShared->commitTsActive)
375 		error_commit_ts_disabled();
376 
377 	xid = commitTsShared->xidLastCommit;
378 	if (ts)
379 		*ts = commitTsShared->dataLastCommit.time;
380 	if (nodeid)
381 		*nodeid = commitTsShared->dataLastCommit.nodeid;
382 	LWLockRelease(CommitTsLock);
383 
384 	return xid;
385 }
386 
387 static void
error_commit_ts_disabled(void)388 error_commit_ts_disabled(void)
389 {
390 	ereport(ERROR,
391 			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
392 			 errmsg("could not get commit timestamp data"),
393 			 RecoveryInProgress() ?
394 			 errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
395 					 "track_commit_timestamp") :
396 			 errhint("Make sure the configuration parameter \"%s\" is set.",
397 					 "track_commit_timestamp")));
398 }
399 
400 /*
401  * SQL-callable wrapper to obtain commit time of a transaction
402  */
403 Datum
pg_xact_commit_timestamp(PG_FUNCTION_ARGS)404 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
405 {
406 	TransactionId xid = PG_GETARG_UINT32(0);
407 	TimestampTz ts;
408 	bool		found;
409 
410 	found = TransactionIdGetCommitTsData(xid, &ts, NULL);
411 
412 	if (!found)
413 		PG_RETURN_NULL();
414 
415 	PG_RETURN_TIMESTAMPTZ(ts);
416 }
417 
418 
419 Datum
pg_last_committed_xact(PG_FUNCTION_ARGS)420 pg_last_committed_xact(PG_FUNCTION_ARGS)
421 {
422 	TransactionId xid;
423 	TimestampTz ts;
424 	Datum		values[2];
425 	bool		nulls[2];
426 	TupleDesc	tupdesc;
427 	HeapTuple	htup;
428 
429 	/* and construct a tuple with our data */
430 	xid = GetLatestCommitTsData(&ts, NULL);
431 
432 	/*
433 	 * Construct a tuple descriptor for the result row.  This must match this
434 	 * function's pg_proc entry!
435 	 */
436 	tupdesc = CreateTemplateTupleDesc(2, false);
437 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
438 					   XIDOID, -1, 0);
439 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
440 					   TIMESTAMPTZOID, -1, 0);
441 	tupdesc = BlessTupleDesc(tupdesc);
442 
443 	if (!TransactionIdIsNormal(xid))
444 	{
445 		memset(nulls, true, sizeof(nulls));
446 	}
447 	else
448 	{
449 		values[0] = TransactionIdGetDatum(xid);
450 		nulls[0] = false;
451 
452 		values[1] = TimestampTzGetDatum(ts);
453 		nulls[1] = false;
454 	}
455 
456 	htup = heap_form_tuple(tupdesc, values, nulls);
457 
458 	PG_RETURN_DATUM(HeapTupleGetDatum(htup));
459 }
460 
461 
462 /*
463  * Number of shared CommitTS buffers.
464  *
465  * We use a very similar logic as for the number of CLOG buffers; see comments
466  * in CLOGShmemBuffers.
467  */
468 Size
CommitTsShmemBuffers(void)469 CommitTsShmemBuffers(void)
470 {
471 	return Min(16, Max(4, NBuffers / 1024));
472 }
473 
474 /*
475  * Shared memory sizing for CommitTs
476  */
477 Size
CommitTsShmemSize(void)478 CommitTsShmemSize(void)
479 {
480 	return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
481 		sizeof(CommitTimestampShared);
482 }
483 
484 /*
485  * Initialize CommitTs at system startup (postmaster start or standalone
486  * backend)
487  */
488 void
CommitTsShmemInit(void)489 CommitTsShmemInit(void)
490 {
491 	bool		found;
492 
493 	CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
494 	SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
495 				  CommitTsControlLock, "pg_commit_ts",
496 				  LWTRANCHE_COMMITTS_BUFFERS);
497 	SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
498 
499 	commitTsShared = ShmemInitStruct("CommitTs shared",
500 									 sizeof(CommitTimestampShared),
501 									 &found);
502 
503 	if (!IsUnderPostmaster)
504 	{
505 		Assert(!found);
506 
507 		commitTsShared->xidLastCommit = InvalidTransactionId;
508 		TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
509 		commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
510 		commitTsShared->commitTsActive = false;
511 	}
512 	else
513 		Assert(found);
514 }
515 
516 /*
517  * This function must be called ONCE on system install.
518  *
519  * (The CommitTs directory is assumed to have been created by initdb, and
520  * CommitTsShmemInit must have been called already.)
521  */
522 void
BootStrapCommitTs(void)523 BootStrapCommitTs(void)
524 {
525 	/*
526 	 * Nothing to do here at present, unlike most other SLRU modules; segments
527 	 * are created when the server is started with this module enabled. See
528 	 * ActivateCommitTs.
529 	 */
530 }
531 
532 /*
533  * Initialize (or reinitialize) a page of CommitTs to zeroes.
534  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
535  *
536  * The page is not actually written, just set up in shared memory.
537  * The slot number of the new page is returned.
538  *
539  * Control lock must be held at entry, and will be held at exit.
540  */
541 static int
ZeroCommitTsPage(int pageno,bool writeXlog)542 ZeroCommitTsPage(int pageno, bool writeXlog)
543 {
544 	int			slotno;
545 
546 	slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
547 
548 	if (writeXlog)
549 		WriteZeroPageXlogRec(pageno);
550 
551 	return slotno;
552 }
553 
554 /*
555  * This must be called ONCE during postmaster or standalone-backend startup,
556  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
557  */
558 void
StartupCommitTs(void)559 StartupCommitTs(void)
560 {
561 	ActivateCommitTs();
562 }
563 
564 /*
565  * This must be called ONCE during postmaster or standalone-backend startup,
566  * after recovery has finished.
567  */
568 void
CompleteCommitTsInitialization(void)569 CompleteCommitTsInitialization(void)
570 {
571 	/*
572 	 * If the feature is not enabled, turn it off for good.  This also removes
573 	 * any leftover data.
574 	 *
575 	 * Conversely, we activate the module if the feature is enabled.  This is
576 	 * necessary for primary and standby as the activation depends on the
577 	 * control file contents at the beginning of recovery or when a
578 	 * XLOG_PARAMETER_CHANGE is replayed.
579 	 */
580 	if (!track_commit_timestamp)
581 		DeactivateCommitTs();
582 	else
583 		ActivateCommitTs();
584 }
585 
586 /*
587  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
588  * XLog record during recovery.
589  */
590 void
CommitTsParameterChange(bool newvalue,bool oldvalue)591 CommitTsParameterChange(bool newvalue, bool oldvalue)
592 {
593 	/*
594 	 * If the commit_ts module is disabled in this server and we get word from
595 	 * the master server that it is enabled there, activate it so that we can
596 	 * replay future WAL records involving it; also mark it as active on
597 	 * pg_control.  If the old value was already set, we already did this, so
598 	 * don't do anything.
599 	 *
600 	 * If the module is disabled in the master, disable it here too, unless
601 	 * the module is enabled locally.
602 	 *
603 	 * Note this only runs in the recovery process, so an unlocked read is
604 	 * fine.
605 	 */
606 	if (newvalue)
607 	{
608 		if (!commitTsShared->commitTsActive)
609 			ActivateCommitTs();
610 	}
611 	else if (commitTsShared->commitTsActive)
612 		DeactivateCommitTs();
613 }
614 
615 /*
616  * Activate this module whenever necessary.
617  *		This must happen during postmaster or standalone-backend startup,
618  *		or during WAL replay anytime the track_commit_timestamp setting is
619  *		changed in the master.
620  *
621  * The reason why this SLRU needs separate activation/deactivation functions is
622  * that it can be enabled/disabled during start and the activation/deactivation
623  * on master is propagated to slave via replay. Other SLRUs don't have this
624  * property and they can be just initialized during normal startup.
625  *
626  * This is in charge of creating the currently active segment, if it's not
627  * already there.  The reason for this is that the server might have been
628  * running with this module disabled for a while and thus might have skipped
629  * the normal creation point.
630  */
631 static void
ActivateCommitTs(void)632 ActivateCommitTs(void)
633 {
634 	TransactionId xid;
635 	int			pageno;
636 
637 	/* If we've done this already, there's nothing to do */
638 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
639 	if (commitTsShared->commitTsActive)
640 	{
641 		LWLockRelease(CommitTsLock);
642 		return;
643 	}
644 	LWLockRelease(CommitTsLock);
645 
646 	xid = ShmemVariableCache->nextXid;
647 	pageno = TransactionIdToCTsPage(xid);
648 
649 	/*
650 	 * Re-Initialize our idea of the latest page number.
651 	 */
652 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
653 	CommitTsCtl->shared->latest_page_number = pageno;
654 	LWLockRelease(CommitTsControlLock);
655 
656 	/*
657 	 * If CommitTs is enabled, but it wasn't in the previous server run, we
658 	 * need to set the oldest and newest values to the next Xid; that way, we
659 	 * will not try to read data that might not have been set.
660 	 *
661 	 * XXX does this have a problem if a server is started with commitTs
662 	 * enabled, then started with commitTs disabled, then restarted with it
663 	 * enabled again?  It doesn't look like it does, because there should be a
664 	 * checkpoint that sets the value to InvalidTransactionId at end of
665 	 * recovery; and so any chance of injecting new transactions without
666 	 * CommitTs values would occur after the oldestCommitTsXid has been set to
667 	 * Invalid temporarily.
668 	 */
669 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
670 	if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
671 	{
672 		ShmemVariableCache->oldestCommitTsXid =
673 			ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
674 	}
675 	LWLockRelease(CommitTsLock);
676 
677 	/* Create the current segment file, if necessary */
678 	if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
679 	{
680 		int			slotno;
681 
682 		LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
683 		slotno = ZeroCommitTsPage(pageno, false);
684 		SimpleLruWritePage(CommitTsCtl, slotno);
685 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
686 		LWLockRelease(CommitTsControlLock);
687 	}
688 
689 	/* Change the activation status in shared memory. */
690 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
691 	commitTsShared->commitTsActive = true;
692 	LWLockRelease(CommitTsLock);
693 }
694 
695 /*
696  * Deactivate this module.
697  *
698  * This must be called when the track_commit_timestamp parameter is turned off.
699  * This happens during postmaster or standalone-backend startup, or during WAL
700  * replay.
701  *
702  * Resets CommitTs into invalid state to make sure we don't hand back
703  * possibly-invalid data; also removes segments of old data.
704  */
705 static void
DeactivateCommitTs(void)706 DeactivateCommitTs(void)
707 {
708 	/*
709 	 * Cleanup the status in the shared memory.
710 	 *
711 	 * We reset everything in the commitTsShared record to prevent user from
712 	 * getting confusing data about last committed transaction on the standby
713 	 * when the module was activated repeatedly on the primary.
714 	 */
715 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
716 
717 	commitTsShared->commitTsActive = false;
718 	commitTsShared->xidLastCommit = InvalidTransactionId;
719 	TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
720 	commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
721 
722 	ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
723 	ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
724 
725 	LWLockRelease(CommitTsLock);
726 
727 	/*
728 	 * Remove *all* files.  This is necessary so that there are no leftover
729 	 * files; in the case where this feature is later enabled after running
730 	 * with it disabled for some time there may be a gap in the file sequence.
731 	 * (We can probably tolerate out-of-sequence files, as they are going to
732 	 * be overwritten anyway when we wrap around, but it seems better to be
733 	 * tidy.)
734 	 */
735 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
736 	(void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
737 	LWLockRelease(CommitTsControlLock);
738 }
739 
740 /*
741  * This must be called ONCE during postmaster or standalone-backend shutdown
742  */
743 void
ShutdownCommitTs(void)744 ShutdownCommitTs(void)
745 {
746 	/* Flush dirty CommitTs pages to disk */
747 	SimpleLruFlush(CommitTsCtl, false);
748 }
749 
750 /*
751  * Perform a checkpoint --- either during shutdown, or on-the-fly
752  */
753 void
CheckPointCommitTs(void)754 CheckPointCommitTs(void)
755 {
756 	/* Flush dirty CommitTs pages to disk */
757 	SimpleLruFlush(CommitTsCtl, true);
758 }
759 
760 /*
761  * Make sure that CommitTs has room for a newly-allocated XID.
762  *
763  * NB: this is called while holding XidGenLock.  We want it to be very fast
764  * most of the time; even when it's not so fast, no actual I/O need happen
765  * unless we're forced to write out a dirty CommitTs or xlog page to make room
766  * in shared memory.
767  *
768  * NB: the current implementation relies on track_commit_timestamp being
769  * PGC_POSTMASTER.
770  */
771 void
ExtendCommitTs(TransactionId newestXact)772 ExtendCommitTs(TransactionId newestXact)
773 {
774 	int			pageno;
775 
776 	/*
777 	 * Nothing to do if module not enabled.  Note we do an unlocked read of
778 	 * the flag here, which is okay because this routine is only called from
779 	 * GetNewTransactionId, which is never called in a standby.
780 	 */
781 	Assert(!InRecovery);
782 	if (!commitTsShared->commitTsActive)
783 		return;
784 
785 	/*
786 	 * No work except at first XID of a page.  But beware: just after
787 	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
788 	 */
789 	if (TransactionIdToCTsEntry(newestXact) != 0 &&
790 		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
791 		return;
792 
793 	pageno = TransactionIdToCTsPage(newestXact);
794 
795 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
796 
797 	/* Zero the page and make an XLOG entry about it */
798 	ZeroCommitTsPage(pageno, !InRecovery);
799 
800 	LWLockRelease(CommitTsControlLock);
801 }
802 
803 /*
804  * Remove all CommitTs segments before the one holding the passed
805  * transaction ID.
806  *
807  * Note that we don't need to flush XLOG here.
808  */
809 void
TruncateCommitTs(TransactionId oldestXact)810 TruncateCommitTs(TransactionId oldestXact)
811 {
812 	int			cutoffPage;
813 
814 	/*
815 	 * The cutoff point is the start of the segment containing oldestXact. We
816 	 * pass the *page* containing oldestXact to SimpleLruTruncate.
817 	 */
818 	cutoffPage = TransactionIdToCTsPage(oldestXact);
819 
820 	/* Check to see if there's any files that could be removed */
821 	if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
822 						   &cutoffPage))
823 		return;					/* nothing to remove */
824 
825 	/* Write XLOG record */
826 	WriteTruncateXlogRec(cutoffPage);
827 
828 	/* Now we can remove the old CommitTs segment(s) */
829 	SimpleLruTruncate(CommitTsCtl, cutoffPage);
830 }
831 
832 /*
833  * Set the limit values between which commit TS can be consulted.
834  */
835 void
SetCommitTsLimit(TransactionId oldestXact,TransactionId newestXact)836 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
837 {
838 	/*
839 	 * Be careful not to overwrite values that are either further into the
840 	 * "future" or signal a disabled committs.
841 	 */
842 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
843 	if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
844 	{
845 		if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
846 			ShmemVariableCache->oldestCommitTsXid = oldestXact;
847 		if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
848 			ShmemVariableCache->newestCommitTsXid = newestXact;
849 	}
850 	else
851 	{
852 		Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
853 		ShmemVariableCache->oldestCommitTsXid = oldestXact;
854 		ShmemVariableCache->newestCommitTsXid = newestXact;
855 	}
856 	LWLockRelease(CommitTsLock);
857 }
858 
859 /*
860  * Move forwards the oldest commitTS value that can be consulted
861  */
862 void
AdvanceOldestCommitTsXid(TransactionId oldestXact)863 AdvanceOldestCommitTsXid(TransactionId oldestXact)
864 {
865 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
866 	if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
867 	TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
868 		ShmemVariableCache->oldestCommitTsXid = oldestXact;
869 	LWLockRelease(CommitTsLock);
870 }
871 
872 
873 /*
874  * Decide whether a commitTS page number is "older" for truncation purposes.
875  * Analogous to CLOGPagePrecedes().
876  *
877  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128.  This
878  * introduces differences compared to CLOG and the other SLRUs having (1 <<
879  * 31) % per_page == 0.  This function never tests exactly
880  * TransactionIdPrecedes(x-2^31, x).  When the system reaches xidStopLimit,
881  * there are two possible counts of page boundaries between oldestXact and the
882  * latest XID assigned, depending on whether oldestXact is within the first
883  * 128 entries of its page.  Since this function doesn't know the location of
884  * oldestXact within page2, it returns false for one page that actually is
885  * expendable.  This is a wider (yet still negligible) version of the
886  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
887  *
888  * For the sake of a worked example, number entries with decimal values such
889  * that page1==1 entries range from 1.0 to 1.999.  Let N+0.15 be the number of
890  * pages that 2^31 entries will span (N is an integer).  If oldestXact=N+2.1,
891  * then the final safe XID assignment leaves newestXact=1.95.  We keep page 2,
892  * because entry=2.85 is the border that toggles whether entries precede the
893  * last entry of the oldestXact page.  While page 2 is expendable at
894  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
895  */
896 static bool
CommitTsPagePrecedes(int page1,int page2)897 CommitTsPagePrecedes(int page1, int page2)
898 {
899 	TransactionId xid1;
900 	TransactionId xid2;
901 
902 	xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
903 	xid1 += FirstNormalTransactionId + 1;
904 	xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
905 	xid2 += FirstNormalTransactionId + 1;
906 
907 	return (TransactionIdPrecedes(xid1, xid2) &&
908 			TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
909 }
910 
911 
912 /*
913  * Write a ZEROPAGE xlog record
914  */
915 static void
WriteZeroPageXlogRec(int pageno)916 WriteZeroPageXlogRec(int pageno)
917 {
918 	XLogBeginInsert();
919 	XLogRegisterData((char *) (&pageno), sizeof(int));
920 	(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
921 }
922 
923 /*
924  * Write a TRUNCATE xlog record
925  */
926 static void
WriteTruncateXlogRec(int pageno)927 WriteTruncateXlogRec(int pageno)
928 {
929 	XLogBeginInsert();
930 	XLogRegisterData((char *) (&pageno), sizeof(int));
931 	(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
932 }
933 
934 /*
935  * Write a SETTS xlog record
936  */
937 static void
WriteSetTimestampXlogRec(TransactionId mainxid,int nsubxids,TransactionId * subxids,TimestampTz timestamp,RepOriginId nodeid)938 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
939 						 TransactionId *subxids, TimestampTz timestamp,
940 						 RepOriginId nodeid)
941 {
942 	xl_commit_ts_set record;
943 
944 	record.timestamp = timestamp;
945 	record.nodeid = nodeid;
946 	record.mainxid = mainxid;
947 
948 	XLogBeginInsert();
949 	XLogRegisterData((char *) &record,
950 					 offsetof(xl_commit_ts_set, mainxid) +
951 					 sizeof(TransactionId));
952 	XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
953 	XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
954 }
955 
956 /*
957  * CommitTS resource manager's routines
958  */
959 void
commit_ts_redo(XLogReaderState * record)960 commit_ts_redo(XLogReaderState *record)
961 {
962 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
963 
964 	/* Backup blocks are not used in commit_ts records */
965 	Assert(!XLogRecHasAnyBlockRefs(record));
966 
967 	if (info == COMMIT_TS_ZEROPAGE)
968 	{
969 		int			pageno;
970 		int			slotno;
971 
972 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
973 
974 		LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
975 
976 		slotno = ZeroCommitTsPage(pageno, false);
977 		SimpleLruWritePage(CommitTsCtl, slotno);
978 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
979 
980 		LWLockRelease(CommitTsControlLock);
981 	}
982 	else if (info == COMMIT_TS_TRUNCATE)
983 	{
984 		int			pageno;
985 
986 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
987 
988 		/*
989 		 * During XLOG replay, latest_page_number isn't set up yet; insert a
990 		 * suitable value to bypass the sanity test in SimpleLruTruncate.
991 		 */
992 		CommitTsCtl->shared->latest_page_number = pageno;
993 
994 		SimpleLruTruncate(CommitTsCtl, pageno);
995 	}
996 	else if (info == COMMIT_TS_SETTS)
997 	{
998 		xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
999 		int			nsubxids;
1000 		TransactionId *subxids;
1001 
1002 		nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1003 					sizeof(TransactionId));
1004 		if (nsubxids > 0)
1005 		{
1006 			subxids = palloc(sizeof(TransactionId) * nsubxids);
1007 			memcpy(subxids,
1008 				   XLogRecGetData(record) + SizeOfCommitTsSet,
1009 				   sizeof(TransactionId) * nsubxids);
1010 		}
1011 		else
1012 			subxids = NULL;
1013 
1014 		TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1015 									   setts->timestamp, setts->nodeid, false);
1016 		if (subxids)
1017 			pfree(subxids);
1018 	}
1019 	else
1020 		elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1021 }
1022