1 /*-------------------------------------------------------------------------
2  *
3  * twophase.c
4  *		Two-phase commit support functions.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		src/backend/access/transam/twophase.c
11  *
12  * NOTES
13  *		Each global transaction is associated with a global transaction
14  *		identifier (GID). The client assigns a GID to a postgres
15  *		transaction with the PREPARE TRANSACTION command.
16  *
17  *		We keep all active global transactions in a shared memory array.
18  *		When the PREPARE TRANSACTION command is issued, the GID is
19  *		reserved for the transaction in the array. This is done before
20  *		a WAL entry is made, because the reservation checks for duplicate
21  *		GIDs and aborts the transaction if there already is a global
22  *		transaction in prepared state with the same GID.
23  *
24  *		A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
25  *		what keeps the XID considered running by TransactionIdIsInProgress.
26  *		It is also convenient as a PGPROC to hook the gxact's locks to.
27  *
28  *		Information to recover prepared transactions in case of crash is
29  *		now stored in WAL for the common case. In some cases there will be
30  *		an extended period between preparing a GXACT and commit/abort, in
31  *		which case we need to separately record prepared transaction data
32  *		in permanent storage. This includes locking information, pending
33  *		notifications etc. All that state information is written to the
34  *		per-transaction state file in the pg_twophase directory.
35  *		All prepared transactions will be written prior to shutdown.
36  *
37  *		Life track of state data is following:
38  *
39  *		* On PREPARE TRANSACTION backend writes state data only to the WAL and
40  *		  stores pointer to the start of the WAL record in
41  *		  gxact->prepare_start_lsn.
42  *		* If COMMIT occurs before checkpoint then backend reads data from WAL
43  *		  using prepare_start_lsn.
44  *		* On checkpoint state data copied to files in pg_twophase directory and
45  *		  fsynced
46  *		* If COMMIT happens after checkpoint then backend reads state data from
47  *		  files
48  *
49  *		During replay and replication, TwoPhaseState also holds information
50  *		about active prepared transactions that haven't been moved to disk yet.
51  *
52  *		Replay of twophase records happens by the following rules:
53  *
54  *		* At the beginning of recovery, pg_twophase is scanned once, filling
55  *		  TwoPhaseState with entries marked with gxact->inredo and
56  *		  gxact->ondisk.  Two-phase file data older than the XID horizon of
57  *		  the redo position are discarded.
58  *		* On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59  *		  gxact->inredo is set to true for such entries.
60  *		* On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61  *		  that have gxact->inredo set and are behind the redo_horizon. We
62  *		  save them to disk and then switch gxact->ondisk to true.
63  *		* On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64  *		  If gxact->ondisk is true, the corresponding entry from the disk
65  *		  is additionally deleted.
66  *		* RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67  *		  and PrescanPreparedTransactions() have been modified to go through
68  *		  gxact->inredo entries that have not made it to disk.
69  *
70  *-------------------------------------------------------------------------
71  */
72 #include "postgres.h"
73 
74 #include <fcntl.h>
75 #include <sys/stat.h>
76 #include <time.h>
77 #include <unistd.h>
78 
79 #include "access/commit_ts.h"
80 #include "access/htup_details.h"
81 #include "access/subtrans.h"
82 #include "access/transam.h"
83 #include "access/twophase.h"
84 #include "access/twophase_rmgr.h"
85 #include "access/xact.h"
86 #include "access/xlog.h"
87 #include "access/xloginsert.h"
88 #include "access/xlogutils.h"
89 #include "access/xlogreader.h"
90 #include "catalog/pg_type.h"
91 #include "catalog/storage.h"
92 #include "funcapi.h"
93 #include "miscadmin.h"
94 #include "pg_trace.h"
95 #include "pgstat.h"
96 #include "replication/origin.h"
97 #include "replication/syncrep.h"
98 #include "replication/walsender.h"
99 #include "storage/fd.h"
100 #include "storage/ipc.h"
101 #include "storage/predicate.h"
102 #include "storage/proc.h"
103 #include "storage/procarray.h"
104 #include "storage/sinvaladt.h"
105 #include "storage/smgr.h"
106 #include "utils/builtins.h"
107 #include "utils/memutils.h"
108 #include "utils/timestamp.h"
109 
110 
111 /*
112  * Directory where Two-phase commit files reside within PGDATA
113  */
114 #define TWOPHASE_DIR "pg_twophase"
115 
116 /* GUC variable, can't be changed after startup */
117 int			max_prepared_xacts = 0;
118 
119 /*
120  * This struct describes one global transaction that is in prepared state
121  * or attempting to become prepared.
122  *
123  * The lifecycle of a global transaction is:
124  *
125  * 1. After checking that the requested GID is not in use, set up an entry in
126  * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127  * and mark it as locked by my backend.
128  *
129  * 2. After successfully completing prepare, set valid = true and enter the
130  * referenced PGPROC into the global ProcArray.
131  *
132  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133  * valid and not locked, then mark the entry as locked by storing my current
134  * backend ID into locking_backend.  This prevents concurrent attempts to
135  * commit or rollback the same prepared xact.
136  *
137  * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138  * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
139  * the freelist.
140  *
141  * Note that if the preparing transaction fails between steps 1 and 2, the
142  * entry must be removed so that the GID and the GlobalTransaction struct
143  * can be reused.  See AtAbort_Twophase().
144  *
145  * typedef struct GlobalTransactionData *GlobalTransaction appears in
146  * twophase.h
147  *
148  * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
149  * specified in TwoPhaseFileHeader.
150  */
151 #define GIDSIZE 200
152 
153 typedef struct GlobalTransactionData
154 {
155 	GlobalTransaction next;		/* list link for free list */
156 	int			pgprocno;		/* ID of associated dummy PGPROC */
157 	BackendId	dummyBackendId; /* similar to backend id for backends */
158 	TimestampTz prepared_at;	/* time of preparation */
159 
160 	/*
161 	 * Note that we need to keep track of two LSNs for each GXACT. We keep
162 	 * track of the start LSN because this is the address we must use to read
163 	 * state data back from WAL when committing a prepared GXACT. We keep
164 	 * track of the end LSN because that is the LSN we need to wait for prior
165 	 * to commit.
166 	 */
167 	XLogRecPtr	prepare_start_lsn;	/* XLOG offset of prepare record start */
168 	XLogRecPtr	prepare_end_lsn;	/* XLOG offset of prepare record end */
169 	TransactionId xid;			/* The GXACT id */
170 
171 	Oid			owner;			/* ID of user that executed the xact */
172 	BackendId	locking_backend;	/* backend currently working on the xact */
173 	bool		valid;			/* TRUE if PGPROC entry is in proc array */
174 	bool		ondisk;			/* TRUE if prepare state file is on disk */
175 	bool		inredo;			/* TRUE if entry was added via xlog_redo */
176 	char		gid[GIDSIZE];	/* The GID assigned to the prepared xact */
177 }			GlobalTransactionData;
178 
179 /*
180  * Two Phase Commit shared state.  Access to this struct is protected
181  * by TwoPhaseStateLock.
182  */
183 typedef struct TwoPhaseStateData
184 {
185 	/* Head of linked list of free GlobalTransactionData structs */
186 	GlobalTransaction freeGXacts;
187 
188 	/* Number of valid prepXacts entries. */
189 	int			numPrepXacts;
190 
191 	/* There are max_prepared_xacts items in this array */
192 	GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
193 } TwoPhaseStateData;
194 
195 static TwoPhaseStateData *TwoPhaseState;
196 
197 /*
198  * Global transaction entry currently locked by us, if any.  Note that any
199  * access to the entry pointed to by this variable must be protected by
200  * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
201  * (since it's just local memory).
202  */
203 static GlobalTransaction MyLockedGxact = NULL;
204 
205 static bool twophaseExitRegistered = false;
206 
207 static void RecordTransactionCommitPrepared(TransactionId xid,
208 								int nchildren,
209 								TransactionId *children,
210 								int nrels,
211 								RelFileNode *rels,
212 								int ninvalmsgs,
213 								SharedInvalidationMessage *invalmsgs,
214 								bool initfileinval);
215 static void RecordTransactionAbortPrepared(TransactionId xid,
216 							   int nchildren,
217 							   TransactionId *children,
218 							   int nrels,
219 							   RelFileNode *rels);
220 static void ProcessRecords(char *bufptr, TransactionId xid,
221 			   const TwoPhaseCallback callbacks[]);
222 static void RemoveGXact(GlobalTransaction gxact);
223 
224 static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
225 static char *ProcessTwoPhaseBuffer(TransactionId xid,
226 					  XLogRecPtr prepare_start_lsn,
227 					  bool fromdisk, bool setParent, bool setNextXid);
228 static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
229 					const char *gid, TimestampTz prepared_at, Oid owner,
230 					Oid databaseid);
231 static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
232 static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
233 
234 /*
235  * Initialization of shared memory
236  */
237 Size
TwoPhaseShmemSize(void)238 TwoPhaseShmemSize(void)
239 {
240 	Size		size;
241 
242 	/* Need the fixed struct, the array of pointers, and the GTD structs */
243 	size = offsetof(TwoPhaseStateData, prepXacts);
244 	size = add_size(size, mul_size(max_prepared_xacts,
245 								   sizeof(GlobalTransaction)));
246 	size = MAXALIGN(size);
247 	size = add_size(size, mul_size(max_prepared_xacts,
248 								   sizeof(GlobalTransactionData)));
249 
250 	return size;
251 }
252 
253 void
TwoPhaseShmemInit(void)254 TwoPhaseShmemInit(void)
255 {
256 	bool		found;
257 
258 	TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
259 									TwoPhaseShmemSize(),
260 									&found);
261 	if (!IsUnderPostmaster)
262 	{
263 		GlobalTransaction gxacts;
264 		int			i;
265 
266 		Assert(!found);
267 		TwoPhaseState->freeGXacts = NULL;
268 		TwoPhaseState->numPrepXacts = 0;
269 
270 		/*
271 		 * Initialize the linked list of free GlobalTransactionData structs
272 		 */
273 		gxacts = (GlobalTransaction)
274 			((char *) TwoPhaseState +
275 			 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
276 					  sizeof(GlobalTransaction) * max_prepared_xacts));
277 		for (i = 0; i < max_prepared_xacts; i++)
278 		{
279 			/* insert into linked list */
280 			gxacts[i].next = TwoPhaseState->freeGXacts;
281 			TwoPhaseState->freeGXacts = &gxacts[i];
282 
283 			/* associate it with a PGPROC assigned by InitProcGlobal */
284 			gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
285 
286 			/*
287 			 * Assign a unique ID for each dummy proc, so that the range of
288 			 * dummy backend IDs immediately follows the range of normal
289 			 * backend IDs. We don't dare to assign a real backend ID to dummy
290 			 * procs, because prepared transactions don't take part in cache
291 			 * invalidation like a real backend ID would imply, but having a
292 			 * unique ID for them is nevertheless handy. This arrangement
293 			 * allows you to allocate an array of size (MaxBackends +
294 			 * max_prepared_xacts + 1), and have a slot for every backend and
295 			 * prepared transaction. Currently multixact.c uses that
296 			 * technique.
297 			 */
298 			gxacts[i].dummyBackendId = MaxBackends + 1 + i;
299 		}
300 	}
301 	else
302 		Assert(found);
303 }
304 
305 /*
306  * Exit hook to unlock the global transaction entry we're working on.
307  */
308 static void
AtProcExit_Twophase(int code,Datum arg)309 AtProcExit_Twophase(int code, Datum arg)
310 {
311 	/* same logic as abort */
312 	AtAbort_Twophase();
313 }
314 
315 /*
316  * Abort hook to unlock the global transaction entry we're working on.
317  */
318 void
AtAbort_Twophase(void)319 AtAbort_Twophase(void)
320 {
321 	if (MyLockedGxact == NULL)
322 		return;
323 
324 	/*
325 	 * What to do with the locked global transaction entry?  If we were in the
326 	 * process of preparing the transaction, but haven't written the WAL
327 	 * record and state file yet, the transaction must not be considered as
328 	 * prepared.  Likewise, if we are in the process of finishing an
329 	 * already-prepared transaction, and fail after having already written the
330 	 * 2nd phase commit or rollback record to the WAL, the transaction should
331 	 * not be considered as prepared anymore.  In those cases, just remove the
332 	 * entry from shared memory.
333 	 *
334 	 * Otherwise, the entry must be left in place so that the transaction can
335 	 * be finished later, so just unlock it.
336 	 *
337 	 * If we abort during prepare, after having written the WAL record, we
338 	 * might not have transferred all locks and other state to the prepared
339 	 * transaction yet.  Likewise, if we abort during commit or rollback,
340 	 * after having written the WAL record, we might not have released all the
341 	 * resources held by the transaction yet.  In those cases, the in-memory
342 	 * state can be wrong, but it's too late to back out.
343 	 */
344 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
345 	if (!MyLockedGxact->valid)
346 		RemoveGXact(MyLockedGxact);
347 	else
348 		MyLockedGxact->locking_backend = InvalidBackendId;
349 	LWLockRelease(TwoPhaseStateLock);
350 
351 	MyLockedGxact = NULL;
352 }
353 
354 /*
355  * This is called after we have finished transferring state to the prepared
356  * PGXACT entry.
357  */
358 void
PostPrepare_Twophase(void)359 PostPrepare_Twophase(void)
360 {
361 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
362 	MyLockedGxact->locking_backend = InvalidBackendId;
363 	LWLockRelease(TwoPhaseStateLock);
364 
365 	MyLockedGxact = NULL;
366 }
367 
368 
369 /*
370  * MarkAsPreparing
371  *		Reserve the GID for the given transaction.
372  */
373 GlobalTransaction
MarkAsPreparing(TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)374 MarkAsPreparing(TransactionId xid, const char *gid,
375 				TimestampTz prepared_at, Oid owner, Oid databaseid)
376 {
377 	GlobalTransaction gxact;
378 	int			i;
379 
380 	if (strlen(gid) >= GIDSIZE)
381 		ereport(ERROR,
382 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
383 				 errmsg("transaction identifier \"%s\" is too long",
384 						gid)));
385 
386 	/* fail immediately if feature is disabled */
387 	if (max_prepared_xacts == 0)
388 		ereport(ERROR,
389 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
390 				 errmsg("prepared transactions are disabled"),
391 				 errhint("Set max_prepared_transactions to a nonzero value.")));
392 
393 	/* on first call, register the exit hook */
394 	if (!twophaseExitRegistered)
395 	{
396 		before_shmem_exit(AtProcExit_Twophase, 0);
397 		twophaseExitRegistered = true;
398 	}
399 
400 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
401 
402 	/* Check for conflicting GID */
403 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
404 	{
405 		gxact = TwoPhaseState->prepXacts[i];
406 		if (strcmp(gxact->gid, gid) == 0)
407 		{
408 			ereport(ERROR,
409 					(errcode(ERRCODE_DUPLICATE_OBJECT),
410 					 errmsg("transaction identifier \"%s\" is already in use",
411 							gid)));
412 		}
413 	}
414 
415 	/* Get a free gxact from the freelist */
416 	if (TwoPhaseState->freeGXacts == NULL)
417 		ereport(ERROR,
418 				(errcode(ERRCODE_OUT_OF_MEMORY),
419 				 errmsg("maximum number of prepared transactions reached"),
420 				 errhint("Increase max_prepared_transactions (currently %d).",
421 						 max_prepared_xacts)));
422 	gxact = TwoPhaseState->freeGXacts;
423 	TwoPhaseState->freeGXacts = gxact->next;
424 
425 	MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
426 
427 	gxact->ondisk = false;
428 
429 	/* And insert it into the active array */
430 	Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
431 	TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
432 
433 	LWLockRelease(TwoPhaseStateLock);
434 
435 	return gxact;
436 }
437 
438 /*
439  * MarkAsPreparingGuts
440  *
441  * This uses a gxact struct and puts it into the active array.
442  * NOTE: this is also used when reloading a gxact after a crash; so avoid
443  * assuming that we can use very much backend context.
444  *
445  * Note: This function should be called with appropriate locks held.
446  */
447 static void
MarkAsPreparingGuts(GlobalTransaction gxact,TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)448 MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
449 					TimestampTz prepared_at, Oid owner, Oid databaseid)
450 {
451 	PGPROC	   *proc;
452 	PGXACT	   *pgxact;
453 	int			i;
454 
455 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
456 
457 	Assert(gxact != NULL);
458 	proc = &ProcGlobal->allProcs[gxact->pgprocno];
459 	pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
460 
461 	/* Initialize the PGPROC entry */
462 	MemSet(proc, 0, sizeof(PGPROC));
463 	proc->pgprocno = gxact->pgprocno;
464 	SHMQueueElemInit(&(proc->links));
465 	proc->waitStatus = STATUS_OK;
466 	if (LocalTransactionIdIsValid(MyProc->lxid))
467 	{
468 		/* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
469 		proc->lxid = MyProc->lxid;
470 		proc->backendId = MyBackendId;
471 	}
472 	else
473 	{
474 		Assert(AmStartupProcess() || !IsPostmasterEnvironment);
475 		/* GetLockConflicts() uses this to specify a wait on the XID */
476 		proc->lxid = xid;
477 		proc->backendId = InvalidBackendId;
478 	}
479 	pgxact->xid = xid;
480 	pgxact->xmin = InvalidTransactionId;
481 	pgxact->delayChkpt = false;
482 	pgxact->vacuumFlags = 0;
483 	proc->pid = 0;
484 	proc->databaseId = databaseid;
485 	proc->roleId = owner;
486 	proc->isBackgroundWorker = false;
487 	proc->lwWaiting = false;
488 	proc->lwWaitMode = 0;
489 	proc->waitLock = NULL;
490 	proc->waitProcLock = NULL;
491 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
492 		SHMQueueInit(&(proc->myProcLocks[i]));
493 	/* subxid data must be filled later by GXactLoadSubxactData */
494 	pgxact->overflowed = false;
495 	pgxact->nxids = 0;
496 
497 	gxact->prepared_at = prepared_at;
498 	gxact->xid = xid;
499 	gxact->owner = owner;
500 	gxact->locking_backend = MyBackendId;
501 	gxact->valid = false;
502 	gxact->inredo = false;
503 	strcpy(gxact->gid, gid);
504 
505 	/*
506 	 * Remember that we have this GlobalTransaction entry locked for us. If we
507 	 * abort after this, we must release it.
508 	 */
509 	MyLockedGxact = gxact;
510 }
511 
512 /*
513  * GXactLoadSubxactData
514  *
515  * If the transaction being persisted had any subtransactions, this must
516  * be called before MarkAsPrepared() to load information into the dummy
517  * PGPROC.
518  */
519 static void
GXactLoadSubxactData(GlobalTransaction gxact,int nsubxacts,TransactionId * children)520 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
521 					 TransactionId *children)
522 {
523 	PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
524 	PGXACT	   *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
525 
526 	/* We need no extra lock since the GXACT isn't valid yet */
527 	if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
528 	{
529 		pgxact->overflowed = true;
530 		nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
531 	}
532 	if (nsubxacts > 0)
533 	{
534 		memcpy(proc->subxids.xids, children,
535 			   nsubxacts * sizeof(TransactionId));
536 		pgxact->nxids = nsubxacts;
537 	}
538 }
539 
540 /*
541  * MarkAsPrepared
542  *		Mark the GXACT as fully valid, and enter it into the global ProcArray.
543  *
544  * lock_held indicates whether caller already holds TwoPhaseStateLock.
545  */
546 static void
MarkAsPrepared(GlobalTransaction gxact,bool lock_held)547 MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
548 {
549 	/* Lock here may be overkill, but I'm not convinced of that ... */
550 	if (!lock_held)
551 		LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
552 	Assert(!gxact->valid);
553 	gxact->valid = true;
554 	if (!lock_held)
555 		LWLockRelease(TwoPhaseStateLock);
556 
557 	/*
558 	 * Put it into the global ProcArray so TransactionIdIsInProgress considers
559 	 * the XID as still running.
560 	 */
561 	ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
562 }
563 
564 /*
565  * LockGXact
566  *		Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
567  */
568 static GlobalTransaction
LockGXact(const char * gid,Oid user)569 LockGXact(const char *gid, Oid user)
570 {
571 	int			i;
572 
573 	/* on first call, register the exit hook */
574 	if (!twophaseExitRegistered)
575 	{
576 		before_shmem_exit(AtProcExit_Twophase, 0);
577 		twophaseExitRegistered = true;
578 	}
579 
580 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
581 
582 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
583 	{
584 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
585 		PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
586 
587 		/* Ignore not-yet-valid GIDs */
588 		if (!gxact->valid)
589 			continue;
590 		if (strcmp(gxact->gid, gid) != 0)
591 			continue;
592 
593 		/* Found it, but has someone else got it locked? */
594 		if (gxact->locking_backend != InvalidBackendId)
595 			ereport(ERROR,
596 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
597 					 errmsg("prepared transaction with identifier \"%s\" is busy",
598 							gid)));
599 
600 		if (user != gxact->owner && !superuser_arg(user))
601 			ereport(ERROR,
602 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
603 					 errmsg("permission denied to finish prepared transaction"),
604 					 errhint("Must be superuser or the user that prepared the transaction.")));
605 
606 		/*
607 		 * Note: it probably would be possible to allow committing from
608 		 * another database; but at the moment NOTIFY is known not to work and
609 		 * there may be some other issues as well.  Hence disallow until
610 		 * someone gets motivated to make it work.
611 		 */
612 		if (MyDatabaseId != proc->databaseId)
613 			ereport(ERROR,
614 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
615 					 errmsg("prepared transaction belongs to another database"),
616 					 errhint("Connect to the database where the transaction was prepared to finish it.")));
617 
618 		/* OK for me to lock it */
619 		gxact->locking_backend = MyBackendId;
620 		MyLockedGxact = gxact;
621 
622 		LWLockRelease(TwoPhaseStateLock);
623 
624 		return gxact;
625 	}
626 
627 	LWLockRelease(TwoPhaseStateLock);
628 
629 	ereport(ERROR,
630 			(errcode(ERRCODE_UNDEFINED_OBJECT),
631 			 errmsg("prepared transaction with identifier \"%s\" does not exist",
632 					gid)));
633 
634 	/* NOTREACHED */
635 	return NULL;
636 }
637 
638 /*
639  * RemoveGXact
640  *		Remove the prepared transaction from the shared memory array.
641  *
642  * NB: caller should have already removed it from ProcArray
643  */
644 static void
RemoveGXact(GlobalTransaction gxact)645 RemoveGXact(GlobalTransaction gxact)
646 {
647 	int			i;
648 
649 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
650 
651 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
652 	{
653 		if (gxact == TwoPhaseState->prepXacts[i])
654 		{
655 			/* remove from the active array */
656 			TwoPhaseState->numPrepXacts--;
657 			TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
658 
659 			/* and put it back in the freelist */
660 			gxact->next = TwoPhaseState->freeGXacts;
661 			TwoPhaseState->freeGXacts = gxact;
662 
663 			return;
664 		}
665 	}
666 
667 	elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
668 }
669 
670 /*
671  * Returns an array of all prepared transactions for the user-level
672  * function pg_prepared_xact.
673  *
674  * The returned array and all its elements are copies of internal data
675  * structures, to minimize the time we need to hold the TwoPhaseStateLock.
676  *
677  * WARNING -- we return even those transactions that are not fully prepared
678  * yet.  The caller should filter them out if he doesn't want them.
679  *
680  * The returned array is palloc'd.
681  */
682 static int
GetPreparedTransactionList(GlobalTransaction * gxacts)683 GetPreparedTransactionList(GlobalTransaction *gxacts)
684 {
685 	GlobalTransaction array;
686 	int			num;
687 	int			i;
688 
689 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
690 
691 	if (TwoPhaseState->numPrepXacts == 0)
692 	{
693 		LWLockRelease(TwoPhaseStateLock);
694 
695 		*gxacts = NULL;
696 		return 0;
697 	}
698 
699 	num = TwoPhaseState->numPrepXacts;
700 	array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
701 	*gxacts = array;
702 	for (i = 0; i < num; i++)
703 		memcpy(array + i, TwoPhaseState->prepXacts[i],
704 			   sizeof(GlobalTransactionData));
705 
706 	LWLockRelease(TwoPhaseStateLock);
707 
708 	return num;
709 }
710 
711 
712 /* Working status for pg_prepared_xact */
713 typedef struct
714 {
715 	GlobalTransaction array;
716 	int			ngxacts;
717 	int			currIdx;
718 } Working_State;
719 
720 /*
721  * pg_prepared_xact
722  *		Produce a view with one row per prepared transaction.
723  *
724  * This function is here so we don't have to export the
725  * GlobalTransactionData struct definition.
726  */
727 Datum
pg_prepared_xact(PG_FUNCTION_ARGS)728 pg_prepared_xact(PG_FUNCTION_ARGS)
729 {
730 	FuncCallContext *funcctx;
731 	Working_State *status;
732 
733 	if (SRF_IS_FIRSTCALL())
734 	{
735 		TupleDesc	tupdesc;
736 		MemoryContext oldcontext;
737 
738 		/* create a function context for cross-call persistence */
739 		funcctx = SRF_FIRSTCALL_INIT();
740 
741 		/*
742 		 * Switch to memory context appropriate for multiple function calls
743 		 */
744 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
745 
746 		/* build tupdesc for result tuples */
747 		/* this had better match pg_prepared_xacts view in system_views.sql */
748 		tupdesc = CreateTemplateTupleDesc(5, false);
749 		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
750 						   XIDOID, -1, 0);
751 		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
752 						   TEXTOID, -1, 0);
753 		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
754 						   TIMESTAMPTZOID, -1, 0);
755 		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
756 						   OIDOID, -1, 0);
757 		TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
758 						   OIDOID, -1, 0);
759 
760 		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
761 
762 		/*
763 		 * Collect all the 2PC status information that we will format and send
764 		 * out as a result set.
765 		 */
766 		status = (Working_State *) palloc(sizeof(Working_State));
767 		funcctx->user_fctx = (void *) status;
768 
769 		status->ngxacts = GetPreparedTransactionList(&status->array);
770 		status->currIdx = 0;
771 
772 		MemoryContextSwitchTo(oldcontext);
773 	}
774 
775 	funcctx = SRF_PERCALL_SETUP();
776 	status = (Working_State *) funcctx->user_fctx;
777 
778 	while (status->array != NULL && status->currIdx < status->ngxacts)
779 	{
780 		GlobalTransaction gxact = &status->array[status->currIdx++];
781 		PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
782 		PGXACT	   *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
783 		Datum		values[5];
784 		bool		nulls[5];
785 		HeapTuple	tuple;
786 		Datum		result;
787 
788 		if (!gxact->valid)
789 			continue;
790 
791 		/*
792 		 * Form tuple with appropriate data.
793 		 */
794 		MemSet(values, 0, sizeof(values));
795 		MemSet(nulls, 0, sizeof(nulls));
796 
797 		values[0] = TransactionIdGetDatum(pgxact->xid);
798 		values[1] = CStringGetTextDatum(gxact->gid);
799 		values[2] = TimestampTzGetDatum(gxact->prepared_at);
800 		values[3] = ObjectIdGetDatum(gxact->owner);
801 		values[4] = ObjectIdGetDatum(proc->databaseId);
802 
803 		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
804 		result = HeapTupleGetDatum(tuple);
805 		SRF_RETURN_NEXT(funcctx, result);
806 	}
807 
808 	SRF_RETURN_DONE(funcctx);
809 }
810 
811 /*
812  * TwoPhaseGetGXact
813  *		Get the GlobalTransaction struct for a prepared transaction
814  *		specified by XID
815  */
816 static GlobalTransaction
TwoPhaseGetGXact(TransactionId xid)817 TwoPhaseGetGXact(TransactionId xid)
818 {
819 	GlobalTransaction result = NULL;
820 	int			i;
821 
822 	static TransactionId cached_xid = InvalidTransactionId;
823 	static GlobalTransaction cached_gxact = NULL;
824 
825 	/*
826 	 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
827 	 * repeatedly for the same XID.  We can save work with a simple cache.
828 	 */
829 	if (xid == cached_xid)
830 		return cached_gxact;
831 
832 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
833 
834 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
835 	{
836 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
837 		PGXACT	   *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
838 
839 		if (pgxact->xid == xid)
840 		{
841 			result = gxact;
842 			break;
843 		}
844 	}
845 
846 	LWLockRelease(TwoPhaseStateLock);
847 
848 	if (result == NULL)			/* should not happen */
849 		elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
850 
851 	cached_xid = xid;
852 	cached_gxact = result;
853 
854 	return result;
855 }
856 
857 /*
858  * TwoPhaseGetXidByVirtualXID
859  *		Lookup VXID among xacts prepared since last startup.
860  *
861  * (This won't find recovered xacts.)  If more than one matches, return any
862  * and set "have_more" to true.  To witness multiple matches, a single
863  * BackendId must consume 2^32 LXIDs, with no intervening database restart.
864  */
865 TransactionId
TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,bool * have_more)866 TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
867 						   bool *have_more)
868 {
869 	int			i;
870 	TransactionId result = InvalidTransactionId;
871 
872 	Assert(VirtualTransactionIdIsValid(vxid));
873 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
874 
875 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
876 	{
877 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
878 		PGPROC	   *proc;
879 		VirtualTransactionId proc_vxid;
880 
881 		if (!gxact->valid)
882 			continue;
883 		proc = &ProcGlobal->allProcs[gxact->pgprocno];
884 		GET_VXID_FROM_PGPROC(proc_vxid, *proc);
885 		if (VirtualTransactionIdEquals(vxid, proc_vxid))
886 		{
887 			/* Startup process sets proc->backendId to InvalidBackendId. */
888 			Assert(!gxact->inredo);
889 
890 			if (result != InvalidTransactionId)
891 			{
892 				*have_more = true;
893 				break;
894 			}
895 			result = gxact->xid;
896 		}
897 	}
898 
899 	LWLockRelease(TwoPhaseStateLock);
900 
901 	return result;
902 }
903 
904 /*
905  * TwoPhaseGetDummyProc
906  *		Get the dummy backend ID for prepared transaction specified by XID
907  *
908  * Dummy backend IDs are similar to real backend IDs of real backends.
909  * They start at MaxBackends + 1, and are unique across all currently active
910  * real backends and prepared transactions.
911  */
912 BackendId
TwoPhaseGetDummyBackendId(TransactionId xid)913 TwoPhaseGetDummyBackendId(TransactionId xid)
914 {
915 	GlobalTransaction gxact = TwoPhaseGetGXact(xid);
916 
917 	return gxact->dummyBackendId;
918 }
919 
920 /*
921  * TwoPhaseGetDummyProc
922  *		Get the PGPROC that represents a prepared transaction specified by XID
923  */
924 PGPROC *
TwoPhaseGetDummyProc(TransactionId xid)925 TwoPhaseGetDummyProc(TransactionId xid)
926 {
927 	GlobalTransaction gxact = TwoPhaseGetGXact(xid);
928 
929 	return &ProcGlobal->allProcs[gxact->pgprocno];
930 }
931 
932 /************************************************************************/
933 /* State file support													*/
934 /************************************************************************/
935 
936 #define TwoPhaseFilePath(path, xid) \
937 	snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
938 
939 /*
940  * 2PC state file format:
941  *
942  *	1. TwoPhaseFileHeader
943  *	2. TransactionId[] (subtransactions)
944  *	3. RelFileNode[] (files to be deleted at commit)
945  *	4. RelFileNode[] (files to be deleted at abort)
946  *	5. SharedInvalidationMessage[] (inval messages to be sent at commit)
947  *	6. TwoPhaseRecordOnDisk
948  *	7. ...
949  *	8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
950  *	9. checksum (CRC-32C)
951  *
952  * Each segment except the final checksum is MAXALIGN'd.
953  */
954 
955 /*
956  * Header for a 2PC state file
957  */
958 #define TWOPHASE_MAGIC	0x57F94533	/* format identifier */
959 
960 typedef struct TwoPhaseFileHeader
961 {
962 	uint32		magic;			/* format identifier */
963 	uint32		total_len;		/* actual file length */
964 	TransactionId xid;			/* original transaction XID */
965 	Oid			database;		/* OID of database it was in */
966 	TimestampTz prepared_at;	/* time of preparation */
967 	Oid			owner;			/* user running the transaction */
968 	int32		nsubxacts;		/* number of following subxact XIDs */
969 	int32		ncommitrels;	/* number of delete-on-commit rels */
970 	int32		nabortrels;		/* number of delete-on-abort rels */
971 	int32		ninvalmsgs;		/* number of cache invalidation messages */
972 	bool		initfileinval;	/* does relcache init file need invalidation? */
973 	uint16		gidlen;			/* length of the GID - GID follows the header */
974 } TwoPhaseFileHeader;
975 
976 /*
977  * Header for each record in a state file
978  *
979  * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
980  * The rmgr data will be stored starting on a MAXALIGN boundary.
981  */
982 typedef struct TwoPhaseRecordOnDisk
983 {
984 	uint32		len;			/* length of rmgr data */
985 	TwoPhaseRmgrId rmid;		/* resource manager for this record */
986 	uint16		info;			/* flag bits for use by rmgr */
987 } TwoPhaseRecordOnDisk;
988 
989 /*
990  * During prepare, the state file is assembled in memory before writing it
991  * to WAL and the actual state file.  We use a chain of StateFileChunk blocks
992  * for that.
993  */
994 typedef struct StateFileChunk
995 {
996 	char	   *data;
997 	uint32		len;
998 	struct StateFileChunk *next;
999 } StateFileChunk;
1000 
1001 static struct xllist
1002 {
1003 	StateFileChunk *head;		/* first data block in the chain */
1004 	StateFileChunk *tail;		/* last block in chain */
1005 	uint32		num_chunks;
1006 	uint32		bytes_free;		/* free bytes left in tail block */
1007 	uint32		total_len;		/* total data bytes in chain */
1008 }			records;
1009 
1010 
1011 /*
1012  * Append a block of data to records data structure.
1013  *
1014  * NB: each block is padded to a MAXALIGN multiple.  This must be
1015  * accounted for when the file is later read!
1016  *
1017  * The data is copied, so the caller is free to modify it afterwards.
1018  */
1019 static void
save_state_data(const void * data,uint32 len)1020 save_state_data(const void *data, uint32 len)
1021 {
1022 	uint32		padlen = MAXALIGN(len);
1023 
1024 	if (padlen > records.bytes_free)
1025 	{
1026 		records.tail->next = palloc0(sizeof(StateFileChunk));
1027 		records.tail = records.tail->next;
1028 		records.tail->len = 0;
1029 		records.tail->next = NULL;
1030 		records.num_chunks++;
1031 
1032 		records.bytes_free = Max(padlen, 512);
1033 		records.tail->data = palloc(records.bytes_free);
1034 	}
1035 
1036 	memcpy(((char *) records.tail->data) + records.tail->len, data, len);
1037 	records.tail->len += padlen;
1038 	records.bytes_free -= padlen;
1039 	records.total_len += padlen;
1040 }
1041 
1042 /*
1043  * Start preparing a state file.
1044  *
1045  * Initializes data structure and inserts the 2PC file header record.
1046  */
1047 void
StartPrepare(GlobalTransaction gxact)1048 StartPrepare(GlobalTransaction gxact)
1049 {
1050 	PGPROC	   *proc = &ProcGlobal->allProcs[gxact->pgprocno];
1051 	PGXACT	   *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1052 	TransactionId xid = pgxact->xid;
1053 	TwoPhaseFileHeader hdr;
1054 	TransactionId *children;
1055 	RelFileNode *commitrels;
1056 	RelFileNode *abortrels;
1057 	SharedInvalidationMessage *invalmsgs;
1058 
1059 	/* Initialize linked list */
1060 	records.head = palloc0(sizeof(StateFileChunk));
1061 	records.head->len = 0;
1062 	records.head->next = NULL;
1063 
1064 	records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1065 	records.head->data = palloc(records.bytes_free);
1066 
1067 	records.tail = records.head;
1068 	records.num_chunks = 1;
1069 
1070 	records.total_len = 0;
1071 
1072 	/* Create header */
1073 	hdr.magic = TWOPHASE_MAGIC;
1074 	hdr.total_len = 0;			/* EndPrepare will fill this in */
1075 	hdr.xid = xid;
1076 	hdr.database = proc->databaseId;
1077 	hdr.prepared_at = gxact->prepared_at;
1078 	hdr.owner = gxact->owner;
1079 	hdr.nsubxacts = xactGetCommittedChildren(&children);
1080 	hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1081 	hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1082 	hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1083 														  &hdr.initfileinval);
1084 	hdr.gidlen = strlen(gxact->gid) + 1;	/* Include '\0' */
1085 
1086 	save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1087 	save_state_data(gxact->gid, hdr.gidlen);
1088 
1089 	/*
1090 	 * Add the additional info about subxacts, deletable files and cache
1091 	 * invalidation messages.
1092 	 */
1093 	if (hdr.nsubxacts > 0)
1094 	{
1095 		save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1096 		/* While we have the child-xact data, stuff it in the gxact too */
1097 		GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1098 	}
1099 	if (hdr.ncommitrels > 0)
1100 	{
1101 		save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
1102 		pfree(commitrels);
1103 	}
1104 	if (hdr.nabortrels > 0)
1105 	{
1106 		save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
1107 		pfree(abortrels);
1108 	}
1109 	if (hdr.ninvalmsgs > 0)
1110 	{
1111 		save_state_data(invalmsgs,
1112 						hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1113 		pfree(invalmsgs);
1114 	}
1115 }
1116 
1117 /*
1118  * Finish preparing state data and writing it to WAL.
1119  */
1120 void
EndPrepare(GlobalTransaction gxact)1121 EndPrepare(GlobalTransaction gxact)
1122 {
1123 	TwoPhaseFileHeader *hdr;
1124 	StateFileChunk *record;
1125 
1126 	/* Add the end sentinel to the list of 2PC records */
1127 	RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1128 						   NULL, 0);
1129 
1130 	/* Go back and fill in total_len in the file header record */
1131 	hdr = (TwoPhaseFileHeader *) records.head->data;
1132 	Assert(hdr->magic == TWOPHASE_MAGIC);
1133 	hdr->total_len = records.total_len + sizeof(pg_crc32c);
1134 
1135 	/*
1136 	 * If the data size exceeds MaxAllocSize, we won't be able to read it in
1137 	 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1138 	 * where we write data to file and then re-read at commit time.
1139 	 */
1140 	if (hdr->total_len > MaxAllocSize)
1141 		ereport(ERROR,
1142 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1143 				 errmsg("two-phase state file maximum length exceeded")));
1144 
1145 	/*
1146 	 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1147 	 * cover us, so no need to calculate a separate CRC.
1148 	 *
1149 	 * We have to set delayChkpt here, too; otherwise a checkpoint starting
1150 	 * immediately after the WAL record is inserted could complete without
1151 	 * fsync'ing our state file.  (This is essentially the same kind of race
1152 	 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
1153 	 * uses delayChkpt for; see notes there.)
1154 	 *
1155 	 * We save the PREPARE record's location in the gxact for later use by
1156 	 * CheckPointTwoPhase.
1157 	 */
1158 	XLogEnsureRecordSpace(0, records.num_chunks);
1159 
1160 	START_CRIT_SECTION();
1161 
1162 	MyPgXact->delayChkpt = true;
1163 
1164 	XLogBeginInsert();
1165 	for (record = records.head; record != NULL; record = record->next)
1166 		XLogRegisterData(record->data, record->len);
1167 	gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1168 	XLogFlush(gxact->prepare_end_lsn);
1169 
1170 	/* If we crash now, we have prepared: WAL replay will fix things */
1171 
1172 	/* Store record's start location to read that later on Commit */
1173 	gxact->prepare_start_lsn = ProcLastRecPtr;
1174 
1175 	/*
1176 	 * Mark the prepared transaction as valid.  As soon as xact.c marks
1177 	 * MyPgXact as not running our XID (which it will do immediately after
1178 	 * this function returns), others can commit/rollback the xact.
1179 	 *
1180 	 * NB: a side effect of this is to make a dummy ProcArray entry for the
1181 	 * prepared XID.  This must happen before we clear the XID from MyPgXact,
1182 	 * else there is a window where the XID is not running according to
1183 	 * TransactionIdIsInProgress, and onlookers would be entitled to assume
1184 	 * the xact crashed.  Instead we have a window where the same XID appears
1185 	 * twice in ProcArray, which is OK.
1186 	 */
1187 	MarkAsPrepared(gxact, false);
1188 
1189 	/*
1190 	 * Now we can mark ourselves as out of the commit critical section: a
1191 	 * checkpoint starting after this will certainly see the gxact as a
1192 	 * candidate for fsyncing.
1193 	 */
1194 	MyPgXact->delayChkpt = false;
1195 
1196 	/*
1197 	 * Remember that we have this GlobalTransaction entry locked for us.  If
1198 	 * we crash after this point, it's too late to abort, but we must unlock
1199 	 * it so that the prepared transaction can be committed or rolled back.
1200 	 */
1201 	MyLockedGxact = gxact;
1202 
1203 	END_CRIT_SECTION();
1204 
1205 	/*
1206 	 * Wait for synchronous replication, if required.
1207 	 *
1208 	 * Note that at this stage we have marked the prepare, but still show as
1209 	 * running in the procarray (twice!) and continue to hold locks.
1210 	 */
1211 	SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1212 
1213 	records.tail = records.head = NULL;
1214 	records.num_chunks = 0;
1215 }
1216 
1217 /*
1218  * Register a 2PC record to be written to state file.
1219  */
1220 void
RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid,uint16 info,const void * data,uint32 len)1221 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1222 					   const void *data, uint32 len)
1223 {
1224 	TwoPhaseRecordOnDisk record;
1225 
1226 	record.rmid = rmid;
1227 	record.info = info;
1228 	record.len = len;
1229 	save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1230 	if (len > 0)
1231 		save_state_data(data, len);
1232 }
1233 
1234 
1235 /*
1236  * Read and validate the state file for xid.
1237  *
1238  * If it looks OK (has a valid magic number and CRC), return the palloc'd
1239  * contents of the file.  Otherwise return NULL.
1240  */
1241 static char *
ReadTwoPhaseFile(TransactionId xid,bool give_warnings)1242 ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
1243 {
1244 	char		path[MAXPGPATH];
1245 	char	   *buf;
1246 	TwoPhaseFileHeader *hdr;
1247 	int			fd;
1248 	struct stat stat;
1249 	uint32		crc_offset;
1250 	pg_crc32c	calc_crc,
1251 				file_crc;
1252 
1253 	TwoPhaseFilePath(path, xid);
1254 
1255 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1256 	if (fd < 0)
1257 	{
1258 		if (give_warnings)
1259 			ereport(WARNING,
1260 					(errcode_for_file_access(),
1261 					 errmsg("could not open two-phase state file \"%s\": %m",
1262 							path)));
1263 		return NULL;
1264 	}
1265 
1266 	/*
1267 	 * Check file length.  We can determine a lower bound pretty easily. We
1268 	 * set an upper bound to avoid palloc() failure on a corrupt file, though
1269 	 * we can't guarantee that we won't get an out of memory error anyway,
1270 	 * even on a valid file.
1271 	 */
1272 	if (fstat(fd, &stat))
1273 	{
1274 		int			save_errno = errno;
1275 
1276 		CloseTransientFile(fd);
1277 		if (give_warnings)
1278 		{
1279 			errno = save_errno;
1280 			ereport(WARNING,
1281 					(errcode_for_file_access(),
1282 					 errmsg("could not stat two-phase state file \"%s\": %m",
1283 							path)));
1284 		}
1285 		return NULL;
1286 	}
1287 
1288 	if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1289 						MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1290 						sizeof(pg_crc32c)) ||
1291 		stat.st_size > MaxAllocSize)
1292 	{
1293 		CloseTransientFile(fd);
1294 		return NULL;
1295 	}
1296 
1297 	crc_offset = stat.st_size - sizeof(pg_crc32c);
1298 	if (crc_offset != MAXALIGN(crc_offset))
1299 	{
1300 		CloseTransientFile(fd);
1301 		return NULL;
1302 	}
1303 
1304 	/*
1305 	 * OK, slurp in the file.
1306 	 */
1307 	buf = (char *) palloc(stat.st_size);
1308 
1309 	pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1310 	if (read(fd, buf, stat.st_size) != stat.st_size)
1311 	{
1312 		int			save_errno = errno;
1313 
1314 		pgstat_report_wait_end();
1315 		CloseTransientFile(fd);
1316 		if (give_warnings)
1317 		{
1318 			errno = save_errno;
1319 			ereport(WARNING,
1320 					(errcode_for_file_access(),
1321 					 errmsg("could not read two-phase state file \"%s\": %m",
1322 							path)));
1323 		}
1324 		pfree(buf);
1325 		return NULL;
1326 	}
1327 
1328 	pgstat_report_wait_end();
1329 	CloseTransientFile(fd);
1330 
1331 	hdr = (TwoPhaseFileHeader *) buf;
1332 	if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1333 	{
1334 		pfree(buf);
1335 		return NULL;
1336 	}
1337 
1338 	INIT_CRC32C(calc_crc);
1339 	COMP_CRC32C(calc_crc, buf, crc_offset);
1340 	FIN_CRC32C(calc_crc);
1341 
1342 	file_crc = *((pg_crc32c *) (buf + crc_offset));
1343 
1344 	if (!EQ_CRC32C(calc_crc, file_crc))
1345 	{
1346 		pfree(buf);
1347 		return NULL;
1348 	}
1349 
1350 	return buf;
1351 }
1352 
1353 
1354 /*
1355  * Reads 2PC data from xlog. During checkpoint this data will be moved to
1356  * twophase files and ReadTwoPhaseFile should be used instead.
1357  *
1358  * Note clearly that this function can access WAL during normal operation,
1359  * similarly to the way WALSender or Logical Decoding would do.  While
1360  * accessing WAL, read_local_xlog_page() may change ThisTimeLineID,
1361  * particularly if this routine is called for the end-of-recovery checkpoint
1362  * in the checkpointer itself, so save the current timeline number value
1363  * and restore it once done.
1364  */
1365 static void
XlogReadTwoPhaseData(XLogRecPtr lsn,char ** buf,int * len)1366 XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1367 {
1368 	XLogRecord *record;
1369 	XLogReaderState *xlogreader;
1370 	char	   *errormsg;
1371 	TimeLineID	save_currtli = ThisTimeLineID;
1372 
1373 	xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
1374 	if (!xlogreader)
1375 		ereport(ERROR,
1376 				(errcode(ERRCODE_OUT_OF_MEMORY),
1377 				 errmsg("out of memory"),
1378 				 errdetail("Failed while allocating a WAL reading processor.")));
1379 
1380 	record = XLogReadRecord(xlogreader, lsn, &errormsg);
1381 
1382 	/*
1383 	 * Restore immediately the timeline where it was previously, as
1384 	 * read_local_xlog_page() could have changed it if the record was read
1385 	 * while recovery was finishing or if the timeline has jumped in-between.
1386 	 */
1387 	ThisTimeLineID = save_currtli;
1388 
1389 	if (record == NULL)
1390 		ereport(ERROR,
1391 				(errcode_for_file_access(),
1392 				 errmsg("could not read two-phase state from WAL at %X/%X",
1393 						(uint32) (lsn >> 32),
1394 						(uint32) lsn)));
1395 
1396 	if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1397 		(XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1398 		ereport(ERROR,
1399 				(errcode_for_file_access(),
1400 				 errmsg("expected two-phase state data is not present in WAL at %X/%X",
1401 						(uint32) (lsn >> 32),
1402 						(uint32) lsn)));
1403 
1404 	if (len != NULL)
1405 		*len = XLogRecGetDataLen(xlogreader);
1406 
1407 	*buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1408 	memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1409 
1410 	XLogReaderFree(xlogreader);
1411 }
1412 
1413 
1414 /*
1415  * Confirms an xid is prepared, during recovery
1416  */
1417 bool
StandbyTransactionIdIsPrepared(TransactionId xid)1418 StandbyTransactionIdIsPrepared(TransactionId xid)
1419 {
1420 	char	   *buf;
1421 	TwoPhaseFileHeader *hdr;
1422 	bool		result;
1423 
1424 	Assert(TransactionIdIsValid(xid));
1425 
1426 	if (max_prepared_xacts <= 0)
1427 		return false;			/* nothing to do */
1428 
1429 	/* Read and validate file */
1430 	buf = ReadTwoPhaseFile(xid, false);
1431 	if (buf == NULL)
1432 		return false;
1433 
1434 	/* Check header also */
1435 	hdr = (TwoPhaseFileHeader *) buf;
1436 	result = TransactionIdEquals(hdr->xid, xid);
1437 	pfree(buf);
1438 
1439 	return result;
1440 }
1441 
1442 /*
1443  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1444  */
1445 void
FinishPreparedTransaction(const char * gid,bool isCommit)1446 FinishPreparedTransaction(const char *gid, bool isCommit)
1447 {
1448 	GlobalTransaction gxact;
1449 	PGPROC	   *proc;
1450 	PGXACT	   *pgxact;
1451 	TransactionId xid;
1452 	char	   *buf;
1453 	char	   *bufptr;
1454 	TwoPhaseFileHeader *hdr;
1455 	TransactionId latestXid;
1456 	TransactionId *children;
1457 	RelFileNode *commitrels;
1458 	RelFileNode *abortrels;
1459 	RelFileNode *delrels;
1460 	int			ndelrels;
1461 	SharedInvalidationMessage *invalmsgs;
1462 
1463 	/*
1464 	 * Validate the GID, and lock the GXACT to ensure that two backends do not
1465 	 * try to commit the same GID at once.
1466 	 */
1467 	gxact = LockGXact(gid, GetUserId());
1468 	proc = &ProcGlobal->allProcs[gxact->pgprocno];
1469 	pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1470 	xid = pgxact->xid;
1471 
1472 	/*
1473 	 * Read and validate 2PC state data. State data will typically be stored
1474 	 * in WAL files if the LSN is after the last checkpoint record, or moved
1475 	 * to disk if for some reason they have lived for a long time.
1476 	 */
1477 	if (gxact->ondisk)
1478 		buf = ReadTwoPhaseFile(xid, true);
1479 	else
1480 		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1481 
1482 
1483 	/*
1484 	 * Disassemble the header area
1485 	 */
1486 	hdr = (TwoPhaseFileHeader *) buf;
1487 	Assert(TransactionIdEquals(hdr->xid, xid));
1488 	bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1489 	bufptr += MAXALIGN(hdr->gidlen);
1490 	children = (TransactionId *) bufptr;
1491 	bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1492 	commitrels = (RelFileNode *) bufptr;
1493 	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1494 	abortrels = (RelFileNode *) bufptr;
1495 	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1496 	invalmsgs = (SharedInvalidationMessage *) bufptr;
1497 	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1498 
1499 	/* compute latestXid among all children */
1500 	latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1501 
1502 	/* Prevent cancel/die interrupt while cleaning up */
1503 	HOLD_INTERRUPTS();
1504 
1505 	/*
1506 	 * The order of operations here is critical: make the XLOG entry for
1507 	 * commit or abort, then mark the transaction committed or aborted in
1508 	 * pg_xact, then remove its PGPROC from the global ProcArray (which means
1509 	 * TransactionIdIsInProgress will stop saying the prepared xact is in
1510 	 * progress), then run the post-commit or post-abort callbacks. The
1511 	 * callbacks will release the locks the transaction held.
1512 	 */
1513 	if (isCommit)
1514 		RecordTransactionCommitPrepared(xid,
1515 										hdr->nsubxacts, children,
1516 										hdr->ncommitrels, commitrels,
1517 										hdr->ninvalmsgs, invalmsgs,
1518 										hdr->initfileinval);
1519 	else
1520 		RecordTransactionAbortPrepared(xid,
1521 									   hdr->nsubxacts, children,
1522 									   hdr->nabortrels, abortrels);
1523 
1524 	ProcArrayRemove(proc, latestXid);
1525 
1526 	/*
1527 	 * In case we fail while running the callbacks, mark the gxact invalid so
1528 	 * no one else will try to commit/rollback, and so it will be recycled if
1529 	 * we fail after this point.  It is still locked by our backend so it
1530 	 * won't go away yet.
1531 	 *
1532 	 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1533 	 */
1534 	gxact->valid = false;
1535 
1536 	/*
1537 	 * We have to remove any files that were supposed to be dropped. For
1538 	 * consistency with the regular xact.c code paths, must do this before
1539 	 * releasing locks, so do it before running the callbacks.
1540 	 *
1541 	 * NB: this code knows that we couldn't be dropping any temp rels ...
1542 	 */
1543 	if (isCommit)
1544 	{
1545 		delrels = commitrels;
1546 		ndelrels = hdr->ncommitrels;
1547 	}
1548 	else
1549 	{
1550 		delrels = abortrels;
1551 		ndelrels = hdr->nabortrels;
1552 	}
1553 
1554 	/* Make sure files supposed to be dropped are dropped */
1555 	DropRelationFiles(delrels, ndelrels, false);
1556 
1557 	/*
1558 	 * Handle cache invalidation messages.
1559 	 *
1560 	 * Relcache init file invalidation requires processing both before and
1561 	 * after we send the SI messages. See AtEOXact_Inval()
1562 	 */
1563 	if (hdr->initfileinval)
1564 		RelationCacheInitFilePreInvalidate();
1565 	SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1566 	if (hdr->initfileinval)
1567 		RelationCacheInitFilePostInvalidate();
1568 
1569 	/* And now do the callbacks */
1570 	if (isCommit)
1571 		ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1572 	else
1573 		ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1574 
1575 	PredicateLockTwoPhaseFinish(xid, isCommit);
1576 
1577 	/* Count the prepared xact as committed or aborted */
1578 	AtEOXact_PgStat(isCommit, false);
1579 
1580 	/*
1581 	 * And now we can clean up any files we may have left.
1582 	 */
1583 	if (gxact->ondisk)
1584 		RemoveTwoPhaseFile(xid, true);
1585 
1586 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1587 	RemoveGXact(gxact);
1588 	LWLockRelease(TwoPhaseStateLock);
1589 	MyLockedGxact = NULL;
1590 
1591 	RESUME_INTERRUPTS();
1592 
1593 	pfree(buf);
1594 }
1595 
1596 /*
1597  * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1598  */
1599 static void
ProcessRecords(char * bufptr,TransactionId xid,const TwoPhaseCallback callbacks[])1600 ProcessRecords(char *bufptr, TransactionId xid,
1601 			   const TwoPhaseCallback callbacks[])
1602 {
1603 	for (;;)
1604 	{
1605 		TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1606 
1607 		Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1608 		if (record->rmid == TWOPHASE_RM_END_ID)
1609 			break;
1610 
1611 		bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1612 
1613 		if (callbacks[record->rmid] != NULL)
1614 			callbacks[record->rmid] (xid, record->info,
1615 									 (void *) bufptr, record->len);
1616 
1617 		bufptr += MAXALIGN(record->len);
1618 	}
1619 }
1620 
1621 /*
1622  * Remove the 2PC file for the specified XID.
1623  *
1624  * If giveWarning is false, do not complain about file-not-present;
1625  * this is an expected case during WAL replay.
1626  */
1627 static void
RemoveTwoPhaseFile(TransactionId xid,bool giveWarning)1628 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1629 {
1630 	char		path[MAXPGPATH];
1631 
1632 	TwoPhaseFilePath(path, xid);
1633 	if (unlink(path))
1634 		if (errno != ENOENT || giveWarning)
1635 			ereport(WARNING,
1636 					(errcode_for_file_access(),
1637 					 errmsg("could not remove two-phase state file \"%s\": %m",
1638 							path)));
1639 }
1640 
1641 /*
1642  * Recreates a state file. This is used in WAL replay and during
1643  * checkpoint creation.
1644  *
1645  * Note: content and len don't include CRC.
1646  */
1647 static void
RecreateTwoPhaseFile(TransactionId xid,void * content,int len)1648 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1649 {
1650 	char		path[MAXPGPATH];
1651 	pg_crc32c	statefile_crc;
1652 	int			fd;
1653 
1654 	/* Recompute CRC */
1655 	INIT_CRC32C(statefile_crc);
1656 	COMP_CRC32C(statefile_crc, content, len);
1657 	FIN_CRC32C(statefile_crc);
1658 
1659 	TwoPhaseFilePath(path, xid);
1660 
1661 	fd = OpenTransientFile(path,
1662 						   O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
1663 						   S_IRUSR | S_IWUSR);
1664 	if (fd < 0)
1665 		ereport(ERROR,
1666 				(errcode_for_file_access(),
1667 				 errmsg("could not recreate two-phase state file \"%s\": %m",
1668 						path)));
1669 
1670 	/* Write content and CRC */
1671 	errno = 0;
1672 	pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1673 	if (write(fd, content, len) != len)
1674 	{
1675 		int			save_errno = errno;
1676 
1677 		pgstat_report_wait_end();
1678 		CloseTransientFile(fd);
1679 
1680 		/* if write didn't set errno, assume problem is no disk space */
1681 		errno = save_errno ? save_errno : ENOSPC;
1682 		ereport(ERROR,
1683 				(errcode_for_file_access(),
1684 				 errmsg("could not write two-phase state file: %m")));
1685 	}
1686 	if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1687 	{
1688 		int			save_errno = errno;
1689 
1690 		pgstat_report_wait_end();
1691 		CloseTransientFile(fd);
1692 
1693 		/* if write didn't set errno, assume problem is no disk space */
1694 		errno = save_errno ? save_errno : ENOSPC;
1695 		ereport(ERROR,
1696 				(errcode_for_file_access(),
1697 				 errmsg("could not write two-phase state file: %m")));
1698 	}
1699 	pgstat_report_wait_end();
1700 
1701 	/*
1702 	 * We must fsync the file because the end-of-replay checkpoint will not do
1703 	 * so, there being no GXACT in shared memory yet to tell it to.
1704 	 */
1705 	pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1706 	if (pg_fsync(fd) != 0)
1707 	{
1708 		int			save_errno = errno;
1709 
1710 		CloseTransientFile(fd);
1711 		errno = save_errno;
1712 		ereport(ERROR,
1713 				(errcode_for_file_access(),
1714 				 errmsg("could not fsync two-phase state file: %m")));
1715 	}
1716 	pgstat_report_wait_end();
1717 
1718 	if (CloseTransientFile(fd) != 0)
1719 		ereport(ERROR,
1720 				(errcode_for_file_access(),
1721 				 errmsg("could not close two-phase state file: %m")));
1722 }
1723 
1724 /*
1725  * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1726  *
1727  * We must fsync the state file of any GXACT that is valid or has been
1728  * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1729  * horizon.  (If the gxact isn't valid yet, has not been generated in
1730  * redo, or has a later LSN, this checkpoint is not responsible for
1731  * fsyncing it.)
1732  *
1733  * This is deliberately run as late as possible in the checkpoint sequence,
1734  * because GXACTs ordinarily have short lifespans, and so it is quite
1735  * possible that GXACTs that were valid at checkpoint start will no longer
1736  * exist if we wait a little bit. With typical checkpoint settings this
1737  * will be about 3 minutes for an online checkpoint, so as a result we
1738  * we expect that there will be no GXACTs that need to be copied to disk.
1739  *
1740  * If a GXACT remains valid across multiple checkpoints, it will already
1741  * be on disk so we don't bother to repeat that write.
1742  */
1743 void
CheckPointTwoPhase(XLogRecPtr redo_horizon)1744 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1745 {
1746 	int			i;
1747 	int			serialized_xacts = 0;
1748 
1749 	if (max_prepared_xacts <= 0)
1750 		return;					/* nothing to do */
1751 
1752 	TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1753 
1754 	/*
1755 	 * We are expecting there to be zero GXACTs that need to be copied to
1756 	 * disk, so we perform all I/O while holding TwoPhaseStateLock for
1757 	 * simplicity. This prevents any new xacts from preparing while this
1758 	 * occurs, which shouldn't be a problem since the presence of long-lived
1759 	 * prepared xacts indicates the transaction manager isn't active.
1760 	 *
1761 	 * It's also possible to move I/O out of the lock, but on every error we
1762 	 * should check whether somebody committed our transaction in different
1763 	 * backend. Let's leave this optimization for future, if somebody will
1764 	 * spot that this place cause bottleneck.
1765 	 *
1766 	 * Note that it isn't possible for there to be a GXACT with a
1767 	 * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1768 	 * because of the efforts with delayChkpt.
1769 	 */
1770 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1771 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1772 	{
1773 		/*
1774 		 * Note that we are using gxact not pgxact so this works in recovery
1775 		 * also
1776 		 */
1777 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1778 
1779 		if ((gxact->valid || gxact->inredo) &&
1780 			!gxact->ondisk &&
1781 			gxact->prepare_end_lsn <= redo_horizon)
1782 		{
1783 			char	   *buf;
1784 			int			len;
1785 
1786 			XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1787 			RecreateTwoPhaseFile(gxact->xid, buf, len);
1788 			gxact->ondisk = true;
1789 			gxact->prepare_start_lsn = InvalidXLogRecPtr;
1790 			gxact->prepare_end_lsn = InvalidXLogRecPtr;
1791 			pfree(buf);
1792 			serialized_xacts++;
1793 		}
1794 	}
1795 	LWLockRelease(TwoPhaseStateLock);
1796 
1797 	/*
1798 	 * Flush unconditionally the parent directory to make any information
1799 	 * durable on disk.  Two-phase files could have been removed and those
1800 	 * removals need to be made persistent as well as any files newly created
1801 	 * previously since the last checkpoint.
1802 	 */
1803 	fsync_fname(TWOPHASE_DIR, true);
1804 
1805 	TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1806 
1807 	if (log_checkpoints && serialized_xacts > 0)
1808 		ereport(LOG,
1809 				(errmsg_plural("%u two-phase state file was written "
1810 							   "for a long-running prepared transaction",
1811 							   "%u two-phase state files were written "
1812 							   "for long-running prepared transactions",
1813 							   serialized_xacts,
1814 							   serialized_xacts)));
1815 }
1816 
1817 /*
1818  * restoreTwoPhaseData
1819  *
1820  * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1821  * This is called once at the beginning of recovery, saving any extra
1822  * lookups in the future.  Two-phase files that are newer than the
1823  * minimum XID horizon are discarded on the way.
1824  */
1825 void
restoreTwoPhaseData(void)1826 restoreTwoPhaseData(void)
1827 {
1828 	DIR		   *cldir;
1829 	struct dirent *clde;
1830 
1831 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1832 	cldir = AllocateDir(TWOPHASE_DIR);
1833 	while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1834 	{
1835 		if (strlen(clde->d_name) == 8 &&
1836 			strspn(clde->d_name, "0123456789ABCDEF") == 8)
1837 		{
1838 			TransactionId xid;
1839 			char	   *buf;
1840 
1841 			xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1842 
1843 			buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1844 										true, false, false);
1845 			if (buf == NULL)
1846 				continue;
1847 
1848 			PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
1849 		}
1850 	}
1851 	LWLockRelease(TwoPhaseStateLock);
1852 	FreeDir(cldir);
1853 }
1854 
1855 /*
1856  * PrescanPreparedTransactions
1857  *
1858  * Scan the shared memory entries of TwoPhaseState and determine the range
1859  * of valid XIDs present.  This is run during database startup, after we
1860  * have completed reading WAL.  ShmemVariableCache->nextXid has been set to
1861  * one more than the highest XID for which evidence exists in WAL.
1862  *
1863  * We throw away any prepared xacts with main XID beyond nextXid --- if any
1864  * are present, it suggests that the DBA has done a PITR recovery to an
1865  * earlier point in time without cleaning out pg_twophase.  We dare not
1866  * try to recover such prepared xacts since they likely depend on database
1867  * state that doesn't exist now.
1868  *
1869  * However, we will advance nextXid beyond any subxact XIDs belonging to
1870  * valid prepared xacts.  We need to do this since subxact commit doesn't
1871  * write a WAL entry, and so there might be no evidence in WAL of those
1872  * subxact XIDs.
1873  *
1874  * Our other responsibility is to determine and return the oldest valid XID
1875  * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1876  * This is needed to synchronize pg_subtrans startup properly.
1877  *
1878  * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1879  * top-level xids is stored in *xids_p. The number of entries in the array
1880  * is returned in *nxids_p.
1881  */
1882 TransactionId
PrescanPreparedTransactions(TransactionId ** xids_p,int * nxids_p)1883 PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1884 {
1885 	TransactionId origNextXid = ShmemVariableCache->nextXid;
1886 	TransactionId result = origNextXid;
1887 	TransactionId *xids = NULL;
1888 	int			nxids = 0;
1889 	int			allocsize = 0;
1890 	int			i;
1891 
1892 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1893 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1894 	{
1895 		TransactionId xid;
1896 		char	   *buf;
1897 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1898 
1899 		Assert(gxact->inredo);
1900 
1901 		xid = gxact->xid;
1902 
1903 		buf = ProcessTwoPhaseBuffer(xid,
1904 									gxact->prepare_start_lsn,
1905 									gxact->ondisk, false, true);
1906 
1907 		if (buf == NULL)
1908 			continue;
1909 
1910 		/*
1911 		 * OK, we think this file is valid.  Incorporate xid into the
1912 		 * running-minimum result.
1913 		 */
1914 		if (TransactionIdPrecedes(xid, result))
1915 			result = xid;
1916 
1917 		if (xids_p)
1918 		{
1919 			if (nxids == allocsize)
1920 			{
1921 				if (nxids == 0)
1922 				{
1923 					allocsize = 10;
1924 					xids = palloc(allocsize * sizeof(TransactionId));
1925 				}
1926 				else
1927 				{
1928 					allocsize = allocsize * 2;
1929 					xids = repalloc(xids, allocsize * sizeof(TransactionId));
1930 				}
1931 			}
1932 			xids[nxids++] = xid;
1933 		}
1934 
1935 		pfree(buf);
1936 	}
1937 	LWLockRelease(TwoPhaseStateLock);
1938 
1939 	if (xids_p)
1940 	{
1941 		*xids_p = xids;
1942 		*nxids_p = nxids;
1943 	}
1944 
1945 	return result;
1946 }
1947 
1948 /*
1949  * StandbyRecoverPreparedTransactions
1950  *
1951  * Scan the shared memory entries of TwoPhaseState and setup all the required
1952  * information to allow standby queries to treat prepared transactions as still
1953  * active.
1954  *
1955  * This is never called at the end of recovery - we use
1956  * RecoverPreparedTransactions() at that point.
1957  *
1958  * The lack of calls to SubTransSetParent() calls here is by design;
1959  * those calls are made by RecoverPreparedTransactions() at the end of recovery
1960  * for those xacts that need this.
1961  */
1962 void
StandbyRecoverPreparedTransactions(void)1963 StandbyRecoverPreparedTransactions(void)
1964 {
1965 	int			i;
1966 
1967 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1968 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1969 	{
1970 		TransactionId xid;
1971 		char	   *buf;
1972 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1973 
1974 		Assert(gxact->inredo);
1975 
1976 		xid = gxact->xid;
1977 
1978 		buf = ProcessTwoPhaseBuffer(xid,
1979 									gxact->prepare_start_lsn,
1980 									gxact->ondisk, false, false);
1981 		if (buf != NULL)
1982 			pfree(buf);
1983 	}
1984 	LWLockRelease(TwoPhaseStateLock);
1985 }
1986 
1987 /*
1988  * RecoverPreparedTransactions
1989  *
1990  * Scan the shared memory entries of TwoPhaseState and reload the state for
1991  * each prepared transaction (reacquire locks, etc).
1992  *
1993  * This is run at the end of recovery, but before we allow backends to write
1994  * WAL.
1995  *
1996  * At the end of recovery the way we take snapshots will change. We now need
1997  * to mark all running transactions with their full SubTransSetParent() info
1998  * to allow normal snapshots to work correctly if snapshots overflow.
1999  * We do this here because by definition prepared transactions are the only
2000  * type of write transaction still running, so this is necessary and
2001  * complete.
2002  */
2003 void
RecoverPreparedTransactions(void)2004 RecoverPreparedTransactions(void)
2005 {
2006 	int			i;
2007 
2008 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2009 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2010 	{
2011 		TransactionId xid;
2012 		char	   *buf;
2013 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2014 		char	   *bufptr;
2015 		TwoPhaseFileHeader *hdr;
2016 		TransactionId *subxids;
2017 		const char *gid;
2018 
2019 		xid = gxact->xid;
2020 
2021 		/*
2022 		 * Reconstruct subtrans state for the transaction --- needed because
2023 		 * pg_subtrans is not preserved over a restart.  Note that we are
2024 		 * linking all the subtransactions directly to the top-level XID;
2025 		 * there may originally have been a more complex hierarchy, but
2026 		 * there's no need to restore that exactly. It's possible that
2027 		 * SubTransSetParent has been set before, if the prepared transaction
2028 		 * generated xid assignment records.
2029 		 */
2030 		buf = ProcessTwoPhaseBuffer(xid,
2031 									gxact->prepare_start_lsn,
2032 									gxact->ondisk, true, false);
2033 		if (buf == NULL)
2034 			continue;
2035 
2036 		ereport(LOG,
2037 				(errmsg("recovering prepared transaction %u from shared memory", xid)));
2038 
2039 		hdr = (TwoPhaseFileHeader *) buf;
2040 		Assert(TransactionIdEquals(hdr->xid, xid));
2041 		bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2042 		gid = (const char *) bufptr;
2043 		bufptr += MAXALIGN(hdr->gidlen);
2044 		subxids = (TransactionId *) bufptr;
2045 		bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2046 		bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2047 		bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2048 		bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2049 
2050 		/*
2051 		 * Recreate its GXACT and dummy PGPROC. But, check whether it was
2052 		 * added in redo and already has a shmem entry for it.
2053 		 */
2054 		MarkAsPreparingGuts(gxact, xid, gid,
2055 							hdr->prepared_at,
2056 							hdr->owner, hdr->database);
2057 
2058 		/* recovered, so reset the flag for entries generated by redo */
2059 		gxact->inredo = false;
2060 
2061 		GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2062 		MarkAsPrepared(gxact, true);
2063 
2064 		LWLockRelease(TwoPhaseStateLock);
2065 
2066 		/*
2067 		 * Recover other state (notably locks) using resource managers.
2068 		 */
2069 		ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2070 
2071 		/*
2072 		 * Release locks held by the standby process after we process each
2073 		 * prepared transaction. As a result, we don't need too many
2074 		 * additional locks at any one time.
2075 		 */
2076 		if (InHotStandby)
2077 			StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2078 
2079 		/*
2080 		 * We're done with recovering this transaction. Clear MyLockedGxact,
2081 		 * like we do in PrepareTransaction() during normal operation.
2082 		 */
2083 		PostPrepare_Twophase();
2084 
2085 		pfree(buf);
2086 
2087 		LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2088 	}
2089 
2090 	LWLockRelease(TwoPhaseStateLock);
2091 }
2092 
2093 /*
2094  * ProcessTwoPhaseBuffer
2095  *
2096  * Given a transaction id, read it either from disk or read it directly
2097  * via shmem xlog record pointer using the provided "prepare_start_lsn".
2098  *
2099  * If setParent is true, set up subtransaction parent linkages.
2100  *
2101  * If setNextXid is true, set ShmemVariableCache->nextXid to the newest
2102  * value scanned.
2103  */
2104 static char *
ProcessTwoPhaseBuffer(TransactionId xid,XLogRecPtr prepare_start_lsn,bool fromdisk,bool setParent,bool setNextXid)2105 ProcessTwoPhaseBuffer(TransactionId xid,
2106 					  XLogRecPtr prepare_start_lsn,
2107 					  bool fromdisk,
2108 					  bool setParent, bool setNextXid)
2109 {
2110 	TransactionId origNextXid = ShmemVariableCache->nextXid;
2111 	TransactionId *subxids;
2112 	char	   *buf;
2113 	TwoPhaseFileHeader *hdr;
2114 	int			i;
2115 
2116 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2117 
2118 	if (!fromdisk)
2119 		Assert(prepare_start_lsn != InvalidXLogRecPtr);
2120 
2121 	/* Already processed? */
2122 	if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2123 	{
2124 		if (fromdisk)
2125 		{
2126 			ereport(WARNING,
2127 					(errmsg("removing stale two-phase state file for transaction %u",
2128 							xid)));
2129 			RemoveTwoPhaseFile(xid, true);
2130 		}
2131 		else
2132 		{
2133 			ereport(WARNING,
2134 					(errmsg("removing stale two-phase state from memory for transaction %u",
2135 							xid)));
2136 			PrepareRedoRemove(xid, true);
2137 		}
2138 		return NULL;
2139 	}
2140 
2141 	/* Reject XID if too new */
2142 	if (TransactionIdFollowsOrEquals(xid, origNextXid))
2143 	{
2144 		if (fromdisk)
2145 		{
2146 			ereport(WARNING,
2147 					(errmsg("removing future two-phase state file for transaction %u",
2148 							xid)));
2149 			RemoveTwoPhaseFile(xid, true);
2150 		}
2151 		else
2152 		{
2153 			ereport(WARNING,
2154 					(errmsg("removing future two-phase state from memory for transaction %u",
2155 							xid)));
2156 			PrepareRedoRemove(xid, true);
2157 		}
2158 		return NULL;
2159 	}
2160 
2161 	if (fromdisk)
2162 	{
2163 		/* Read and validate file */
2164 		buf = ReadTwoPhaseFile(xid, true);
2165 		if (buf == NULL)
2166 		{
2167 			ereport(WARNING,
2168 					(errmsg("removing corrupt two-phase state file for transaction %u",
2169 							xid)));
2170 			RemoveTwoPhaseFile(xid, true);
2171 			return NULL;
2172 		}
2173 	}
2174 	else
2175 	{
2176 		/* Read xlog data */
2177 		XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2178 	}
2179 
2180 	/* Deconstruct header */
2181 	hdr = (TwoPhaseFileHeader *) buf;
2182 	if (!TransactionIdEquals(hdr->xid, xid))
2183 	{
2184 		if (fromdisk)
2185 		{
2186 			ereport(WARNING,
2187 					(errmsg("removing corrupt two-phase state file for transaction %u",
2188 							xid)));
2189 			RemoveTwoPhaseFile(xid, true);
2190 		}
2191 		else
2192 		{
2193 			ereport(WARNING,
2194 					(errmsg("removing corrupt two-phase state from memory for transaction %u",
2195 							xid)));
2196 			PrepareRedoRemove(xid, true);
2197 		}
2198 		pfree(buf);
2199 		return NULL;
2200 	}
2201 
2202 	/*
2203 	 * Examine subtransaction XIDs ... they should all follow main XID, and
2204 	 * they may force us to advance nextXid.
2205 	 */
2206 	subxids = (TransactionId *) (buf +
2207 								 MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2208 								 MAXALIGN(hdr->gidlen));
2209 	for (i = 0; i < hdr->nsubxacts; i++)
2210 	{
2211 		TransactionId subxid = subxids[i];
2212 
2213 		Assert(TransactionIdFollows(subxid, xid));
2214 
2215 		/* update nextXid if needed */
2216 		if (setNextXid &&
2217 			TransactionIdFollowsOrEquals(subxid,
2218 										 ShmemVariableCache->nextXid))
2219 		{
2220 			/*
2221 			 * We don't expect anyone else to modify nextXid, hence we don't
2222 			 * need to hold a lock while examining it.  We still acquire the
2223 			 * lock to modify it, though, so we recheck.
2224 			 */
2225 			LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
2226 			if (TransactionIdFollowsOrEquals(subxid,
2227 											 ShmemVariableCache->nextXid))
2228 			{
2229 				ShmemVariableCache->nextXid = subxid;
2230 				TransactionIdAdvance(ShmemVariableCache->nextXid);
2231 			}
2232 			LWLockRelease(XidGenLock);
2233 		}
2234 
2235 		if (setParent)
2236 			SubTransSetParent(subxid, xid);
2237 	}
2238 
2239 	return buf;
2240 }
2241 
2242 
2243 /*
2244  *	RecordTransactionCommitPrepared
2245  *
2246  * This is basically the same as RecordTransactionCommit (q.v. if you change
2247  * this function): in particular, we must set the delayChkpt flag to avoid a
2248  * race condition.
2249  *
2250  * We know the transaction made at least one XLOG entry (its PREPARE),
2251  * so it is never possible to optimize out the commit record.
2252  */
2253 static void
RecordTransactionCommitPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels,int ninvalmsgs,SharedInvalidationMessage * invalmsgs,bool initfileinval)2254 RecordTransactionCommitPrepared(TransactionId xid,
2255 								int nchildren,
2256 								TransactionId *children,
2257 								int nrels,
2258 								RelFileNode *rels,
2259 								int ninvalmsgs,
2260 								SharedInvalidationMessage *invalmsgs,
2261 								bool initfileinval)
2262 {
2263 	XLogRecPtr	recptr;
2264 	TimestampTz committs = GetCurrentTimestamp();
2265 	bool		replorigin;
2266 
2267 	/*
2268 	 * Are we using the replication origins feature?  Or, in other words, are
2269 	 * we replaying remote actions?
2270 	 */
2271 	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2272 				  replorigin_session_origin != DoNotReplicateId);
2273 
2274 	START_CRIT_SECTION();
2275 
2276 	/* See notes in RecordTransactionCommit */
2277 	MyPgXact->delayChkpt = true;
2278 
2279 	/*
2280 	 * Emit the XLOG commit record. Note that we mark 2PC commits as
2281 	 * potentially having AccessExclusiveLocks since we don't know whether or
2282 	 * not they do.
2283 	 */
2284 	recptr = XactLogCommitRecord(committs,
2285 								 nchildren, children, nrels, rels,
2286 								 ninvalmsgs, invalmsgs,
2287 								 initfileinval, false,
2288 								 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2289 								 xid);
2290 
2291 
2292 	if (replorigin)
2293 		/* Move LSNs forward for this replication origin */
2294 		replorigin_session_advance(replorigin_session_origin_lsn,
2295 								   XactLastRecEnd);
2296 
2297 	/*
2298 	 * Record commit timestamp.  The value comes from plain commit timestamp
2299 	 * if replorigin is not enabled, or replorigin already set a value for us
2300 	 * in replorigin_session_origin_timestamp otherwise.
2301 	 *
2302 	 * We don't need to WAL-log anything here, as the commit record written
2303 	 * above already contains the data.
2304 	 */
2305 	if (!replorigin || replorigin_session_origin_timestamp == 0)
2306 		replorigin_session_origin_timestamp = committs;
2307 
2308 	TransactionTreeSetCommitTsData(xid, nchildren, children,
2309 								   replorigin_session_origin_timestamp,
2310 								   replorigin_session_origin, false);
2311 
2312 	/*
2313 	 * We don't currently try to sleep before flush here ... nor is there any
2314 	 * support for async commit of a prepared xact (the very idea is probably
2315 	 * a contradiction)
2316 	 */
2317 
2318 	/* Flush XLOG to disk */
2319 	XLogFlush(recptr);
2320 
2321 	/* Mark the transaction committed in pg_xact */
2322 	TransactionIdCommitTree(xid, nchildren, children);
2323 
2324 	/* Checkpoint can proceed now */
2325 	MyPgXact->delayChkpt = false;
2326 
2327 	END_CRIT_SECTION();
2328 
2329 	/*
2330 	 * Wait for synchronous replication, if required.
2331 	 *
2332 	 * Note that at this stage we have marked clog, but still show as running
2333 	 * in the procarray and continue to hold locks.
2334 	 */
2335 	SyncRepWaitForLSN(recptr, true);
2336 }
2337 
2338 /*
2339  *	RecordTransactionAbortPrepared
2340  *
2341  * This is basically the same as RecordTransactionAbort.
2342  *
2343  * We know the transaction made at least one XLOG entry (its PREPARE),
2344  * so it is never possible to optimize out the abort record.
2345  */
2346 static void
RecordTransactionAbortPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels)2347 RecordTransactionAbortPrepared(TransactionId xid,
2348 							   int nchildren,
2349 							   TransactionId *children,
2350 							   int nrels,
2351 							   RelFileNode *rels)
2352 {
2353 	XLogRecPtr	recptr;
2354 
2355 	/*
2356 	 * Catch the scenario where we aborted partway through
2357 	 * RecordTransactionCommitPrepared ...
2358 	 */
2359 	if (TransactionIdDidCommit(xid))
2360 		elog(PANIC, "cannot abort transaction %u, it was already committed",
2361 			 xid);
2362 
2363 	START_CRIT_SECTION();
2364 
2365 	/*
2366 	 * Emit the XLOG commit record. Note that we mark 2PC aborts as
2367 	 * potentially having AccessExclusiveLocks since we don't know whether or
2368 	 * not they do.
2369 	 */
2370 	recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2371 								nchildren, children,
2372 								nrels, rels,
2373 								MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2374 								xid);
2375 
2376 	/* Always flush, since we're about to remove the 2PC state file */
2377 	XLogFlush(recptr);
2378 
2379 	/*
2380 	 * Mark the transaction aborted in clog.  This is not absolutely necessary
2381 	 * but we may as well do it while we are here.
2382 	 */
2383 	TransactionIdAbortTree(xid, nchildren, children);
2384 
2385 	END_CRIT_SECTION();
2386 
2387 	/*
2388 	 * Wait for synchronous replication, if required.
2389 	 *
2390 	 * Note that at this stage we have marked clog, but still show as running
2391 	 * in the procarray and continue to hold locks.
2392 	 */
2393 	SyncRepWaitForLSN(recptr, false);
2394 }
2395 
2396 /*
2397  * PrepareRedoAdd
2398  *
2399  * Store pointers to the start/end of the WAL record along with the xid in
2400  * a gxact entry in shared memory TwoPhaseState structure.  If caller
2401  * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2402  * data, the entry is marked as located on disk.
2403  */
2404 void
PrepareRedoAdd(char * buf,XLogRecPtr start_lsn,XLogRecPtr end_lsn)2405 PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
2406 {
2407 	TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2408 	char	   *bufptr;
2409 	const char *gid;
2410 	GlobalTransaction gxact;
2411 
2412 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2413 	Assert(RecoveryInProgress());
2414 
2415 	bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2416 	gid = (const char *) bufptr;
2417 
2418 	/*
2419 	 * Reserve the GID for the given transaction in the redo code path.
2420 	 *
2421 	 * This creates a gxact struct and puts it into the active array.
2422 	 *
2423 	 * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2424 	 * shared memory. Hence, we only fill up the bare minimum contents here.
2425 	 * The gxact also gets marked with gxact->inredo set to true to indicate
2426 	 * that it got added in the redo phase
2427 	 */
2428 
2429 	/* Get a free gxact from the freelist */
2430 	if (TwoPhaseState->freeGXacts == NULL)
2431 		ereport(ERROR,
2432 				(errcode(ERRCODE_OUT_OF_MEMORY),
2433 				 errmsg("maximum number of prepared transactions reached"),
2434 				 errhint("Increase max_prepared_transactions (currently %d).",
2435 						 max_prepared_xacts)));
2436 	gxact = TwoPhaseState->freeGXacts;
2437 	TwoPhaseState->freeGXacts = gxact->next;
2438 
2439 	gxact->prepared_at = hdr->prepared_at;
2440 	gxact->prepare_start_lsn = start_lsn;
2441 	gxact->prepare_end_lsn = end_lsn;
2442 	gxact->xid = hdr->xid;
2443 	gxact->owner = hdr->owner;
2444 	gxact->locking_backend = InvalidBackendId;
2445 	gxact->valid = false;
2446 	gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2447 	gxact->inredo = true;		/* yes, added in redo */
2448 	strcpy(gxact->gid, gid);
2449 
2450 	/* And insert it into the active array */
2451 	Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2452 	TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2453 
2454 	elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2455 }
2456 
2457 /*
2458  * PrepareRedoRemove
2459  *
2460  * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2461  * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2462  *
2463  * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2464  * is updated.
2465  */
2466 void
PrepareRedoRemove(TransactionId xid,bool giveWarning)2467 PrepareRedoRemove(TransactionId xid, bool giveWarning)
2468 {
2469 	GlobalTransaction gxact = NULL;
2470 	int			i;
2471 	bool		found = false;
2472 
2473 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2474 	Assert(RecoveryInProgress());
2475 
2476 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2477 	{
2478 		gxact = TwoPhaseState->prepXacts[i];
2479 
2480 		if (gxact->xid == xid)
2481 		{
2482 			Assert(gxact->inredo);
2483 			found = true;
2484 			break;
2485 		}
2486 	}
2487 
2488 	/*
2489 	 * Just leave if there is nothing, this is expected during WAL replay.
2490 	 */
2491 	if (!found)
2492 		return;
2493 
2494 	/*
2495 	 * And now we can clean up any files we may have left.
2496 	 */
2497 	elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2498 	if (gxact->ondisk)
2499 		RemoveTwoPhaseFile(xid, giveWarning);
2500 	RemoveGXact(gxact);
2501 
2502 	return;
2503 }
2504