1 /*-------------------------------------------------------------------------
2  *
3  * twophase.c
4  *		Two-phase commit support functions.
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		src/backend/access/transam/twophase.c
11  *
12  * NOTES
13  *		Each global transaction is associated with a global transaction
14  *		identifier (GID). The client assigns a GID to a postgres
15  *		transaction with the PREPARE TRANSACTION command.
16  *
17  *		We keep all active global transactions in a shared memory array.
18  *		When the PREPARE TRANSACTION command is issued, the GID is
19  *		reserved for the transaction in the array. This is done before
20  *		a WAL entry is made, because the reservation checks for duplicate
21  *		GIDs and aborts the transaction if there already is a global
22  *		transaction in prepared state with the same GID.
23  *
24  *		A global transaction (gxact) also has dummy PGPROC; this is what keeps
25  *		the XID considered running by TransactionIdIsInProgress.  It is also
26  *		convenient as a PGPROC to hook the gxact's locks to.
27  *
28  *		Information to recover prepared transactions in case of crash is
29  *		now stored in WAL for the common case. In some cases there will be
30  *		an extended period between preparing a GXACT and commit/abort, in
31  *		which case we need to separately record prepared transaction data
32  *		in permanent storage. This includes locking information, pending
33  *		notifications etc. All that state information is written to the
34  *		per-transaction state file in the pg_twophase directory.
35  *		All prepared transactions will be written prior to shutdown.
36  *
37  *		Life track of state data is following:
38  *
39  *		* On PREPARE TRANSACTION backend writes state data only to the WAL and
40  *		  stores pointer to the start of the WAL record in
41  *		  gxact->prepare_start_lsn.
42  *		* If COMMIT occurs before checkpoint then backend reads data from WAL
43  *		  using prepare_start_lsn.
44  *		* On checkpoint state data copied to files in pg_twophase directory and
45  *		  fsynced
46  *		* If COMMIT happens after checkpoint then backend reads state data from
47  *		  files
48  *
49  *		During replay and replication, TwoPhaseState also holds information
50  *		about active prepared transactions that haven't been moved to disk yet.
51  *
52  *		Replay of twophase records happens by the following rules:
53  *
54  *		* At the beginning of recovery, pg_twophase is scanned once, filling
55  *		  TwoPhaseState with entries marked with gxact->inredo and
56  *		  gxact->ondisk.  Two-phase file data older than the XID horizon of
57  *		  the redo position are discarded.
58  *		* On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59  *		  gxact->inredo is set to true for such entries.
60  *		* On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61  *		  that have gxact->inredo set and are behind the redo_horizon. We
62  *		  save them to disk and then switch gxact->ondisk to true.
63  *		* On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64  *		  If gxact->ondisk is true, the corresponding entry from the disk
65  *		  is additionally deleted.
66  *		* RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67  *		  and PrescanPreparedTransactions() have been modified to go through
68  *		  gxact->inredo entries that have not made it to disk.
69  *
70  *-------------------------------------------------------------------------
71  */
72 #include "postgres.h"
73 
74 #include <fcntl.h>
75 #include <sys/stat.h>
76 #include <time.h>
77 #include <unistd.h>
78 
79 #include "access/commit_ts.h"
80 #include "access/htup_details.h"
81 #include "access/subtrans.h"
82 #include "access/transam.h"
83 #include "access/twophase.h"
84 #include "access/twophase_rmgr.h"
85 #include "access/xact.h"
86 #include "access/xlog.h"
87 #include "access/xloginsert.h"
88 #include "access/xlogreader.h"
89 #include "access/xlogutils.h"
90 #include "catalog/pg_type.h"
91 #include "catalog/storage.h"
92 #include "funcapi.h"
93 #include "miscadmin.h"
94 #include "pg_trace.h"
95 #include "pgstat.h"
96 #include "replication/origin.h"
97 #include "replication/syncrep.h"
98 #include "replication/walsender.h"
99 #include "storage/fd.h"
100 #include "storage/ipc.h"
101 #include "storage/md.h"
102 #include "storage/predicate.h"
103 #include "storage/proc.h"
104 #include "storage/procarray.h"
105 #include "storage/sinvaladt.h"
106 #include "storage/smgr.h"
107 #include "utils/builtins.h"
108 #include "utils/memutils.h"
109 #include "utils/timestamp.h"
110 
111 /*
112  * Directory where Two-phase commit files reside within PGDATA
113  */
114 #define TWOPHASE_DIR "pg_twophase"
115 
116 /* GUC variable, can't be changed after startup */
117 int			max_prepared_xacts = 0;
118 
119 /*
120  * This struct describes one global transaction that is in prepared state
121  * or attempting to become prepared.
122  *
123  * The lifecycle of a global transaction is:
124  *
125  * 1. After checking that the requested GID is not in use, set up an entry in
126  * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127  * and mark it as locked by my backend.
128  *
129  * 2. After successfully completing prepare, set valid = true and enter the
130  * referenced PGPROC into the global ProcArray.
131  *
132  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133  * valid and not locked, then mark the entry as locked by storing my current
134  * backend ID into locking_backend.  This prevents concurrent attempts to
135  * commit or rollback the same prepared xact.
136  *
137  * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138  * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
139  * the freelist.
140  *
141  * Note that if the preparing transaction fails between steps 1 and 2, the
142  * entry must be removed so that the GID and the GlobalTransaction struct
143  * can be reused.  See AtAbort_Twophase().
144  *
145  * typedef struct GlobalTransactionData *GlobalTransaction appears in
146  * twophase.h
147  */
148 
149 typedef struct GlobalTransactionData
150 {
151 	GlobalTransaction next;		/* list link for free list */
152 	int			pgprocno;		/* ID of associated dummy PGPROC */
153 	BackendId	dummyBackendId; /* similar to backend id for backends */
154 	TimestampTz prepared_at;	/* time of preparation */
155 
156 	/*
157 	 * Note that we need to keep track of two LSNs for each GXACT. We keep
158 	 * track of the start LSN because this is the address we must use to read
159 	 * state data back from WAL when committing a prepared GXACT. We keep
160 	 * track of the end LSN because that is the LSN we need to wait for prior
161 	 * to commit.
162 	 */
163 	XLogRecPtr	prepare_start_lsn;	/* XLOG offset of prepare record start */
164 	XLogRecPtr	prepare_end_lsn;	/* XLOG offset of prepare record end */
165 	TransactionId xid;			/* The GXACT id */
166 
167 	Oid			owner;			/* ID of user that executed the xact */
168 	BackendId	locking_backend;	/* backend currently working on the xact */
169 	bool		valid;			/* true if PGPROC entry is in proc array */
170 	bool		ondisk;			/* true if prepare state file is on disk */
171 	bool		inredo;			/* true if entry was added via xlog_redo */
172 	char		gid[GIDSIZE];	/* The GID assigned to the prepared xact */
173 }			GlobalTransactionData;
174 
175 /*
176  * Two Phase Commit shared state.  Access to this struct is protected
177  * by TwoPhaseStateLock.
178  */
179 typedef struct TwoPhaseStateData
180 {
181 	/* Head of linked list of free GlobalTransactionData structs */
182 	GlobalTransaction freeGXacts;
183 
184 	/* Number of valid prepXacts entries. */
185 	int			numPrepXacts;
186 
187 	/* There are max_prepared_xacts items in this array */
188 	GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
189 } TwoPhaseStateData;
190 
191 static TwoPhaseStateData *TwoPhaseState;
192 
193 /*
194  * Global transaction entry currently locked by us, if any.  Note that any
195  * access to the entry pointed to by this variable must be protected by
196  * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
197  * (since it's just local memory).
198  */
199 static GlobalTransaction MyLockedGxact = NULL;
200 
201 static bool twophaseExitRegistered = false;
202 
203 static void RecordTransactionCommitPrepared(TransactionId xid,
204 											int nchildren,
205 											TransactionId *children,
206 											int nrels,
207 											RelFileNode *rels,
208 											int ninvalmsgs,
209 											SharedInvalidationMessage *invalmsgs,
210 											bool initfileinval,
211 											const char *gid);
212 static void RecordTransactionAbortPrepared(TransactionId xid,
213 										   int nchildren,
214 										   TransactionId *children,
215 										   int nrels,
216 										   RelFileNode *rels,
217 										   const char *gid);
218 static void ProcessRecords(char *bufptr, TransactionId xid,
219 						   const TwoPhaseCallback callbacks[]);
220 static void RemoveGXact(GlobalTransaction gxact);
221 
222 static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
223 static char *ProcessTwoPhaseBuffer(TransactionId xid,
224 								   XLogRecPtr prepare_start_lsn,
225 								   bool fromdisk, bool setParent, bool setNextXid);
226 static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
227 								const char *gid, TimestampTz prepared_at, Oid owner,
228 								Oid databaseid);
229 static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
230 static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
231 
232 /*
233  * Initialization of shared memory
234  */
235 Size
TwoPhaseShmemSize(void)236 TwoPhaseShmemSize(void)
237 {
238 	Size		size;
239 
240 	/* Need the fixed struct, the array of pointers, and the GTD structs */
241 	size = offsetof(TwoPhaseStateData, prepXacts);
242 	size = add_size(size, mul_size(max_prepared_xacts,
243 								   sizeof(GlobalTransaction)));
244 	size = MAXALIGN(size);
245 	size = add_size(size, mul_size(max_prepared_xacts,
246 								   sizeof(GlobalTransactionData)));
247 
248 	return size;
249 }
250 
251 void
TwoPhaseShmemInit(void)252 TwoPhaseShmemInit(void)
253 {
254 	bool		found;
255 
256 	TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
257 									TwoPhaseShmemSize(),
258 									&found);
259 	if (!IsUnderPostmaster)
260 	{
261 		GlobalTransaction gxacts;
262 		int			i;
263 
264 		Assert(!found);
265 		TwoPhaseState->freeGXacts = NULL;
266 		TwoPhaseState->numPrepXacts = 0;
267 
268 		/*
269 		 * Initialize the linked list of free GlobalTransactionData structs
270 		 */
271 		gxacts = (GlobalTransaction)
272 			((char *) TwoPhaseState +
273 			 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
274 					  sizeof(GlobalTransaction) * max_prepared_xacts));
275 		for (i = 0; i < max_prepared_xacts; i++)
276 		{
277 			/* insert into linked list */
278 			gxacts[i].next = TwoPhaseState->freeGXacts;
279 			TwoPhaseState->freeGXacts = &gxacts[i];
280 
281 			/* associate it with a PGPROC assigned by InitProcGlobal */
282 			gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
283 
284 			/*
285 			 * Assign a unique ID for each dummy proc, so that the range of
286 			 * dummy backend IDs immediately follows the range of normal
287 			 * backend IDs. We don't dare to assign a real backend ID to dummy
288 			 * procs, because prepared transactions don't take part in cache
289 			 * invalidation like a real backend ID would imply, but having a
290 			 * unique ID for them is nevertheless handy. This arrangement
291 			 * allows you to allocate an array of size (MaxBackends +
292 			 * max_prepared_xacts + 1), and have a slot for every backend and
293 			 * prepared transaction. Currently multixact.c uses that
294 			 * technique.
295 			 */
296 			gxacts[i].dummyBackendId = MaxBackends + 1 + i;
297 		}
298 	}
299 	else
300 		Assert(found);
301 }
302 
303 /*
304  * Exit hook to unlock the global transaction entry we're working on.
305  */
306 static void
AtProcExit_Twophase(int code,Datum arg)307 AtProcExit_Twophase(int code, Datum arg)
308 {
309 	/* same logic as abort */
310 	AtAbort_Twophase();
311 }
312 
313 /*
314  * Abort hook to unlock the global transaction entry we're working on.
315  */
316 void
AtAbort_Twophase(void)317 AtAbort_Twophase(void)
318 {
319 	if (MyLockedGxact == NULL)
320 		return;
321 
322 	/*
323 	 * What to do with the locked global transaction entry?  If we were in the
324 	 * process of preparing the transaction, but haven't written the WAL
325 	 * record and state file yet, the transaction must not be considered as
326 	 * prepared.  Likewise, if we are in the process of finishing an
327 	 * already-prepared transaction, and fail after having already written the
328 	 * 2nd phase commit or rollback record to the WAL, the transaction should
329 	 * not be considered as prepared anymore.  In those cases, just remove the
330 	 * entry from shared memory.
331 	 *
332 	 * Otherwise, the entry must be left in place so that the transaction can
333 	 * be finished later, so just unlock it.
334 	 *
335 	 * If we abort during prepare, after having written the WAL record, we
336 	 * might not have transferred all locks and other state to the prepared
337 	 * transaction yet.  Likewise, if we abort during commit or rollback,
338 	 * after having written the WAL record, we might not have released all the
339 	 * resources held by the transaction yet.  In those cases, the in-memory
340 	 * state can be wrong, but it's too late to back out.
341 	 */
342 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
343 	if (!MyLockedGxact->valid)
344 		RemoveGXact(MyLockedGxact);
345 	else
346 		MyLockedGxact->locking_backend = InvalidBackendId;
347 	LWLockRelease(TwoPhaseStateLock);
348 
349 	MyLockedGxact = NULL;
350 }
351 
352 /*
353  * This is called after we have finished transferring state to the prepared
354  * PGPROC entry.
355  */
356 void
PostPrepare_Twophase(void)357 PostPrepare_Twophase(void)
358 {
359 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
360 	MyLockedGxact->locking_backend = InvalidBackendId;
361 	LWLockRelease(TwoPhaseStateLock);
362 
363 	MyLockedGxact = NULL;
364 }
365 
366 
367 /*
368  * MarkAsPreparing
369  *		Reserve the GID for the given transaction.
370  */
371 GlobalTransaction
MarkAsPreparing(TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)372 MarkAsPreparing(TransactionId xid, const char *gid,
373 				TimestampTz prepared_at, Oid owner, Oid databaseid)
374 {
375 	GlobalTransaction gxact;
376 	int			i;
377 
378 	if (strlen(gid) >= GIDSIZE)
379 		ereport(ERROR,
380 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
381 				 errmsg("transaction identifier \"%s\" is too long",
382 						gid)));
383 
384 	/* fail immediately if feature is disabled */
385 	if (max_prepared_xacts == 0)
386 		ereport(ERROR,
387 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
388 				 errmsg("prepared transactions are disabled"),
389 				 errhint("Set max_prepared_transactions to a nonzero value.")));
390 
391 	/* on first call, register the exit hook */
392 	if (!twophaseExitRegistered)
393 	{
394 		before_shmem_exit(AtProcExit_Twophase, 0);
395 		twophaseExitRegistered = true;
396 	}
397 
398 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
399 
400 	/* Check for conflicting GID */
401 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
402 	{
403 		gxact = TwoPhaseState->prepXacts[i];
404 		if (strcmp(gxact->gid, gid) == 0)
405 		{
406 			ereport(ERROR,
407 					(errcode(ERRCODE_DUPLICATE_OBJECT),
408 					 errmsg("transaction identifier \"%s\" is already in use",
409 							gid)));
410 		}
411 	}
412 
413 	/* Get a free gxact from the freelist */
414 	if (TwoPhaseState->freeGXacts == NULL)
415 		ereport(ERROR,
416 				(errcode(ERRCODE_OUT_OF_MEMORY),
417 				 errmsg("maximum number of prepared transactions reached"),
418 				 errhint("Increase max_prepared_transactions (currently %d).",
419 						 max_prepared_xacts)));
420 	gxact = TwoPhaseState->freeGXacts;
421 	TwoPhaseState->freeGXacts = gxact->next;
422 
423 	MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
424 
425 	gxact->ondisk = false;
426 
427 	/* And insert it into the active array */
428 	Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
429 	TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
430 
431 	LWLockRelease(TwoPhaseStateLock);
432 
433 	return gxact;
434 }
435 
436 /*
437  * MarkAsPreparingGuts
438  *
439  * This uses a gxact struct and puts it into the active array.
440  * NOTE: this is also used when reloading a gxact after a crash; so avoid
441  * assuming that we can use very much backend context.
442  *
443  * Note: This function should be called with appropriate locks held.
444  */
445 static void
MarkAsPreparingGuts(GlobalTransaction gxact,TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)446 MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
447 					TimestampTz prepared_at, Oid owner, Oid databaseid)
448 {
449 	PGPROC	   *proc;
450 	int			i;
451 
452 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
453 
454 	Assert(gxact != NULL);
455 	proc = &ProcGlobal->allProcs[gxact->pgprocno];
456 
457 	/* Initialize the PGPROC entry */
458 	MemSet(proc, 0, sizeof(PGPROC));
459 	proc->pgprocno = gxact->pgprocno;
460 	SHMQueueElemInit(&(proc->links));
461 	proc->waitStatus = PROC_WAIT_STATUS_OK;
462 	if (LocalTransactionIdIsValid(MyProc->lxid))
463 	{
464 		/* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
465 		proc->lxid = MyProc->lxid;
466 		proc->backendId = MyBackendId;
467 	}
468 	else
469 	{
470 		Assert(AmStartupProcess() || !IsPostmasterEnvironment);
471 		/* GetLockConflicts() uses this to specify a wait on the XID */
472 		proc->lxid = xid;
473 		proc->backendId = InvalidBackendId;
474 	}
475 	proc->xid = xid;
476 	Assert(proc->xmin == InvalidTransactionId);
477 	proc->delayChkpt = false;
478 	proc->statusFlags = 0;
479 	proc->pid = 0;
480 	proc->databaseId = databaseid;
481 	proc->roleId = owner;
482 	proc->tempNamespaceId = InvalidOid;
483 	proc->isBackgroundWorker = false;
484 	proc->lwWaiting = false;
485 	proc->lwWaitMode = 0;
486 	proc->waitLock = NULL;
487 	proc->waitProcLock = NULL;
488 	pg_atomic_init_u64(&proc->waitStart, 0);
489 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
490 		SHMQueueInit(&(proc->myProcLocks[i]));
491 	/* subxid data must be filled later by GXactLoadSubxactData */
492 	proc->subxidStatus.overflowed = false;
493 	proc->subxidStatus.count = 0;
494 
495 	gxact->prepared_at = prepared_at;
496 	gxact->xid = xid;
497 	gxact->owner = owner;
498 	gxact->locking_backend = MyBackendId;
499 	gxact->valid = false;
500 	gxact->inredo = false;
501 	strcpy(gxact->gid, gid);
502 
503 	/*
504 	 * Remember that we have this GlobalTransaction entry locked for us. If we
505 	 * abort after this, we must release it.
506 	 */
507 	MyLockedGxact = gxact;
508 }
509 
510 /*
511  * GXactLoadSubxactData
512  *
513  * If the transaction being persisted had any subtransactions, this must
514  * be called before MarkAsPrepared() to load information into the dummy
515  * PGPROC.
516  */
517 static void
GXactLoadSubxactData(GlobalTransaction gxact,int nsubxacts,TransactionId * children)518 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
519 					 TransactionId *children)
520 {
521 	PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
522 
523 	/* We need no extra lock since the GXACT isn't valid yet */
524 	if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
525 	{
526 		proc->subxidStatus.overflowed = true;
527 		nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
528 	}
529 	if (nsubxacts > 0)
530 	{
531 		memcpy(proc->subxids.xids, children,
532 			   nsubxacts * sizeof(TransactionId));
533 		proc->subxidStatus.count = nsubxacts;
534 	}
535 }
536 
537 /*
538  * MarkAsPrepared
539  *		Mark the GXACT as fully valid, and enter it into the global ProcArray.
540  *
541  * lock_held indicates whether caller already holds TwoPhaseStateLock.
542  */
543 static void
MarkAsPrepared(GlobalTransaction gxact,bool lock_held)544 MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
545 {
546 	/* Lock here may be overkill, but I'm not convinced of that ... */
547 	if (!lock_held)
548 		LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
549 	Assert(!gxact->valid);
550 	gxact->valid = true;
551 	if (!lock_held)
552 		LWLockRelease(TwoPhaseStateLock);
553 
554 	/*
555 	 * Put it into the global ProcArray so TransactionIdIsInProgress considers
556 	 * the XID as still running.
557 	 */
558 	ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
559 }
560 
561 /*
562  * LockGXact
563  *		Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
564  */
565 static GlobalTransaction
LockGXact(const char * gid,Oid user)566 LockGXact(const char *gid, Oid user)
567 {
568 	int			i;
569 
570 	/* on first call, register the exit hook */
571 	if (!twophaseExitRegistered)
572 	{
573 		before_shmem_exit(AtProcExit_Twophase, 0);
574 		twophaseExitRegistered = true;
575 	}
576 
577 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
578 
579 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
580 	{
581 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
582 		PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
583 
584 		/* Ignore not-yet-valid GIDs */
585 		if (!gxact->valid)
586 			continue;
587 		if (strcmp(gxact->gid, gid) != 0)
588 			continue;
589 
590 		/* Found it, but has someone else got it locked? */
591 		if (gxact->locking_backend != InvalidBackendId)
592 			ereport(ERROR,
593 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
594 					 errmsg("prepared transaction with identifier \"%s\" is busy",
595 							gid)));
596 
597 		if (user != gxact->owner && !superuser_arg(user))
598 			ereport(ERROR,
599 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
600 					 errmsg("permission denied to finish prepared transaction"),
601 					 errhint("Must be superuser or the user that prepared the transaction.")));
602 
603 		/*
604 		 * Note: it probably would be possible to allow committing from
605 		 * another database; but at the moment NOTIFY is known not to work and
606 		 * there may be some other issues as well.  Hence disallow until
607 		 * someone gets motivated to make it work.
608 		 */
609 		if (MyDatabaseId != proc->databaseId)
610 			ereport(ERROR,
611 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
612 					 errmsg("prepared transaction belongs to another database"),
613 					 errhint("Connect to the database where the transaction was prepared to finish it.")));
614 
615 		/* OK for me to lock it */
616 		gxact->locking_backend = MyBackendId;
617 		MyLockedGxact = gxact;
618 
619 		LWLockRelease(TwoPhaseStateLock);
620 
621 		return gxact;
622 	}
623 
624 	LWLockRelease(TwoPhaseStateLock);
625 
626 	ereport(ERROR,
627 			(errcode(ERRCODE_UNDEFINED_OBJECT),
628 			 errmsg("prepared transaction with identifier \"%s\" does not exist",
629 					gid)));
630 
631 	/* NOTREACHED */
632 	return NULL;
633 }
634 
635 /*
636  * RemoveGXact
637  *		Remove the prepared transaction from the shared memory array.
638  *
639  * NB: caller should have already removed it from ProcArray
640  */
641 static void
RemoveGXact(GlobalTransaction gxact)642 RemoveGXact(GlobalTransaction gxact)
643 {
644 	int			i;
645 
646 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
647 
648 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
649 	{
650 		if (gxact == TwoPhaseState->prepXacts[i])
651 		{
652 			/* remove from the active array */
653 			TwoPhaseState->numPrepXacts--;
654 			TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
655 
656 			/* and put it back in the freelist */
657 			gxact->next = TwoPhaseState->freeGXacts;
658 			TwoPhaseState->freeGXacts = gxact;
659 
660 			return;
661 		}
662 	}
663 
664 	elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
665 }
666 
667 /*
668  * Returns an array of all prepared transactions for the user-level
669  * function pg_prepared_xact.
670  *
671  * The returned array and all its elements are copies of internal data
672  * structures, to minimize the time we need to hold the TwoPhaseStateLock.
673  *
674  * WARNING -- we return even those transactions that are not fully prepared
675  * yet.  The caller should filter them out if he doesn't want them.
676  *
677  * The returned array is palloc'd.
678  */
679 static int
GetPreparedTransactionList(GlobalTransaction * gxacts)680 GetPreparedTransactionList(GlobalTransaction *gxacts)
681 {
682 	GlobalTransaction array;
683 	int			num;
684 	int			i;
685 
686 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
687 
688 	if (TwoPhaseState->numPrepXacts == 0)
689 	{
690 		LWLockRelease(TwoPhaseStateLock);
691 
692 		*gxacts = NULL;
693 		return 0;
694 	}
695 
696 	num = TwoPhaseState->numPrepXacts;
697 	array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
698 	*gxacts = array;
699 	for (i = 0; i < num; i++)
700 		memcpy(array + i, TwoPhaseState->prepXacts[i],
701 			   sizeof(GlobalTransactionData));
702 
703 	LWLockRelease(TwoPhaseStateLock);
704 
705 	return num;
706 }
707 
708 
709 /* Working status for pg_prepared_xact */
710 typedef struct
711 {
712 	GlobalTransaction array;
713 	int			ngxacts;
714 	int			currIdx;
715 } Working_State;
716 
717 /*
718  * pg_prepared_xact
719  *		Produce a view with one row per prepared transaction.
720  *
721  * This function is here so we don't have to export the
722  * GlobalTransactionData struct definition.
723  */
724 Datum
pg_prepared_xact(PG_FUNCTION_ARGS)725 pg_prepared_xact(PG_FUNCTION_ARGS)
726 {
727 	FuncCallContext *funcctx;
728 	Working_State *status;
729 
730 	if (SRF_IS_FIRSTCALL())
731 	{
732 		TupleDesc	tupdesc;
733 		MemoryContext oldcontext;
734 
735 		/* create a function context for cross-call persistence */
736 		funcctx = SRF_FIRSTCALL_INIT();
737 
738 		/*
739 		 * Switch to memory context appropriate for multiple function calls
740 		 */
741 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
742 
743 		/* build tupdesc for result tuples */
744 		/* this had better match pg_prepared_xacts view in system_views.sql */
745 		tupdesc = CreateTemplateTupleDesc(5);
746 		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
747 						   XIDOID, -1, 0);
748 		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
749 						   TEXTOID, -1, 0);
750 		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
751 						   TIMESTAMPTZOID, -1, 0);
752 		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
753 						   OIDOID, -1, 0);
754 		TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
755 						   OIDOID, -1, 0);
756 
757 		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
758 
759 		/*
760 		 * Collect all the 2PC status information that we will format and send
761 		 * out as a result set.
762 		 */
763 		status = (Working_State *) palloc(sizeof(Working_State));
764 		funcctx->user_fctx = (void *) status;
765 
766 		status->ngxacts = GetPreparedTransactionList(&status->array);
767 		status->currIdx = 0;
768 
769 		MemoryContextSwitchTo(oldcontext);
770 	}
771 
772 	funcctx = SRF_PERCALL_SETUP();
773 	status = (Working_State *) funcctx->user_fctx;
774 
775 	while (status->array != NULL && status->currIdx < status->ngxacts)
776 	{
777 		GlobalTransaction gxact = &status->array[status->currIdx++];
778 		PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
779 		Datum		values[5];
780 		bool		nulls[5];
781 		HeapTuple	tuple;
782 		Datum		result;
783 
784 		if (!gxact->valid)
785 			continue;
786 
787 		/*
788 		 * Form tuple with appropriate data.
789 		 */
790 		MemSet(values, 0, sizeof(values));
791 		MemSet(nulls, 0, sizeof(nulls));
792 
793 		values[0] = TransactionIdGetDatum(proc->xid);
794 		values[1] = CStringGetTextDatum(gxact->gid);
795 		values[2] = TimestampTzGetDatum(gxact->prepared_at);
796 		values[3] = ObjectIdGetDatum(gxact->owner);
797 		values[4] = ObjectIdGetDatum(proc->databaseId);
798 
799 		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
800 		result = HeapTupleGetDatum(tuple);
801 		SRF_RETURN_NEXT(funcctx, result);
802 	}
803 
804 	SRF_RETURN_DONE(funcctx);
805 }
806 
807 /*
808  * TwoPhaseGetGXact
809  *		Get the GlobalTransaction struct for a prepared transaction
810  *		specified by XID
811  *
812  * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
813  * caller had better hold it.
814  */
815 static GlobalTransaction
TwoPhaseGetGXact(TransactionId xid,bool lock_held)816 TwoPhaseGetGXact(TransactionId xid, bool lock_held)
817 {
818 	GlobalTransaction result = NULL;
819 	int			i;
820 
821 	static TransactionId cached_xid = InvalidTransactionId;
822 	static GlobalTransaction cached_gxact = NULL;
823 
824 	Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
825 
826 	/*
827 	 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
828 	 * repeatedly for the same XID.  We can save work with a simple cache.
829 	 */
830 	if (xid == cached_xid)
831 		return cached_gxact;
832 
833 	if (!lock_held)
834 		LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
835 
836 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
837 	{
838 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
839 
840 		if (gxact->xid == xid)
841 		{
842 			result = gxact;
843 			break;
844 		}
845 	}
846 
847 	if (!lock_held)
848 		LWLockRelease(TwoPhaseStateLock);
849 
850 	if (result == NULL)			/* should not happen */
851 		elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
852 
853 	cached_xid = xid;
854 	cached_gxact = result;
855 
856 	return result;
857 }
858 
859 /*
860  * TwoPhaseGetXidByVirtualXID
861  *		Lookup VXID among xacts prepared since last startup.
862  *
863  * (This won't find recovered xacts.)  If more than one matches, return any
864  * and set "have_more" to true.  To witness multiple matches, a single
865  * BackendId must consume 2^32 LXIDs, with no intervening database restart.
866  */
867 TransactionId
TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,bool * have_more)868 TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
869 						   bool *have_more)
870 {
871 	int			i;
872 	TransactionId result = InvalidTransactionId;
873 
874 	Assert(VirtualTransactionIdIsValid(vxid));
875 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
876 
877 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
878 	{
879 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
880 		PGPROC	   *proc;
881 		VirtualTransactionId proc_vxid;
882 
883 		if (!gxact->valid)
884 			continue;
885 		proc = &ProcGlobal->allProcs[gxact->pgprocno];
886 		GET_VXID_FROM_PGPROC(proc_vxid, *proc);
887 		if (VirtualTransactionIdEquals(vxid, proc_vxid))
888 		{
889 			/* Startup process sets proc->backendId to InvalidBackendId. */
890 			Assert(!gxact->inredo);
891 
892 			if (result != InvalidTransactionId)
893 			{
894 				*have_more = true;
895 				break;
896 			}
897 			result = gxact->xid;
898 		}
899 	}
900 
901 	LWLockRelease(TwoPhaseStateLock);
902 
903 	return result;
904 }
905 
906 /*
907  * TwoPhaseGetDummyBackendId
908  *		Get the dummy backend ID for prepared transaction specified by XID
909  *
910  * Dummy backend IDs are similar to real backend IDs of real backends.
911  * They start at MaxBackends + 1, and are unique across all currently active
912  * real backends and prepared transactions.  If lock_held is set to true,
913  * TwoPhaseStateLock will not be taken, so the caller had better hold it.
914  */
915 BackendId
TwoPhaseGetDummyBackendId(TransactionId xid,bool lock_held)916 TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held)
917 {
918 	GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
919 
920 	return gxact->dummyBackendId;
921 }
922 
923 /*
924  * TwoPhaseGetDummyProc
925  *		Get the PGPROC that represents a prepared transaction specified by XID
926  *
927  * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
928  * caller had better hold it.
929  */
930 PGPROC *
TwoPhaseGetDummyProc(TransactionId xid,bool lock_held)931 TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
932 {
933 	GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
934 
935 	return &ProcGlobal->allProcs[gxact->pgprocno];
936 }
937 
938 /************************************************************************/
939 /* State file support													*/
940 /************************************************************************/
941 
942 #define TwoPhaseFilePath(path, xid) \
943 	snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
944 
945 /*
946  * 2PC state file format:
947  *
948  *	1. TwoPhaseFileHeader
949  *	2. TransactionId[] (subtransactions)
950  *	3. RelFileNode[] (files to be deleted at commit)
951  *	4. RelFileNode[] (files to be deleted at abort)
952  *	5. SharedInvalidationMessage[] (inval messages to be sent at commit)
953  *	6. TwoPhaseRecordOnDisk
954  *	7. ...
955  *	8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
956  *	9. checksum (CRC-32C)
957  *
958  * Each segment except the final checksum is MAXALIGN'd.
959  */
960 
961 /*
962  * Header for a 2PC state file
963  */
964 #define TWOPHASE_MAGIC	0x57F94534	/* format identifier */
965 
966 typedef xl_xact_prepare TwoPhaseFileHeader;
967 
968 /*
969  * Header for each record in a state file
970  *
971  * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
972  * The rmgr data will be stored starting on a MAXALIGN boundary.
973  */
974 typedef struct TwoPhaseRecordOnDisk
975 {
976 	uint32		len;			/* length of rmgr data */
977 	TwoPhaseRmgrId rmid;		/* resource manager for this record */
978 	uint16		info;			/* flag bits for use by rmgr */
979 } TwoPhaseRecordOnDisk;
980 
981 /*
982  * During prepare, the state file is assembled in memory before writing it
983  * to WAL and the actual state file.  We use a chain of StateFileChunk blocks
984  * for that.
985  */
986 typedef struct StateFileChunk
987 {
988 	char	   *data;
989 	uint32		len;
990 	struct StateFileChunk *next;
991 } StateFileChunk;
992 
993 static struct xllist
994 {
995 	StateFileChunk *head;		/* first data block in the chain */
996 	StateFileChunk *tail;		/* last block in chain */
997 	uint32		num_chunks;
998 	uint32		bytes_free;		/* free bytes left in tail block */
999 	uint32		total_len;		/* total data bytes in chain */
1000 }			records;
1001 
1002 
1003 /*
1004  * Append a block of data to records data structure.
1005  *
1006  * NB: each block is padded to a MAXALIGN multiple.  This must be
1007  * accounted for when the file is later read!
1008  *
1009  * The data is copied, so the caller is free to modify it afterwards.
1010  */
1011 static void
save_state_data(const void * data,uint32 len)1012 save_state_data(const void *data, uint32 len)
1013 {
1014 	uint32		padlen = MAXALIGN(len);
1015 
1016 	if (padlen > records.bytes_free)
1017 	{
1018 		records.tail->next = palloc0(sizeof(StateFileChunk));
1019 		records.tail = records.tail->next;
1020 		records.tail->len = 0;
1021 		records.tail->next = NULL;
1022 		records.num_chunks++;
1023 
1024 		records.bytes_free = Max(padlen, 512);
1025 		records.tail->data = palloc(records.bytes_free);
1026 	}
1027 
1028 	memcpy(((char *) records.tail->data) + records.tail->len, data, len);
1029 	records.tail->len += padlen;
1030 	records.bytes_free -= padlen;
1031 	records.total_len += padlen;
1032 }
1033 
1034 /*
1035  * Start preparing a state file.
1036  *
1037  * Initializes data structure and inserts the 2PC file header record.
1038  */
1039 void
StartPrepare(GlobalTransaction gxact)1040 StartPrepare(GlobalTransaction gxact)
1041 {
1042 	PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
1043 	TransactionId xid = gxact->xid;
1044 	TwoPhaseFileHeader hdr;
1045 	TransactionId *children;
1046 	RelFileNode *commitrels;
1047 	RelFileNode *abortrels;
1048 	SharedInvalidationMessage *invalmsgs;
1049 
1050 	/* Initialize linked list */
1051 	records.head = palloc0(sizeof(StateFileChunk));
1052 	records.head->len = 0;
1053 	records.head->next = NULL;
1054 
1055 	records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1056 	records.head->data = palloc(records.bytes_free);
1057 
1058 	records.tail = records.head;
1059 	records.num_chunks = 1;
1060 
1061 	records.total_len = 0;
1062 
1063 	/* Create header */
1064 	hdr.magic = TWOPHASE_MAGIC;
1065 	hdr.total_len = 0;			/* EndPrepare will fill this in */
1066 	hdr.xid = xid;
1067 	hdr.database = proc->databaseId;
1068 	hdr.prepared_at = gxact->prepared_at;
1069 	hdr.owner = gxact->owner;
1070 	hdr.nsubxacts = xactGetCommittedChildren(&children);
1071 	hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1072 	hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1073 	hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1074 														  &hdr.initfileinval);
1075 	hdr.gidlen = strlen(gxact->gid) + 1;	/* Include '\0' */
1076 
1077 	save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1078 	save_state_data(gxact->gid, hdr.gidlen);
1079 
1080 	/*
1081 	 * Add the additional info about subxacts, deletable files and cache
1082 	 * invalidation messages.
1083 	 */
1084 	if (hdr.nsubxacts > 0)
1085 	{
1086 		save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1087 		/* While we have the child-xact data, stuff it in the gxact too */
1088 		GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1089 	}
1090 	if (hdr.ncommitrels > 0)
1091 	{
1092 		save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
1093 		pfree(commitrels);
1094 	}
1095 	if (hdr.nabortrels > 0)
1096 	{
1097 		save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
1098 		pfree(abortrels);
1099 	}
1100 	if (hdr.ninvalmsgs > 0)
1101 	{
1102 		save_state_data(invalmsgs,
1103 						hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1104 		pfree(invalmsgs);
1105 	}
1106 }
1107 
1108 /*
1109  * Finish preparing state data and writing it to WAL.
1110  */
1111 void
EndPrepare(GlobalTransaction gxact)1112 EndPrepare(GlobalTransaction gxact)
1113 {
1114 	TwoPhaseFileHeader *hdr;
1115 	StateFileChunk *record;
1116 	bool		replorigin;
1117 
1118 	/* Add the end sentinel to the list of 2PC records */
1119 	RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1120 						   NULL, 0);
1121 
1122 	/* Go back and fill in total_len in the file header record */
1123 	hdr = (TwoPhaseFileHeader *) records.head->data;
1124 	Assert(hdr->magic == TWOPHASE_MAGIC);
1125 	hdr->total_len = records.total_len + sizeof(pg_crc32c);
1126 
1127 	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1128 				  replorigin_session_origin != DoNotReplicateId);
1129 
1130 	if (replorigin)
1131 	{
1132 		Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
1133 		hdr->origin_lsn = replorigin_session_origin_lsn;
1134 		hdr->origin_timestamp = replorigin_session_origin_timestamp;
1135 	}
1136 	else
1137 	{
1138 		hdr->origin_lsn = InvalidXLogRecPtr;
1139 		hdr->origin_timestamp = 0;
1140 	}
1141 
1142 	/*
1143 	 * If the data size exceeds MaxAllocSize, we won't be able to read it in
1144 	 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1145 	 * where we write data to file and then re-read at commit time.
1146 	 */
1147 	if (hdr->total_len > MaxAllocSize)
1148 		ereport(ERROR,
1149 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1150 				 errmsg("two-phase state file maximum length exceeded")));
1151 
1152 	/*
1153 	 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1154 	 * cover us, so no need to calculate a separate CRC.
1155 	 *
1156 	 * We have to set delayChkpt here, too; otherwise a checkpoint starting
1157 	 * immediately after the WAL record is inserted could complete without
1158 	 * fsync'ing our state file.  (This is essentially the same kind of race
1159 	 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
1160 	 * uses delayChkpt for; see notes there.)
1161 	 *
1162 	 * We save the PREPARE record's location in the gxact for later use by
1163 	 * CheckPointTwoPhase.
1164 	 */
1165 	XLogEnsureRecordSpace(0, records.num_chunks);
1166 
1167 	START_CRIT_SECTION();
1168 
1169 	MyProc->delayChkpt = true;
1170 
1171 	XLogBeginInsert();
1172 	for (record = records.head; record != NULL; record = record->next)
1173 		XLogRegisterData(record->data, record->len);
1174 
1175 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1176 
1177 	gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1178 
1179 	if (replorigin)
1180 	{
1181 		/* Move LSNs forward for this replication origin */
1182 		replorigin_session_advance(replorigin_session_origin_lsn,
1183 								   gxact->prepare_end_lsn);
1184 	}
1185 
1186 	XLogFlush(gxact->prepare_end_lsn);
1187 
1188 	/* If we crash now, we have prepared: WAL replay will fix things */
1189 
1190 	/* Store record's start location to read that later on Commit */
1191 	gxact->prepare_start_lsn = ProcLastRecPtr;
1192 
1193 	/*
1194 	 * Mark the prepared transaction as valid.  As soon as xact.c marks MyProc
1195 	 * as not running our XID (which it will do immediately after this
1196 	 * function returns), others can commit/rollback the xact.
1197 	 *
1198 	 * NB: a side effect of this is to make a dummy ProcArray entry for the
1199 	 * prepared XID.  This must happen before we clear the XID from MyProc /
1200 	 * ProcGlobal->xids[], else there is a window where the XID is not running
1201 	 * according to TransactionIdIsInProgress, and onlookers would be entitled
1202 	 * to assume the xact crashed.  Instead we have a window where the same
1203 	 * XID appears twice in ProcArray, which is OK.
1204 	 */
1205 	MarkAsPrepared(gxact, false);
1206 
1207 	/*
1208 	 * Now we can mark ourselves as out of the commit critical section: a
1209 	 * checkpoint starting after this will certainly see the gxact as a
1210 	 * candidate for fsyncing.
1211 	 */
1212 	MyProc->delayChkpt = false;
1213 
1214 	/*
1215 	 * Remember that we have this GlobalTransaction entry locked for us.  If
1216 	 * we crash after this point, it's too late to abort, but we must unlock
1217 	 * it so that the prepared transaction can be committed or rolled back.
1218 	 */
1219 	MyLockedGxact = gxact;
1220 
1221 	END_CRIT_SECTION();
1222 
1223 	/*
1224 	 * Wait for synchronous replication, if required.
1225 	 *
1226 	 * Note that at this stage we have marked the prepare, but still show as
1227 	 * running in the procarray (twice!) and continue to hold locks.
1228 	 */
1229 	SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1230 
1231 	records.tail = records.head = NULL;
1232 	records.num_chunks = 0;
1233 }
1234 
1235 /*
1236  * Register a 2PC record to be written to state file.
1237  */
1238 void
RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid,uint16 info,const void * data,uint32 len)1239 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1240 					   const void *data, uint32 len)
1241 {
1242 	TwoPhaseRecordOnDisk record;
1243 
1244 	record.rmid = rmid;
1245 	record.info = info;
1246 	record.len = len;
1247 	save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1248 	if (len > 0)
1249 		save_state_data(data, len);
1250 }
1251 
1252 
1253 /*
1254  * Read and validate the state file for xid.
1255  *
1256  * If it looks OK (has a valid magic number and CRC), return the palloc'd
1257  * contents of the file, issuing an error when finding corrupted data.  If
1258  * missing_ok is true, which indicates that missing files can be safely
1259  * ignored, then return NULL.  This state can be reached when doing recovery.
1260  */
1261 static char *
ReadTwoPhaseFile(TransactionId xid,bool missing_ok)1262 ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
1263 {
1264 	char		path[MAXPGPATH];
1265 	char	   *buf;
1266 	TwoPhaseFileHeader *hdr;
1267 	int			fd;
1268 	struct stat stat;
1269 	uint32		crc_offset;
1270 	pg_crc32c	calc_crc,
1271 				file_crc;
1272 	int			r;
1273 
1274 	TwoPhaseFilePath(path, xid);
1275 
1276 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1277 	if (fd < 0)
1278 	{
1279 		if (missing_ok && errno == ENOENT)
1280 			return NULL;
1281 
1282 		ereport(ERROR,
1283 				(errcode_for_file_access(),
1284 				 errmsg("could not open file \"%s\": %m", path)));
1285 	}
1286 
1287 	/*
1288 	 * Check file length.  We can determine a lower bound pretty easily. We
1289 	 * set an upper bound to avoid palloc() failure on a corrupt file, though
1290 	 * we can't guarantee that we won't get an out of memory error anyway,
1291 	 * even on a valid file.
1292 	 */
1293 	if (fstat(fd, &stat))
1294 		ereport(ERROR,
1295 				(errcode_for_file_access(),
1296 				 errmsg("could not stat file \"%s\": %m", path)));
1297 
1298 	if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1299 						MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1300 						sizeof(pg_crc32c)) ||
1301 		stat.st_size > MaxAllocSize)
1302 		ereport(ERROR,
1303 				(errcode(ERRCODE_DATA_CORRUPTED),
1304 				 errmsg_plural("incorrect size of file \"%s\": %lld byte",
1305 							   "incorrect size of file \"%s\": %lld bytes",
1306 							   (long long int) stat.st_size, path,
1307 							   (long long int) stat.st_size)));
1308 
1309 	crc_offset = stat.st_size - sizeof(pg_crc32c);
1310 	if (crc_offset != MAXALIGN(crc_offset))
1311 		ereport(ERROR,
1312 				(errcode(ERRCODE_DATA_CORRUPTED),
1313 				 errmsg("incorrect alignment of CRC offset for file \"%s\"",
1314 						path)));
1315 
1316 	/*
1317 	 * OK, slurp in the file.
1318 	 */
1319 	buf = (char *) palloc(stat.st_size);
1320 
1321 	pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1322 	r = read(fd, buf, stat.st_size);
1323 	if (r != stat.st_size)
1324 	{
1325 		if (r < 0)
1326 			ereport(ERROR,
1327 					(errcode_for_file_access(),
1328 					 errmsg("could not read file \"%s\": %m", path)));
1329 		else
1330 			ereport(ERROR,
1331 					(errmsg("could not read file \"%s\": read %d of %lld",
1332 							path, r, (long long int) stat.st_size)));
1333 	}
1334 
1335 	pgstat_report_wait_end();
1336 
1337 	if (CloseTransientFile(fd) != 0)
1338 		ereport(ERROR,
1339 				(errcode_for_file_access(),
1340 				 errmsg("could not close file \"%s\": %m", path)));
1341 
1342 	hdr = (TwoPhaseFileHeader *) buf;
1343 	if (hdr->magic != TWOPHASE_MAGIC)
1344 		ereport(ERROR,
1345 				(errcode(ERRCODE_DATA_CORRUPTED),
1346 				 errmsg("invalid magic number stored in file \"%s\"",
1347 						path)));
1348 
1349 	if (hdr->total_len != stat.st_size)
1350 		ereport(ERROR,
1351 				(errcode(ERRCODE_DATA_CORRUPTED),
1352 				 errmsg("invalid size stored in file \"%s\"",
1353 						path)));
1354 
1355 	INIT_CRC32C(calc_crc);
1356 	COMP_CRC32C(calc_crc, buf, crc_offset);
1357 	FIN_CRC32C(calc_crc);
1358 
1359 	file_crc = *((pg_crc32c *) (buf + crc_offset));
1360 
1361 	if (!EQ_CRC32C(calc_crc, file_crc))
1362 		ereport(ERROR,
1363 				(errcode(ERRCODE_DATA_CORRUPTED),
1364 				 errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1365 						path)));
1366 
1367 	return buf;
1368 }
1369 
1370 
1371 /*
1372  * Reads 2PC data from xlog. During checkpoint this data will be moved to
1373  * twophase files and ReadTwoPhaseFile should be used instead.
1374  *
1375  * Note clearly that this function can access WAL during normal operation,
1376  * similarly to the way WALSender or Logical Decoding would do.  While
1377  * accessing WAL, read_local_xlog_page() may change ThisTimeLineID,
1378  * particularly if this routine is called for the end-of-recovery checkpoint
1379  * in the checkpointer itself, so save the current timeline number value
1380  * and restore it once done.
1381  */
1382 static void
XlogReadTwoPhaseData(XLogRecPtr lsn,char ** buf,int * len)1383 XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1384 {
1385 	XLogRecord *record;
1386 	XLogReaderState *xlogreader;
1387 	char	   *errormsg;
1388 	TimeLineID	save_currtli = ThisTimeLineID;
1389 
1390 	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1391 									XL_ROUTINE(.page_read = &read_local_xlog_page,
1392 											   .segment_open = &wal_segment_open,
1393 											   .segment_close = &wal_segment_close),
1394 									NULL);
1395 	if (!xlogreader)
1396 		ereport(ERROR,
1397 				(errcode(ERRCODE_OUT_OF_MEMORY),
1398 				 errmsg("out of memory"),
1399 				 errdetail("Failed while allocating a WAL reading processor.")));
1400 
1401 	XLogBeginRead(xlogreader, lsn);
1402 	record = XLogReadRecord(xlogreader, &errormsg);
1403 
1404 	/*
1405 	 * Restore immediately the timeline where it was previously, as
1406 	 * read_local_xlog_page() could have changed it if the record was read
1407 	 * while recovery was finishing or if the timeline has jumped in-between.
1408 	 */
1409 	ThisTimeLineID = save_currtli;
1410 
1411 	if (record == NULL)
1412 		ereport(ERROR,
1413 				(errcode_for_file_access(),
1414 				 errmsg("could not read two-phase state from WAL at %X/%X",
1415 						LSN_FORMAT_ARGS(lsn))));
1416 
1417 	if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1418 		(XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1419 		ereport(ERROR,
1420 				(errcode_for_file_access(),
1421 				 errmsg("expected two-phase state data is not present in WAL at %X/%X",
1422 						LSN_FORMAT_ARGS(lsn))));
1423 
1424 	if (len != NULL)
1425 		*len = XLogRecGetDataLen(xlogreader);
1426 
1427 	*buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1428 	memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1429 
1430 	XLogReaderFree(xlogreader);
1431 }
1432 
1433 
1434 /*
1435  * Confirms an xid is prepared, during recovery
1436  */
1437 bool
StandbyTransactionIdIsPrepared(TransactionId xid)1438 StandbyTransactionIdIsPrepared(TransactionId xid)
1439 {
1440 	char	   *buf;
1441 	TwoPhaseFileHeader *hdr;
1442 	bool		result;
1443 
1444 	Assert(TransactionIdIsValid(xid));
1445 
1446 	if (max_prepared_xacts <= 0)
1447 		return false;			/* nothing to do */
1448 
1449 	/* Read and validate file */
1450 	buf = ReadTwoPhaseFile(xid, true);
1451 	if (buf == NULL)
1452 		return false;
1453 
1454 	/* Check header also */
1455 	hdr = (TwoPhaseFileHeader *) buf;
1456 	result = TransactionIdEquals(hdr->xid, xid);
1457 	pfree(buf);
1458 
1459 	return result;
1460 }
1461 
1462 /*
1463  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1464  */
1465 void
FinishPreparedTransaction(const char * gid,bool isCommit)1466 FinishPreparedTransaction(const char *gid, bool isCommit)
1467 {
1468 	GlobalTransaction gxact;
1469 	PGPROC	   *proc;
1470 	TransactionId xid;
1471 	char	   *buf;
1472 	char	   *bufptr;
1473 	TwoPhaseFileHeader *hdr;
1474 	TransactionId latestXid;
1475 	TransactionId *children;
1476 	RelFileNode *commitrels;
1477 	RelFileNode *abortrels;
1478 	RelFileNode *delrels;
1479 	int			ndelrels;
1480 	SharedInvalidationMessage *invalmsgs;
1481 
1482 	/*
1483 	 * Validate the GID, and lock the GXACT to ensure that two backends do not
1484 	 * try to commit the same GID at once.
1485 	 */
1486 	gxact = LockGXact(gid, GetUserId());
1487 	proc = &ProcGlobal->allProcs[gxact->pgprocno];
1488 	xid = gxact->xid;
1489 
1490 	/*
1491 	 * Read and validate 2PC state data. State data will typically be stored
1492 	 * in WAL files if the LSN is after the last checkpoint record, or moved
1493 	 * to disk if for some reason they have lived for a long time.
1494 	 */
1495 	if (gxact->ondisk)
1496 		buf = ReadTwoPhaseFile(xid, false);
1497 	else
1498 		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1499 
1500 
1501 	/*
1502 	 * Disassemble the header area
1503 	 */
1504 	hdr = (TwoPhaseFileHeader *) buf;
1505 	Assert(TransactionIdEquals(hdr->xid, xid));
1506 	bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1507 	bufptr += MAXALIGN(hdr->gidlen);
1508 	children = (TransactionId *) bufptr;
1509 	bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1510 	commitrels = (RelFileNode *) bufptr;
1511 	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1512 	abortrels = (RelFileNode *) bufptr;
1513 	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1514 	invalmsgs = (SharedInvalidationMessage *) bufptr;
1515 	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1516 
1517 	/* compute latestXid among all children */
1518 	latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1519 
1520 	/* Prevent cancel/die interrupt while cleaning up */
1521 	HOLD_INTERRUPTS();
1522 
1523 	/*
1524 	 * The order of operations here is critical: make the XLOG entry for
1525 	 * commit or abort, then mark the transaction committed or aborted in
1526 	 * pg_xact, then remove its PGPROC from the global ProcArray (which means
1527 	 * TransactionIdIsInProgress will stop saying the prepared xact is in
1528 	 * progress), then run the post-commit or post-abort callbacks. The
1529 	 * callbacks will release the locks the transaction held.
1530 	 */
1531 	if (isCommit)
1532 		RecordTransactionCommitPrepared(xid,
1533 										hdr->nsubxacts, children,
1534 										hdr->ncommitrels, commitrels,
1535 										hdr->ninvalmsgs, invalmsgs,
1536 										hdr->initfileinval, gid);
1537 	else
1538 		RecordTransactionAbortPrepared(xid,
1539 									   hdr->nsubxacts, children,
1540 									   hdr->nabortrels, abortrels,
1541 									   gid);
1542 
1543 	ProcArrayRemove(proc, latestXid);
1544 
1545 	/*
1546 	 * In case we fail while running the callbacks, mark the gxact invalid so
1547 	 * no one else will try to commit/rollback, and so it will be recycled if
1548 	 * we fail after this point.  It is still locked by our backend so it
1549 	 * won't go away yet.
1550 	 *
1551 	 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1552 	 */
1553 	gxact->valid = false;
1554 
1555 	/*
1556 	 * We have to remove any files that were supposed to be dropped. For
1557 	 * consistency with the regular xact.c code paths, must do this before
1558 	 * releasing locks, so do it before running the callbacks.
1559 	 *
1560 	 * NB: this code knows that we couldn't be dropping any temp rels ...
1561 	 */
1562 	if (isCommit)
1563 	{
1564 		delrels = commitrels;
1565 		ndelrels = hdr->ncommitrels;
1566 	}
1567 	else
1568 	{
1569 		delrels = abortrels;
1570 		ndelrels = hdr->nabortrels;
1571 	}
1572 
1573 	/* Make sure files supposed to be dropped are dropped */
1574 	DropRelationFiles(delrels, ndelrels, false);
1575 
1576 	/*
1577 	 * Handle cache invalidation messages.
1578 	 *
1579 	 * Relcache init file invalidation requires processing both before and
1580 	 * after we send the SI messages. See AtEOXact_Inval()
1581 	 */
1582 	if (hdr->initfileinval)
1583 		RelationCacheInitFilePreInvalidate();
1584 	SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1585 	if (hdr->initfileinval)
1586 		RelationCacheInitFilePostInvalidate();
1587 
1588 	/*
1589 	 * Acquire the two-phase lock.  We want to work on the two-phase callbacks
1590 	 * while holding it to avoid potential conflicts with other transactions
1591 	 * attempting to use the same GID, so the lock is released once the shared
1592 	 * memory state is cleared.
1593 	 */
1594 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1595 
1596 	/* And now do the callbacks */
1597 	if (isCommit)
1598 		ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1599 	else
1600 		ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1601 
1602 	PredicateLockTwoPhaseFinish(xid, isCommit);
1603 
1604 	/* Clear shared memory state */
1605 	RemoveGXact(gxact);
1606 
1607 	/*
1608 	 * Release the lock as all callbacks are called and shared memory cleanup
1609 	 * is done.
1610 	 */
1611 	LWLockRelease(TwoPhaseStateLock);
1612 
1613 	/* Count the prepared xact as committed or aborted */
1614 	AtEOXact_PgStat(isCommit, false);
1615 
1616 	/*
1617 	 * And now we can clean up any files we may have left.
1618 	 */
1619 	if (gxact->ondisk)
1620 		RemoveTwoPhaseFile(xid, true);
1621 
1622 	MyLockedGxact = NULL;
1623 
1624 	RESUME_INTERRUPTS();
1625 
1626 	pfree(buf);
1627 }
1628 
1629 /*
1630  * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1631  */
1632 static void
ProcessRecords(char * bufptr,TransactionId xid,const TwoPhaseCallback callbacks[])1633 ProcessRecords(char *bufptr, TransactionId xid,
1634 			   const TwoPhaseCallback callbacks[])
1635 {
1636 	for (;;)
1637 	{
1638 		TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1639 
1640 		Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1641 		if (record->rmid == TWOPHASE_RM_END_ID)
1642 			break;
1643 
1644 		bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1645 
1646 		if (callbacks[record->rmid] != NULL)
1647 			callbacks[record->rmid] (xid, record->info,
1648 									 (void *) bufptr, record->len);
1649 
1650 		bufptr += MAXALIGN(record->len);
1651 	}
1652 }
1653 
1654 /*
1655  * Remove the 2PC file for the specified XID.
1656  *
1657  * If giveWarning is false, do not complain about file-not-present;
1658  * this is an expected case during WAL replay.
1659  */
1660 static void
RemoveTwoPhaseFile(TransactionId xid,bool giveWarning)1661 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1662 {
1663 	char		path[MAXPGPATH];
1664 
1665 	TwoPhaseFilePath(path, xid);
1666 	if (unlink(path))
1667 		if (errno != ENOENT || giveWarning)
1668 			ereport(WARNING,
1669 					(errcode_for_file_access(),
1670 					 errmsg("could not remove file \"%s\": %m", path)));
1671 }
1672 
1673 /*
1674  * Recreates a state file. This is used in WAL replay and during
1675  * checkpoint creation.
1676  *
1677  * Note: content and len don't include CRC.
1678  */
1679 static void
RecreateTwoPhaseFile(TransactionId xid,void * content,int len)1680 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1681 {
1682 	char		path[MAXPGPATH];
1683 	pg_crc32c	statefile_crc;
1684 	int			fd;
1685 
1686 	/* Recompute CRC */
1687 	INIT_CRC32C(statefile_crc);
1688 	COMP_CRC32C(statefile_crc, content, len);
1689 	FIN_CRC32C(statefile_crc);
1690 
1691 	TwoPhaseFilePath(path, xid);
1692 
1693 	fd = OpenTransientFile(path,
1694 						   O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1695 	if (fd < 0)
1696 		ereport(ERROR,
1697 				(errcode_for_file_access(),
1698 				 errmsg("could not recreate file \"%s\": %m", path)));
1699 
1700 	/* Write content and CRC */
1701 	errno = 0;
1702 	pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1703 	if (write(fd, content, len) != len)
1704 	{
1705 		/* if write didn't set errno, assume problem is no disk space */
1706 		if (errno == 0)
1707 			errno = ENOSPC;
1708 		ereport(ERROR,
1709 				(errcode_for_file_access(),
1710 				 errmsg("could not write file \"%s\": %m", path)));
1711 	}
1712 	if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1713 	{
1714 		/* if write didn't set errno, assume problem is no disk space */
1715 		if (errno == 0)
1716 			errno = ENOSPC;
1717 		ereport(ERROR,
1718 				(errcode_for_file_access(),
1719 				 errmsg("could not write file \"%s\": %m", path)));
1720 	}
1721 	pgstat_report_wait_end();
1722 
1723 	/*
1724 	 * We must fsync the file because the end-of-replay checkpoint will not do
1725 	 * so, there being no GXACT in shared memory yet to tell it to.
1726 	 */
1727 	pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1728 	if (pg_fsync(fd) != 0)
1729 		ereport(ERROR,
1730 				(errcode_for_file_access(),
1731 				 errmsg("could not fsync file \"%s\": %m", path)));
1732 	pgstat_report_wait_end();
1733 
1734 	if (CloseTransientFile(fd) != 0)
1735 		ereport(ERROR,
1736 				(errcode_for_file_access(),
1737 				 errmsg("could not close file \"%s\": %m", path)));
1738 }
1739 
1740 /*
1741  * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1742  *
1743  * We must fsync the state file of any GXACT that is valid or has been
1744  * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1745  * horizon.  (If the gxact isn't valid yet, has not been generated in
1746  * redo, or has a later LSN, this checkpoint is not responsible for
1747  * fsyncing it.)
1748  *
1749  * This is deliberately run as late as possible in the checkpoint sequence,
1750  * because GXACTs ordinarily have short lifespans, and so it is quite
1751  * possible that GXACTs that were valid at checkpoint start will no longer
1752  * exist if we wait a little bit. With typical checkpoint settings this
1753  * will be about 3 minutes for an online checkpoint, so as a result we
1754  * expect that there will be no GXACTs that need to be copied to disk.
1755  *
1756  * If a GXACT remains valid across multiple checkpoints, it will already
1757  * be on disk so we don't bother to repeat that write.
1758  */
1759 void
CheckPointTwoPhase(XLogRecPtr redo_horizon)1760 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1761 {
1762 	int			i;
1763 	int			serialized_xacts = 0;
1764 
1765 	if (max_prepared_xacts <= 0)
1766 		return;					/* nothing to do */
1767 
1768 	TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1769 
1770 	/*
1771 	 * We are expecting there to be zero GXACTs that need to be copied to
1772 	 * disk, so we perform all I/O while holding TwoPhaseStateLock for
1773 	 * simplicity. This prevents any new xacts from preparing while this
1774 	 * occurs, which shouldn't be a problem since the presence of long-lived
1775 	 * prepared xacts indicates the transaction manager isn't active.
1776 	 *
1777 	 * It's also possible to move I/O out of the lock, but on every error we
1778 	 * should check whether somebody committed our transaction in different
1779 	 * backend. Let's leave this optimization for future, if somebody will
1780 	 * spot that this place cause bottleneck.
1781 	 *
1782 	 * Note that it isn't possible for there to be a GXACT with a
1783 	 * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1784 	 * because of the efforts with delayChkpt.
1785 	 */
1786 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1787 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1788 	{
1789 		/*
1790 		 * Note that we are using gxact not PGPROC so this works in recovery
1791 		 * also
1792 		 */
1793 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1794 
1795 		if ((gxact->valid || gxact->inredo) &&
1796 			!gxact->ondisk &&
1797 			gxact->prepare_end_lsn <= redo_horizon)
1798 		{
1799 			char	   *buf;
1800 			int			len;
1801 
1802 			XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1803 			RecreateTwoPhaseFile(gxact->xid, buf, len);
1804 			gxact->ondisk = true;
1805 			gxact->prepare_start_lsn = InvalidXLogRecPtr;
1806 			gxact->prepare_end_lsn = InvalidXLogRecPtr;
1807 			pfree(buf);
1808 			serialized_xacts++;
1809 		}
1810 	}
1811 	LWLockRelease(TwoPhaseStateLock);
1812 
1813 	/*
1814 	 * Flush unconditionally the parent directory to make any information
1815 	 * durable on disk.  Two-phase files could have been removed and those
1816 	 * removals need to be made persistent as well as any files newly created
1817 	 * previously since the last checkpoint.
1818 	 */
1819 	fsync_fname(TWOPHASE_DIR, true);
1820 
1821 	TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1822 
1823 	if (log_checkpoints && serialized_xacts > 0)
1824 		ereport(LOG,
1825 				(errmsg_plural("%u two-phase state file was written "
1826 							   "for a long-running prepared transaction",
1827 							   "%u two-phase state files were written "
1828 							   "for long-running prepared transactions",
1829 							   serialized_xacts,
1830 							   serialized_xacts)));
1831 }
1832 
1833 /*
1834  * restoreTwoPhaseData
1835  *
1836  * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1837  * This is called once at the beginning of recovery, saving any extra
1838  * lookups in the future.  Two-phase files that are newer than the
1839  * minimum XID horizon are discarded on the way.
1840  */
1841 void
restoreTwoPhaseData(void)1842 restoreTwoPhaseData(void)
1843 {
1844 	DIR		   *cldir;
1845 	struct dirent *clde;
1846 
1847 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1848 	cldir = AllocateDir(TWOPHASE_DIR);
1849 	while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1850 	{
1851 		if (strlen(clde->d_name) == 8 &&
1852 			strspn(clde->d_name, "0123456789ABCDEF") == 8)
1853 		{
1854 			TransactionId xid;
1855 			char	   *buf;
1856 
1857 			xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1858 
1859 			buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1860 										true, false, false);
1861 			if (buf == NULL)
1862 				continue;
1863 
1864 			PrepareRedoAdd(buf, InvalidXLogRecPtr,
1865 						   InvalidXLogRecPtr, InvalidRepOriginId);
1866 		}
1867 	}
1868 	LWLockRelease(TwoPhaseStateLock);
1869 	FreeDir(cldir);
1870 }
1871 
1872 /*
1873  * PrescanPreparedTransactions
1874  *
1875  * Scan the shared memory entries of TwoPhaseState and determine the range
1876  * of valid XIDs present.  This is run during database startup, after we
1877  * have completed reading WAL.  ShmemVariableCache->nextXid has been set to
1878  * one more than the highest XID for which evidence exists in WAL.
1879  *
1880  * We throw away any prepared xacts with main XID beyond nextXid --- if any
1881  * are present, it suggests that the DBA has done a PITR recovery to an
1882  * earlier point in time without cleaning out pg_twophase.  We dare not
1883  * try to recover such prepared xacts since they likely depend on database
1884  * state that doesn't exist now.
1885  *
1886  * However, we will advance nextXid beyond any subxact XIDs belonging to
1887  * valid prepared xacts.  We need to do this since subxact commit doesn't
1888  * write a WAL entry, and so there might be no evidence in WAL of those
1889  * subxact XIDs.
1890  *
1891  * On corrupted two-phase files, fail immediately.  Keeping around broken
1892  * entries and let replay continue causes harm on the system, and a new
1893  * backup should be rolled in.
1894  *
1895  * Our other responsibility is to determine and return the oldest valid XID
1896  * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1897  * This is needed to synchronize pg_subtrans startup properly.
1898  *
1899  * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1900  * top-level xids is stored in *xids_p. The number of entries in the array
1901  * is returned in *nxids_p.
1902  */
1903 TransactionId
PrescanPreparedTransactions(TransactionId ** xids_p,int * nxids_p)1904 PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1905 {
1906 	FullTransactionId nextXid = ShmemVariableCache->nextXid;
1907 	TransactionId origNextXid = XidFromFullTransactionId(nextXid);
1908 	TransactionId result = origNextXid;
1909 	TransactionId *xids = NULL;
1910 	int			nxids = 0;
1911 	int			allocsize = 0;
1912 	int			i;
1913 
1914 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1915 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1916 	{
1917 		TransactionId xid;
1918 		char	   *buf;
1919 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1920 
1921 		Assert(gxact->inredo);
1922 
1923 		xid = gxact->xid;
1924 
1925 		buf = ProcessTwoPhaseBuffer(xid,
1926 									gxact->prepare_start_lsn,
1927 									gxact->ondisk, false, true);
1928 
1929 		if (buf == NULL)
1930 			continue;
1931 
1932 		/*
1933 		 * OK, we think this file is valid.  Incorporate xid into the
1934 		 * running-minimum result.
1935 		 */
1936 		if (TransactionIdPrecedes(xid, result))
1937 			result = xid;
1938 
1939 		if (xids_p)
1940 		{
1941 			if (nxids == allocsize)
1942 			{
1943 				if (nxids == 0)
1944 				{
1945 					allocsize = 10;
1946 					xids = palloc(allocsize * sizeof(TransactionId));
1947 				}
1948 				else
1949 				{
1950 					allocsize = allocsize * 2;
1951 					xids = repalloc(xids, allocsize * sizeof(TransactionId));
1952 				}
1953 			}
1954 			xids[nxids++] = xid;
1955 		}
1956 
1957 		pfree(buf);
1958 	}
1959 	LWLockRelease(TwoPhaseStateLock);
1960 
1961 	if (xids_p)
1962 	{
1963 		*xids_p = xids;
1964 		*nxids_p = nxids;
1965 	}
1966 
1967 	return result;
1968 }
1969 
1970 /*
1971  * StandbyRecoverPreparedTransactions
1972  *
1973  * Scan the shared memory entries of TwoPhaseState and setup all the required
1974  * information to allow standby queries to treat prepared transactions as still
1975  * active.
1976  *
1977  * This is never called at the end of recovery - we use
1978  * RecoverPreparedTransactions() at that point.
1979  *
1980  * The lack of calls to SubTransSetParent() calls here is by design;
1981  * those calls are made by RecoverPreparedTransactions() at the end of recovery
1982  * for those xacts that need this.
1983  */
1984 void
StandbyRecoverPreparedTransactions(void)1985 StandbyRecoverPreparedTransactions(void)
1986 {
1987 	int			i;
1988 
1989 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1990 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1991 	{
1992 		TransactionId xid;
1993 		char	   *buf;
1994 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1995 
1996 		Assert(gxact->inredo);
1997 
1998 		xid = gxact->xid;
1999 
2000 		buf = ProcessTwoPhaseBuffer(xid,
2001 									gxact->prepare_start_lsn,
2002 									gxact->ondisk, false, false);
2003 		if (buf != NULL)
2004 			pfree(buf);
2005 	}
2006 	LWLockRelease(TwoPhaseStateLock);
2007 }
2008 
2009 /*
2010  * RecoverPreparedTransactions
2011  *
2012  * Scan the shared memory entries of TwoPhaseState and reload the state for
2013  * each prepared transaction (reacquire locks, etc).
2014  *
2015  * This is run at the end of recovery, but before we allow backends to write
2016  * WAL.
2017  *
2018  * At the end of recovery the way we take snapshots will change. We now need
2019  * to mark all running transactions with their full SubTransSetParent() info
2020  * to allow normal snapshots to work correctly if snapshots overflow.
2021  * We do this here because by definition prepared transactions are the only
2022  * type of write transaction still running, so this is necessary and
2023  * complete.
2024  */
2025 void
RecoverPreparedTransactions(void)2026 RecoverPreparedTransactions(void)
2027 {
2028 	int			i;
2029 
2030 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2031 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2032 	{
2033 		TransactionId xid;
2034 		char	   *buf;
2035 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2036 		char	   *bufptr;
2037 		TwoPhaseFileHeader *hdr;
2038 		TransactionId *subxids;
2039 		const char *gid;
2040 
2041 		xid = gxact->xid;
2042 
2043 		/*
2044 		 * Reconstruct subtrans state for the transaction --- needed because
2045 		 * pg_subtrans is not preserved over a restart.  Note that we are
2046 		 * linking all the subtransactions directly to the top-level XID;
2047 		 * there may originally have been a more complex hierarchy, but
2048 		 * there's no need to restore that exactly. It's possible that
2049 		 * SubTransSetParent has been set before, if the prepared transaction
2050 		 * generated xid assignment records.
2051 		 */
2052 		buf = ProcessTwoPhaseBuffer(xid,
2053 									gxact->prepare_start_lsn,
2054 									gxact->ondisk, true, false);
2055 		if (buf == NULL)
2056 			continue;
2057 
2058 		ereport(LOG,
2059 				(errmsg("recovering prepared transaction %u from shared memory", xid)));
2060 
2061 		hdr = (TwoPhaseFileHeader *) buf;
2062 		Assert(TransactionIdEquals(hdr->xid, xid));
2063 		bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2064 		gid = (const char *) bufptr;
2065 		bufptr += MAXALIGN(hdr->gidlen);
2066 		subxids = (TransactionId *) bufptr;
2067 		bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2068 		bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2069 		bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2070 		bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2071 
2072 		/*
2073 		 * Recreate its GXACT and dummy PGPROC. But, check whether it was
2074 		 * added in redo and already has a shmem entry for it.
2075 		 */
2076 		MarkAsPreparingGuts(gxact, xid, gid,
2077 							hdr->prepared_at,
2078 							hdr->owner, hdr->database);
2079 
2080 		/* recovered, so reset the flag for entries generated by redo */
2081 		gxact->inredo = false;
2082 
2083 		GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2084 		MarkAsPrepared(gxact, true);
2085 
2086 		LWLockRelease(TwoPhaseStateLock);
2087 
2088 		/*
2089 		 * Recover other state (notably locks) using resource managers.
2090 		 */
2091 		ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2092 
2093 		/*
2094 		 * Release locks held by the standby process after we process each
2095 		 * prepared transaction. As a result, we don't need too many
2096 		 * additional locks at any one time.
2097 		 */
2098 		if (InHotStandby)
2099 			StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2100 
2101 		/*
2102 		 * We're done with recovering this transaction. Clear MyLockedGxact,
2103 		 * like we do in PrepareTransaction() during normal operation.
2104 		 */
2105 		PostPrepare_Twophase();
2106 
2107 		pfree(buf);
2108 
2109 		LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2110 	}
2111 
2112 	LWLockRelease(TwoPhaseStateLock);
2113 }
2114 
2115 /*
2116  * ProcessTwoPhaseBuffer
2117  *
2118  * Given a transaction id, read it either from disk or read it directly
2119  * via shmem xlog record pointer using the provided "prepare_start_lsn".
2120  *
2121  * If setParent is true, set up subtransaction parent linkages.
2122  *
2123  * If setNextXid is true, set ShmemVariableCache->nextXid to the newest
2124  * value scanned.
2125  */
2126 static char *
ProcessTwoPhaseBuffer(TransactionId xid,XLogRecPtr prepare_start_lsn,bool fromdisk,bool setParent,bool setNextXid)2127 ProcessTwoPhaseBuffer(TransactionId xid,
2128 					  XLogRecPtr prepare_start_lsn,
2129 					  bool fromdisk,
2130 					  bool setParent, bool setNextXid)
2131 {
2132 	FullTransactionId nextXid = ShmemVariableCache->nextXid;
2133 	TransactionId origNextXid = XidFromFullTransactionId(nextXid);
2134 	TransactionId *subxids;
2135 	char	   *buf;
2136 	TwoPhaseFileHeader *hdr;
2137 	int			i;
2138 
2139 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2140 
2141 	if (!fromdisk)
2142 		Assert(prepare_start_lsn != InvalidXLogRecPtr);
2143 
2144 	/* Already processed? */
2145 	if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2146 	{
2147 		if (fromdisk)
2148 		{
2149 			ereport(WARNING,
2150 					(errmsg("removing stale two-phase state file for transaction %u",
2151 							xid)));
2152 			RemoveTwoPhaseFile(xid, true);
2153 		}
2154 		else
2155 		{
2156 			ereport(WARNING,
2157 					(errmsg("removing stale two-phase state from memory for transaction %u",
2158 							xid)));
2159 			PrepareRedoRemove(xid, true);
2160 		}
2161 		return NULL;
2162 	}
2163 
2164 	/* Reject XID if too new */
2165 	if (TransactionIdFollowsOrEquals(xid, origNextXid))
2166 	{
2167 		if (fromdisk)
2168 		{
2169 			ereport(WARNING,
2170 					(errmsg("removing future two-phase state file for transaction %u",
2171 							xid)));
2172 			RemoveTwoPhaseFile(xid, true);
2173 		}
2174 		else
2175 		{
2176 			ereport(WARNING,
2177 					(errmsg("removing future two-phase state from memory for transaction %u",
2178 							xid)));
2179 			PrepareRedoRemove(xid, true);
2180 		}
2181 		return NULL;
2182 	}
2183 
2184 	if (fromdisk)
2185 	{
2186 		/* Read and validate file */
2187 		buf = ReadTwoPhaseFile(xid, false);
2188 	}
2189 	else
2190 	{
2191 		/* Read xlog data */
2192 		XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2193 	}
2194 
2195 	/* Deconstruct header */
2196 	hdr = (TwoPhaseFileHeader *) buf;
2197 	if (!TransactionIdEquals(hdr->xid, xid))
2198 	{
2199 		if (fromdisk)
2200 			ereport(ERROR,
2201 					(errcode(ERRCODE_DATA_CORRUPTED),
2202 					 errmsg("corrupted two-phase state file for transaction %u",
2203 							xid)));
2204 		else
2205 			ereport(ERROR,
2206 					(errcode(ERRCODE_DATA_CORRUPTED),
2207 					 errmsg("corrupted two-phase state in memory for transaction %u",
2208 							xid)));
2209 	}
2210 
2211 	/*
2212 	 * Examine subtransaction XIDs ... they should all follow main XID, and
2213 	 * they may force us to advance nextXid.
2214 	 */
2215 	subxids = (TransactionId *) (buf +
2216 								 MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2217 								 MAXALIGN(hdr->gidlen));
2218 	for (i = 0; i < hdr->nsubxacts; i++)
2219 	{
2220 		TransactionId subxid = subxids[i];
2221 
2222 		Assert(TransactionIdFollows(subxid, xid));
2223 
2224 		/* update nextXid if needed */
2225 		if (setNextXid)
2226 			AdvanceNextFullTransactionIdPastXid(subxid);
2227 
2228 		if (setParent)
2229 			SubTransSetParent(subxid, xid);
2230 	}
2231 
2232 	return buf;
2233 }
2234 
2235 
2236 /*
2237  *	RecordTransactionCommitPrepared
2238  *
2239  * This is basically the same as RecordTransactionCommit (q.v. if you change
2240  * this function): in particular, we must set the delayChkpt flag to avoid a
2241  * race condition.
2242  *
2243  * We know the transaction made at least one XLOG entry (its PREPARE),
2244  * so it is never possible to optimize out the commit record.
2245  */
2246 static void
RecordTransactionCommitPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels,int ninvalmsgs,SharedInvalidationMessage * invalmsgs,bool initfileinval,const char * gid)2247 RecordTransactionCommitPrepared(TransactionId xid,
2248 								int nchildren,
2249 								TransactionId *children,
2250 								int nrels,
2251 								RelFileNode *rels,
2252 								int ninvalmsgs,
2253 								SharedInvalidationMessage *invalmsgs,
2254 								bool initfileinval,
2255 								const char *gid)
2256 {
2257 	XLogRecPtr	recptr;
2258 	TimestampTz committs = GetCurrentTimestamp();
2259 	bool		replorigin;
2260 
2261 	/*
2262 	 * Are we using the replication origins feature?  Or, in other words, are
2263 	 * we replaying remote actions?
2264 	 */
2265 	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2266 				  replorigin_session_origin != DoNotReplicateId);
2267 
2268 	START_CRIT_SECTION();
2269 
2270 	/* See notes in RecordTransactionCommit */
2271 	MyProc->delayChkpt = true;
2272 
2273 	/*
2274 	 * Emit the XLOG commit record. Note that we mark 2PC commits as
2275 	 * potentially having AccessExclusiveLocks since we don't know whether or
2276 	 * not they do.
2277 	 */
2278 	recptr = XactLogCommitRecord(committs,
2279 								 nchildren, children, nrels, rels,
2280 								 ninvalmsgs, invalmsgs,
2281 								 initfileinval,
2282 								 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2283 								 xid, gid);
2284 
2285 
2286 	if (replorigin)
2287 		/* Move LSNs forward for this replication origin */
2288 		replorigin_session_advance(replorigin_session_origin_lsn,
2289 								   XactLastRecEnd);
2290 
2291 	/*
2292 	 * Record commit timestamp.  The value comes from plain commit timestamp
2293 	 * if replorigin is not enabled, or replorigin already set a value for us
2294 	 * in replorigin_session_origin_timestamp otherwise.
2295 	 *
2296 	 * We don't need to WAL-log anything here, as the commit record written
2297 	 * above already contains the data.
2298 	 */
2299 	if (!replorigin || replorigin_session_origin_timestamp == 0)
2300 		replorigin_session_origin_timestamp = committs;
2301 
2302 	TransactionTreeSetCommitTsData(xid, nchildren, children,
2303 								   replorigin_session_origin_timestamp,
2304 								   replorigin_session_origin);
2305 
2306 	/*
2307 	 * We don't currently try to sleep before flush here ... nor is there any
2308 	 * support for async commit of a prepared xact (the very idea is probably
2309 	 * a contradiction)
2310 	 */
2311 
2312 	/* Flush XLOG to disk */
2313 	XLogFlush(recptr);
2314 
2315 	/* Mark the transaction committed in pg_xact */
2316 	TransactionIdCommitTree(xid, nchildren, children);
2317 
2318 	/* Checkpoint can proceed now */
2319 	MyProc->delayChkpt = false;
2320 
2321 	END_CRIT_SECTION();
2322 
2323 	/*
2324 	 * Wait for synchronous replication, if required.
2325 	 *
2326 	 * Note that at this stage we have marked clog, but still show as running
2327 	 * in the procarray and continue to hold locks.
2328 	 */
2329 	SyncRepWaitForLSN(recptr, true);
2330 }
2331 
2332 /*
2333  *	RecordTransactionAbortPrepared
2334  *
2335  * This is basically the same as RecordTransactionAbort.
2336  *
2337  * We know the transaction made at least one XLOG entry (its PREPARE),
2338  * so it is never possible to optimize out the abort record.
2339  */
2340 static void
RecordTransactionAbortPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels,const char * gid)2341 RecordTransactionAbortPrepared(TransactionId xid,
2342 							   int nchildren,
2343 							   TransactionId *children,
2344 							   int nrels,
2345 							   RelFileNode *rels,
2346 							   const char *gid)
2347 {
2348 	XLogRecPtr	recptr;
2349 	bool		replorigin;
2350 
2351 	/*
2352 	 * Are we using the replication origins feature?  Or, in other words, are
2353 	 * we replaying remote actions?
2354 	 */
2355 	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2356 				  replorigin_session_origin != DoNotReplicateId);
2357 
2358 	/*
2359 	 * Catch the scenario where we aborted partway through
2360 	 * RecordTransactionCommitPrepared ...
2361 	 */
2362 	if (TransactionIdDidCommit(xid))
2363 		elog(PANIC, "cannot abort transaction %u, it was already committed",
2364 			 xid);
2365 
2366 	START_CRIT_SECTION();
2367 
2368 	/*
2369 	 * Emit the XLOG commit record. Note that we mark 2PC aborts as
2370 	 * potentially having AccessExclusiveLocks since we don't know whether or
2371 	 * not they do.
2372 	 */
2373 	recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2374 								nchildren, children,
2375 								nrels, rels,
2376 								MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2377 								xid, gid);
2378 
2379 	if (replorigin)
2380 		/* Move LSNs forward for this replication origin */
2381 		replorigin_session_advance(replorigin_session_origin_lsn,
2382 								   XactLastRecEnd);
2383 
2384 	/* Always flush, since we're about to remove the 2PC state file */
2385 	XLogFlush(recptr);
2386 
2387 	/*
2388 	 * Mark the transaction aborted in clog.  This is not absolutely necessary
2389 	 * but we may as well do it while we are here.
2390 	 */
2391 	TransactionIdAbortTree(xid, nchildren, children);
2392 
2393 	END_CRIT_SECTION();
2394 
2395 	/*
2396 	 * Wait for synchronous replication, if required.
2397 	 *
2398 	 * Note that at this stage we have marked clog, but still show as running
2399 	 * in the procarray and continue to hold locks.
2400 	 */
2401 	SyncRepWaitForLSN(recptr, false);
2402 }
2403 
2404 /*
2405  * PrepareRedoAdd
2406  *
2407  * Store pointers to the start/end of the WAL record along with the xid in
2408  * a gxact entry in shared memory TwoPhaseState structure.  If caller
2409  * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2410  * data, the entry is marked as located on disk.
2411  */
2412 void
PrepareRedoAdd(char * buf,XLogRecPtr start_lsn,XLogRecPtr end_lsn,RepOriginId origin_id)2413 PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2414 			   XLogRecPtr end_lsn, RepOriginId origin_id)
2415 {
2416 	TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2417 	char	   *bufptr;
2418 	const char *gid;
2419 	GlobalTransaction gxact;
2420 
2421 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2422 	Assert(RecoveryInProgress());
2423 
2424 	bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2425 	gid = (const char *) bufptr;
2426 
2427 	/*
2428 	 * Reserve the GID for the given transaction in the redo code path.
2429 	 *
2430 	 * This creates a gxact struct and puts it into the active array.
2431 	 *
2432 	 * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2433 	 * shared memory. Hence, we only fill up the bare minimum contents here.
2434 	 * The gxact also gets marked with gxact->inredo set to true to indicate
2435 	 * that it got added in the redo phase
2436 	 */
2437 
2438 	/* Get a free gxact from the freelist */
2439 	if (TwoPhaseState->freeGXacts == NULL)
2440 		ereport(ERROR,
2441 				(errcode(ERRCODE_OUT_OF_MEMORY),
2442 				 errmsg("maximum number of prepared transactions reached"),
2443 				 errhint("Increase max_prepared_transactions (currently %d).",
2444 						 max_prepared_xacts)));
2445 	gxact = TwoPhaseState->freeGXacts;
2446 	TwoPhaseState->freeGXacts = gxact->next;
2447 
2448 	gxact->prepared_at = hdr->prepared_at;
2449 	gxact->prepare_start_lsn = start_lsn;
2450 	gxact->prepare_end_lsn = end_lsn;
2451 	gxact->xid = hdr->xid;
2452 	gxact->owner = hdr->owner;
2453 	gxact->locking_backend = InvalidBackendId;
2454 	gxact->valid = false;
2455 	gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2456 	gxact->inredo = true;		/* yes, added in redo */
2457 	strcpy(gxact->gid, gid);
2458 
2459 	/* And insert it into the active array */
2460 	Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2461 	TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2462 
2463 	if (origin_id != InvalidRepOriginId)
2464 	{
2465 		/* recover apply progress */
2466 		replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2467 						   false /* backward */ , false /* WAL */ );
2468 	}
2469 
2470 	elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2471 }
2472 
2473 /*
2474  * PrepareRedoRemove
2475  *
2476  * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2477  * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2478  *
2479  * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2480  * is updated.
2481  */
2482 void
PrepareRedoRemove(TransactionId xid,bool giveWarning)2483 PrepareRedoRemove(TransactionId xid, bool giveWarning)
2484 {
2485 	GlobalTransaction gxact = NULL;
2486 	int			i;
2487 	bool		found = false;
2488 
2489 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2490 	Assert(RecoveryInProgress());
2491 
2492 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2493 	{
2494 		gxact = TwoPhaseState->prepXacts[i];
2495 
2496 		if (gxact->xid == xid)
2497 		{
2498 			Assert(gxact->inredo);
2499 			found = true;
2500 			break;
2501 		}
2502 	}
2503 
2504 	/*
2505 	 * Just leave if there is nothing, this is expected during WAL replay.
2506 	 */
2507 	if (!found)
2508 		return;
2509 
2510 	/*
2511 	 * And now we can clean up any files we may have left.
2512 	 */
2513 	elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2514 	if (gxact->ondisk)
2515 		RemoveTwoPhaseFile(xid, giveWarning);
2516 	RemoveGXact(gxact);
2517 }
2518