1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  *		PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_xact-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26 
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "storage/shmem.h"
36 #include "utils/builtins.h"
37 #include "utils/snapmgr.h"
38 #include "utils/timestamp.h"
39 
40 /*
41  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
42  * everywhere else in Postgres.
43  *
44  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45  * CommitTs page numbering also wraps around at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
47  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
48  * explicit notice of that fact in this module, except when comparing segment
49  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50  */
51 
52 /*
53  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
54  * the largest possible file name is more than 5 chars long; see
55  * SlruScanDirectory.
56  */
57 typedef struct CommitTimestampEntry
58 {
59 	TimestampTz time;
60 	RepOriginId nodeid;
61 } CommitTimestampEntry;
62 
63 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
64 									sizeof(RepOriginId))
65 
66 #define COMMIT_TS_XACTS_PER_PAGE \
67 	(BLCKSZ / SizeOfCommitTimestampEntry)
68 
69 #define TransactionIdToCTsPage(xid) \
70 	((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
71 #define TransactionIdToCTsEntry(xid)	\
72 	((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
73 
74 /*
75  * Link to shared-memory data structures for CommitTs control
76  */
77 static SlruCtlData CommitTsCtlData;
78 
79 #define CommitTsCtl (&CommitTsCtlData)
80 
81 /*
82  * We keep a cache of the last value set in shared memory.
83  *
84  * This is also good place to keep the activation status.  We keep this
85  * separate from the GUC so that the standby can activate the module if the
86  * primary has it active independently of the value of the GUC.
87  *
88  * This is protected by CommitTsLock.  In some places, we use commitTsActive
89  * without acquiring the lock; where this happens, a comment explains the
90  * rationale for it.
91  */
92 typedef struct CommitTimestampShared
93 {
94 	TransactionId xidLastCommit;
95 	CommitTimestampEntry dataLastCommit;
96 	bool		commitTsActive;
97 } CommitTimestampShared;
98 
99 CommitTimestampShared *commitTsShared;
100 
101 
102 /* GUC variable */
103 bool		track_commit_timestamp;
104 
105 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
106 					 TransactionId *subxids, TimestampTz ts,
107 					 RepOriginId nodeid, int pageno);
108 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
109 						 RepOriginId nodeid, int slotno);
110 static void error_commit_ts_disabled(void);
111 static int	ZeroCommitTsPage(int pageno, bool writeXlog);
112 static bool CommitTsPagePrecedes(int page1, int page2);
113 static void ActivateCommitTs(void);
114 static void DeactivateCommitTs(void);
115 static void WriteZeroPageXlogRec(int pageno);
116 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
117 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
118 						 TransactionId *subxids, TimestampTz timestamp,
119 						 RepOriginId nodeid);
120 
121 /*
122  * TransactionTreeSetCommitTsData
123  *
124  * Record the final commit timestamp of transaction entries in the commit log
125  * for a transaction and its subtransaction tree, as efficiently as possible.
126  *
127  * xid is the top level transaction id.
128  *
129  * subxids is an array of xids of length nsubxids, representing subtransactions
130  * in the tree of xid. In various cases nsubxids may be zero.
131  * The reason why tracking just the parent xid commit timestamp is not enough
132  * is that the subtrans SLRU does not stay valid across crashes (it's not
133  * permanent) so we need to keep the information about them here. If the
134  * subtrans implementation changes in the future, we might want to revisit the
135  * decision of storing timestamp info for each subxid.
136  *
137  * The write_xlog parameter tells us whether to include an XLog record of this
138  * or not.  Normally, this is called from transaction commit routines (both
139  * normal and prepared) and the information will be stored in the transaction
140  * commit XLog record, and so they should pass "false" for this.  The XLog redo
141  * code should use "false" here as well.  Other callers probably want to pass
142  * true, so that the given values persist in case of crashes.
143  */
144 void
TransactionTreeSetCommitTsData(TransactionId xid,int nsubxids,TransactionId * subxids,TimestampTz timestamp,RepOriginId nodeid,bool write_xlog)145 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
146 							   TransactionId *subxids, TimestampTz timestamp,
147 							   RepOriginId nodeid, bool write_xlog)
148 {
149 	int			i;
150 	TransactionId headxid;
151 	TransactionId newestXact;
152 
153 	/*
154 	 * No-op if the module is not active.
155 	 *
156 	 * An unlocked read here is fine, because in a standby (the only place
157 	 * where the flag can change in flight) this routine is only called by the
158 	 * recovery process, which is also the only process which can change the
159 	 * flag.
160 	 */
161 	if (!commitTsShared->commitTsActive)
162 		return;
163 
164 	/*
165 	 * Comply with the WAL-before-data rule: if caller specified it wants this
166 	 * value to be recorded in WAL, do so before touching the data.
167 	 */
168 	if (write_xlog)
169 		WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
170 
171 	/*
172 	 * Figure out the latest Xid in this batch: either the last subxid if
173 	 * there's any, otherwise the parent xid.
174 	 */
175 	if (nsubxids > 0)
176 		newestXact = subxids[nsubxids - 1];
177 	else
178 		newestXact = xid;
179 
180 	/*
181 	 * We split the xids to set the timestamp to in groups belonging to the
182 	 * same SLRU page; the first element in each such set is its head.  The
183 	 * first group has the main XID as the head; subsequent sets use the first
184 	 * subxid not on the previous page as head.  This way, we only have to
185 	 * lock/modify each SLRU page once.
186 	 */
187 	for (i = 0, headxid = xid;;)
188 	{
189 		int			pageno = TransactionIdToCTsPage(headxid);
190 		int			j;
191 
192 		for (j = i; j < nsubxids; j++)
193 		{
194 			if (TransactionIdToCTsPage(subxids[j]) != pageno)
195 				break;
196 		}
197 		/* subxids[i..j] are on the same page as the head */
198 
199 		SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
200 							 pageno);
201 
202 		/* if we wrote out all subxids, we're done. */
203 		if (j + 1 >= nsubxids)
204 			break;
205 
206 		/*
207 		 * Set the new head and skip over it, as well as over the subxids we
208 		 * just wrote.
209 		 */
210 		headxid = subxids[j];
211 		i += j - i + 1;
212 	}
213 
214 	/* update the cached value in shared memory */
215 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
216 	commitTsShared->xidLastCommit = xid;
217 	commitTsShared->dataLastCommit.time = timestamp;
218 	commitTsShared->dataLastCommit.nodeid = nodeid;
219 
220 	/* and move forwards our endpoint, if needed */
221 	if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
222 		ShmemVariableCache->newestCommitTsXid = newestXact;
223 	LWLockRelease(CommitTsLock);
224 }
225 
226 /*
227  * Record the commit timestamp of transaction entries in the commit log for all
228  * entries on a single page.  Atomic only on this page.
229  */
230 static void
SetXidCommitTsInPage(TransactionId xid,int nsubxids,TransactionId * subxids,TimestampTz ts,RepOriginId nodeid,int pageno)231 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
232 					 TransactionId *subxids, TimestampTz ts,
233 					 RepOriginId nodeid, int pageno)
234 {
235 	int			slotno;
236 	int			i;
237 
238 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
239 
240 	slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
241 
242 	TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
243 	for (i = 0; i < nsubxids; i++)
244 		TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
245 
246 	CommitTsCtl->shared->page_dirty[slotno] = true;
247 
248 	LWLockRelease(CommitTsControlLock);
249 }
250 
251 /*
252  * Sets the commit timestamp of a single transaction.
253  *
254  * Must be called with CommitTsControlLock held
255  */
256 static void
TransactionIdSetCommitTs(TransactionId xid,TimestampTz ts,RepOriginId nodeid,int slotno)257 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
258 						 RepOriginId nodeid, int slotno)
259 {
260 	int			entryno = TransactionIdToCTsEntry(xid);
261 	CommitTimestampEntry entry;
262 
263 	Assert(TransactionIdIsNormal(xid));
264 
265 	entry.time = ts;
266 	entry.nodeid = nodeid;
267 
268 	memcpy(CommitTsCtl->shared->page_buffer[slotno] +
269 		   SizeOfCommitTimestampEntry * entryno,
270 		   &entry, SizeOfCommitTimestampEntry);
271 }
272 
273 /*
274  * Interrogate the commit timestamp of a transaction.
275  *
276  * The return value indicates whether a commit timestamp record was found for
277  * the given xid.  The timestamp value is returned in *ts (which may not be
278  * null), and the origin node for the Xid is returned in *nodeid, if it's not
279  * null.
280  */
281 bool
TransactionIdGetCommitTsData(TransactionId xid,TimestampTz * ts,RepOriginId * nodeid)282 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
283 							 RepOriginId *nodeid)
284 {
285 	int			pageno = TransactionIdToCTsPage(xid);
286 	int			entryno = TransactionIdToCTsEntry(xid);
287 	int			slotno;
288 	CommitTimestampEntry entry;
289 	TransactionId oldestCommitTsXid;
290 	TransactionId newestCommitTsXid;
291 
292 	if (!TransactionIdIsValid(xid))
293 		ereport(ERROR,
294 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295 				 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296 	else if (!TransactionIdIsNormal(xid))
297 	{
298 		/* frozen and bootstrap xids are always committed far in the past */
299 		*ts = 0;
300 		if (nodeid)
301 			*nodeid = 0;
302 		return false;
303 	}
304 
305 	LWLockAcquire(CommitTsLock, LW_SHARED);
306 
307 	/* Error if module not enabled */
308 	if (!commitTsShared->commitTsActive)
309 		error_commit_ts_disabled();
310 
311 	/*
312 	 * If we're asked for the cached value, return that.  Otherwise, fall
313 	 * through to read from SLRU.
314 	 */
315 	if (commitTsShared->xidLastCommit == xid)
316 	{
317 		*ts = commitTsShared->dataLastCommit.time;
318 		if (nodeid)
319 			*nodeid = commitTsShared->dataLastCommit.nodeid;
320 
321 		LWLockRelease(CommitTsLock);
322 		return *ts != 0;
323 	}
324 
325 	oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
326 	newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
327 	/* neither is invalid, or both are */
328 	Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
329 	LWLockRelease(CommitTsLock);
330 
331 	/*
332 	 * Return empty if the requested value is outside our valid range.
333 	 */
334 	if (!TransactionIdIsValid(oldestCommitTsXid) ||
335 		TransactionIdPrecedes(xid, oldestCommitTsXid) ||
336 		TransactionIdPrecedes(newestCommitTsXid, xid))
337 	{
338 		*ts = 0;
339 		if (nodeid)
340 			*nodeid = InvalidRepOriginId;
341 		return false;
342 	}
343 
344 	/* lock is acquired by SimpleLruReadPage_ReadOnly */
345 	slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
346 	memcpy(&entry,
347 		   CommitTsCtl->shared->page_buffer[slotno] +
348 		   SizeOfCommitTimestampEntry * entryno,
349 		   SizeOfCommitTimestampEntry);
350 
351 	*ts = entry.time;
352 	if (nodeid)
353 		*nodeid = entry.nodeid;
354 
355 	LWLockRelease(CommitTsControlLock);
356 	return *ts != 0;
357 }
358 
359 /*
360  * Return the Xid of the latest committed transaction.  (As far as this module
361  * is concerned, anyway; it's up to the caller to ensure the value is useful
362  * for its purposes.)
363  *
364  * ts and extra are filled with the corresponding data; they can be passed
365  * as NULL if not wanted.
366  */
367 TransactionId
GetLatestCommitTsData(TimestampTz * ts,RepOriginId * nodeid)368 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
369 {
370 	TransactionId xid;
371 
372 	LWLockAcquire(CommitTsLock, LW_SHARED);
373 
374 	/* Error if module not enabled */
375 	if (!commitTsShared->commitTsActive)
376 		error_commit_ts_disabled();
377 
378 	xid = commitTsShared->xidLastCommit;
379 	if (ts)
380 		*ts = commitTsShared->dataLastCommit.time;
381 	if (nodeid)
382 		*nodeid = commitTsShared->dataLastCommit.nodeid;
383 	LWLockRelease(CommitTsLock);
384 
385 	return xid;
386 }
387 
388 static void
error_commit_ts_disabled(void)389 error_commit_ts_disabled(void)
390 {
391 	ereport(ERROR,
392 			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393 			 errmsg("could not get commit timestamp data"),
394 			 RecoveryInProgress() ?
395 			 errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
396 					 "track_commit_timestamp") :
397 			 errhint("Make sure the configuration parameter \"%s\" is set.",
398 					 "track_commit_timestamp")));
399 }
400 
401 /*
402  * SQL-callable wrapper to obtain commit time of a transaction
403  */
404 Datum
pg_xact_commit_timestamp(PG_FUNCTION_ARGS)405 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
406 {
407 	TransactionId xid = PG_GETARG_UINT32(0);
408 	TimestampTz ts;
409 	bool		found;
410 
411 	found = TransactionIdGetCommitTsData(xid, &ts, NULL);
412 
413 	if (!found)
414 		PG_RETURN_NULL();
415 
416 	PG_RETURN_TIMESTAMPTZ(ts);
417 }
418 
419 
420 Datum
pg_last_committed_xact(PG_FUNCTION_ARGS)421 pg_last_committed_xact(PG_FUNCTION_ARGS)
422 {
423 	TransactionId xid;
424 	TimestampTz ts;
425 	Datum		values[2];
426 	bool		nulls[2];
427 	TupleDesc	tupdesc;
428 	HeapTuple	htup;
429 
430 	/* and construct a tuple with our data */
431 	xid = GetLatestCommitTsData(&ts, NULL);
432 
433 	/*
434 	 * Construct a tuple descriptor for the result row.  This must match this
435 	 * function's pg_proc entry!
436 	 */
437 	tupdesc = CreateTemplateTupleDesc(2, false);
438 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
439 					   XIDOID, -1, 0);
440 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
441 					   TIMESTAMPTZOID, -1, 0);
442 	tupdesc = BlessTupleDesc(tupdesc);
443 
444 	if (!TransactionIdIsNormal(xid))
445 	{
446 		memset(nulls, true, sizeof(nulls));
447 	}
448 	else
449 	{
450 		values[0] = TransactionIdGetDatum(xid);
451 		nulls[0] = false;
452 
453 		values[1] = TimestampTzGetDatum(ts);
454 		nulls[1] = false;
455 	}
456 
457 	htup = heap_form_tuple(tupdesc, values, nulls);
458 
459 	PG_RETURN_DATUM(HeapTupleGetDatum(htup));
460 }
461 
462 
463 /*
464  * Number of shared CommitTS buffers.
465  *
466  * We use a very similar logic as for the number of CLOG buffers; see comments
467  * in CLOGShmemBuffers.
468  */
469 Size
CommitTsShmemBuffers(void)470 CommitTsShmemBuffers(void)
471 {
472 	return Min(16, Max(4, NBuffers / 1024));
473 }
474 
475 /*
476  * Shared memory sizing for CommitTs
477  */
478 Size
CommitTsShmemSize(void)479 CommitTsShmemSize(void)
480 {
481 	return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
482 		sizeof(CommitTimestampShared);
483 }
484 
485 /*
486  * Initialize CommitTs at system startup (postmaster start or standalone
487  * backend)
488  */
489 void
CommitTsShmemInit(void)490 CommitTsShmemInit(void)
491 {
492 	bool		found;
493 
494 	CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
495 	SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
496 				  CommitTsControlLock, "pg_commit_ts",
497 				  LWTRANCHE_COMMITTS_BUFFERS);
498 	SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
499 
500 	commitTsShared = ShmemInitStruct("CommitTs shared",
501 									 sizeof(CommitTimestampShared),
502 									 &found);
503 
504 	if (!IsUnderPostmaster)
505 	{
506 		Assert(!found);
507 
508 		commitTsShared->xidLastCommit = InvalidTransactionId;
509 		TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
510 		commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
511 		commitTsShared->commitTsActive = false;
512 	}
513 	else
514 		Assert(found);
515 }
516 
517 /*
518  * This function must be called ONCE on system install.
519  *
520  * (The CommitTs directory is assumed to have been created by initdb, and
521  * CommitTsShmemInit must have been called already.)
522  */
523 void
BootStrapCommitTs(void)524 BootStrapCommitTs(void)
525 {
526 	/*
527 	 * Nothing to do here at present, unlike most other SLRU modules; segments
528 	 * are created when the server is started with this module enabled. See
529 	 * ActivateCommitTs.
530 	 */
531 }
532 
533 /*
534  * Initialize (or reinitialize) a page of CommitTs to zeroes.
535  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
536  *
537  * The page is not actually written, just set up in shared memory.
538  * The slot number of the new page is returned.
539  *
540  * Control lock must be held at entry, and will be held at exit.
541  */
542 static int
ZeroCommitTsPage(int pageno,bool writeXlog)543 ZeroCommitTsPage(int pageno, bool writeXlog)
544 {
545 	int			slotno;
546 
547 	slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
548 
549 	if (writeXlog)
550 		WriteZeroPageXlogRec(pageno);
551 
552 	return slotno;
553 }
554 
555 /*
556  * This must be called ONCE during postmaster or standalone-backend startup,
557  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
558  */
559 void
StartupCommitTs(void)560 StartupCommitTs(void)
561 {
562 	ActivateCommitTs();
563 }
564 
565 /*
566  * This must be called ONCE during postmaster or standalone-backend startup,
567  * after recovery has finished.
568  */
569 void
CompleteCommitTsInitialization(void)570 CompleteCommitTsInitialization(void)
571 {
572 	/*
573 	 * If the feature is not enabled, turn it off for good.  This also removes
574 	 * any leftover data.
575 	 *
576 	 * Conversely, we activate the module if the feature is enabled.  This is
577 	 * necessary for primary and standby as the activation depends on the
578 	 * control file contents at the beginning of recovery or when a
579 	 * XLOG_PARAMETER_CHANGE is replayed.
580 	 */
581 	if (!track_commit_timestamp)
582 		DeactivateCommitTs();
583 	else
584 		ActivateCommitTs();
585 }
586 
587 /*
588  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
589  * XLog record during recovery.
590  */
591 void
CommitTsParameterChange(bool newvalue,bool oldvalue)592 CommitTsParameterChange(bool newvalue, bool oldvalue)
593 {
594 	/*
595 	 * If the commit_ts module is disabled in this server and we get word from
596 	 * the master server that it is enabled there, activate it so that we can
597 	 * replay future WAL records involving it; also mark it as active on
598 	 * pg_control.  If the old value was already set, we already did this, so
599 	 * don't do anything.
600 	 *
601 	 * If the module is disabled in the master, disable it here too, unless
602 	 * the module is enabled locally.
603 	 *
604 	 * Note this only runs in the recovery process, so an unlocked read is
605 	 * fine.
606 	 */
607 	if (newvalue)
608 	{
609 		if (!commitTsShared->commitTsActive)
610 			ActivateCommitTs();
611 	}
612 	else if (commitTsShared->commitTsActive)
613 		DeactivateCommitTs();
614 }
615 
616 /*
617  * Activate this module whenever necessary.
618  *		This must happen during postmaster or standalone-backend startup,
619  *		or during WAL replay anytime the track_commit_timestamp setting is
620  *		changed in the master.
621  *
622  * The reason why this SLRU needs separate activation/deactivation functions is
623  * that it can be enabled/disabled during start and the activation/deactivation
624  * on master is propagated to standby via replay. Other SLRUs don't have this
625  * property and they can be just initialized during normal startup.
626  *
627  * This is in charge of creating the currently active segment, if it's not
628  * already there.  The reason for this is that the server might have been
629  * running with this module disabled for a while and thus might have skipped
630  * the normal creation point.
631  */
632 static void
ActivateCommitTs(void)633 ActivateCommitTs(void)
634 {
635 	TransactionId xid;
636 	int			pageno;
637 
638 	/* If we've done this already, there's nothing to do */
639 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
640 	if (commitTsShared->commitTsActive)
641 	{
642 		LWLockRelease(CommitTsLock);
643 		return;
644 	}
645 	LWLockRelease(CommitTsLock);
646 
647 	xid = ShmemVariableCache->nextXid;
648 	pageno = TransactionIdToCTsPage(xid);
649 
650 	/*
651 	 * Re-Initialize our idea of the latest page number.
652 	 */
653 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
654 	CommitTsCtl->shared->latest_page_number = pageno;
655 	LWLockRelease(CommitTsControlLock);
656 
657 	/*
658 	 * If CommitTs is enabled, but it wasn't in the previous server run, we
659 	 * need to set the oldest and newest values to the next Xid; that way, we
660 	 * will not try to read data that might not have been set.
661 	 *
662 	 * XXX does this have a problem if a server is started with commitTs
663 	 * enabled, then started with commitTs disabled, then restarted with it
664 	 * enabled again?  It doesn't look like it does, because there should be a
665 	 * checkpoint that sets the value to InvalidTransactionId at end of
666 	 * recovery; and so any chance of injecting new transactions without
667 	 * CommitTs values would occur after the oldestCommitTsXid has been set to
668 	 * Invalid temporarily.
669 	 */
670 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
671 	if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
672 	{
673 		ShmemVariableCache->oldestCommitTsXid =
674 			ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
675 	}
676 	LWLockRelease(CommitTsLock);
677 
678 	/* Create the current segment file, if necessary */
679 	if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
680 	{
681 		int			slotno;
682 
683 		LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
684 		slotno = ZeroCommitTsPage(pageno, false);
685 		SimpleLruWritePage(CommitTsCtl, slotno);
686 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
687 		LWLockRelease(CommitTsControlLock);
688 	}
689 
690 	/* Change the activation status in shared memory. */
691 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
692 	commitTsShared->commitTsActive = true;
693 	LWLockRelease(CommitTsLock);
694 }
695 
696 /*
697  * Deactivate this module.
698  *
699  * This must be called when the track_commit_timestamp parameter is turned off.
700  * This happens during postmaster or standalone-backend startup, or during WAL
701  * replay.
702  *
703  * Resets CommitTs into invalid state to make sure we don't hand back
704  * possibly-invalid data; also removes segments of old data.
705  */
706 static void
DeactivateCommitTs(void)707 DeactivateCommitTs(void)
708 {
709 	/*
710 	 * Cleanup the status in the shared memory.
711 	 *
712 	 * We reset everything in the commitTsShared record to prevent user from
713 	 * getting confusing data about last committed transaction on the standby
714 	 * when the module was activated repeatedly on the primary.
715 	 */
716 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
717 
718 	commitTsShared->commitTsActive = false;
719 	commitTsShared->xidLastCommit = InvalidTransactionId;
720 	TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
721 	commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
722 
723 	ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
724 	ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
725 
726 	LWLockRelease(CommitTsLock);
727 
728 	/*
729 	 * Remove *all* files.  This is necessary so that there are no leftover
730 	 * files; in the case where this feature is later enabled after running
731 	 * with it disabled for some time there may be a gap in the file sequence.
732 	 * (We can probably tolerate out-of-sequence files, as they are going to
733 	 * be overwritten anyway when we wrap around, but it seems better to be
734 	 * tidy.)
735 	 */
736 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
737 	(void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
738 	LWLockRelease(CommitTsControlLock);
739 }
740 
741 /*
742  * This must be called ONCE during postmaster or standalone-backend shutdown
743  */
744 void
ShutdownCommitTs(void)745 ShutdownCommitTs(void)
746 {
747 	/* Flush dirty CommitTs pages to disk */
748 	SimpleLruFlush(CommitTsCtl, false);
749 
750 	/*
751 	 * fsync pg_commit_ts to ensure that any files flushed previously are
752 	 * durably on disk.
753 	 */
754 	fsync_fname("pg_commit_ts", true);
755 }
756 
757 /*
758  * Perform a checkpoint --- either during shutdown, or on-the-fly
759  */
760 void
CheckPointCommitTs(void)761 CheckPointCommitTs(void)
762 {
763 	/* Flush dirty CommitTs pages to disk */
764 	SimpleLruFlush(CommitTsCtl, true);
765 }
766 
767 /*
768  * Make sure that CommitTs has room for a newly-allocated XID.
769  *
770  * NB: this is called while holding XidGenLock.  We want it to be very fast
771  * most of the time; even when it's not so fast, no actual I/O need happen
772  * unless we're forced to write out a dirty CommitTs or xlog page to make room
773  * in shared memory.
774  *
775  * NB: the current implementation relies on track_commit_timestamp being
776  * PGC_POSTMASTER.
777  */
778 void
ExtendCommitTs(TransactionId newestXact)779 ExtendCommitTs(TransactionId newestXact)
780 {
781 	int			pageno;
782 
783 	/*
784 	 * Nothing to do if module not enabled.  Note we do an unlocked read of
785 	 * the flag here, which is okay because this routine is only called from
786 	 * GetNewTransactionId, which is never called in a standby.
787 	 */
788 	Assert(!InRecovery);
789 	if (!commitTsShared->commitTsActive)
790 		return;
791 
792 	/*
793 	 * No work except at first XID of a page.  But beware: just after
794 	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
795 	 */
796 	if (TransactionIdToCTsEntry(newestXact) != 0 &&
797 		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
798 		return;
799 
800 	pageno = TransactionIdToCTsPage(newestXact);
801 
802 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
803 
804 	/* Zero the page and make an XLOG entry about it */
805 	ZeroCommitTsPage(pageno, !InRecovery);
806 
807 	LWLockRelease(CommitTsControlLock);
808 }
809 
810 /*
811  * Remove all CommitTs segments before the one holding the passed
812  * transaction ID.
813  *
814  * Note that we don't need to flush XLOG here.
815  */
816 void
TruncateCommitTs(TransactionId oldestXact)817 TruncateCommitTs(TransactionId oldestXact)
818 {
819 	int			cutoffPage;
820 
821 	/*
822 	 * The cutoff point is the start of the segment containing oldestXact. We
823 	 * pass the *page* containing oldestXact to SimpleLruTruncate.
824 	 */
825 	cutoffPage = TransactionIdToCTsPage(oldestXact);
826 
827 	/* Check to see if there's any files that could be removed */
828 	if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
829 						   &cutoffPage))
830 		return;					/* nothing to remove */
831 
832 	/* Write XLOG record */
833 	WriteTruncateXlogRec(cutoffPage, oldestXact);
834 
835 	/* Now we can remove the old CommitTs segment(s) */
836 	SimpleLruTruncate(CommitTsCtl, cutoffPage);
837 }
838 
839 /*
840  * Set the limit values between which commit TS can be consulted.
841  */
842 void
SetCommitTsLimit(TransactionId oldestXact,TransactionId newestXact)843 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
844 {
845 	/*
846 	 * Be careful not to overwrite values that are either further into the
847 	 * "future" or signal a disabled committs.
848 	 */
849 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
850 	if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
851 	{
852 		if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
853 			ShmemVariableCache->oldestCommitTsXid = oldestXact;
854 		if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
855 			ShmemVariableCache->newestCommitTsXid = newestXact;
856 	}
857 	else
858 	{
859 		Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
860 		ShmemVariableCache->oldestCommitTsXid = oldestXact;
861 		ShmemVariableCache->newestCommitTsXid = newestXact;
862 	}
863 	LWLockRelease(CommitTsLock);
864 }
865 
866 /*
867  * Move forwards the oldest commitTS value that can be consulted
868  */
869 void
AdvanceOldestCommitTsXid(TransactionId oldestXact)870 AdvanceOldestCommitTsXid(TransactionId oldestXact)
871 {
872 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
873 	if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
874 		TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
875 		ShmemVariableCache->oldestCommitTsXid = oldestXact;
876 	LWLockRelease(CommitTsLock);
877 }
878 
879 
880 /*
881  * Decide whether a commitTS page number is "older" for truncation purposes.
882  * Analogous to CLOGPagePrecedes().
883  *
884  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128.  This
885  * introduces differences compared to CLOG and the other SLRUs having (1 <<
886  * 31) % per_page == 0.  This function never tests exactly
887  * TransactionIdPrecedes(x-2^31, x).  When the system reaches xidStopLimit,
888  * there are two possible counts of page boundaries between oldestXact and the
889  * latest XID assigned, depending on whether oldestXact is within the first
890  * 128 entries of its page.  Since this function doesn't know the location of
891  * oldestXact within page2, it returns false for one page that actually is
892  * expendable.  This is a wider (yet still negligible) version of the
893  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
894  *
895  * For the sake of a worked example, number entries with decimal values such
896  * that page1==1 entries range from 1.0 to 1.999.  Let N+0.15 be the number of
897  * pages that 2^31 entries will span (N is an integer).  If oldestXact=N+2.1,
898  * then the final safe XID assignment leaves newestXact=1.95.  We keep page 2,
899  * because entry=2.85 is the border that toggles whether entries precede the
900  * last entry of the oldestXact page.  While page 2 is expendable at
901  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
902  */
903 static bool
CommitTsPagePrecedes(int page1,int page2)904 CommitTsPagePrecedes(int page1, int page2)
905 {
906 	TransactionId xid1;
907 	TransactionId xid2;
908 
909 	xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
910 	xid1 += FirstNormalTransactionId + 1;
911 	xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
912 	xid2 += FirstNormalTransactionId + 1;
913 
914 	return (TransactionIdPrecedes(xid1, xid2) &&
915 			TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
916 }
917 
918 
919 /*
920  * Write a ZEROPAGE xlog record
921  */
922 static void
WriteZeroPageXlogRec(int pageno)923 WriteZeroPageXlogRec(int pageno)
924 {
925 	XLogBeginInsert();
926 	XLogRegisterData((char *) (&pageno), sizeof(int));
927 	(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
928 }
929 
930 /*
931  * Write a TRUNCATE xlog record
932  */
933 static void
WriteTruncateXlogRec(int pageno,TransactionId oldestXid)934 WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
935 {
936 	xl_commit_ts_truncate xlrec;
937 
938 	xlrec.pageno = pageno;
939 	xlrec.oldestXid = oldestXid;
940 
941 	XLogBeginInsert();
942 	XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
943 	(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
944 }
945 
946 /*
947  * Write a SETTS xlog record
948  */
949 static void
WriteSetTimestampXlogRec(TransactionId mainxid,int nsubxids,TransactionId * subxids,TimestampTz timestamp,RepOriginId nodeid)950 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
951 						 TransactionId *subxids, TimestampTz timestamp,
952 						 RepOriginId nodeid)
953 {
954 	xl_commit_ts_set record;
955 
956 	record.timestamp = timestamp;
957 	record.nodeid = nodeid;
958 	record.mainxid = mainxid;
959 
960 	XLogBeginInsert();
961 	XLogRegisterData((char *) &record,
962 					 offsetof(xl_commit_ts_set, mainxid) +
963 					 sizeof(TransactionId));
964 	XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
965 	XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
966 }
967 
968 /*
969  * CommitTS resource manager's routines
970  */
971 void
commit_ts_redo(XLogReaderState * record)972 commit_ts_redo(XLogReaderState *record)
973 {
974 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
975 
976 	/* Backup blocks are not used in commit_ts records */
977 	Assert(!XLogRecHasAnyBlockRefs(record));
978 
979 	if (info == COMMIT_TS_ZEROPAGE)
980 	{
981 		int			pageno;
982 		int			slotno;
983 
984 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
985 
986 		LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
987 
988 		slotno = ZeroCommitTsPage(pageno, false);
989 		SimpleLruWritePage(CommitTsCtl, slotno);
990 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
991 
992 		LWLockRelease(CommitTsControlLock);
993 	}
994 	else if (info == COMMIT_TS_TRUNCATE)
995 	{
996 		xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
997 
998 		AdvanceOldestCommitTsXid(trunc->oldestXid);
999 
1000 		/*
1001 		 * During XLOG replay, latest_page_number isn't set up yet; insert a
1002 		 * suitable value to bypass the sanity test in SimpleLruTruncate.
1003 		 */
1004 		CommitTsCtl->shared->latest_page_number = trunc->pageno;
1005 
1006 		SimpleLruTruncate(CommitTsCtl, trunc->pageno);
1007 	}
1008 	else if (info == COMMIT_TS_SETTS)
1009 	{
1010 		xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
1011 		int			nsubxids;
1012 		TransactionId *subxids;
1013 
1014 		nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1015 					sizeof(TransactionId));
1016 		if (nsubxids > 0)
1017 		{
1018 			subxids = palloc(sizeof(TransactionId) * nsubxids);
1019 			memcpy(subxids,
1020 				   XLogRecGetData(record) + SizeOfCommitTsSet,
1021 				   sizeof(TransactionId) * nsubxids);
1022 		}
1023 		else
1024 			subxids = NULL;
1025 
1026 		TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1027 									   setts->timestamp, setts->nodeid, false);
1028 		if (subxids)
1029 			pfree(subxids);
1030 	}
1031 	else
1032 		elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1033 }
1034