1 /*-------------------------------------------------------------------------
2  *
3  * twophase.c
4  *		Two-phase commit support functions.
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		src/backend/access/transam/twophase.c
11  *
12  * NOTES
13  *		Each global transaction is associated with a global transaction
14  *		identifier (GID). The client assigns a GID to a postgres
15  *		transaction with the PREPARE TRANSACTION command.
16  *
17  *		We keep all active global transactions in a shared memory array.
18  *		When the PREPARE TRANSACTION command is issued, the GID is
19  *		reserved for the transaction in the array. This is done before
20  *		a WAL entry is made, because the reservation checks for duplicate
21  *		GIDs and aborts the transaction if there already is a global
22  *		transaction in prepared state with the same GID.
23  *
24  *		A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
25  *		what keeps the XID considered running by TransactionIdIsInProgress.
26  *		It is also convenient as a PGPROC to hook the gxact's locks to.
27  *
28  *		Information to recover prepared transactions in case of crash is
29  *		now stored in WAL for the common case. In some cases there will be
30  *		an extended period between preparing a GXACT and commit/abort, in
31  *		which case we need to separately record prepared transaction data
32  *		in permanent storage. This includes locking information, pending
33  *		notifications etc. All that state information is written to the
34  *		per-transaction state file in the pg_twophase directory.
35  *		All prepared transactions will be written prior to shutdown.
36  *
37  *		Life track of state data is following:
38  *
39  *		* On PREPARE TRANSACTION backend writes state data only to the WAL and
40  *		  stores pointer to the start of the WAL record in
41  *		  gxact->prepare_start_lsn.
42  *		* If COMMIT occurs before checkpoint then backend reads data from WAL
43  *		  using prepare_start_lsn.
44  *		* On checkpoint state data copied to files in pg_twophase directory and
45  *		  fsynced
46  *		* If COMMIT happens after checkpoint then backend reads state data from
47  *		  files
48  *
49  *		During replay and replication, TwoPhaseState also holds information
50  *		about active prepared transactions that haven't been moved to disk yet.
51  *
52  *		Replay of twophase records happens by the following rules:
53  *
54  *		* At the beginning of recovery, pg_twophase is scanned once, filling
55  *		  TwoPhaseState with entries marked with gxact->inredo and
56  *		  gxact->ondisk.  Two-phase file data older than the XID horizon of
57  *		  the redo position are discarded.
58  *		* On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59  *		  gxact->inredo is set to true for such entries.
60  *		* On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61  *		  that have gxact->inredo set and are behind the redo_horizon. We
62  *		  save them to disk and then switch gxact->ondisk to true.
63  *		* On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64  *		  If gxact->ondisk is true, the corresponding entry from the disk
65  *		  is additionally deleted.
66  *		* RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67  *		  and PrescanPreparedTransactions() have been modified to go through
68  *		  gxact->inredo entries that have not made it to disk.
69  *
70  *-------------------------------------------------------------------------
71  */
72 #include "postgres.h"
73 
74 #include <fcntl.h>
75 #include <sys/stat.h>
76 #include <time.h>
77 #include <unistd.h>
78 
79 #include "access/commit_ts.h"
80 #include "access/htup_details.h"
81 #include "access/subtrans.h"
82 #include "access/transam.h"
83 #include "access/twophase.h"
84 #include "access/twophase_rmgr.h"
85 #include "access/xact.h"
86 #include "access/xlog.h"
87 #include "access/xloginsert.h"
88 #include "access/xlogreader.h"
89 #include "access/xlogutils.h"
90 #include "catalog/pg_type.h"
91 #include "catalog/storage.h"
92 #include "funcapi.h"
93 #include "miscadmin.h"
94 #include "pg_trace.h"
95 #include "pgstat.h"
96 #include "replication/origin.h"
97 #include "replication/syncrep.h"
98 #include "replication/walsender.h"
99 #include "storage/fd.h"
100 #include "storage/ipc.h"
101 #include "storage/md.h"
102 #include "storage/predicate.h"
103 #include "storage/proc.h"
104 #include "storage/procarray.h"
105 #include "storage/sinvaladt.h"
106 #include "storage/smgr.h"
107 #include "utils/builtins.h"
108 #include "utils/memutils.h"
109 #include "utils/timestamp.h"
110 
111 /*
112  * Directory where Two-phase commit files reside within PGDATA
113  */
114 #define TWOPHASE_DIR "pg_twophase"
115 
116 /* GUC variable, can't be changed after startup */
117 int			max_prepared_xacts = 0;
118 
119 /*
120  * This struct describes one global transaction that is in prepared state
121  * or attempting to become prepared.
122  *
123  * The lifecycle of a global transaction is:
124  *
125  * 1. After checking that the requested GID is not in use, set up an entry in
126  * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127  * and mark it as locked by my backend.
128  *
129  * 2. After successfully completing prepare, set valid = true and enter the
130  * referenced PGPROC into the global ProcArray.
131  *
132  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133  * valid and not locked, then mark the entry as locked by storing my current
134  * backend ID into locking_backend.  This prevents concurrent attempts to
135  * commit or rollback the same prepared xact.
136  *
137  * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138  * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
139  * the freelist.
140  *
141  * Note that if the preparing transaction fails between steps 1 and 2, the
142  * entry must be removed so that the GID and the GlobalTransaction struct
143  * can be reused.  See AtAbort_Twophase().
144  *
145  * typedef struct GlobalTransactionData *GlobalTransaction appears in
146  * twophase.h
147  */
148 
149 typedef struct GlobalTransactionData
150 {
151 	GlobalTransaction next;		/* list link for free list */
152 	int			pgprocno;		/* ID of associated dummy PGPROC */
153 	BackendId	dummyBackendId; /* similar to backend id for backends */
154 	TimestampTz prepared_at;	/* time of preparation */
155 
156 	/*
157 	 * Note that we need to keep track of two LSNs for each GXACT. We keep
158 	 * track of the start LSN because this is the address we must use to read
159 	 * state data back from WAL when committing a prepared GXACT. We keep
160 	 * track of the end LSN because that is the LSN we need to wait for prior
161 	 * to commit.
162 	 */
163 	XLogRecPtr	prepare_start_lsn;	/* XLOG offset of prepare record start */
164 	XLogRecPtr	prepare_end_lsn;	/* XLOG offset of prepare record end */
165 	TransactionId xid;			/* The GXACT id */
166 
167 	Oid			owner;			/* ID of user that executed the xact */
168 	BackendId	locking_backend;	/* backend currently working on the xact */
169 	bool		valid;			/* true if PGPROC entry is in proc array */
170 	bool		ondisk;			/* true if prepare state file is on disk */
171 	bool		inredo;			/* true if entry was added via xlog_redo */
172 	char		gid[GIDSIZE];	/* The GID assigned to the prepared xact */
173 }			GlobalTransactionData;
174 
175 /*
176  * Two Phase Commit shared state.  Access to this struct is protected
177  * by TwoPhaseStateLock.
178  */
179 typedef struct TwoPhaseStateData
180 {
181 	/* Head of linked list of free GlobalTransactionData structs */
182 	GlobalTransaction freeGXacts;
183 
184 	/* Number of valid prepXacts entries. */
185 	int			numPrepXacts;
186 
187 	/* There are max_prepared_xacts items in this array */
188 	GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
189 } TwoPhaseStateData;
190 
191 static TwoPhaseStateData *TwoPhaseState;
192 
193 /*
194  * Global transaction entry currently locked by us, if any.  Note that any
195  * access to the entry pointed to by this variable must be protected by
196  * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
197  * (since it's just local memory).
198  */
199 static GlobalTransaction MyLockedGxact = NULL;
200 
201 static bool twophaseExitRegistered = false;
202 
203 static void RecordTransactionCommitPrepared(TransactionId xid,
204 											int nchildren,
205 											TransactionId *children,
206 											int nrels,
207 											RelFileNode *rels,
208 											int ninvalmsgs,
209 											SharedInvalidationMessage *invalmsgs,
210 											bool initfileinval,
211 											const char *gid);
212 static void RecordTransactionAbortPrepared(TransactionId xid,
213 										   int nchildren,
214 										   TransactionId *children,
215 										   int nrels,
216 										   RelFileNode *rels,
217 										   const char *gid);
218 static void ProcessRecords(char *bufptr, TransactionId xid,
219 						   const TwoPhaseCallback callbacks[]);
220 static void RemoveGXact(GlobalTransaction gxact);
221 
222 static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
223 static char *ProcessTwoPhaseBuffer(TransactionId xid,
224 								   XLogRecPtr prepare_start_lsn,
225 								   bool fromdisk, bool setParent, bool setNextXid);
226 static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
227 								const char *gid, TimestampTz prepared_at, Oid owner,
228 								Oid databaseid);
229 static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
230 static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
231 
232 /*
233  * Initialization of shared memory
234  */
235 Size
TwoPhaseShmemSize(void)236 TwoPhaseShmemSize(void)
237 {
238 	Size		size;
239 
240 	/* Need the fixed struct, the array of pointers, and the GTD structs */
241 	size = offsetof(TwoPhaseStateData, prepXacts);
242 	size = add_size(size, mul_size(max_prepared_xacts,
243 								   sizeof(GlobalTransaction)));
244 	size = MAXALIGN(size);
245 	size = add_size(size, mul_size(max_prepared_xacts,
246 								   sizeof(GlobalTransactionData)));
247 
248 	return size;
249 }
250 
251 void
TwoPhaseShmemInit(void)252 TwoPhaseShmemInit(void)
253 {
254 	bool		found;
255 
256 	TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
257 									TwoPhaseShmemSize(),
258 									&found);
259 	if (!IsUnderPostmaster)
260 	{
261 		GlobalTransaction gxacts;
262 		int			i;
263 
264 		Assert(!found);
265 		TwoPhaseState->freeGXacts = NULL;
266 		TwoPhaseState->numPrepXacts = 0;
267 
268 		/*
269 		 * Initialize the linked list of free GlobalTransactionData structs
270 		 */
271 		gxacts = (GlobalTransaction)
272 			((char *) TwoPhaseState +
273 			 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
274 					  sizeof(GlobalTransaction) * max_prepared_xacts));
275 		for (i = 0; i < max_prepared_xacts; i++)
276 		{
277 			/* insert into linked list */
278 			gxacts[i].next = TwoPhaseState->freeGXacts;
279 			TwoPhaseState->freeGXacts = &gxacts[i];
280 
281 			/* associate it with a PGPROC assigned by InitProcGlobal */
282 			gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
283 
284 			/*
285 			 * Assign a unique ID for each dummy proc, so that the range of
286 			 * dummy backend IDs immediately follows the range of normal
287 			 * backend IDs. We don't dare to assign a real backend ID to dummy
288 			 * procs, because prepared transactions don't take part in cache
289 			 * invalidation like a real backend ID would imply, but having a
290 			 * unique ID for them is nevertheless handy. This arrangement
291 			 * allows you to allocate an array of size (MaxBackends +
292 			 * max_prepared_xacts + 1), and have a slot for every backend and
293 			 * prepared transaction. Currently multixact.c uses that
294 			 * technique.
295 			 */
296 			gxacts[i].dummyBackendId = MaxBackends + 1 + i;
297 		}
298 	}
299 	else
300 		Assert(found);
301 }
302 
303 /*
304  * Exit hook to unlock the global transaction entry we're working on.
305  */
306 static void
AtProcExit_Twophase(int code,Datum arg)307 AtProcExit_Twophase(int code, Datum arg)
308 {
309 	/* same logic as abort */
310 	AtAbort_Twophase();
311 }
312 
313 /*
314  * Abort hook to unlock the global transaction entry we're working on.
315  */
316 void
AtAbort_Twophase(void)317 AtAbort_Twophase(void)
318 {
319 	if (MyLockedGxact == NULL)
320 		return;
321 
322 	/*
323 	 * What to do with the locked global transaction entry?  If we were in the
324 	 * process of preparing the transaction, but haven't written the WAL
325 	 * record and state file yet, the transaction must not be considered as
326 	 * prepared.  Likewise, if we are in the process of finishing an
327 	 * already-prepared transaction, and fail after having already written the
328 	 * 2nd phase commit or rollback record to the WAL, the transaction should
329 	 * not be considered as prepared anymore.  In those cases, just remove the
330 	 * entry from shared memory.
331 	 *
332 	 * Otherwise, the entry must be left in place so that the transaction can
333 	 * be finished later, so just unlock it.
334 	 *
335 	 * If we abort during prepare, after having written the WAL record, we
336 	 * might not have transferred all locks and other state to the prepared
337 	 * transaction yet.  Likewise, if we abort during commit or rollback,
338 	 * after having written the WAL record, we might not have released all the
339 	 * resources held by the transaction yet.  In those cases, the in-memory
340 	 * state can be wrong, but it's too late to back out.
341 	 */
342 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
343 	if (!MyLockedGxact->valid)
344 		RemoveGXact(MyLockedGxact);
345 	else
346 		MyLockedGxact->locking_backend = InvalidBackendId;
347 	LWLockRelease(TwoPhaseStateLock);
348 
349 	MyLockedGxact = NULL;
350 }
351 
352 /*
353  * This is called after we have finished transferring state to the prepared
354  * PGXACT entry.
355  */
356 void
PostPrepare_Twophase(void)357 PostPrepare_Twophase(void)
358 {
359 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
360 	MyLockedGxact->locking_backend = InvalidBackendId;
361 	LWLockRelease(TwoPhaseStateLock);
362 
363 	MyLockedGxact = NULL;
364 }
365 
366 
367 /*
368  * MarkAsPreparing
369  *		Reserve the GID for the given transaction.
370  */
371 GlobalTransaction
MarkAsPreparing(TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)372 MarkAsPreparing(TransactionId xid, const char *gid,
373 				TimestampTz prepared_at, Oid owner, Oid databaseid)
374 {
375 	GlobalTransaction gxact;
376 	int			i;
377 
378 	if (strlen(gid) >= GIDSIZE)
379 		ereport(ERROR,
380 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
381 				 errmsg("transaction identifier \"%s\" is too long",
382 						gid)));
383 
384 	/* fail immediately if feature is disabled */
385 	if (max_prepared_xacts == 0)
386 		ereport(ERROR,
387 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
388 				 errmsg("prepared transactions are disabled"),
389 				 errhint("Set max_prepared_transactions to a nonzero value.")));
390 
391 	/* on first call, register the exit hook */
392 	if (!twophaseExitRegistered)
393 	{
394 		before_shmem_exit(AtProcExit_Twophase, 0);
395 		twophaseExitRegistered = true;
396 	}
397 
398 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
399 
400 	/* Check for conflicting GID */
401 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
402 	{
403 		gxact = TwoPhaseState->prepXacts[i];
404 		if (strcmp(gxact->gid, gid) == 0)
405 		{
406 			ereport(ERROR,
407 					(errcode(ERRCODE_DUPLICATE_OBJECT),
408 					 errmsg("transaction identifier \"%s\" is already in use",
409 							gid)));
410 		}
411 	}
412 
413 	/* Get a free gxact from the freelist */
414 	if (TwoPhaseState->freeGXacts == NULL)
415 		ereport(ERROR,
416 				(errcode(ERRCODE_OUT_OF_MEMORY),
417 				 errmsg("maximum number of prepared transactions reached"),
418 				 errhint("Increase max_prepared_transactions (currently %d).",
419 						 max_prepared_xacts)));
420 	gxact = TwoPhaseState->freeGXacts;
421 	TwoPhaseState->freeGXacts = gxact->next;
422 
423 	MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
424 
425 	gxact->ondisk = false;
426 
427 	/* And insert it into the active array */
428 	Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
429 	TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
430 
431 	LWLockRelease(TwoPhaseStateLock);
432 
433 	return gxact;
434 }
435 
436 /*
437  * MarkAsPreparingGuts
438  *
439  * This uses a gxact struct and puts it into the active array.
440  * NOTE: this is also used when reloading a gxact after a crash; so avoid
441  * assuming that we can use very much backend context.
442  *
443  * Note: This function should be called with appropriate locks held.
444  */
445 static void
MarkAsPreparingGuts(GlobalTransaction gxact,TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)446 MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
447 					TimestampTz prepared_at, Oid owner, Oid databaseid)
448 {
449 	PGPROC	   *proc;
450 	PGXACT	   *pgxact;
451 	int			i;
452 
453 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
454 
455 	Assert(gxact != NULL);
456 	proc = &ProcGlobal->allProcs[gxact->pgprocno];
457 	pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
458 
459 	/* Initialize the PGPROC entry */
460 	MemSet(proc, 0, sizeof(PGPROC));
461 	proc->pgprocno = gxact->pgprocno;
462 	SHMQueueElemInit(&(proc->links));
463 	proc->waitStatus = STATUS_OK;
464 	if (LocalTransactionIdIsValid(MyProc->lxid))
465 	{
466 		/* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
467 		proc->lxid = MyProc->lxid;
468 		proc->backendId = MyBackendId;
469 	}
470 	else
471 	{
472 		Assert(AmStartupProcess() || !IsPostmasterEnvironment);
473 		/* GetLockConflicts() uses this to specify a wait on the XID */
474 		proc->lxid = xid;
475 		proc->backendId = InvalidBackendId;
476 	}
477 	pgxact->xid = xid;
478 	pgxact->xmin = InvalidTransactionId;
479 	proc->delayChkpt = false;
480 	pgxact->vacuumFlags = 0;
481 	proc->pid = 0;
482 	proc->databaseId = databaseid;
483 	proc->roleId = owner;
484 	proc->tempNamespaceId = InvalidOid;
485 	proc->isBackgroundWorker = false;
486 	proc->lwWaiting = false;
487 	proc->lwWaitMode = 0;
488 	proc->waitLock = NULL;
489 	proc->waitProcLock = NULL;
490 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
491 		SHMQueueInit(&(proc->myProcLocks[i]));
492 	/* subxid data must be filled later by GXactLoadSubxactData */
493 	pgxact->overflowed = false;
494 	pgxact->nxids = 0;
495 
496 	gxact->prepared_at = prepared_at;
497 	gxact->xid = xid;
498 	gxact->owner = owner;
499 	gxact->locking_backend = MyBackendId;
500 	gxact->valid = false;
501 	gxact->inredo = false;
502 	strcpy(gxact->gid, gid);
503 
504 	/*
505 	 * Remember that we have this GlobalTransaction entry locked for us. If we
506 	 * abort after this, we must release it.
507 	 */
508 	MyLockedGxact = gxact;
509 }
510 
511 /*
512  * GXactLoadSubxactData
513  *
514  * If the transaction being persisted had any subtransactions, this must
515  * be called before MarkAsPrepared() to load information into the dummy
516  * PGPROC.
517  */
518 static void
GXactLoadSubxactData(GlobalTransaction gxact,int nsubxacts,TransactionId * children)519 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
520 					 TransactionId *children)
521 {
522 	PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
523 	PGXACT	   *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
524 
525 	/* We need no extra lock since the GXACT isn't valid yet */
526 	if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
527 	{
528 		pgxact->overflowed = true;
529 		nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
530 	}
531 	if (nsubxacts > 0)
532 	{
533 		memcpy(proc->subxids.xids, children,
534 			   nsubxacts * sizeof(TransactionId));
535 		pgxact->nxids = nsubxacts;
536 	}
537 }
538 
539 /*
540  * MarkAsPrepared
541  *		Mark the GXACT as fully valid, and enter it into the global ProcArray.
542  *
543  * lock_held indicates whether caller already holds TwoPhaseStateLock.
544  */
545 static void
MarkAsPrepared(GlobalTransaction gxact,bool lock_held)546 MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
547 {
548 	/* Lock here may be overkill, but I'm not convinced of that ... */
549 	if (!lock_held)
550 		LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
551 	Assert(!gxact->valid);
552 	gxact->valid = true;
553 	if (!lock_held)
554 		LWLockRelease(TwoPhaseStateLock);
555 
556 	/*
557 	 * Put it into the global ProcArray so TransactionIdIsInProgress considers
558 	 * the XID as still running.
559 	 */
560 	ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
561 }
562 
563 /*
564  * LockGXact
565  *		Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
566  */
567 static GlobalTransaction
LockGXact(const char * gid,Oid user)568 LockGXact(const char *gid, Oid user)
569 {
570 	int			i;
571 
572 	/* on first call, register the exit hook */
573 	if (!twophaseExitRegistered)
574 	{
575 		before_shmem_exit(AtProcExit_Twophase, 0);
576 		twophaseExitRegistered = true;
577 	}
578 
579 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
580 
581 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
582 	{
583 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
584 		PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
585 
586 		/* Ignore not-yet-valid GIDs */
587 		if (!gxact->valid)
588 			continue;
589 		if (strcmp(gxact->gid, gid) != 0)
590 			continue;
591 
592 		/* Found it, but has someone else got it locked? */
593 		if (gxact->locking_backend != InvalidBackendId)
594 			ereport(ERROR,
595 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
596 					 errmsg("prepared transaction with identifier \"%s\" is busy",
597 							gid)));
598 
599 		if (user != gxact->owner && !superuser_arg(user))
600 			ereport(ERROR,
601 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
602 					 errmsg("permission denied to finish prepared transaction"),
603 					 errhint("Must be superuser or the user that prepared the transaction.")));
604 
605 		/*
606 		 * Note: it probably would be possible to allow committing from
607 		 * another database; but at the moment NOTIFY is known not to work and
608 		 * there may be some other issues as well.  Hence disallow until
609 		 * someone gets motivated to make it work.
610 		 */
611 		if (MyDatabaseId != proc->databaseId)
612 			ereport(ERROR,
613 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
614 					 errmsg("prepared transaction belongs to another database"),
615 					 errhint("Connect to the database where the transaction was prepared to finish it.")));
616 
617 		/* OK for me to lock it */
618 		gxact->locking_backend = MyBackendId;
619 		MyLockedGxact = gxact;
620 
621 		LWLockRelease(TwoPhaseStateLock);
622 
623 		return gxact;
624 	}
625 
626 	LWLockRelease(TwoPhaseStateLock);
627 
628 	ereport(ERROR,
629 			(errcode(ERRCODE_UNDEFINED_OBJECT),
630 			 errmsg("prepared transaction with identifier \"%s\" does not exist",
631 					gid)));
632 
633 	/* NOTREACHED */
634 	return NULL;
635 }
636 
637 /*
638  * RemoveGXact
639  *		Remove the prepared transaction from the shared memory array.
640  *
641  * NB: caller should have already removed it from ProcArray
642  */
643 static void
RemoveGXact(GlobalTransaction gxact)644 RemoveGXact(GlobalTransaction gxact)
645 {
646 	int			i;
647 
648 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
649 
650 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
651 	{
652 		if (gxact == TwoPhaseState->prepXacts[i])
653 		{
654 			/* remove from the active array */
655 			TwoPhaseState->numPrepXacts--;
656 			TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
657 
658 			/* and put it back in the freelist */
659 			gxact->next = TwoPhaseState->freeGXacts;
660 			TwoPhaseState->freeGXacts = gxact;
661 
662 			return;
663 		}
664 	}
665 
666 	elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
667 }
668 
669 /*
670  * Returns an array of all prepared transactions for the user-level
671  * function pg_prepared_xact.
672  *
673  * The returned array and all its elements are copies of internal data
674  * structures, to minimize the time we need to hold the TwoPhaseStateLock.
675  *
676  * WARNING -- we return even those transactions that are not fully prepared
677  * yet.  The caller should filter them out if he doesn't want them.
678  *
679  * The returned array is palloc'd.
680  */
681 static int
GetPreparedTransactionList(GlobalTransaction * gxacts)682 GetPreparedTransactionList(GlobalTransaction *gxacts)
683 {
684 	GlobalTransaction array;
685 	int			num;
686 	int			i;
687 
688 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
689 
690 	if (TwoPhaseState->numPrepXacts == 0)
691 	{
692 		LWLockRelease(TwoPhaseStateLock);
693 
694 		*gxacts = NULL;
695 		return 0;
696 	}
697 
698 	num = TwoPhaseState->numPrepXacts;
699 	array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
700 	*gxacts = array;
701 	for (i = 0; i < num; i++)
702 		memcpy(array + i, TwoPhaseState->prepXacts[i],
703 			   sizeof(GlobalTransactionData));
704 
705 	LWLockRelease(TwoPhaseStateLock);
706 
707 	return num;
708 }
709 
710 
711 /* Working status for pg_prepared_xact */
712 typedef struct
713 {
714 	GlobalTransaction array;
715 	int			ngxacts;
716 	int			currIdx;
717 } Working_State;
718 
719 /*
720  * pg_prepared_xact
721  *		Produce a view with one row per prepared transaction.
722  *
723  * This function is here so we don't have to export the
724  * GlobalTransactionData struct definition.
725  */
726 Datum
pg_prepared_xact(PG_FUNCTION_ARGS)727 pg_prepared_xact(PG_FUNCTION_ARGS)
728 {
729 	FuncCallContext *funcctx;
730 	Working_State *status;
731 
732 	if (SRF_IS_FIRSTCALL())
733 	{
734 		TupleDesc	tupdesc;
735 		MemoryContext oldcontext;
736 
737 		/* create a function context for cross-call persistence */
738 		funcctx = SRF_FIRSTCALL_INIT();
739 
740 		/*
741 		 * Switch to memory context appropriate for multiple function calls
742 		 */
743 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
744 
745 		/* build tupdesc for result tuples */
746 		/* this had better match pg_prepared_xacts view in system_views.sql */
747 		tupdesc = CreateTemplateTupleDesc(5);
748 		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
749 						   XIDOID, -1, 0);
750 		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
751 						   TEXTOID, -1, 0);
752 		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
753 						   TIMESTAMPTZOID, -1, 0);
754 		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
755 						   OIDOID, -1, 0);
756 		TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
757 						   OIDOID, -1, 0);
758 
759 		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
760 
761 		/*
762 		 * Collect all the 2PC status information that we will format and send
763 		 * out as a result set.
764 		 */
765 		status = (Working_State *) palloc(sizeof(Working_State));
766 		funcctx->user_fctx = (void *) status;
767 
768 		status->ngxacts = GetPreparedTransactionList(&status->array);
769 		status->currIdx = 0;
770 
771 		MemoryContextSwitchTo(oldcontext);
772 	}
773 
774 	funcctx = SRF_PERCALL_SETUP();
775 	status = (Working_State *) funcctx->user_fctx;
776 
777 	while (status->array != NULL && status->currIdx < status->ngxacts)
778 	{
779 		GlobalTransaction gxact = &status->array[status->currIdx++];
780 		PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
781 		PGXACT	   *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
782 		Datum		values[5];
783 		bool		nulls[5];
784 		HeapTuple	tuple;
785 		Datum		result;
786 
787 		if (!gxact->valid)
788 			continue;
789 
790 		/*
791 		 * Form tuple with appropriate data.
792 		 */
793 		MemSet(values, 0, sizeof(values));
794 		MemSet(nulls, 0, sizeof(nulls));
795 
796 		values[0] = TransactionIdGetDatum(pgxact->xid);
797 		values[1] = CStringGetTextDatum(gxact->gid);
798 		values[2] = TimestampTzGetDatum(gxact->prepared_at);
799 		values[3] = ObjectIdGetDatum(gxact->owner);
800 		values[4] = ObjectIdGetDatum(proc->databaseId);
801 
802 		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
803 		result = HeapTupleGetDatum(tuple);
804 		SRF_RETURN_NEXT(funcctx, result);
805 	}
806 
807 	SRF_RETURN_DONE(funcctx);
808 }
809 
810 /*
811  * TwoPhaseGetGXact
812  *		Get the GlobalTransaction struct for a prepared transaction
813  *		specified by XID
814  *
815  * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
816  * caller had better hold it.
817  */
818 static GlobalTransaction
TwoPhaseGetGXact(TransactionId xid,bool lock_held)819 TwoPhaseGetGXact(TransactionId xid, bool lock_held)
820 {
821 	GlobalTransaction result = NULL;
822 	int			i;
823 
824 	static TransactionId cached_xid = InvalidTransactionId;
825 	static GlobalTransaction cached_gxact = NULL;
826 
827 	Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
828 
829 	/*
830 	 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
831 	 * repeatedly for the same XID.  We can save work with a simple cache.
832 	 */
833 	if (xid == cached_xid)
834 		return cached_gxact;
835 
836 	if (!lock_held)
837 		LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
838 
839 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
840 	{
841 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
842 		PGXACT	   *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
843 
844 		if (pgxact->xid == xid)
845 		{
846 			result = gxact;
847 			break;
848 		}
849 	}
850 
851 	if (!lock_held)
852 		LWLockRelease(TwoPhaseStateLock);
853 
854 	if (result == NULL)			/* should not happen */
855 		elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
856 
857 	cached_xid = xid;
858 	cached_gxact = result;
859 
860 	return result;
861 }
862 
863 /*
864  * TwoPhaseGetXidByVirtualXID
865  *		Lookup VXID among xacts prepared since last startup.
866  *
867  * (This won't find recovered xacts.)  If more than one matches, return any
868  * and set "have_more" to true.  To witness multiple matches, a single
869  * BackendId must consume 2^32 LXIDs, with no intervening database restart.
870  */
871 TransactionId
TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,bool * have_more)872 TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
873 						   bool *have_more)
874 {
875 	int			i;
876 	TransactionId result = InvalidTransactionId;
877 
878 	Assert(VirtualTransactionIdIsValid(vxid));
879 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
880 
881 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
882 	{
883 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
884 		PGPROC	   *proc;
885 		VirtualTransactionId proc_vxid;
886 
887 		if (!gxact->valid)
888 			continue;
889 		proc = &ProcGlobal->allProcs[gxact->pgprocno];
890 		GET_VXID_FROM_PGPROC(proc_vxid, *proc);
891 		if (VirtualTransactionIdEquals(vxid, proc_vxid))
892 		{
893 			/* Startup process sets proc->backendId to InvalidBackendId. */
894 			Assert(!gxact->inredo);
895 
896 			if (result != InvalidTransactionId)
897 			{
898 				*have_more = true;
899 				break;
900 			}
901 			result = gxact->xid;
902 		}
903 	}
904 
905 	LWLockRelease(TwoPhaseStateLock);
906 
907 	return result;
908 }
909 
910 /*
911  * TwoPhaseGetDummyBackendId
912  *		Get the dummy backend ID for prepared transaction specified by XID
913  *
914  * Dummy backend IDs are similar to real backend IDs of real backends.
915  * They start at MaxBackends + 1, and are unique across all currently active
916  * real backends and prepared transactions.  If lock_held is set to true,
917  * TwoPhaseStateLock will not be taken, so the caller had better hold it.
918  */
919 BackendId
TwoPhaseGetDummyBackendId(TransactionId xid,bool lock_held)920 TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held)
921 {
922 	GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
923 
924 	return gxact->dummyBackendId;
925 }
926 
927 /*
928  * TwoPhaseGetDummyProc
929  *		Get the PGPROC that represents a prepared transaction specified by XID
930  *
931  * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
932  * caller had better hold it.
933  */
934 PGPROC *
TwoPhaseGetDummyProc(TransactionId xid,bool lock_held)935 TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
936 {
937 	GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
938 
939 	return &ProcGlobal->allProcs[gxact->pgprocno];
940 }
941 
942 /************************************************************************/
943 /* State file support													*/
944 /************************************************************************/
945 
946 #define TwoPhaseFilePath(path, xid) \
947 	snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
948 
949 /*
950  * 2PC state file format:
951  *
952  *	1. TwoPhaseFileHeader
953  *	2. TransactionId[] (subtransactions)
954  *	3. RelFileNode[] (files to be deleted at commit)
955  *	4. RelFileNode[] (files to be deleted at abort)
956  *	5. SharedInvalidationMessage[] (inval messages to be sent at commit)
957  *	6. TwoPhaseRecordOnDisk
958  *	7. ...
959  *	8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
960  *	9. checksum (CRC-32C)
961  *
962  * Each segment except the final checksum is MAXALIGN'd.
963  */
964 
965 /*
966  * Header for a 2PC state file
967  */
968 #define TWOPHASE_MAGIC	0x57F94534	/* format identifier */
969 
970 typedef xl_xact_prepare TwoPhaseFileHeader;
971 
972 /*
973  * Header for each record in a state file
974  *
975  * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
976  * The rmgr data will be stored starting on a MAXALIGN boundary.
977  */
978 typedef struct TwoPhaseRecordOnDisk
979 {
980 	uint32		len;			/* length of rmgr data */
981 	TwoPhaseRmgrId rmid;		/* resource manager for this record */
982 	uint16		info;			/* flag bits for use by rmgr */
983 } TwoPhaseRecordOnDisk;
984 
985 /*
986  * During prepare, the state file is assembled in memory before writing it
987  * to WAL and the actual state file.  We use a chain of StateFileChunk blocks
988  * for that.
989  */
990 typedef struct StateFileChunk
991 {
992 	char	   *data;
993 	uint32		len;
994 	struct StateFileChunk *next;
995 } StateFileChunk;
996 
997 static struct xllist
998 {
999 	StateFileChunk *head;		/* first data block in the chain */
1000 	StateFileChunk *tail;		/* last block in chain */
1001 	uint32		num_chunks;
1002 	uint32		bytes_free;		/* free bytes left in tail block */
1003 	uint32		total_len;		/* total data bytes in chain */
1004 }			records;
1005 
1006 
1007 /*
1008  * Append a block of data to records data structure.
1009  *
1010  * NB: each block is padded to a MAXALIGN multiple.  This must be
1011  * accounted for when the file is later read!
1012  *
1013  * The data is copied, so the caller is free to modify it afterwards.
1014  */
1015 static void
save_state_data(const void * data,uint32 len)1016 save_state_data(const void *data, uint32 len)
1017 {
1018 	uint32		padlen = MAXALIGN(len);
1019 
1020 	if (padlen > records.bytes_free)
1021 	{
1022 		records.tail->next = palloc0(sizeof(StateFileChunk));
1023 		records.tail = records.tail->next;
1024 		records.tail->len = 0;
1025 		records.tail->next = NULL;
1026 		records.num_chunks++;
1027 
1028 		records.bytes_free = Max(padlen, 512);
1029 		records.tail->data = palloc(records.bytes_free);
1030 	}
1031 
1032 	memcpy(((char *) records.tail->data) + records.tail->len, data, len);
1033 	records.tail->len += padlen;
1034 	records.bytes_free -= padlen;
1035 	records.total_len += padlen;
1036 }
1037 
1038 /*
1039  * Start preparing a state file.
1040  *
1041  * Initializes data structure and inserts the 2PC file header record.
1042  */
1043 void
StartPrepare(GlobalTransaction gxact)1044 StartPrepare(GlobalTransaction gxact)
1045 {
1046 	PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
1047 	PGXACT	   *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1048 	TransactionId xid = pgxact->xid;
1049 	TwoPhaseFileHeader hdr;
1050 	TransactionId *children;
1051 	RelFileNode *commitrels;
1052 	RelFileNode *abortrels;
1053 	SharedInvalidationMessage *invalmsgs;
1054 
1055 	/* Initialize linked list */
1056 	records.head = palloc0(sizeof(StateFileChunk));
1057 	records.head->len = 0;
1058 	records.head->next = NULL;
1059 
1060 	records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1061 	records.head->data = palloc(records.bytes_free);
1062 
1063 	records.tail = records.head;
1064 	records.num_chunks = 1;
1065 
1066 	records.total_len = 0;
1067 
1068 	/* Create header */
1069 	hdr.magic = TWOPHASE_MAGIC;
1070 	hdr.total_len = 0;			/* EndPrepare will fill this in */
1071 	hdr.xid = xid;
1072 	hdr.database = proc->databaseId;
1073 	hdr.prepared_at = gxact->prepared_at;
1074 	hdr.owner = gxact->owner;
1075 	hdr.nsubxacts = xactGetCommittedChildren(&children);
1076 	hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1077 	hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1078 	hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1079 														  &hdr.initfileinval);
1080 	hdr.gidlen = strlen(gxact->gid) + 1;	/* Include '\0' */
1081 
1082 	save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1083 	save_state_data(gxact->gid, hdr.gidlen);
1084 
1085 	/*
1086 	 * Add the additional info about subxacts, deletable files and cache
1087 	 * invalidation messages.
1088 	 */
1089 	if (hdr.nsubxacts > 0)
1090 	{
1091 		save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1092 		/* While we have the child-xact data, stuff it in the gxact too */
1093 		GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1094 	}
1095 	if (hdr.ncommitrels > 0)
1096 	{
1097 		save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
1098 		pfree(commitrels);
1099 	}
1100 	if (hdr.nabortrels > 0)
1101 	{
1102 		save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
1103 		pfree(abortrels);
1104 	}
1105 	if (hdr.ninvalmsgs > 0)
1106 	{
1107 		save_state_data(invalmsgs,
1108 						hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1109 		pfree(invalmsgs);
1110 	}
1111 }
1112 
1113 /*
1114  * Finish preparing state data and writing it to WAL.
1115  */
1116 void
EndPrepare(GlobalTransaction gxact)1117 EndPrepare(GlobalTransaction gxact)
1118 {
1119 	TwoPhaseFileHeader *hdr;
1120 	StateFileChunk *record;
1121 	bool		replorigin;
1122 
1123 	/* Add the end sentinel to the list of 2PC records */
1124 	RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1125 						   NULL, 0);
1126 
1127 	/* Go back and fill in total_len in the file header record */
1128 	hdr = (TwoPhaseFileHeader *) records.head->data;
1129 	Assert(hdr->magic == TWOPHASE_MAGIC);
1130 	hdr->total_len = records.total_len + sizeof(pg_crc32c);
1131 
1132 	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1133 				  replorigin_session_origin != DoNotReplicateId);
1134 
1135 	if (replorigin)
1136 	{
1137 		Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
1138 		hdr->origin_lsn = replorigin_session_origin_lsn;
1139 		hdr->origin_timestamp = replorigin_session_origin_timestamp;
1140 	}
1141 	else
1142 	{
1143 		hdr->origin_lsn = InvalidXLogRecPtr;
1144 		hdr->origin_timestamp = 0;
1145 	}
1146 
1147 	/*
1148 	 * If the data size exceeds MaxAllocSize, we won't be able to read it in
1149 	 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1150 	 * where we write data to file and then re-read at commit time.
1151 	 */
1152 	if (hdr->total_len > MaxAllocSize)
1153 		ereport(ERROR,
1154 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1155 				 errmsg("two-phase state file maximum length exceeded")));
1156 
1157 	/*
1158 	 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1159 	 * cover us, so no need to calculate a separate CRC.
1160 	 *
1161 	 * We have to set delayChkpt here, too; otherwise a checkpoint starting
1162 	 * immediately after the WAL record is inserted could complete without
1163 	 * fsync'ing our state file.  (This is essentially the same kind of race
1164 	 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
1165 	 * uses delayChkpt for; see notes there.)
1166 	 *
1167 	 * We save the PREPARE record's location in the gxact for later use by
1168 	 * CheckPointTwoPhase.
1169 	 */
1170 	XLogEnsureRecordSpace(0, records.num_chunks);
1171 
1172 	START_CRIT_SECTION();
1173 
1174 	MyProc->delayChkpt = true;
1175 
1176 	XLogBeginInsert();
1177 	for (record = records.head; record != NULL; record = record->next)
1178 		XLogRegisterData(record->data, record->len);
1179 
1180 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1181 
1182 	gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1183 
1184 	if (replorigin)
1185 	{
1186 		/* Move LSNs forward for this replication origin */
1187 		replorigin_session_advance(replorigin_session_origin_lsn,
1188 								   gxact->prepare_end_lsn);
1189 	}
1190 
1191 	XLogFlush(gxact->prepare_end_lsn);
1192 
1193 	/* If we crash now, we have prepared: WAL replay will fix things */
1194 
1195 	/* Store record's start location to read that later on Commit */
1196 	gxact->prepare_start_lsn = ProcLastRecPtr;
1197 
1198 	/*
1199 	 * Mark the prepared transaction as valid.  As soon as xact.c marks
1200 	 * MyPgXact as not running our XID (which it will do immediately after
1201 	 * this function returns), others can commit/rollback the xact.
1202 	 *
1203 	 * NB: a side effect of this is to make a dummy ProcArray entry for the
1204 	 * prepared XID.  This must happen before we clear the XID from MyPgXact,
1205 	 * else there is a window where the XID is not running according to
1206 	 * TransactionIdIsInProgress, and onlookers would be entitled to assume
1207 	 * the xact crashed.  Instead we have a window where the same XID appears
1208 	 * twice in ProcArray, which is OK.
1209 	 */
1210 	MarkAsPrepared(gxact, false);
1211 
1212 	/*
1213 	 * Now we can mark ourselves as out of the commit critical section: a
1214 	 * checkpoint starting after this will certainly see the gxact as a
1215 	 * candidate for fsyncing.
1216 	 */
1217 	MyProc->delayChkpt = false;
1218 
1219 	/*
1220 	 * Remember that we have this GlobalTransaction entry locked for us.  If
1221 	 * we crash after this point, it's too late to abort, but we must unlock
1222 	 * it so that the prepared transaction can be committed or rolled back.
1223 	 */
1224 	MyLockedGxact = gxact;
1225 
1226 	END_CRIT_SECTION();
1227 
1228 	/*
1229 	 * Wait for synchronous replication, if required.
1230 	 *
1231 	 * Note that at this stage we have marked the prepare, but still show as
1232 	 * running in the procarray (twice!) and continue to hold locks.
1233 	 */
1234 	SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1235 
1236 	records.tail = records.head = NULL;
1237 	records.num_chunks = 0;
1238 }
1239 
1240 /*
1241  * Register a 2PC record to be written to state file.
1242  */
1243 void
RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid,uint16 info,const void * data,uint32 len)1244 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1245 					   const void *data, uint32 len)
1246 {
1247 	TwoPhaseRecordOnDisk record;
1248 
1249 	record.rmid = rmid;
1250 	record.info = info;
1251 	record.len = len;
1252 	save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1253 	if (len > 0)
1254 		save_state_data(data, len);
1255 }
1256 
1257 
1258 /*
1259  * Read and validate the state file for xid.
1260  *
1261  * If it looks OK (has a valid magic number and CRC), return the palloc'd
1262  * contents of the file, issuing an error when finding corrupted data.  If
1263  * missing_ok is true, which indicates that missing files can be safely
1264  * ignored, then return NULL.  This state can be reached when doing recovery.
1265  */
1266 static char *
ReadTwoPhaseFile(TransactionId xid,bool missing_ok)1267 ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
1268 {
1269 	char		path[MAXPGPATH];
1270 	char	   *buf;
1271 	TwoPhaseFileHeader *hdr;
1272 	int			fd;
1273 	struct stat stat;
1274 	uint32		crc_offset;
1275 	pg_crc32c	calc_crc,
1276 				file_crc;
1277 	int			r;
1278 
1279 	TwoPhaseFilePath(path, xid);
1280 
1281 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1282 	if (fd < 0)
1283 	{
1284 		if (missing_ok && errno == ENOENT)
1285 			return NULL;
1286 
1287 		ereport(ERROR,
1288 				(errcode_for_file_access(),
1289 				 errmsg("could not open file \"%s\": %m", path)));
1290 	}
1291 
1292 	/*
1293 	 * Check file length.  We can determine a lower bound pretty easily. We
1294 	 * set an upper bound to avoid palloc() failure on a corrupt file, though
1295 	 * we can't guarantee that we won't get an out of memory error anyway,
1296 	 * even on a valid file.
1297 	 */
1298 	if (fstat(fd, &stat))
1299 		ereport(ERROR,
1300 				(errcode_for_file_access(),
1301 				 errmsg("could not stat file \"%s\": %m", path)));
1302 
1303 	if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1304 						MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1305 						sizeof(pg_crc32c)) ||
1306 		stat.st_size > MaxAllocSize)
1307 		ereport(ERROR,
1308 				(errcode(ERRCODE_DATA_CORRUPTED),
1309 				 errmsg_plural("incorrect size of file \"%s\": %zu byte",
1310 							   "incorrect size of file \"%s\": %zu bytes",
1311 							   (Size) stat.st_size, path,
1312 							   (Size) stat.st_size)));
1313 
1314 	crc_offset = stat.st_size - sizeof(pg_crc32c);
1315 	if (crc_offset != MAXALIGN(crc_offset))
1316 		ereport(ERROR,
1317 				(errcode(ERRCODE_DATA_CORRUPTED),
1318 				 errmsg("incorrect alignment of CRC offset for file \"%s\"",
1319 						path)));
1320 
1321 	/*
1322 	 * OK, slurp in the file.
1323 	 */
1324 	buf = (char *) palloc(stat.st_size);
1325 
1326 	pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1327 	r = read(fd, buf, stat.st_size);
1328 	if (r != stat.st_size)
1329 	{
1330 		if (r < 0)
1331 			ereport(ERROR,
1332 					(errcode_for_file_access(),
1333 					 errmsg("could not read file \"%s\": %m", path)));
1334 		else
1335 			ereport(ERROR,
1336 					(errmsg("could not read file \"%s\": read %d of %zu",
1337 							path, r, (Size) stat.st_size)));
1338 	}
1339 
1340 	pgstat_report_wait_end();
1341 
1342 	if (CloseTransientFile(fd) != 0)
1343 		ereport(ERROR,
1344 				(errcode_for_file_access(),
1345 				 errmsg("could not close file \"%s\": %m", path)));
1346 
1347 	hdr = (TwoPhaseFileHeader *) buf;
1348 	if (hdr->magic != TWOPHASE_MAGIC)
1349 		ereport(ERROR,
1350 				(errcode(ERRCODE_DATA_CORRUPTED),
1351 				 errmsg("invalid magic number stored in file \"%s\"",
1352 						path)));
1353 
1354 	if (hdr->total_len != stat.st_size)
1355 		ereport(ERROR,
1356 				(errcode(ERRCODE_DATA_CORRUPTED),
1357 				 errmsg("invalid size stored in file \"%s\"",
1358 						path)));
1359 
1360 	INIT_CRC32C(calc_crc);
1361 	COMP_CRC32C(calc_crc, buf, crc_offset);
1362 	FIN_CRC32C(calc_crc);
1363 
1364 	file_crc = *((pg_crc32c *) (buf + crc_offset));
1365 
1366 	if (!EQ_CRC32C(calc_crc, file_crc))
1367 		ereport(ERROR,
1368 				(errcode(ERRCODE_DATA_CORRUPTED),
1369 				 errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1370 						path)));
1371 
1372 	return buf;
1373 }
1374 
1375 
1376 /*
1377  * Reads 2PC data from xlog. During checkpoint this data will be moved to
1378  * twophase files and ReadTwoPhaseFile should be used instead.
1379  *
1380  * Note clearly that this function can access WAL during normal operation,
1381  * similarly to the way WALSender or Logical Decoding would do.  While
1382  * accessing WAL, read_local_xlog_page() may change ThisTimeLineID,
1383  * particularly if this routine is called for the end-of-recovery checkpoint
1384  * in the checkpointer itself, so save the current timeline number value
1385  * and restore it once done.
1386  */
1387 static void
XlogReadTwoPhaseData(XLogRecPtr lsn,char ** buf,int * len)1388 XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1389 {
1390 	XLogRecord *record;
1391 	XLogReaderState *xlogreader;
1392 	char	   *errormsg;
1393 	TimeLineID	save_currtli = ThisTimeLineID;
1394 
1395 	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1396 									XL_ROUTINE(.page_read = &read_local_xlog_page,
1397 											   .segment_open = &wal_segment_open,
1398 											   .segment_close = &wal_segment_close),
1399 									NULL);
1400 	if (!xlogreader)
1401 		ereport(ERROR,
1402 				(errcode(ERRCODE_OUT_OF_MEMORY),
1403 				 errmsg("out of memory"),
1404 				 errdetail("Failed while allocating a WAL reading processor.")));
1405 
1406 	XLogBeginRead(xlogreader, lsn);
1407 	record = XLogReadRecord(xlogreader, &errormsg);
1408 
1409 	/*
1410 	 * Restore immediately the timeline where it was previously, as
1411 	 * read_local_xlog_page() could have changed it if the record was read
1412 	 * while recovery was finishing or if the timeline has jumped in-between.
1413 	 */
1414 	ThisTimeLineID = save_currtli;
1415 
1416 	if (record == NULL)
1417 		ereport(ERROR,
1418 				(errcode_for_file_access(),
1419 				 errmsg("could not read two-phase state from WAL at %X/%X",
1420 						(uint32) (lsn >> 32),
1421 						(uint32) lsn)));
1422 
1423 	if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1424 		(XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1425 		ereport(ERROR,
1426 				(errcode_for_file_access(),
1427 				 errmsg("expected two-phase state data is not present in WAL at %X/%X",
1428 						(uint32) (lsn >> 32),
1429 						(uint32) lsn)));
1430 
1431 	if (len != NULL)
1432 		*len = XLogRecGetDataLen(xlogreader);
1433 
1434 	*buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1435 	memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1436 
1437 	XLogReaderFree(xlogreader);
1438 }
1439 
1440 
1441 /*
1442  * Confirms an xid is prepared, during recovery
1443  */
1444 bool
StandbyTransactionIdIsPrepared(TransactionId xid)1445 StandbyTransactionIdIsPrepared(TransactionId xid)
1446 {
1447 	char	   *buf;
1448 	TwoPhaseFileHeader *hdr;
1449 	bool		result;
1450 
1451 	Assert(TransactionIdIsValid(xid));
1452 
1453 	if (max_prepared_xacts <= 0)
1454 		return false;			/* nothing to do */
1455 
1456 	/* Read and validate file */
1457 	buf = ReadTwoPhaseFile(xid, true);
1458 	if (buf == NULL)
1459 		return false;
1460 
1461 	/* Check header also */
1462 	hdr = (TwoPhaseFileHeader *) buf;
1463 	result = TransactionIdEquals(hdr->xid, xid);
1464 	pfree(buf);
1465 
1466 	return result;
1467 }
1468 
1469 /*
1470  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1471  */
1472 void
FinishPreparedTransaction(const char * gid,bool isCommit)1473 FinishPreparedTransaction(const char *gid, bool isCommit)
1474 {
1475 	GlobalTransaction gxact;
1476 	PGPROC	   *proc;
1477 	PGXACT	   *pgxact;
1478 	TransactionId xid;
1479 	char	   *buf;
1480 	char	   *bufptr;
1481 	TwoPhaseFileHeader *hdr;
1482 	TransactionId latestXid;
1483 	TransactionId *children;
1484 	RelFileNode *commitrels;
1485 	RelFileNode *abortrels;
1486 	RelFileNode *delrels;
1487 	int			ndelrels;
1488 	SharedInvalidationMessage *invalmsgs;
1489 
1490 	/*
1491 	 * Validate the GID, and lock the GXACT to ensure that two backends do not
1492 	 * try to commit the same GID at once.
1493 	 */
1494 	gxact = LockGXact(gid, GetUserId());
1495 	proc = &ProcGlobal->allProcs[gxact->pgprocno];
1496 	pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1497 	xid = pgxact->xid;
1498 
1499 	/*
1500 	 * Read and validate 2PC state data. State data will typically be stored
1501 	 * in WAL files if the LSN is after the last checkpoint record, or moved
1502 	 * to disk if for some reason they have lived for a long time.
1503 	 */
1504 	if (gxact->ondisk)
1505 		buf = ReadTwoPhaseFile(xid, false);
1506 	else
1507 		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1508 
1509 
1510 	/*
1511 	 * Disassemble the header area
1512 	 */
1513 	hdr = (TwoPhaseFileHeader *) buf;
1514 	Assert(TransactionIdEquals(hdr->xid, xid));
1515 	bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1516 	bufptr += MAXALIGN(hdr->gidlen);
1517 	children = (TransactionId *) bufptr;
1518 	bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1519 	commitrels = (RelFileNode *) bufptr;
1520 	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1521 	abortrels = (RelFileNode *) bufptr;
1522 	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1523 	invalmsgs = (SharedInvalidationMessage *) bufptr;
1524 	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1525 
1526 	/* compute latestXid among all children */
1527 	latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1528 
1529 	/* Prevent cancel/die interrupt while cleaning up */
1530 	HOLD_INTERRUPTS();
1531 
1532 	/*
1533 	 * The order of operations here is critical: make the XLOG entry for
1534 	 * commit or abort, then mark the transaction committed or aborted in
1535 	 * pg_xact, then remove its PGPROC from the global ProcArray (which means
1536 	 * TransactionIdIsInProgress will stop saying the prepared xact is in
1537 	 * progress), then run the post-commit or post-abort callbacks. The
1538 	 * callbacks will release the locks the transaction held.
1539 	 */
1540 	if (isCommit)
1541 		RecordTransactionCommitPrepared(xid,
1542 										hdr->nsubxacts, children,
1543 										hdr->ncommitrels, commitrels,
1544 										hdr->ninvalmsgs, invalmsgs,
1545 										hdr->initfileinval, gid);
1546 	else
1547 		RecordTransactionAbortPrepared(xid,
1548 									   hdr->nsubxacts, children,
1549 									   hdr->nabortrels, abortrels,
1550 									   gid);
1551 
1552 	ProcArrayRemove(proc, latestXid);
1553 
1554 	/*
1555 	 * In case we fail while running the callbacks, mark the gxact invalid so
1556 	 * no one else will try to commit/rollback, and so it will be recycled if
1557 	 * we fail after this point.  It is still locked by our backend so it
1558 	 * won't go away yet.
1559 	 *
1560 	 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1561 	 */
1562 	gxact->valid = false;
1563 
1564 	/*
1565 	 * We have to remove any files that were supposed to be dropped. For
1566 	 * consistency with the regular xact.c code paths, must do this before
1567 	 * releasing locks, so do it before running the callbacks.
1568 	 *
1569 	 * NB: this code knows that we couldn't be dropping any temp rels ...
1570 	 */
1571 	if (isCommit)
1572 	{
1573 		delrels = commitrels;
1574 		ndelrels = hdr->ncommitrels;
1575 	}
1576 	else
1577 	{
1578 		delrels = abortrels;
1579 		ndelrels = hdr->nabortrels;
1580 	}
1581 
1582 	/* Make sure files supposed to be dropped are dropped */
1583 	DropRelationFiles(delrels, ndelrels, false);
1584 
1585 	/*
1586 	 * Handle cache invalidation messages.
1587 	 *
1588 	 * Relcache init file invalidation requires processing both before and
1589 	 * after we send the SI messages. See AtEOXact_Inval()
1590 	 */
1591 	if (hdr->initfileinval)
1592 		RelationCacheInitFilePreInvalidate();
1593 	SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1594 	if (hdr->initfileinval)
1595 		RelationCacheInitFilePostInvalidate();
1596 
1597 	/*
1598 	 * Acquire the two-phase lock.  We want to work on the two-phase callbacks
1599 	 * while holding it to avoid potential conflicts with other transactions
1600 	 * attempting to use the same GID, so the lock is released once the shared
1601 	 * memory state is cleared.
1602 	 */
1603 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1604 
1605 	/* And now do the callbacks */
1606 	if (isCommit)
1607 		ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1608 	else
1609 		ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1610 
1611 	PredicateLockTwoPhaseFinish(xid, isCommit);
1612 
1613 	/* Clear shared memory state */
1614 	RemoveGXact(gxact);
1615 
1616 	/*
1617 	 * Release the lock as all callbacks are called and shared memory cleanup
1618 	 * is done.
1619 	 */
1620 	LWLockRelease(TwoPhaseStateLock);
1621 
1622 	/* Count the prepared xact as committed or aborted */
1623 	AtEOXact_PgStat(isCommit, false);
1624 
1625 	/*
1626 	 * And now we can clean up any files we may have left.
1627 	 */
1628 	if (gxact->ondisk)
1629 		RemoveTwoPhaseFile(xid, true);
1630 
1631 	MyLockedGxact = NULL;
1632 
1633 	RESUME_INTERRUPTS();
1634 
1635 	pfree(buf);
1636 }
1637 
1638 /*
1639  * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1640  */
1641 static void
ProcessRecords(char * bufptr,TransactionId xid,const TwoPhaseCallback callbacks[])1642 ProcessRecords(char *bufptr, TransactionId xid,
1643 			   const TwoPhaseCallback callbacks[])
1644 {
1645 	for (;;)
1646 	{
1647 		TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1648 
1649 		Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1650 		if (record->rmid == TWOPHASE_RM_END_ID)
1651 			break;
1652 
1653 		bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1654 
1655 		if (callbacks[record->rmid] != NULL)
1656 			callbacks[record->rmid] (xid, record->info,
1657 									 (void *) bufptr, record->len);
1658 
1659 		bufptr += MAXALIGN(record->len);
1660 	}
1661 }
1662 
1663 /*
1664  * Remove the 2PC file for the specified XID.
1665  *
1666  * If giveWarning is false, do not complain about file-not-present;
1667  * this is an expected case during WAL replay.
1668  */
1669 static void
RemoveTwoPhaseFile(TransactionId xid,bool giveWarning)1670 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1671 {
1672 	char		path[MAXPGPATH];
1673 
1674 	TwoPhaseFilePath(path, xid);
1675 	if (unlink(path))
1676 		if (errno != ENOENT || giveWarning)
1677 			ereport(WARNING,
1678 					(errcode_for_file_access(),
1679 					 errmsg("could not remove file \"%s\": %m", path)));
1680 }
1681 
1682 /*
1683  * Recreates a state file. This is used in WAL replay and during
1684  * checkpoint creation.
1685  *
1686  * Note: content and len don't include CRC.
1687  */
1688 static void
RecreateTwoPhaseFile(TransactionId xid,void * content,int len)1689 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1690 {
1691 	char		path[MAXPGPATH];
1692 	pg_crc32c	statefile_crc;
1693 	int			fd;
1694 
1695 	/* Recompute CRC */
1696 	INIT_CRC32C(statefile_crc);
1697 	COMP_CRC32C(statefile_crc, content, len);
1698 	FIN_CRC32C(statefile_crc);
1699 
1700 	TwoPhaseFilePath(path, xid);
1701 
1702 	fd = OpenTransientFile(path,
1703 						   O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1704 	if (fd < 0)
1705 		ereport(ERROR,
1706 				(errcode_for_file_access(),
1707 				 errmsg("could not recreate file \"%s\": %m", path)));
1708 
1709 	/* Write content and CRC */
1710 	errno = 0;
1711 	pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1712 	if (write(fd, content, len) != len)
1713 	{
1714 		/* if write didn't set errno, assume problem is no disk space */
1715 		if (errno == 0)
1716 			errno = ENOSPC;
1717 		ereport(ERROR,
1718 				(errcode_for_file_access(),
1719 				 errmsg("could not write file \"%s\": %m", path)));
1720 	}
1721 	if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1722 	{
1723 		/* if write didn't set errno, assume problem is no disk space */
1724 		if (errno == 0)
1725 			errno = ENOSPC;
1726 		ereport(ERROR,
1727 				(errcode_for_file_access(),
1728 				 errmsg("could not write file \"%s\": %m", path)));
1729 	}
1730 	pgstat_report_wait_end();
1731 
1732 	/*
1733 	 * We must fsync the file because the end-of-replay checkpoint will not do
1734 	 * so, there being no GXACT in shared memory yet to tell it to.
1735 	 */
1736 	pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1737 	if (pg_fsync(fd) != 0)
1738 		ereport(ERROR,
1739 				(errcode_for_file_access(),
1740 				 errmsg("could not fsync file \"%s\": %m", path)));
1741 	pgstat_report_wait_end();
1742 
1743 	if (CloseTransientFile(fd) != 0)
1744 		ereport(ERROR,
1745 				(errcode_for_file_access(),
1746 				 errmsg("could not close file \"%s\": %m", path)));
1747 }
1748 
1749 /*
1750  * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1751  *
1752  * We must fsync the state file of any GXACT that is valid or has been
1753  * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1754  * horizon.  (If the gxact isn't valid yet, has not been generated in
1755  * redo, or has a later LSN, this checkpoint is not responsible for
1756  * fsyncing it.)
1757  *
1758  * This is deliberately run as late as possible in the checkpoint sequence,
1759  * because GXACTs ordinarily have short lifespans, and so it is quite
1760  * possible that GXACTs that were valid at checkpoint start will no longer
1761  * exist if we wait a little bit. With typical checkpoint settings this
1762  * will be about 3 minutes for an online checkpoint, so as a result we
1763  * expect that there will be no GXACTs that need to be copied to disk.
1764  *
1765  * If a GXACT remains valid across multiple checkpoints, it will already
1766  * be on disk so we don't bother to repeat that write.
1767  */
1768 void
CheckPointTwoPhase(XLogRecPtr redo_horizon)1769 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1770 {
1771 	int			i;
1772 	int			serialized_xacts = 0;
1773 
1774 	if (max_prepared_xacts <= 0)
1775 		return;					/* nothing to do */
1776 
1777 	TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1778 
1779 	/*
1780 	 * We are expecting there to be zero GXACTs that need to be copied to
1781 	 * disk, so we perform all I/O while holding TwoPhaseStateLock for
1782 	 * simplicity. This prevents any new xacts from preparing while this
1783 	 * occurs, which shouldn't be a problem since the presence of long-lived
1784 	 * prepared xacts indicates the transaction manager isn't active.
1785 	 *
1786 	 * It's also possible to move I/O out of the lock, but on every error we
1787 	 * should check whether somebody committed our transaction in different
1788 	 * backend. Let's leave this optimization for future, if somebody will
1789 	 * spot that this place cause bottleneck.
1790 	 *
1791 	 * Note that it isn't possible for there to be a GXACT with a
1792 	 * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1793 	 * because of the efforts with delayChkpt.
1794 	 */
1795 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1796 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1797 	{
1798 		/*
1799 		 * Note that we are using gxact not pgxact so this works in recovery
1800 		 * also
1801 		 */
1802 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1803 
1804 		if ((gxact->valid || gxact->inredo) &&
1805 			!gxact->ondisk &&
1806 			gxact->prepare_end_lsn <= redo_horizon)
1807 		{
1808 			char	   *buf;
1809 			int			len;
1810 
1811 			XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1812 			RecreateTwoPhaseFile(gxact->xid, buf, len);
1813 			gxact->ondisk = true;
1814 			gxact->prepare_start_lsn = InvalidXLogRecPtr;
1815 			gxact->prepare_end_lsn = InvalidXLogRecPtr;
1816 			pfree(buf);
1817 			serialized_xacts++;
1818 		}
1819 	}
1820 	LWLockRelease(TwoPhaseStateLock);
1821 
1822 	/*
1823 	 * Flush unconditionally the parent directory to make any information
1824 	 * durable on disk.  Two-phase files could have been removed and those
1825 	 * removals need to be made persistent as well as any files newly created
1826 	 * previously since the last checkpoint.
1827 	 */
1828 	fsync_fname(TWOPHASE_DIR, true);
1829 
1830 	TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1831 
1832 	if (log_checkpoints && serialized_xacts > 0)
1833 		ereport(LOG,
1834 				(errmsg_plural("%u two-phase state file was written "
1835 							   "for a long-running prepared transaction",
1836 							   "%u two-phase state files were written "
1837 							   "for long-running prepared transactions",
1838 							   serialized_xacts,
1839 							   serialized_xacts)));
1840 }
1841 
1842 /*
1843  * restoreTwoPhaseData
1844  *
1845  * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1846  * This is called once at the beginning of recovery, saving any extra
1847  * lookups in the future.  Two-phase files that are newer than the
1848  * minimum XID horizon are discarded on the way.
1849  */
1850 void
restoreTwoPhaseData(void)1851 restoreTwoPhaseData(void)
1852 {
1853 	DIR		   *cldir;
1854 	struct dirent *clde;
1855 
1856 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1857 	cldir = AllocateDir(TWOPHASE_DIR);
1858 	while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1859 	{
1860 		if (strlen(clde->d_name) == 8 &&
1861 			strspn(clde->d_name, "0123456789ABCDEF") == 8)
1862 		{
1863 			TransactionId xid;
1864 			char	   *buf;
1865 
1866 			xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1867 
1868 			buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1869 										true, false, false);
1870 			if (buf == NULL)
1871 				continue;
1872 
1873 			PrepareRedoAdd(buf, InvalidXLogRecPtr,
1874 						   InvalidXLogRecPtr, InvalidRepOriginId);
1875 		}
1876 	}
1877 	LWLockRelease(TwoPhaseStateLock);
1878 	FreeDir(cldir);
1879 }
1880 
1881 /*
1882  * PrescanPreparedTransactions
1883  *
1884  * Scan the shared memory entries of TwoPhaseState and determine the range
1885  * of valid XIDs present.  This is run during database startup, after we
1886  * have completed reading WAL.  ShmemVariableCache->nextFullXid has been set to
1887  * one more than the highest XID for which evidence exists in WAL.
1888  *
1889  * We throw away any prepared xacts with main XID beyond nextFullXid --- if any
1890  * are present, it suggests that the DBA has done a PITR recovery to an
1891  * earlier point in time without cleaning out pg_twophase.  We dare not
1892  * try to recover such prepared xacts since they likely depend on database
1893  * state that doesn't exist now.
1894  *
1895  * However, we will advance nextFullXid beyond any subxact XIDs belonging to
1896  * valid prepared xacts.  We need to do this since subxact commit doesn't
1897  * write a WAL entry, and so there might be no evidence in WAL of those
1898  * subxact XIDs.
1899  *
1900  * On corrupted two-phase files, fail immediately.  Keeping around broken
1901  * entries and let replay continue causes harm on the system, and a new
1902  * backup should be rolled in.
1903  *
1904  * Our other responsibility is to determine and return the oldest valid XID
1905  * among the prepared xacts (if none, return ShmemVariableCache->nextFullXid).
1906  * This is needed to synchronize pg_subtrans startup properly.
1907  *
1908  * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1909  * top-level xids is stored in *xids_p. The number of entries in the array
1910  * is returned in *nxids_p.
1911  */
1912 TransactionId
PrescanPreparedTransactions(TransactionId ** xids_p,int * nxids_p)1913 PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1914 {
1915 	FullTransactionId nextFullXid = ShmemVariableCache->nextFullXid;
1916 	TransactionId origNextXid = XidFromFullTransactionId(nextFullXid);
1917 	TransactionId result = origNextXid;
1918 	TransactionId *xids = NULL;
1919 	int			nxids = 0;
1920 	int			allocsize = 0;
1921 	int			i;
1922 
1923 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1924 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1925 	{
1926 		TransactionId xid;
1927 		char	   *buf;
1928 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1929 
1930 		Assert(gxact->inredo);
1931 
1932 		xid = gxact->xid;
1933 
1934 		buf = ProcessTwoPhaseBuffer(xid,
1935 									gxact->prepare_start_lsn,
1936 									gxact->ondisk, false, true);
1937 
1938 		if (buf == NULL)
1939 			continue;
1940 
1941 		/*
1942 		 * OK, we think this file is valid.  Incorporate xid into the
1943 		 * running-minimum result.
1944 		 */
1945 		if (TransactionIdPrecedes(xid, result))
1946 			result = xid;
1947 
1948 		if (xids_p)
1949 		{
1950 			if (nxids == allocsize)
1951 			{
1952 				if (nxids == 0)
1953 				{
1954 					allocsize = 10;
1955 					xids = palloc(allocsize * sizeof(TransactionId));
1956 				}
1957 				else
1958 				{
1959 					allocsize = allocsize * 2;
1960 					xids = repalloc(xids, allocsize * sizeof(TransactionId));
1961 				}
1962 			}
1963 			xids[nxids++] = xid;
1964 		}
1965 
1966 		pfree(buf);
1967 	}
1968 	LWLockRelease(TwoPhaseStateLock);
1969 
1970 	if (xids_p)
1971 	{
1972 		*xids_p = xids;
1973 		*nxids_p = nxids;
1974 	}
1975 
1976 	return result;
1977 }
1978 
1979 /*
1980  * StandbyRecoverPreparedTransactions
1981  *
1982  * Scan the shared memory entries of TwoPhaseState and setup all the required
1983  * information to allow standby queries to treat prepared transactions as still
1984  * active.
1985  *
1986  * This is never called at the end of recovery - we use
1987  * RecoverPreparedTransactions() at that point.
1988  *
1989  * The lack of calls to SubTransSetParent() calls here is by design;
1990  * those calls are made by RecoverPreparedTransactions() at the end of recovery
1991  * for those xacts that need this.
1992  */
1993 void
StandbyRecoverPreparedTransactions(void)1994 StandbyRecoverPreparedTransactions(void)
1995 {
1996 	int			i;
1997 
1998 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1999 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2000 	{
2001 		TransactionId xid;
2002 		char	   *buf;
2003 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2004 
2005 		Assert(gxact->inredo);
2006 
2007 		xid = gxact->xid;
2008 
2009 		buf = ProcessTwoPhaseBuffer(xid,
2010 									gxact->prepare_start_lsn,
2011 									gxact->ondisk, false, false);
2012 		if (buf != NULL)
2013 			pfree(buf);
2014 	}
2015 	LWLockRelease(TwoPhaseStateLock);
2016 }
2017 
2018 /*
2019  * RecoverPreparedTransactions
2020  *
2021  * Scan the shared memory entries of TwoPhaseState and reload the state for
2022  * each prepared transaction (reacquire locks, etc).
2023  *
2024  * This is run at the end of recovery, but before we allow backends to write
2025  * WAL.
2026  *
2027  * At the end of recovery the way we take snapshots will change. We now need
2028  * to mark all running transactions with their full SubTransSetParent() info
2029  * to allow normal snapshots to work correctly if snapshots overflow.
2030  * We do this here because by definition prepared transactions are the only
2031  * type of write transaction still running, so this is necessary and
2032  * complete.
2033  */
2034 void
RecoverPreparedTransactions(void)2035 RecoverPreparedTransactions(void)
2036 {
2037 	int			i;
2038 
2039 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2040 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2041 	{
2042 		TransactionId xid;
2043 		char	   *buf;
2044 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2045 		char	   *bufptr;
2046 		TwoPhaseFileHeader *hdr;
2047 		TransactionId *subxids;
2048 		const char *gid;
2049 
2050 		xid = gxact->xid;
2051 
2052 		/*
2053 		 * Reconstruct subtrans state for the transaction --- needed because
2054 		 * pg_subtrans is not preserved over a restart.  Note that we are
2055 		 * linking all the subtransactions directly to the top-level XID;
2056 		 * there may originally have been a more complex hierarchy, but
2057 		 * there's no need to restore that exactly. It's possible that
2058 		 * SubTransSetParent has been set before, if the prepared transaction
2059 		 * generated xid assignment records.
2060 		 */
2061 		buf = ProcessTwoPhaseBuffer(xid,
2062 									gxact->prepare_start_lsn,
2063 									gxact->ondisk, true, false);
2064 		if (buf == NULL)
2065 			continue;
2066 
2067 		ereport(LOG,
2068 				(errmsg("recovering prepared transaction %u from shared memory", xid)));
2069 
2070 		hdr = (TwoPhaseFileHeader *) buf;
2071 		Assert(TransactionIdEquals(hdr->xid, xid));
2072 		bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2073 		gid = (const char *) bufptr;
2074 		bufptr += MAXALIGN(hdr->gidlen);
2075 		subxids = (TransactionId *) bufptr;
2076 		bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2077 		bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2078 		bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2079 		bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2080 
2081 		/*
2082 		 * Recreate its GXACT and dummy PGPROC. But, check whether it was
2083 		 * added in redo and already has a shmem entry for it.
2084 		 */
2085 		MarkAsPreparingGuts(gxact, xid, gid,
2086 							hdr->prepared_at,
2087 							hdr->owner, hdr->database);
2088 
2089 		/* recovered, so reset the flag for entries generated by redo */
2090 		gxact->inredo = false;
2091 
2092 		GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2093 		MarkAsPrepared(gxact, true);
2094 
2095 		LWLockRelease(TwoPhaseStateLock);
2096 
2097 		/*
2098 		 * Recover other state (notably locks) using resource managers.
2099 		 */
2100 		ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2101 
2102 		/*
2103 		 * Release locks held by the standby process after we process each
2104 		 * prepared transaction. As a result, we don't need too many
2105 		 * additional locks at any one time.
2106 		 */
2107 		if (InHotStandby)
2108 			StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2109 
2110 		/*
2111 		 * We're done with recovering this transaction. Clear MyLockedGxact,
2112 		 * like we do in PrepareTransaction() during normal operation.
2113 		 */
2114 		PostPrepare_Twophase();
2115 
2116 		pfree(buf);
2117 
2118 		LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2119 	}
2120 
2121 	LWLockRelease(TwoPhaseStateLock);
2122 }
2123 
2124 /*
2125  * ProcessTwoPhaseBuffer
2126  *
2127  * Given a transaction id, read it either from disk or read it directly
2128  * via shmem xlog record pointer using the provided "prepare_start_lsn".
2129  *
2130  * If setParent is true, set up subtransaction parent linkages.
2131  *
2132  * If setNextXid is true, set ShmemVariableCache->nextFullXid to the newest
2133  * value scanned.
2134  */
2135 static char *
ProcessTwoPhaseBuffer(TransactionId xid,XLogRecPtr prepare_start_lsn,bool fromdisk,bool setParent,bool setNextXid)2136 ProcessTwoPhaseBuffer(TransactionId xid,
2137 					  XLogRecPtr prepare_start_lsn,
2138 					  bool fromdisk,
2139 					  bool setParent, bool setNextXid)
2140 {
2141 	FullTransactionId nextFullXid = ShmemVariableCache->nextFullXid;
2142 	TransactionId origNextXid = XidFromFullTransactionId(nextFullXid);
2143 	TransactionId *subxids;
2144 	char	   *buf;
2145 	TwoPhaseFileHeader *hdr;
2146 	int			i;
2147 
2148 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2149 
2150 	if (!fromdisk)
2151 		Assert(prepare_start_lsn != InvalidXLogRecPtr);
2152 
2153 	/* Already processed? */
2154 	if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2155 	{
2156 		if (fromdisk)
2157 		{
2158 			ereport(WARNING,
2159 					(errmsg("removing stale two-phase state file for transaction %u",
2160 							xid)));
2161 			RemoveTwoPhaseFile(xid, true);
2162 		}
2163 		else
2164 		{
2165 			ereport(WARNING,
2166 					(errmsg("removing stale two-phase state from memory for transaction %u",
2167 							xid)));
2168 			PrepareRedoRemove(xid, true);
2169 		}
2170 		return NULL;
2171 	}
2172 
2173 	/* Reject XID if too new */
2174 	if (TransactionIdFollowsOrEquals(xid, origNextXid))
2175 	{
2176 		if (fromdisk)
2177 		{
2178 			ereport(WARNING,
2179 					(errmsg("removing future two-phase state file for transaction %u",
2180 							xid)));
2181 			RemoveTwoPhaseFile(xid, true);
2182 		}
2183 		else
2184 		{
2185 			ereport(WARNING,
2186 					(errmsg("removing future two-phase state from memory for transaction %u",
2187 							xid)));
2188 			PrepareRedoRemove(xid, true);
2189 		}
2190 		return NULL;
2191 	}
2192 
2193 	if (fromdisk)
2194 	{
2195 		/* Read and validate file */
2196 		buf = ReadTwoPhaseFile(xid, false);
2197 	}
2198 	else
2199 	{
2200 		/* Read xlog data */
2201 		XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2202 	}
2203 
2204 	/* Deconstruct header */
2205 	hdr = (TwoPhaseFileHeader *) buf;
2206 	if (!TransactionIdEquals(hdr->xid, xid))
2207 	{
2208 		if (fromdisk)
2209 			ereport(ERROR,
2210 					(errcode(ERRCODE_DATA_CORRUPTED),
2211 					 errmsg("corrupted two-phase state file for transaction %u",
2212 							xid)));
2213 		else
2214 			ereport(ERROR,
2215 					(errcode(ERRCODE_DATA_CORRUPTED),
2216 					 errmsg("corrupted two-phase state in memory for transaction %u",
2217 							xid)));
2218 	}
2219 
2220 	/*
2221 	 * Examine subtransaction XIDs ... they should all follow main XID, and
2222 	 * they may force us to advance nextFullXid.
2223 	 */
2224 	subxids = (TransactionId *) (buf +
2225 								 MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2226 								 MAXALIGN(hdr->gidlen));
2227 	for (i = 0; i < hdr->nsubxacts; i++)
2228 	{
2229 		TransactionId subxid = subxids[i];
2230 
2231 		Assert(TransactionIdFollows(subxid, xid));
2232 
2233 		/* update nextFullXid if needed */
2234 		if (setNextXid)
2235 			AdvanceNextFullTransactionIdPastXid(subxid);
2236 
2237 		if (setParent)
2238 			SubTransSetParent(subxid, xid);
2239 	}
2240 
2241 	return buf;
2242 }
2243 
2244 
2245 /*
2246  *	RecordTransactionCommitPrepared
2247  *
2248  * This is basically the same as RecordTransactionCommit (q.v. if you change
2249  * this function): in particular, we must set the delayChkpt flag to avoid a
2250  * race condition.
2251  *
2252  * We know the transaction made at least one XLOG entry (its PREPARE),
2253  * so it is never possible to optimize out the commit record.
2254  */
2255 static void
RecordTransactionCommitPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels,int ninvalmsgs,SharedInvalidationMessage * invalmsgs,bool initfileinval,const char * gid)2256 RecordTransactionCommitPrepared(TransactionId xid,
2257 								int nchildren,
2258 								TransactionId *children,
2259 								int nrels,
2260 								RelFileNode *rels,
2261 								int ninvalmsgs,
2262 								SharedInvalidationMessage *invalmsgs,
2263 								bool initfileinval,
2264 								const char *gid)
2265 {
2266 	XLogRecPtr	recptr;
2267 	TimestampTz committs = GetCurrentTimestamp();
2268 	bool		replorigin;
2269 
2270 	/*
2271 	 * Are we using the replication origins feature?  Or, in other words, are
2272 	 * we replaying remote actions?
2273 	 */
2274 	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2275 				  replorigin_session_origin != DoNotReplicateId);
2276 
2277 	START_CRIT_SECTION();
2278 
2279 	/* See notes in RecordTransactionCommit */
2280 	MyProc->delayChkpt = true;
2281 
2282 	/*
2283 	 * Emit the XLOG commit record. Note that we mark 2PC commits as
2284 	 * potentially having AccessExclusiveLocks since we don't know whether or
2285 	 * not they do.
2286 	 */
2287 	recptr = XactLogCommitRecord(committs,
2288 								 nchildren, children, nrels, rels,
2289 								 ninvalmsgs, invalmsgs,
2290 								 initfileinval, false,
2291 								 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2292 								 xid, gid);
2293 
2294 
2295 	if (replorigin)
2296 		/* Move LSNs forward for this replication origin */
2297 		replorigin_session_advance(replorigin_session_origin_lsn,
2298 								   XactLastRecEnd);
2299 
2300 	/*
2301 	 * Record commit timestamp.  The value comes from plain commit timestamp
2302 	 * if replorigin is not enabled, or replorigin already set a value for us
2303 	 * in replorigin_session_origin_timestamp otherwise.
2304 	 *
2305 	 * We don't need to WAL-log anything here, as the commit record written
2306 	 * above already contains the data.
2307 	 */
2308 	if (!replorigin || replorigin_session_origin_timestamp == 0)
2309 		replorigin_session_origin_timestamp = committs;
2310 
2311 	TransactionTreeSetCommitTsData(xid, nchildren, children,
2312 								   replorigin_session_origin_timestamp,
2313 								   replorigin_session_origin, false);
2314 
2315 	/*
2316 	 * We don't currently try to sleep before flush here ... nor is there any
2317 	 * support for async commit of a prepared xact (the very idea is probably
2318 	 * a contradiction)
2319 	 */
2320 
2321 	/* Flush XLOG to disk */
2322 	XLogFlush(recptr);
2323 
2324 	/* Mark the transaction committed in pg_xact */
2325 	TransactionIdCommitTree(xid, nchildren, children);
2326 
2327 	/* Checkpoint can proceed now */
2328 	MyProc->delayChkpt = false;
2329 
2330 	END_CRIT_SECTION();
2331 
2332 	/*
2333 	 * Wait for synchronous replication, if required.
2334 	 *
2335 	 * Note that at this stage we have marked clog, but still show as running
2336 	 * in the procarray and continue to hold locks.
2337 	 */
2338 	SyncRepWaitForLSN(recptr, true);
2339 }
2340 
2341 /*
2342  *	RecordTransactionAbortPrepared
2343  *
2344  * This is basically the same as RecordTransactionAbort.
2345  *
2346  * We know the transaction made at least one XLOG entry (its PREPARE),
2347  * so it is never possible to optimize out the abort record.
2348  */
2349 static void
RecordTransactionAbortPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels,const char * gid)2350 RecordTransactionAbortPrepared(TransactionId xid,
2351 							   int nchildren,
2352 							   TransactionId *children,
2353 							   int nrels,
2354 							   RelFileNode *rels,
2355 							   const char *gid)
2356 {
2357 	XLogRecPtr	recptr;
2358 
2359 	/*
2360 	 * Catch the scenario where we aborted partway through
2361 	 * RecordTransactionCommitPrepared ...
2362 	 */
2363 	if (TransactionIdDidCommit(xid))
2364 		elog(PANIC, "cannot abort transaction %u, it was already committed",
2365 			 xid);
2366 
2367 	START_CRIT_SECTION();
2368 
2369 	/*
2370 	 * Emit the XLOG commit record. Note that we mark 2PC aborts as
2371 	 * potentially having AccessExclusiveLocks since we don't know whether or
2372 	 * not they do.
2373 	 */
2374 	recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2375 								nchildren, children,
2376 								nrels, rels,
2377 								MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2378 								xid, gid);
2379 
2380 	/* Always flush, since we're about to remove the 2PC state file */
2381 	XLogFlush(recptr);
2382 
2383 	/*
2384 	 * Mark the transaction aborted in clog.  This is not absolutely necessary
2385 	 * but we may as well do it while we are here.
2386 	 */
2387 	TransactionIdAbortTree(xid, nchildren, children);
2388 
2389 	END_CRIT_SECTION();
2390 
2391 	/*
2392 	 * Wait for synchronous replication, if required.
2393 	 *
2394 	 * Note that at this stage we have marked clog, but still show as running
2395 	 * in the procarray and continue to hold locks.
2396 	 */
2397 	SyncRepWaitForLSN(recptr, false);
2398 }
2399 
2400 /*
2401  * PrepareRedoAdd
2402  *
2403  * Store pointers to the start/end of the WAL record along with the xid in
2404  * a gxact entry in shared memory TwoPhaseState structure.  If caller
2405  * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2406  * data, the entry is marked as located on disk.
2407  */
2408 void
PrepareRedoAdd(char * buf,XLogRecPtr start_lsn,XLogRecPtr end_lsn,RepOriginId origin_id)2409 PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2410 			   XLogRecPtr end_lsn, RepOriginId origin_id)
2411 {
2412 	TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2413 	char	   *bufptr;
2414 	const char *gid;
2415 	GlobalTransaction gxact;
2416 
2417 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2418 	Assert(RecoveryInProgress());
2419 
2420 	bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2421 	gid = (const char *) bufptr;
2422 
2423 	/*
2424 	 * Reserve the GID for the given transaction in the redo code path.
2425 	 *
2426 	 * This creates a gxact struct and puts it into the active array.
2427 	 *
2428 	 * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2429 	 * shared memory. Hence, we only fill up the bare minimum contents here.
2430 	 * The gxact also gets marked with gxact->inredo set to true to indicate
2431 	 * that it got added in the redo phase
2432 	 */
2433 
2434 	/* Get a free gxact from the freelist */
2435 	if (TwoPhaseState->freeGXacts == NULL)
2436 		ereport(ERROR,
2437 				(errcode(ERRCODE_OUT_OF_MEMORY),
2438 				 errmsg("maximum number of prepared transactions reached"),
2439 				 errhint("Increase max_prepared_transactions (currently %d).",
2440 						 max_prepared_xacts)));
2441 	gxact = TwoPhaseState->freeGXacts;
2442 	TwoPhaseState->freeGXacts = gxact->next;
2443 
2444 	gxact->prepared_at = hdr->prepared_at;
2445 	gxact->prepare_start_lsn = start_lsn;
2446 	gxact->prepare_end_lsn = end_lsn;
2447 	gxact->xid = hdr->xid;
2448 	gxact->owner = hdr->owner;
2449 	gxact->locking_backend = InvalidBackendId;
2450 	gxact->valid = false;
2451 	gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2452 	gxact->inredo = true;		/* yes, added in redo */
2453 	strcpy(gxact->gid, gid);
2454 
2455 	/* And insert it into the active array */
2456 	Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2457 	TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2458 
2459 	if (origin_id != InvalidRepOriginId)
2460 	{
2461 		/* recover apply progress */
2462 		replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2463 						   false /* backward */ , false /* WAL */ );
2464 	}
2465 
2466 	elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2467 }
2468 
2469 /*
2470  * PrepareRedoRemove
2471  *
2472  * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2473  * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2474  *
2475  * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2476  * is updated.
2477  */
2478 void
PrepareRedoRemove(TransactionId xid,bool giveWarning)2479 PrepareRedoRemove(TransactionId xid, bool giveWarning)
2480 {
2481 	GlobalTransaction gxact = NULL;
2482 	int			i;
2483 	bool		found = false;
2484 
2485 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2486 	Assert(RecoveryInProgress());
2487 
2488 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2489 	{
2490 		gxact = TwoPhaseState->prepXacts[i];
2491 
2492 		if (gxact->xid == xid)
2493 		{
2494 			Assert(gxact->inredo);
2495 			found = true;
2496 			break;
2497 		}
2498 	}
2499 
2500 	/*
2501 	 * Just leave if there is nothing, this is expected during WAL replay.
2502 	 */
2503 	if (!found)
2504 		return;
2505 
2506 	/*
2507 	 * And now we can clean up any files we may have left.
2508 	 */
2509 	elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2510 	if (gxact->ondisk)
2511 		RemoveTwoPhaseFile(xid, giveWarning);
2512 	RemoveGXact(gxact);
2513 }
2514