1 /*-------------------------------------------------------------------------
2 *
3 * twophase.c
4 * Two-phase commit support functions.
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/access/transam/twophase.c
11 *
12 * NOTES
13 * Each global transaction is associated with a global transaction
14 * identifier (GID). The client assigns a GID to a postgres
15 * transaction with the PREPARE TRANSACTION command.
16 *
17 * We keep all active global transactions in a shared memory array.
18 * When the PREPARE TRANSACTION command is issued, the GID is
19 * reserved for the transaction in the array. This is done before
20 * a WAL entry is made, because the reservation checks for duplicate
21 * GIDs and aborts the transaction if there already is a global
22 * transaction in prepared state with the same GID.
23 *
24 * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 * the XID considered running by TransactionIdIsInProgress. It is also
26 * convenient as a PGPROC to hook the gxact's locks to.
27 *
28 * Information to recover prepared transactions in case of crash is
29 * now stored in WAL for the common case. In some cases there will be
30 * an extended period between preparing a GXACT and commit/abort, in
31 * which case we need to separately record prepared transaction data
32 * in permanent storage. This includes locking information, pending
33 * notifications etc. All that state information is written to the
34 * per-transaction state file in the pg_twophase directory.
35 * All prepared transactions will be written prior to shutdown.
36 *
37 * Life track of state data is following:
38 *
39 * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 * stores pointer to the start of the WAL record in
41 * gxact->prepare_start_lsn.
42 * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 * using prepare_start_lsn.
44 * * On checkpoint state data copied to files in pg_twophase directory and
45 * fsynced
46 * * If COMMIT happens after checkpoint then backend reads state data from
47 * files
48 *
49 * During replay and replication, TwoPhaseState also holds information
50 * about active prepared transactions that haven't been moved to disk yet.
51 *
52 * Replay of twophase records happens by the following rules:
53 *
54 * * At the beginning of recovery, pg_twophase is scanned once, filling
55 * TwoPhaseState with entries marked with gxact->inredo and
56 * gxact->ondisk. Two-phase file data older than the XID horizon of
57 * the redo position are discarded.
58 * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 * gxact->inredo is set to true for such entries.
60 * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 * that have gxact->inredo set and are behind the redo_horizon. We
62 * save them to disk and then switch gxact->ondisk to true.
63 * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 * If gxact->ondisk is true, the corresponding entry from the disk
65 * is additionally deleted.
66 * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 * and PrescanPreparedTransactions() have been modified to go through
68 * gxact->inredo entries that have not made it to disk.
69 *
70 *-------------------------------------------------------------------------
71 */
72 #include "postgres.h"
73
74 #include <fcntl.h>
75 #include <sys/stat.h>
76 #include <time.h>
77 #include <unistd.h>
78
79 #include "access/commit_ts.h"
80 #include "access/htup_details.h"
81 #include "access/subtrans.h"
82 #include "access/transam.h"
83 #include "access/twophase.h"
84 #include "access/twophase_rmgr.h"
85 #include "access/xact.h"
86 #include "access/xlog.h"
87 #include "access/xloginsert.h"
88 #include "access/xlogreader.h"
89 #include "access/xlogutils.h"
90 #include "catalog/pg_type.h"
91 #include "catalog/storage.h"
92 #include "funcapi.h"
93 #include "miscadmin.h"
94 #include "pg_trace.h"
95 #include "pgstat.h"
96 #include "replication/origin.h"
97 #include "replication/syncrep.h"
98 #include "replication/walsender.h"
99 #include "storage/fd.h"
100 #include "storage/ipc.h"
101 #include "storage/md.h"
102 #include "storage/predicate.h"
103 #include "storage/proc.h"
104 #include "storage/procarray.h"
105 #include "storage/sinvaladt.h"
106 #include "storage/smgr.h"
107 #include "utils/builtins.h"
108 #include "utils/memutils.h"
109 #include "utils/timestamp.h"
110
111 /*
112 * Directory where Two-phase commit files reside within PGDATA
113 */
114 #define TWOPHASE_DIR "pg_twophase"
115
116 /* GUC variable, can't be changed after startup */
117 int max_prepared_xacts = 0;
118
119 /*
120 * This struct describes one global transaction that is in prepared state
121 * or attempting to become prepared.
122 *
123 * The lifecycle of a global transaction is:
124 *
125 * 1. After checking that the requested GID is not in use, set up an entry in
126 * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127 * and mark it as locked by my backend.
128 *
129 * 2. After successfully completing prepare, set valid = true and enter the
130 * referenced PGPROC into the global ProcArray.
131 *
132 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133 * valid and not locked, then mark the entry as locked by storing my current
134 * backend ID into locking_backend. This prevents concurrent attempts to
135 * commit or rollback the same prepared xact.
136 *
137 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
139 * the freelist.
140 *
141 * Note that if the preparing transaction fails between steps 1 and 2, the
142 * entry must be removed so that the GID and the GlobalTransaction struct
143 * can be reused. See AtAbort_Twophase().
144 *
145 * typedef struct GlobalTransactionData *GlobalTransaction appears in
146 * twophase.h
147 */
148
149 typedef struct GlobalTransactionData
150 {
151 GlobalTransaction next; /* list link for free list */
152 int pgprocno; /* ID of associated dummy PGPROC */
153 BackendId dummyBackendId; /* similar to backend id for backends */
154 TimestampTz prepared_at; /* time of preparation */
155
156 /*
157 * Note that we need to keep track of two LSNs for each GXACT. We keep
158 * track of the start LSN because this is the address we must use to read
159 * state data back from WAL when committing a prepared GXACT. We keep
160 * track of the end LSN because that is the LSN we need to wait for prior
161 * to commit.
162 */
163 XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
164 XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
165 TransactionId xid; /* The GXACT id */
166
167 Oid owner; /* ID of user that executed the xact */
168 BackendId locking_backend; /* backend currently working on the xact */
169 bool valid; /* true if PGPROC entry is in proc array */
170 bool ondisk; /* true if prepare state file is on disk */
171 bool inredo; /* true if entry was added via xlog_redo */
172 char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
173 } GlobalTransactionData;
174
175 /*
176 * Two Phase Commit shared state. Access to this struct is protected
177 * by TwoPhaseStateLock.
178 */
179 typedef struct TwoPhaseStateData
180 {
181 /* Head of linked list of free GlobalTransactionData structs */
182 GlobalTransaction freeGXacts;
183
184 /* Number of valid prepXacts entries. */
185 int numPrepXacts;
186
187 /* There are max_prepared_xacts items in this array */
188 GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
189 } TwoPhaseStateData;
190
191 static TwoPhaseStateData *TwoPhaseState;
192
193 /*
194 * Global transaction entry currently locked by us, if any. Note that any
195 * access to the entry pointed to by this variable must be protected by
196 * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
197 * (since it's just local memory).
198 */
199 static GlobalTransaction MyLockedGxact = NULL;
200
201 static bool twophaseExitRegistered = false;
202
203 static void RecordTransactionCommitPrepared(TransactionId xid,
204 int nchildren,
205 TransactionId *children,
206 int nrels,
207 RelFileNode *rels,
208 int ninvalmsgs,
209 SharedInvalidationMessage *invalmsgs,
210 bool initfileinval,
211 const char *gid);
212 static void RecordTransactionAbortPrepared(TransactionId xid,
213 int nchildren,
214 TransactionId *children,
215 int nrels,
216 RelFileNode *rels,
217 const char *gid);
218 static void ProcessRecords(char *bufptr, TransactionId xid,
219 const TwoPhaseCallback callbacks[]);
220 static void RemoveGXact(GlobalTransaction gxact);
221
222 static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
223 static char *ProcessTwoPhaseBuffer(TransactionId xid,
224 XLogRecPtr prepare_start_lsn,
225 bool fromdisk, bool setParent, bool setNextXid);
226 static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
227 const char *gid, TimestampTz prepared_at, Oid owner,
228 Oid databaseid);
229 static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
230 static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
231
232 /*
233 * Initialization of shared memory
234 */
235 Size
TwoPhaseShmemSize(void)236 TwoPhaseShmemSize(void)
237 {
238 Size size;
239
240 /* Need the fixed struct, the array of pointers, and the GTD structs */
241 size = offsetof(TwoPhaseStateData, prepXacts);
242 size = add_size(size, mul_size(max_prepared_xacts,
243 sizeof(GlobalTransaction)));
244 size = MAXALIGN(size);
245 size = add_size(size, mul_size(max_prepared_xacts,
246 sizeof(GlobalTransactionData)));
247
248 return size;
249 }
250
251 void
TwoPhaseShmemInit(void)252 TwoPhaseShmemInit(void)
253 {
254 bool found;
255
256 TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
257 TwoPhaseShmemSize(),
258 &found);
259 if (!IsUnderPostmaster)
260 {
261 GlobalTransaction gxacts;
262 int i;
263
264 Assert(!found);
265 TwoPhaseState->freeGXacts = NULL;
266 TwoPhaseState->numPrepXacts = 0;
267
268 /*
269 * Initialize the linked list of free GlobalTransactionData structs
270 */
271 gxacts = (GlobalTransaction)
272 ((char *) TwoPhaseState +
273 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
274 sizeof(GlobalTransaction) * max_prepared_xacts));
275 for (i = 0; i < max_prepared_xacts; i++)
276 {
277 /* insert into linked list */
278 gxacts[i].next = TwoPhaseState->freeGXacts;
279 TwoPhaseState->freeGXacts = &gxacts[i];
280
281 /* associate it with a PGPROC assigned by InitProcGlobal */
282 gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
283
284 /*
285 * Assign a unique ID for each dummy proc, so that the range of
286 * dummy backend IDs immediately follows the range of normal
287 * backend IDs. We don't dare to assign a real backend ID to dummy
288 * procs, because prepared transactions don't take part in cache
289 * invalidation like a real backend ID would imply, but having a
290 * unique ID for them is nevertheless handy. This arrangement
291 * allows you to allocate an array of size (MaxBackends +
292 * max_prepared_xacts + 1), and have a slot for every backend and
293 * prepared transaction. Currently multixact.c uses that
294 * technique.
295 */
296 gxacts[i].dummyBackendId = MaxBackends + 1 + i;
297 }
298 }
299 else
300 Assert(found);
301 }
302
303 /*
304 * Exit hook to unlock the global transaction entry we're working on.
305 */
306 static void
AtProcExit_Twophase(int code,Datum arg)307 AtProcExit_Twophase(int code, Datum arg)
308 {
309 /* same logic as abort */
310 AtAbort_Twophase();
311 }
312
313 /*
314 * Abort hook to unlock the global transaction entry we're working on.
315 */
316 void
AtAbort_Twophase(void)317 AtAbort_Twophase(void)
318 {
319 if (MyLockedGxact == NULL)
320 return;
321
322 /*
323 * What to do with the locked global transaction entry? If we were in the
324 * process of preparing the transaction, but haven't written the WAL
325 * record and state file yet, the transaction must not be considered as
326 * prepared. Likewise, if we are in the process of finishing an
327 * already-prepared transaction, and fail after having already written the
328 * 2nd phase commit or rollback record to the WAL, the transaction should
329 * not be considered as prepared anymore. In those cases, just remove the
330 * entry from shared memory.
331 *
332 * Otherwise, the entry must be left in place so that the transaction can
333 * be finished later, so just unlock it.
334 *
335 * If we abort during prepare, after having written the WAL record, we
336 * might not have transferred all locks and other state to the prepared
337 * transaction yet. Likewise, if we abort during commit or rollback,
338 * after having written the WAL record, we might not have released all the
339 * resources held by the transaction yet. In those cases, the in-memory
340 * state can be wrong, but it's too late to back out.
341 */
342 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
343 if (!MyLockedGxact->valid)
344 RemoveGXact(MyLockedGxact);
345 else
346 MyLockedGxact->locking_backend = InvalidBackendId;
347 LWLockRelease(TwoPhaseStateLock);
348
349 MyLockedGxact = NULL;
350 }
351
352 /*
353 * This is called after we have finished transferring state to the prepared
354 * PGPROC entry.
355 */
356 void
PostPrepare_Twophase(void)357 PostPrepare_Twophase(void)
358 {
359 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
360 MyLockedGxact->locking_backend = InvalidBackendId;
361 LWLockRelease(TwoPhaseStateLock);
362
363 MyLockedGxact = NULL;
364 }
365
366
367 /*
368 * MarkAsPreparing
369 * Reserve the GID for the given transaction.
370 */
371 GlobalTransaction
MarkAsPreparing(TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)372 MarkAsPreparing(TransactionId xid, const char *gid,
373 TimestampTz prepared_at, Oid owner, Oid databaseid)
374 {
375 GlobalTransaction gxact;
376 int i;
377
378 if (strlen(gid) >= GIDSIZE)
379 ereport(ERROR,
380 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
381 errmsg("transaction identifier \"%s\" is too long",
382 gid)));
383
384 /* fail immediately if feature is disabled */
385 if (max_prepared_xacts == 0)
386 ereport(ERROR,
387 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
388 errmsg("prepared transactions are disabled"),
389 errhint("Set max_prepared_transactions to a nonzero value.")));
390
391 /* on first call, register the exit hook */
392 if (!twophaseExitRegistered)
393 {
394 before_shmem_exit(AtProcExit_Twophase, 0);
395 twophaseExitRegistered = true;
396 }
397
398 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
399
400 /* Check for conflicting GID */
401 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
402 {
403 gxact = TwoPhaseState->prepXacts[i];
404 if (strcmp(gxact->gid, gid) == 0)
405 {
406 ereport(ERROR,
407 (errcode(ERRCODE_DUPLICATE_OBJECT),
408 errmsg("transaction identifier \"%s\" is already in use",
409 gid)));
410 }
411 }
412
413 /* Get a free gxact from the freelist */
414 if (TwoPhaseState->freeGXacts == NULL)
415 ereport(ERROR,
416 (errcode(ERRCODE_OUT_OF_MEMORY),
417 errmsg("maximum number of prepared transactions reached"),
418 errhint("Increase max_prepared_transactions (currently %d).",
419 max_prepared_xacts)));
420 gxact = TwoPhaseState->freeGXacts;
421 TwoPhaseState->freeGXacts = gxact->next;
422
423 MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
424
425 gxact->ondisk = false;
426
427 /* And insert it into the active array */
428 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
429 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
430
431 LWLockRelease(TwoPhaseStateLock);
432
433 return gxact;
434 }
435
436 /*
437 * MarkAsPreparingGuts
438 *
439 * This uses a gxact struct and puts it into the active array.
440 * NOTE: this is also used when reloading a gxact after a crash; so avoid
441 * assuming that we can use very much backend context.
442 *
443 * Note: This function should be called with appropriate locks held.
444 */
445 static void
MarkAsPreparingGuts(GlobalTransaction gxact,TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)446 MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
447 TimestampTz prepared_at, Oid owner, Oid databaseid)
448 {
449 PGPROC *proc;
450 int i;
451
452 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
453
454 Assert(gxact != NULL);
455 proc = &ProcGlobal->allProcs[gxact->pgprocno];
456
457 /* Initialize the PGPROC entry */
458 MemSet(proc, 0, sizeof(PGPROC));
459 proc->pgprocno = gxact->pgprocno;
460 SHMQueueElemInit(&(proc->links));
461 proc->waitStatus = PROC_WAIT_STATUS_OK;
462 if (LocalTransactionIdIsValid(MyProc->lxid))
463 {
464 /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
465 proc->lxid = MyProc->lxid;
466 proc->backendId = MyBackendId;
467 }
468 else
469 {
470 Assert(AmStartupProcess() || !IsPostmasterEnvironment);
471 /* GetLockConflicts() uses this to specify a wait on the XID */
472 proc->lxid = xid;
473 proc->backendId = InvalidBackendId;
474 }
475 proc->xid = xid;
476 Assert(proc->xmin == InvalidTransactionId);
477 proc->delayChkpt = false;
478 proc->statusFlags = 0;
479 proc->pid = 0;
480 proc->databaseId = databaseid;
481 proc->roleId = owner;
482 proc->tempNamespaceId = InvalidOid;
483 proc->isBackgroundWorker = false;
484 proc->lwWaiting = false;
485 proc->lwWaitMode = 0;
486 proc->waitLock = NULL;
487 proc->waitProcLock = NULL;
488 pg_atomic_init_u64(&proc->waitStart, 0);
489 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
490 SHMQueueInit(&(proc->myProcLocks[i]));
491 /* subxid data must be filled later by GXactLoadSubxactData */
492 proc->subxidStatus.overflowed = false;
493 proc->subxidStatus.count = 0;
494
495 gxact->prepared_at = prepared_at;
496 gxact->xid = xid;
497 gxact->owner = owner;
498 gxact->locking_backend = MyBackendId;
499 gxact->valid = false;
500 gxact->inredo = false;
501 strcpy(gxact->gid, gid);
502
503 /*
504 * Remember that we have this GlobalTransaction entry locked for us. If we
505 * abort after this, we must release it.
506 */
507 MyLockedGxact = gxact;
508 }
509
510 /*
511 * GXactLoadSubxactData
512 *
513 * If the transaction being persisted had any subtransactions, this must
514 * be called before MarkAsPrepared() to load information into the dummy
515 * PGPROC.
516 */
517 static void
GXactLoadSubxactData(GlobalTransaction gxact,int nsubxacts,TransactionId * children)518 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
519 TransactionId *children)
520 {
521 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
522
523 /* We need no extra lock since the GXACT isn't valid yet */
524 if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
525 {
526 proc->subxidStatus.overflowed = true;
527 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
528 }
529 if (nsubxacts > 0)
530 {
531 memcpy(proc->subxids.xids, children,
532 nsubxacts * sizeof(TransactionId));
533 proc->subxidStatus.count = nsubxacts;
534 }
535 }
536
537 /*
538 * MarkAsPrepared
539 * Mark the GXACT as fully valid, and enter it into the global ProcArray.
540 *
541 * lock_held indicates whether caller already holds TwoPhaseStateLock.
542 */
543 static void
MarkAsPrepared(GlobalTransaction gxact,bool lock_held)544 MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
545 {
546 /* Lock here may be overkill, but I'm not convinced of that ... */
547 if (!lock_held)
548 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
549 Assert(!gxact->valid);
550 gxact->valid = true;
551 if (!lock_held)
552 LWLockRelease(TwoPhaseStateLock);
553
554 /*
555 * Put it into the global ProcArray so TransactionIdIsInProgress considers
556 * the XID as still running.
557 */
558 ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
559 }
560
561 /*
562 * LockGXact
563 * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
564 */
565 static GlobalTransaction
LockGXact(const char * gid,Oid user)566 LockGXact(const char *gid, Oid user)
567 {
568 int i;
569
570 /* on first call, register the exit hook */
571 if (!twophaseExitRegistered)
572 {
573 before_shmem_exit(AtProcExit_Twophase, 0);
574 twophaseExitRegistered = true;
575 }
576
577 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
578
579 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
580 {
581 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
582 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
583
584 /* Ignore not-yet-valid GIDs */
585 if (!gxact->valid)
586 continue;
587 if (strcmp(gxact->gid, gid) != 0)
588 continue;
589
590 /* Found it, but has someone else got it locked? */
591 if (gxact->locking_backend != InvalidBackendId)
592 ereport(ERROR,
593 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
594 errmsg("prepared transaction with identifier \"%s\" is busy",
595 gid)));
596
597 if (user != gxact->owner && !superuser_arg(user))
598 ereport(ERROR,
599 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
600 errmsg("permission denied to finish prepared transaction"),
601 errhint("Must be superuser or the user that prepared the transaction.")));
602
603 /*
604 * Note: it probably would be possible to allow committing from
605 * another database; but at the moment NOTIFY is known not to work and
606 * there may be some other issues as well. Hence disallow until
607 * someone gets motivated to make it work.
608 */
609 if (MyDatabaseId != proc->databaseId)
610 ereport(ERROR,
611 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
612 errmsg("prepared transaction belongs to another database"),
613 errhint("Connect to the database where the transaction was prepared to finish it.")));
614
615 /* OK for me to lock it */
616 gxact->locking_backend = MyBackendId;
617 MyLockedGxact = gxact;
618
619 LWLockRelease(TwoPhaseStateLock);
620
621 return gxact;
622 }
623
624 LWLockRelease(TwoPhaseStateLock);
625
626 ereport(ERROR,
627 (errcode(ERRCODE_UNDEFINED_OBJECT),
628 errmsg("prepared transaction with identifier \"%s\" does not exist",
629 gid)));
630
631 /* NOTREACHED */
632 return NULL;
633 }
634
635 /*
636 * RemoveGXact
637 * Remove the prepared transaction from the shared memory array.
638 *
639 * NB: caller should have already removed it from ProcArray
640 */
641 static void
RemoveGXact(GlobalTransaction gxact)642 RemoveGXact(GlobalTransaction gxact)
643 {
644 int i;
645
646 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
647
648 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
649 {
650 if (gxact == TwoPhaseState->prepXacts[i])
651 {
652 /* remove from the active array */
653 TwoPhaseState->numPrepXacts--;
654 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
655
656 /* and put it back in the freelist */
657 gxact->next = TwoPhaseState->freeGXacts;
658 TwoPhaseState->freeGXacts = gxact;
659
660 return;
661 }
662 }
663
664 elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
665 }
666
667 /*
668 * Returns an array of all prepared transactions for the user-level
669 * function pg_prepared_xact.
670 *
671 * The returned array and all its elements are copies of internal data
672 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
673 *
674 * WARNING -- we return even those transactions that are not fully prepared
675 * yet. The caller should filter them out if he doesn't want them.
676 *
677 * The returned array is palloc'd.
678 */
679 static int
GetPreparedTransactionList(GlobalTransaction * gxacts)680 GetPreparedTransactionList(GlobalTransaction *gxacts)
681 {
682 GlobalTransaction array;
683 int num;
684 int i;
685
686 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
687
688 if (TwoPhaseState->numPrepXacts == 0)
689 {
690 LWLockRelease(TwoPhaseStateLock);
691
692 *gxacts = NULL;
693 return 0;
694 }
695
696 num = TwoPhaseState->numPrepXacts;
697 array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
698 *gxacts = array;
699 for (i = 0; i < num; i++)
700 memcpy(array + i, TwoPhaseState->prepXacts[i],
701 sizeof(GlobalTransactionData));
702
703 LWLockRelease(TwoPhaseStateLock);
704
705 return num;
706 }
707
708
709 /* Working status for pg_prepared_xact */
710 typedef struct
711 {
712 GlobalTransaction array;
713 int ngxacts;
714 int currIdx;
715 } Working_State;
716
717 /*
718 * pg_prepared_xact
719 * Produce a view with one row per prepared transaction.
720 *
721 * This function is here so we don't have to export the
722 * GlobalTransactionData struct definition.
723 */
724 Datum
pg_prepared_xact(PG_FUNCTION_ARGS)725 pg_prepared_xact(PG_FUNCTION_ARGS)
726 {
727 FuncCallContext *funcctx;
728 Working_State *status;
729
730 if (SRF_IS_FIRSTCALL())
731 {
732 TupleDesc tupdesc;
733 MemoryContext oldcontext;
734
735 /* create a function context for cross-call persistence */
736 funcctx = SRF_FIRSTCALL_INIT();
737
738 /*
739 * Switch to memory context appropriate for multiple function calls
740 */
741 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
742
743 /* build tupdesc for result tuples */
744 /* this had better match pg_prepared_xacts view in system_views.sql */
745 tupdesc = CreateTemplateTupleDesc(5);
746 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
747 XIDOID, -1, 0);
748 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
749 TEXTOID, -1, 0);
750 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
751 TIMESTAMPTZOID, -1, 0);
752 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
753 OIDOID, -1, 0);
754 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
755 OIDOID, -1, 0);
756
757 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
758
759 /*
760 * Collect all the 2PC status information that we will format and send
761 * out as a result set.
762 */
763 status = (Working_State *) palloc(sizeof(Working_State));
764 funcctx->user_fctx = (void *) status;
765
766 status->ngxacts = GetPreparedTransactionList(&status->array);
767 status->currIdx = 0;
768
769 MemoryContextSwitchTo(oldcontext);
770 }
771
772 funcctx = SRF_PERCALL_SETUP();
773 status = (Working_State *) funcctx->user_fctx;
774
775 while (status->array != NULL && status->currIdx < status->ngxacts)
776 {
777 GlobalTransaction gxact = &status->array[status->currIdx++];
778 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
779 Datum values[5];
780 bool nulls[5];
781 HeapTuple tuple;
782 Datum result;
783
784 if (!gxact->valid)
785 continue;
786
787 /*
788 * Form tuple with appropriate data.
789 */
790 MemSet(values, 0, sizeof(values));
791 MemSet(nulls, 0, sizeof(nulls));
792
793 values[0] = TransactionIdGetDatum(proc->xid);
794 values[1] = CStringGetTextDatum(gxact->gid);
795 values[2] = TimestampTzGetDatum(gxact->prepared_at);
796 values[3] = ObjectIdGetDatum(gxact->owner);
797 values[4] = ObjectIdGetDatum(proc->databaseId);
798
799 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
800 result = HeapTupleGetDatum(tuple);
801 SRF_RETURN_NEXT(funcctx, result);
802 }
803
804 SRF_RETURN_DONE(funcctx);
805 }
806
807 /*
808 * TwoPhaseGetGXact
809 * Get the GlobalTransaction struct for a prepared transaction
810 * specified by XID
811 *
812 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
813 * caller had better hold it.
814 */
815 static GlobalTransaction
TwoPhaseGetGXact(TransactionId xid,bool lock_held)816 TwoPhaseGetGXact(TransactionId xid, bool lock_held)
817 {
818 GlobalTransaction result = NULL;
819 int i;
820
821 static TransactionId cached_xid = InvalidTransactionId;
822 static GlobalTransaction cached_gxact = NULL;
823
824 Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
825
826 /*
827 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
828 * repeatedly for the same XID. We can save work with a simple cache.
829 */
830 if (xid == cached_xid)
831 return cached_gxact;
832
833 if (!lock_held)
834 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
835
836 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
837 {
838 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
839
840 if (gxact->xid == xid)
841 {
842 result = gxact;
843 break;
844 }
845 }
846
847 if (!lock_held)
848 LWLockRelease(TwoPhaseStateLock);
849
850 if (result == NULL) /* should not happen */
851 elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
852
853 cached_xid = xid;
854 cached_gxact = result;
855
856 return result;
857 }
858
859 /*
860 * TwoPhaseGetXidByVirtualXID
861 * Lookup VXID among xacts prepared since last startup.
862 *
863 * (This won't find recovered xacts.) If more than one matches, return any
864 * and set "have_more" to true. To witness multiple matches, a single
865 * BackendId must consume 2^32 LXIDs, with no intervening database restart.
866 */
867 TransactionId
TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,bool * have_more)868 TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
869 bool *have_more)
870 {
871 int i;
872 TransactionId result = InvalidTransactionId;
873
874 Assert(VirtualTransactionIdIsValid(vxid));
875 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
876
877 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
878 {
879 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
880 PGPROC *proc;
881 VirtualTransactionId proc_vxid;
882
883 if (!gxact->valid)
884 continue;
885 proc = &ProcGlobal->allProcs[gxact->pgprocno];
886 GET_VXID_FROM_PGPROC(proc_vxid, *proc);
887 if (VirtualTransactionIdEquals(vxid, proc_vxid))
888 {
889 /* Startup process sets proc->backendId to InvalidBackendId. */
890 Assert(!gxact->inredo);
891
892 if (result != InvalidTransactionId)
893 {
894 *have_more = true;
895 break;
896 }
897 result = gxact->xid;
898 }
899 }
900
901 LWLockRelease(TwoPhaseStateLock);
902
903 return result;
904 }
905
906 /*
907 * TwoPhaseGetDummyBackendId
908 * Get the dummy backend ID for prepared transaction specified by XID
909 *
910 * Dummy backend IDs are similar to real backend IDs of real backends.
911 * They start at MaxBackends + 1, and are unique across all currently active
912 * real backends and prepared transactions. If lock_held is set to true,
913 * TwoPhaseStateLock will not be taken, so the caller had better hold it.
914 */
915 BackendId
TwoPhaseGetDummyBackendId(TransactionId xid,bool lock_held)916 TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held)
917 {
918 GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
919
920 return gxact->dummyBackendId;
921 }
922
923 /*
924 * TwoPhaseGetDummyProc
925 * Get the PGPROC that represents a prepared transaction specified by XID
926 *
927 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
928 * caller had better hold it.
929 */
930 PGPROC *
TwoPhaseGetDummyProc(TransactionId xid,bool lock_held)931 TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
932 {
933 GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
934
935 return &ProcGlobal->allProcs[gxact->pgprocno];
936 }
937
938 /************************************************************************/
939 /* State file support */
940 /************************************************************************/
941
942 #define TwoPhaseFilePath(path, xid) \
943 snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
944
945 /*
946 * 2PC state file format:
947 *
948 * 1. TwoPhaseFileHeader
949 * 2. TransactionId[] (subtransactions)
950 * 3. RelFileNode[] (files to be deleted at commit)
951 * 4. RelFileNode[] (files to be deleted at abort)
952 * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
953 * 6. TwoPhaseRecordOnDisk
954 * 7. ...
955 * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
956 * 9. checksum (CRC-32C)
957 *
958 * Each segment except the final checksum is MAXALIGN'd.
959 */
960
961 /*
962 * Header for a 2PC state file
963 */
964 #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
965
966 typedef xl_xact_prepare TwoPhaseFileHeader;
967
968 /*
969 * Header for each record in a state file
970 *
971 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
972 * The rmgr data will be stored starting on a MAXALIGN boundary.
973 */
974 typedef struct TwoPhaseRecordOnDisk
975 {
976 uint32 len; /* length of rmgr data */
977 TwoPhaseRmgrId rmid; /* resource manager for this record */
978 uint16 info; /* flag bits for use by rmgr */
979 } TwoPhaseRecordOnDisk;
980
981 /*
982 * During prepare, the state file is assembled in memory before writing it
983 * to WAL and the actual state file. We use a chain of StateFileChunk blocks
984 * for that.
985 */
986 typedef struct StateFileChunk
987 {
988 char *data;
989 uint32 len;
990 struct StateFileChunk *next;
991 } StateFileChunk;
992
993 static struct xllist
994 {
995 StateFileChunk *head; /* first data block in the chain */
996 StateFileChunk *tail; /* last block in chain */
997 uint32 num_chunks;
998 uint32 bytes_free; /* free bytes left in tail block */
999 uint32 total_len; /* total data bytes in chain */
1000 } records;
1001
1002
1003 /*
1004 * Append a block of data to records data structure.
1005 *
1006 * NB: each block is padded to a MAXALIGN multiple. This must be
1007 * accounted for when the file is later read!
1008 *
1009 * The data is copied, so the caller is free to modify it afterwards.
1010 */
1011 static void
save_state_data(const void * data,uint32 len)1012 save_state_data(const void *data, uint32 len)
1013 {
1014 uint32 padlen = MAXALIGN(len);
1015
1016 if (padlen > records.bytes_free)
1017 {
1018 records.tail->next = palloc0(sizeof(StateFileChunk));
1019 records.tail = records.tail->next;
1020 records.tail->len = 0;
1021 records.tail->next = NULL;
1022 records.num_chunks++;
1023
1024 records.bytes_free = Max(padlen, 512);
1025 records.tail->data = palloc(records.bytes_free);
1026 }
1027
1028 memcpy(((char *) records.tail->data) + records.tail->len, data, len);
1029 records.tail->len += padlen;
1030 records.bytes_free -= padlen;
1031 records.total_len += padlen;
1032 }
1033
1034 /*
1035 * Start preparing a state file.
1036 *
1037 * Initializes data structure and inserts the 2PC file header record.
1038 */
1039 void
StartPrepare(GlobalTransaction gxact)1040 StartPrepare(GlobalTransaction gxact)
1041 {
1042 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
1043 TransactionId xid = gxact->xid;
1044 TwoPhaseFileHeader hdr;
1045 TransactionId *children;
1046 RelFileNode *commitrels;
1047 RelFileNode *abortrels;
1048 SharedInvalidationMessage *invalmsgs;
1049
1050 /* Initialize linked list */
1051 records.head = palloc0(sizeof(StateFileChunk));
1052 records.head->len = 0;
1053 records.head->next = NULL;
1054
1055 records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1056 records.head->data = palloc(records.bytes_free);
1057
1058 records.tail = records.head;
1059 records.num_chunks = 1;
1060
1061 records.total_len = 0;
1062
1063 /* Create header */
1064 hdr.magic = TWOPHASE_MAGIC;
1065 hdr.total_len = 0; /* EndPrepare will fill this in */
1066 hdr.xid = xid;
1067 hdr.database = proc->databaseId;
1068 hdr.prepared_at = gxact->prepared_at;
1069 hdr.owner = gxact->owner;
1070 hdr.nsubxacts = xactGetCommittedChildren(&children);
1071 hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1072 hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1073 hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1074 &hdr.initfileinval);
1075 hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1076
1077 save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1078 save_state_data(gxact->gid, hdr.gidlen);
1079
1080 /*
1081 * Add the additional info about subxacts, deletable files and cache
1082 * invalidation messages.
1083 */
1084 if (hdr.nsubxacts > 0)
1085 {
1086 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1087 /* While we have the child-xact data, stuff it in the gxact too */
1088 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1089 }
1090 if (hdr.ncommitrels > 0)
1091 {
1092 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
1093 pfree(commitrels);
1094 }
1095 if (hdr.nabortrels > 0)
1096 {
1097 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
1098 pfree(abortrels);
1099 }
1100 if (hdr.ninvalmsgs > 0)
1101 {
1102 save_state_data(invalmsgs,
1103 hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1104 pfree(invalmsgs);
1105 }
1106 }
1107
1108 /*
1109 * Finish preparing state data and writing it to WAL.
1110 */
1111 void
EndPrepare(GlobalTransaction gxact)1112 EndPrepare(GlobalTransaction gxact)
1113 {
1114 TwoPhaseFileHeader *hdr;
1115 StateFileChunk *record;
1116 bool replorigin;
1117
1118 /* Add the end sentinel to the list of 2PC records */
1119 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1120 NULL, 0);
1121
1122 /* Go back and fill in total_len in the file header record */
1123 hdr = (TwoPhaseFileHeader *) records.head->data;
1124 Assert(hdr->magic == TWOPHASE_MAGIC);
1125 hdr->total_len = records.total_len + sizeof(pg_crc32c);
1126
1127 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1128 replorigin_session_origin != DoNotReplicateId);
1129
1130 if (replorigin)
1131 {
1132 Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
1133 hdr->origin_lsn = replorigin_session_origin_lsn;
1134 hdr->origin_timestamp = replorigin_session_origin_timestamp;
1135 }
1136 else
1137 {
1138 hdr->origin_lsn = InvalidXLogRecPtr;
1139 hdr->origin_timestamp = 0;
1140 }
1141
1142 /*
1143 * If the data size exceeds MaxAllocSize, we won't be able to read it in
1144 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1145 * where we write data to file and then re-read at commit time.
1146 */
1147 if (hdr->total_len > MaxAllocSize)
1148 ereport(ERROR,
1149 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1150 errmsg("two-phase state file maximum length exceeded")));
1151
1152 /*
1153 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1154 * cover us, so no need to calculate a separate CRC.
1155 *
1156 * We have to set delayChkpt here, too; otherwise a checkpoint starting
1157 * immediately after the WAL record is inserted could complete without
1158 * fsync'ing our state file. (This is essentially the same kind of race
1159 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
1160 * uses delayChkpt for; see notes there.)
1161 *
1162 * We save the PREPARE record's location in the gxact for later use by
1163 * CheckPointTwoPhase.
1164 */
1165 XLogEnsureRecordSpace(0, records.num_chunks);
1166
1167 START_CRIT_SECTION();
1168
1169 MyProc->delayChkpt = true;
1170
1171 XLogBeginInsert();
1172 for (record = records.head; record != NULL; record = record->next)
1173 XLogRegisterData(record->data, record->len);
1174
1175 XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1176
1177 gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1178
1179 if (replorigin)
1180 {
1181 /* Move LSNs forward for this replication origin */
1182 replorigin_session_advance(replorigin_session_origin_lsn,
1183 gxact->prepare_end_lsn);
1184 }
1185
1186 XLogFlush(gxact->prepare_end_lsn);
1187
1188 /* If we crash now, we have prepared: WAL replay will fix things */
1189
1190 /* Store record's start location to read that later on Commit */
1191 gxact->prepare_start_lsn = ProcLastRecPtr;
1192
1193 /*
1194 * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1195 * as not running our XID (which it will do immediately after this
1196 * function returns), others can commit/rollback the xact.
1197 *
1198 * NB: a side effect of this is to make a dummy ProcArray entry for the
1199 * prepared XID. This must happen before we clear the XID from MyProc /
1200 * ProcGlobal->xids[], else there is a window where the XID is not running
1201 * according to TransactionIdIsInProgress, and onlookers would be entitled
1202 * to assume the xact crashed. Instead we have a window where the same
1203 * XID appears twice in ProcArray, which is OK.
1204 */
1205 MarkAsPrepared(gxact, false);
1206
1207 /*
1208 * Now we can mark ourselves as out of the commit critical section: a
1209 * checkpoint starting after this will certainly see the gxact as a
1210 * candidate for fsyncing.
1211 */
1212 MyProc->delayChkpt = false;
1213
1214 /*
1215 * Remember that we have this GlobalTransaction entry locked for us. If
1216 * we crash after this point, it's too late to abort, but we must unlock
1217 * it so that the prepared transaction can be committed or rolled back.
1218 */
1219 MyLockedGxact = gxact;
1220
1221 END_CRIT_SECTION();
1222
1223 /*
1224 * Wait for synchronous replication, if required.
1225 *
1226 * Note that at this stage we have marked the prepare, but still show as
1227 * running in the procarray (twice!) and continue to hold locks.
1228 */
1229 SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1230
1231 records.tail = records.head = NULL;
1232 records.num_chunks = 0;
1233 }
1234
1235 /*
1236 * Register a 2PC record to be written to state file.
1237 */
1238 void
RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid,uint16 info,const void * data,uint32 len)1239 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1240 const void *data, uint32 len)
1241 {
1242 TwoPhaseRecordOnDisk record;
1243
1244 record.rmid = rmid;
1245 record.info = info;
1246 record.len = len;
1247 save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1248 if (len > 0)
1249 save_state_data(data, len);
1250 }
1251
1252
1253 /*
1254 * Read and validate the state file for xid.
1255 *
1256 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1257 * contents of the file, issuing an error when finding corrupted data. If
1258 * missing_ok is true, which indicates that missing files can be safely
1259 * ignored, then return NULL. This state can be reached when doing recovery.
1260 */
1261 static char *
ReadTwoPhaseFile(TransactionId xid,bool missing_ok)1262 ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
1263 {
1264 char path[MAXPGPATH];
1265 char *buf;
1266 TwoPhaseFileHeader *hdr;
1267 int fd;
1268 struct stat stat;
1269 uint32 crc_offset;
1270 pg_crc32c calc_crc,
1271 file_crc;
1272 int r;
1273
1274 TwoPhaseFilePath(path, xid);
1275
1276 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1277 if (fd < 0)
1278 {
1279 if (missing_ok && errno == ENOENT)
1280 return NULL;
1281
1282 ereport(ERROR,
1283 (errcode_for_file_access(),
1284 errmsg("could not open file \"%s\": %m", path)));
1285 }
1286
1287 /*
1288 * Check file length. We can determine a lower bound pretty easily. We
1289 * set an upper bound to avoid palloc() failure on a corrupt file, though
1290 * we can't guarantee that we won't get an out of memory error anyway,
1291 * even on a valid file.
1292 */
1293 if (fstat(fd, &stat))
1294 ereport(ERROR,
1295 (errcode_for_file_access(),
1296 errmsg("could not stat file \"%s\": %m", path)));
1297
1298 if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1299 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1300 sizeof(pg_crc32c)) ||
1301 stat.st_size > MaxAllocSize)
1302 ereport(ERROR,
1303 (errcode(ERRCODE_DATA_CORRUPTED),
1304 errmsg_plural("incorrect size of file \"%s\": %lld byte",
1305 "incorrect size of file \"%s\": %lld bytes",
1306 (long long int) stat.st_size, path,
1307 (long long int) stat.st_size)));
1308
1309 crc_offset = stat.st_size - sizeof(pg_crc32c);
1310 if (crc_offset != MAXALIGN(crc_offset))
1311 ereport(ERROR,
1312 (errcode(ERRCODE_DATA_CORRUPTED),
1313 errmsg("incorrect alignment of CRC offset for file \"%s\"",
1314 path)));
1315
1316 /*
1317 * OK, slurp in the file.
1318 */
1319 buf = (char *) palloc(stat.st_size);
1320
1321 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1322 r = read(fd, buf, stat.st_size);
1323 if (r != stat.st_size)
1324 {
1325 if (r < 0)
1326 ereport(ERROR,
1327 (errcode_for_file_access(),
1328 errmsg("could not read file \"%s\": %m", path)));
1329 else
1330 ereport(ERROR,
1331 (errmsg("could not read file \"%s\": read %d of %lld",
1332 path, r, (long long int) stat.st_size)));
1333 }
1334
1335 pgstat_report_wait_end();
1336
1337 if (CloseTransientFile(fd) != 0)
1338 ereport(ERROR,
1339 (errcode_for_file_access(),
1340 errmsg("could not close file \"%s\": %m", path)));
1341
1342 hdr = (TwoPhaseFileHeader *) buf;
1343 if (hdr->magic != TWOPHASE_MAGIC)
1344 ereport(ERROR,
1345 (errcode(ERRCODE_DATA_CORRUPTED),
1346 errmsg("invalid magic number stored in file \"%s\"",
1347 path)));
1348
1349 if (hdr->total_len != stat.st_size)
1350 ereport(ERROR,
1351 (errcode(ERRCODE_DATA_CORRUPTED),
1352 errmsg("invalid size stored in file \"%s\"",
1353 path)));
1354
1355 INIT_CRC32C(calc_crc);
1356 COMP_CRC32C(calc_crc, buf, crc_offset);
1357 FIN_CRC32C(calc_crc);
1358
1359 file_crc = *((pg_crc32c *) (buf + crc_offset));
1360
1361 if (!EQ_CRC32C(calc_crc, file_crc))
1362 ereport(ERROR,
1363 (errcode(ERRCODE_DATA_CORRUPTED),
1364 errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1365 path)));
1366
1367 return buf;
1368 }
1369
1370
1371 /*
1372 * Reads 2PC data from xlog. During checkpoint this data will be moved to
1373 * twophase files and ReadTwoPhaseFile should be used instead.
1374 *
1375 * Note clearly that this function can access WAL during normal operation,
1376 * similarly to the way WALSender or Logical Decoding would do. While
1377 * accessing WAL, read_local_xlog_page() may change ThisTimeLineID,
1378 * particularly if this routine is called for the end-of-recovery checkpoint
1379 * in the checkpointer itself, so save the current timeline number value
1380 * and restore it once done.
1381 */
1382 static void
XlogReadTwoPhaseData(XLogRecPtr lsn,char ** buf,int * len)1383 XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1384 {
1385 XLogRecord *record;
1386 XLogReaderState *xlogreader;
1387 char *errormsg;
1388 TimeLineID save_currtli = ThisTimeLineID;
1389
1390 xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1391 XL_ROUTINE(.page_read = &read_local_xlog_page,
1392 .segment_open = &wal_segment_open,
1393 .segment_close = &wal_segment_close),
1394 NULL);
1395 if (!xlogreader)
1396 ereport(ERROR,
1397 (errcode(ERRCODE_OUT_OF_MEMORY),
1398 errmsg("out of memory"),
1399 errdetail("Failed while allocating a WAL reading processor.")));
1400
1401 XLogBeginRead(xlogreader, lsn);
1402 record = XLogReadRecord(xlogreader, &errormsg);
1403
1404 /*
1405 * Restore immediately the timeline where it was previously, as
1406 * read_local_xlog_page() could have changed it if the record was read
1407 * while recovery was finishing or if the timeline has jumped in-between.
1408 */
1409 ThisTimeLineID = save_currtli;
1410
1411 if (record == NULL)
1412 ereport(ERROR,
1413 (errcode_for_file_access(),
1414 errmsg("could not read two-phase state from WAL at %X/%X",
1415 LSN_FORMAT_ARGS(lsn))));
1416
1417 if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1418 (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1419 ereport(ERROR,
1420 (errcode_for_file_access(),
1421 errmsg("expected two-phase state data is not present in WAL at %X/%X",
1422 LSN_FORMAT_ARGS(lsn))));
1423
1424 if (len != NULL)
1425 *len = XLogRecGetDataLen(xlogreader);
1426
1427 *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1428 memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1429
1430 XLogReaderFree(xlogreader);
1431 }
1432
1433
1434 /*
1435 * Confirms an xid is prepared, during recovery
1436 */
1437 bool
StandbyTransactionIdIsPrepared(TransactionId xid)1438 StandbyTransactionIdIsPrepared(TransactionId xid)
1439 {
1440 char *buf;
1441 TwoPhaseFileHeader *hdr;
1442 bool result;
1443
1444 Assert(TransactionIdIsValid(xid));
1445
1446 if (max_prepared_xacts <= 0)
1447 return false; /* nothing to do */
1448
1449 /* Read and validate file */
1450 buf = ReadTwoPhaseFile(xid, true);
1451 if (buf == NULL)
1452 return false;
1453
1454 /* Check header also */
1455 hdr = (TwoPhaseFileHeader *) buf;
1456 result = TransactionIdEquals(hdr->xid, xid);
1457 pfree(buf);
1458
1459 return result;
1460 }
1461
1462 /*
1463 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1464 */
1465 void
FinishPreparedTransaction(const char * gid,bool isCommit)1466 FinishPreparedTransaction(const char *gid, bool isCommit)
1467 {
1468 GlobalTransaction gxact;
1469 PGPROC *proc;
1470 TransactionId xid;
1471 char *buf;
1472 char *bufptr;
1473 TwoPhaseFileHeader *hdr;
1474 TransactionId latestXid;
1475 TransactionId *children;
1476 RelFileNode *commitrels;
1477 RelFileNode *abortrels;
1478 RelFileNode *delrels;
1479 int ndelrels;
1480 SharedInvalidationMessage *invalmsgs;
1481
1482 /*
1483 * Validate the GID, and lock the GXACT to ensure that two backends do not
1484 * try to commit the same GID at once.
1485 */
1486 gxact = LockGXact(gid, GetUserId());
1487 proc = &ProcGlobal->allProcs[gxact->pgprocno];
1488 xid = gxact->xid;
1489
1490 /*
1491 * Read and validate 2PC state data. State data will typically be stored
1492 * in WAL files if the LSN is after the last checkpoint record, or moved
1493 * to disk if for some reason they have lived for a long time.
1494 */
1495 if (gxact->ondisk)
1496 buf = ReadTwoPhaseFile(xid, false);
1497 else
1498 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1499
1500
1501 /*
1502 * Disassemble the header area
1503 */
1504 hdr = (TwoPhaseFileHeader *) buf;
1505 Assert(TransactionIdEquals(hdr->xid, xid));
1506 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1507 bufptr += MAXALIGN(hdr->gidlen);
1508 children = (TransactionId *) bufptr;
1509 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1510 commitrels = (RelFileNode *) bufptr;
1511 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1512 abortrels = (RelFileNode *) bufptr;
1513 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1514 invalmsgs = (SharedInvalidationMessage *) bufptr;
1515 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1516
1517 /* compute latestXid among all children */
1518 latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1519
1520 /* Prevent cancel/die interrupt while cleaning up */
1521 HOLD_INTERRUPTS();
1522
1523 /*
1524 * The order of operations here is critical: make the XLOG entry for
1525 * commit or abort, then mark the transaction committed or aborted in
1526 * pg_xact, then remove its PGPROC from the global ProcArray (which means
1527 * TransactionIdIsInProgress will stop saying the prepared xact is in
1528 * progress), then run the post-commit or post-abort callbacks. The
1529 * callbacks will release the locks the transaction held.
1530 */
1531 if (isCommit)
1532 RecordTransactionCommitPrepared(xid,
1533 hdr->nsubxacts, children,
1534 hdr->ncommitrels, commitrels,
1535 hdr->ninvalmsgs, invalmsgs,
1536 hdr->initfileinval, gid);
1537 else
1538 RecordTransactionAbortPrepared(xid,
1539 hdr->nsubxacts, children,
1540 hdr->nabortrels, abortrels,
1541 gid);
1542
1543 ProcArrayRemove(proc, latestXid);
1544
1545 /*
1546 * In case we fail while running the callbacks, mark the gxact invalid so
1547 * no one else will try to commit/rollback, and so it will be recycled if
1548 * we fail after this point. It is still locked by our backend so it
1549 * won't go away yet.
1550 *
1551 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1552 */
1553 gxact->valid = false;
1554
1555 /*
1556 * We have to remove any files that were supposed to be dropped. For
1557 * consistency with the regular xact.c code paths, must do this before
1558 * releasing locks, so do it before running the callbacks.
1559 *
1560 * NB: this code knows that we couldn't be dropping any temp rels ...
1561 */
1562 if (isCommit)
1563 {
1564 delrels = commitrels;
1565 ndelrels = hdr->ncommitrels;
1566 }
1567 else
1568 {
1569 delrels = abortrels;
1570 ndelrels = hdr->nabortrels;
1571 }
1572
1573 /* Make sure files supposed to be dropped are dropped */
1574 DropRelationFiles(delrels, ndelrels, false);
1575
1576 /*
1577 * Handle cache invalidation messages.
1578 *
1579 * Relcache init file invalidation requires processing both before and
1580 * after we send the SI messages. See AtEOXact_Inval()
1581 */
1582 if (hdr->initfileinval)
1583 RelationCacheInitFilePreInvalidate();
1584 SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1585 if (hdr->initfileinval)
1586 RelationCacheInitFilePostInvalidate();
1587
1588 /*
1589 * Acquire the two-phase lock. We want to work on the two-phase callbacks
1590 * while holding it to avoid potential conflicts with other transactions
1591 * attempting to use the same GID, so the lock is released once the shared
1592 * memory state is cleared.
1593 */
1594 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1595
1596 /* And now do the callbacks */
1597 if (isCommit)
1598 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1599 else
1600 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1601
1602 PredicateLockTwoPhaseFinish(xid, isCommit);
1603
1604 /* Clear shared memory state */
1605 RemoveGXact(gxact);
1606
1607 /*
1608 * Release the lock as all callbacks are called and shared memory cleanup
1609 * is done.
1610 */
1611 LWLockRelease(TwoPhaseStateLock);
1612
1613 /* Count the prepared xact as committed or aborted */
1614 AtEOXact_PgStat(isCommit, false);
1615
1616 /*
1617 * And now we can clean up any files we may have left.
1618 */
1619 if (gxact->ondisk)
1620 RemoveTwoPhaseFile(xid, true);
1621
1622 MyLockedGxact = NULL;
1623
1624 RESUME_INTERRUPTS();
1625
1626 pfree(buf);
1627 }
1628
1629 /*
1630 * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1631 */
1632 static void
ProcessRecords(char * bufptr,TransactionId xid,const TwoPhaseCallback callbacks[])1633 ProcessRecords(char *bufptr, TransactionId xid,
1634 const TwoPhaseCallback callbacks[])
1635 {
1636 for (;;)
1637 {
1638 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1639
1640 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1641 if (record->rmid == TWOPHASE_RM_END_ID)
1642 break;
1643
1644 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1645
1646 if (callbacks[record->rmid] != NULL)
1647 callbacks[record->rmid] (xid, record->info,
1648 (void *) bufptr, record->len);
1649
1650 bufptr += MAXALIGN(record->len);
1651 }
1652 }
1653
1654 /*
1655 * Remove the 2PC file for the specified XID.
1656 *
1657 * If giveWarning is false, do not complain about file-not-present;
1658 * this is an expected case during WAL replay.
1659 */
1660 static void
RemoveTwoPhaseFile(TransactionId xid,bool giveWarning)1661 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1662 {
1663 char path[MAXPGPATH];
1664
1665 TwoPhaseFilePath(path, xid);
1666 if (unlink(path))
1667 if (errno != ENOENT || giveWarning)
1668 ereport(WARNING,
1669 (errcode_for_file_access(),
1670 errmsg("could not remove file \"%s\": %m", path)));
1671 }
1672
1673 /*
1674 * Recreates a state file. This is used in WAL replay and during
1675 * checkpoint creation.
1676 *
1677 * Note: content and len don't include CRC.
1678 */
1679 static void
RecreateTwoPhaseFile(TransactionId xid,void * content,int len)1680 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1681 {
1682 char path[MAXPGPATH];
1683 pg_crc32c statefile_crc;
1684 int fd;
1685
1686 /* Recompute CRC */
1687 INIT_CRC32C(statefile_crc);
1688 COMP_CRC32C(statefile_crc, content, len);
1689 FIN_CRC32C(statefile_crc);
1690
1691 TwoPhaseFilePath(path, xid);
1692
1693 fd = OpenTransientFile(path,
1694 O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1695 if (fd < 0)
1696 ereport(ERROR,
1697 (errcode_for_file_access(),
1698 errmsg("could not recreate file \"%s\": %m", path)));
1699
1700 /* Write content and CRC */
1701 errno = 0;
1702 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1703 if (write(fd, content, len) != len)
1704 {
1705 /* if write didn't set errno, assume problem is no disk space */
1706 if (errno == 0)
1707 errno = ENOSPC;
1708 ereport(ERROR,
1709 (errcode_for_file_access(),
1710 errmsg("could not write file \"%s\": %m", path)));
1711 }
1712 if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1713 {
1714 /* if write didn't set errno, assume problem is no disk space */
1715 if (errno == 0)
1716 errno = ENOSPC;
1717 ereport(ERROR,
1718 (errcode_for_file_access(),
1719 errmsg("could not write file \"%s\": %m", path)));
1720 }
1721 pgstat_report_wait_end();
1722
1723 /*
1724 * We must fsync the file because the end-of-replay checkpoint will not do
1725 * so, there being no GXACT in shared memory yet to tell it to.
1726 */
1727 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1728 if (pg_fsync(fd) != 0)
1729 ereport(ERROR,
1730 (errcode_for_file_access(),
1731 errmsg("could not fsync file \"%s\": %m", path)));
1732 pgstat_report_wait_end();
1733
1734 if (CloseTransientFile(fd) != 0)
1735 ereport(ERROR,
1736 (errcode_for_file_access(),
1737 errmsg("could not close file \"%s\": %m", path)));
1738 }
1739
1740 /*
1741 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1742 *
1743 * We must fsync the state file of any GXACT that is valid or has been
1744 * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1745 * horizon. (If the gxact isn't valid yet, has not been generated in
1746 * redo, or has a later LSN, this checkpoint is not responsible for
1747 * fsyncing it.)
1748 *
1749 * This is deliberately run as late as possible in the checkpoint sequence,
1750 * because GXACTs ordinarily have short lifespans, and so it is quite
1751 * possible that GXACTs that were valid at checkpoint start will no longer
1752 * exist if we wait a little bit. With typical checkpoint settings this
1753 * will be about 3 minutes for an online checkpoint, so as a result we
1754 * expect that there will be no GXACTs that need to be copied to disk.
1755 *
1756 * If a GXACT remains valid across multiple checkpoints, it will already
1757 * be on disk so we don't bother to repeat that write.
1758 */
1759 void
CheckPointTwoPhase(XLogRecPtr redo_horizon)1760 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1761 {
1762 int i;
1763 int serialized_xacts = 0;
1764
1765 if (max_prepared_xacts <= 0)
1766 return; /* nothing to do */
1767
1768 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1769
1770 /*
1771 * We are expecting there to be zero GXACTs that need to be copied to
1772 * disk, so we perform all I/O while holding TwoPhaseStateLock for
1773 * simplicity. This prevents any new xacts from preparing while this
1774 * occurs, which shouldn't be a problem since the presence of long-lived
1775 * prepared xacts indicates the transaction manager isn't active.
1776 *
1777 * It's also possible to move I/O out of the lock, but on every error we
1778 * should check whether somebody committed our transaction in different
1779 * backend. Let's leave this optimization for future, if somebody will
1780 * spot that this place cause bottleneck.
1781 *
1782 * Note that it isn't possible for there to be a GXACT with a
1783 * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1784 * because of the efforts with delayChkpt.
1785 */
1786 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1787 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1788 {
1789 /*
1790 * Note that we are using gxact not PGPROC so this works in recovery
1791 * also
1792 */
1793 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1794
1795 if ((gxact->valid || gxact->inredo) &&
1796 !gxact->ondisk &&
1797 gxact->prepare_end_lsn <= redo_horizon)
1798 {
1799 char *buf;
1800 int len;
1801
1802 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1803 RecreateTwoPhaseFile(gxact->xid, buf, len);
1804 gxact->ondisk = true;
1805 gxact->prepare_start_lsn = InvalidXLogRecPtr;
1806 gxact->prepare_end_lsn = InvalidXLogRecPtr;
1807 pfree(buf);
1808 serialized_xacts++;
1809 }
1810 }
1811 LWLockRelease(TwoPhaseStateLock);
1812
1813 /*
1814 * Flush unconditionally the parent directory to make any information
1815 * durable on disk. Two-phase files could have been removed and those
1816 * removals need to be made persistent as well as any files newly created
1817 * previously since the last checkpoint.
1818 */
1819 fsync_fname(TWOPHASE_DIR, true);
1820
1821 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1822
1823 if (log_checkpoints && serialized_xacts > 0)
1824 ereport(LOG,
1825 (errmsg_plural("%u two-phase state file was written "
1826 "for a long-running prepared transaction",
1827 "%u two-phase state files were written "
1828 "for long-running prepared transactions",
1829 serialized_xacts,
1830 serialized_xacts)));
1831 }
1832
1833 /*
1834 * restoreTwoPhaseData
1835 *
1836 * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1837 * This is called once at the beginning of recovery, saving any extra
1838 * lookups in the future. Two-phase files that are newer than the
1839 * minimum XID horizon are discarded on the way.
1840 */
1841 void
restoreTwoPhaseData(void)1842 restoreTwoPhaseData(void)
1843 {
1844 DIR *cldir;
1845 struct dirent *clde;
1846
1847 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1848 cldir = AllocateDir(TWOPHASE_DIR);
1849 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1850 {
1851 if (strlen(clde->d_name) == 8 &&
1852 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1853 {
1854 TransactionId xid;
1855 char *buf;
1856
1857 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1858
1859 buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1860 true, false, false);
1861 if (buf == NULL)
1862 continue;
1863
1864 PrepareRedoAdd(buf, InvalidXLogRecPtr,
1865 InvalidXLogRecPtr, InvalidRepOriginId);
1866 }
1867 }
1868 LWLockRelease(TwoPhaseStateLock);
1869 FreeDir(cldir);
1870 }
1871
1872 /*
1873 * PrescanPreparedTransactions
1874 *
1875 * Scan the shared memory entries of TwoPhaseState and determine the range
1876 * of valid XIDs present. This is run during database startup, after we
1877 * have completed reading WAL. ShmemVariableCache->nextXid has been set to
1878 * one more than the highest XID for which evidence exists in WAL.
1879 *
1880 * We throw away any prepared xacts with main XID beyond nextXid --- if any
1881 * are present, it suggests that the DBA has done a PITR recovery to an
1882 * earlier point in time without cleaning out pg_twophase. We dare not
1883 * try to recover such prepared xacts since they likely depend on database
1884 * state that doesn't exist now.
1885 *
1886 * However, we will advance nextXid beyond any subxact XIDs belonging to
1887 * valid prepared xacts. We need to do this since subxact commit doesn't
1888 * write a WAL entry, and so there might be no evidence in WAL of those
1889 * subxact XIDs.
1890 *
1891 * On corrupted two-phase files, fail immediately. Keeping around broken
1892 * entries and let replay continue causes harm on the system, and a new
1893 * backup should be rolled in.
1894 *
1895 * Our other responsibility is to determine and return the oldest valid XID
1896 * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1897 * This is needed to synchronize pg_subtrans startup properly.
1898 *
1899 * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1900 * top-level xids is stored in *xids_p. The number of entries in the array
1901 * is returned in *nxids_p.
1902 */
1903 TransactionId
PrescanPreparedTransactions(TransactionId ** xids_p,int * nxids_p)1904 PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1905 {
1906 FullTransactionId nextXid = ShmemVariableCache->nextXid;
1907 TransactionId origNextXid = XidFromFullTransactionId(nextXid);
1908 TransactionId result = origNextXid;
1909 TransactionId *xids = NULL;
1910 int nxids = 0;
1911 int allocsize = 0;
1912 int i;
1913
1914 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1915 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1916 {
1917 TransactionId xid;
1918 char *buf;
1919 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1920
1921 Assert(gxact->inredo);
1922
1923 xid = gxact->xid;
1924
1925 buf = ProcessTwoPhaseBuffer(xid,
1926 gxact->prepare_start_lsn,
1927 gxact->ondisk, false, true);
1928
1929 if (buf == NULL)
1930 continue;
1931
1932 /*
1933 * OK, we think this file is valid. Incorporate xid into the
1934 * running-minimum result.
1935 */
1936 if (TransactionIdPrecedes(xid, result))
1937 result = xid;
1938
1939 if (xids_p)
1940 {
1941 if (nxids == allocsize)
1942 {
1943 if (nxids == 0)
1944 {
1945 allocsize = 10;
1946 xids = palloc(allocsize * sizeof(TransactionId));
1947 }
1948 else
1949 {
1950 allocsize = allocsize * 2;
1951 xids = repalloc(xids, allocsize * sizeof(TransactionId));
1952 }
1953 }
1954 xids[nxids++] = xid;
1955 }
1956
1957 pfree(buf);
1958 }
1959 LWLockRelease(TwoPhaseStateLock);
1960
1961 if (xids_p)
1962 {
1963 *xids_p = xids;
1964 *nxids_p = nxids;
1965 }
1966
1967 return result;
1968 }
1969
1970 /*
1971 * StandbyRecoverPreparedTransactions
1972 *
1973 * Scan the shared memory entries of TwoPhaseState and setup all the required
1974 * information to allow standby queries to treat prepared transactions as still
1975 * active.
1976 *
1977 * This is never called at the end of recovery - we use
1978 * RecoverPreparedTransactions() at that point.
1979 *
1980 * The lack of calls to SubTransSetParent() calls here is by design;
1981 * those calls are made by RecoverPreparedTransactions() at the end of recovery
1982 * for those xacts that need this.
1983 */
1984 void
StandbyRecoverPreparedTransactions(void)1985 StandbyRecoverPreparedTransactions(void)
1986 {
1987 int i;
1988
1989 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1990 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1991 {
1992 TransactionId xid;
1993 char *buf;
1994 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1995
1996 Assert(gxact->inredo);
1997
1998 xid = gxact->xid;
1999
2000 buf = ProcessTwoPhaseBuffer(xid,
2001 gxact->prepare_start_lsn,
2002 gxact->ondisk, false, false);
2003 if (buf != NULL)
2004 pfree(buf);
2005 }
2006 LWLockRelease(TwoPhaseStateLock);
2007 }
2008
2009 /*
2010 * RecoverPreparedTransactions
2011 *
2012 * Scan the shared memory entries of TwoPhaseState and reload the state for
2013 * each prepared transaction (reacquire locks, etc).
2014 *
2015 * This is run at the end of recovery, but before we allow backends to write
2016 * WAL.
2017 *
2018 * At the end of recovery the way we take snapshots will change. We now need
2019 * to mark all running transactions with their full SubTransSetParent() info
2020 * to allow normal snapshots to work correctly if snapshots overflow.
2021 * We do this here because by definition prepared transactions are the only
2022 * type of write transaction still running, so this is necessary and
2023 * complete.
2024 */
2025 void
RecoverPreparedTransactions(void)2026 RecoverPreparedTransactions(void)
2027 {
2028 int i;
2029
2030 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2031 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2032 {
2033 TransactionId xid;
2034 char *buf;
2035 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2036 char *bufptr;
2037 TwoPhaseFileHeader *hdr;
2038 TransactionId *subxids;
2039 const char *gid;
2040
2041 xid = gxact->xid;
2042
2043 /*
2044 * Reconstruct subtrans state for the transaction --- needed because
2045 * pg_subtrans is not preserved over a restart. Note that we are
2046 * linking all the subtransactions directly to the top-level XID;
2047 * there may originally have been a more complex hierarchy, but
2048 * there's no need to restore that exactly. It's possible that
2049 * SubTransSetParent has been set before, if the prepared transaction
2050 * generated xid assignment records.
2051 */
2052 buf = ProcessTwoPhaseBuffer(xid,
2053 gxact->prepare_start_lsn,
2054 gxact->ondisk, true, false);
2055 if (buf == NULL)
2056 continue;
2057
2058 ereport(LOG,
2059 (errmsg("recovering prepared transaction %u from shared memory", xid)));
2060
2061 hdr = (TwoPhaseFileHeader *) buf;
2062 Assert(TransactionIdEquals(hdr->xid, xid));
2063 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2064 gid = (const char *) bufptr;
2065 bufptr += MAXALIGN(hdr->gidlen);
2066 subxids = (TransactionId *) bufptr;
2067 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2068 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2069 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2070 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2071
2072 /*
2073 * Recreate its GXACT and dummy PGPROC. But, check whether it was
2074 * added in redo and already has a shmem entry for it.
2075 */
2076 MarkAsPreparingGuts(gxact, xid, gid,
2077 hdr->prepared_at,
2078 hdr->owner, hdr->database);
2079
2080 /* recovered, so reset the flag for entries generated by redo */
2081 gxact->inredo = false;
2082
2083 GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2084 MarkAsPrepared(gxact, true);
2085
2086 LWLockRelease(TwoPhaseStateLock);
2087
2088 /*
2089 * Recover other state (notably locks) using resource managers.
2090 */
2091 ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2092
2093 /*
2094 * Release locks held by the standby process after we process each
2095 * prepared transaction. As a result, we don't need too many
2096 * additional locks at any one time.
2097 */
2098 if (InHotStandby)
2099 StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2100
2101 /*
2102 * We're done with recovering this transaction. Clear MyLockedGxact,
2103 * like we do in PrepareTransaction() during normal operation.
2104 */
2105 PostPrepare_Twophase();
2106
2107 pfree(buf);
2108
2109 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2110 }
2111
2112 LWLockRelease(TwoPhaseStateLock);
2113 }
2114
2115 /*
2116 * ProcessTwoPhaseBuffer
2117 *
2118 * Given a transaction id, read it either from disk or read it directly
2119 * via shmem xlog record pointer using the provided "prepare_start_lsn".
2120 *
2121 * If setParent is true, set up subtransaction parent linkages.
2122 *
2123 * If setNextXid is true, set ShmemVariableCache->nextXid to the newest
2124 * value scanned.
2125 */
2126 static char *
ProcessTwoPhaseBuffer(TransactionId xid,XLogRecPtr prepare_start_lsn,bool fromdisk,bool setParent,bool setNextXid)2127 ProcessTwoPhaseBuffer(TransactionId xid,
2128 XLogRecPtr prepare_start_lsn,
2129 bool fromdisk,
2130 bool setParent, bool setNextXid)
2131 {
2132 FullTransactionId nextXid = ShmemVariableCache->nextXid;
2133 TransactionId origNextXid = XidFromFullTransactionId(nextXid);
2134 TransactionId *subxids;
2135 char *buf;
2136 TwoPhaseFileHeader *hdr;
2137 int i;
2138
2139 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2140
2141 if (!fromdisk)
2142 Assert(prepare_start_lsn != InvalidXLogRecPtr);
2143
2144 /* Already processed? */
2145 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2146 {
2147 if (fromdisk)
2148 {
2149 ereport(WARNING,
2150 (errmsg("removing stale two-phase state file for transaction %u",
2151 xid)));
2152 RemoveTwoPhaseFile(xid, true);
2153 }
2154 else
2155 {
2156 ereport(WARNING,
2157 (errmsg("removing stale two-phase state from memory for transaction %u",
2158 xid)));
2159 PrepareRedoRemove(xid, true);
2160 }
2161 return NULL;
2162 }
2163
2164 /* Reject XID if too new */
2165 if (TransactionIdFollowsOrEquals(xid, origNextXid))
2166 {
2167 if (fromdisk)
2168 {
2169 ereport(WARNING,
2170 (errmsg("removing future two-phase state file for transaction %u",
2171 xid)));
2172 RemoveTwoPhaseFile(xid, true);
2173 }
2174 else
2175 {
2176 ereport(WARNING,
2177 (errmsg("removing future two-phase state from memory for transaction %u",
2178 xid)));
2179 PrepareRedoRemove(xid, true);
2180 }
2181 return NULL;
2182 }
2183
2184 if (fromdisk)
2185 {
2186 /* Read and validate file */
2187 buf = ReadTwoPhaseFile(xid, false);
2188 }
2189 else
2190 {
2191 /* Read xlog data */
2192 XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2193 }
2194
2195 /* Deconstruct header */
2196 hdr = (TwoPhaseFileHeader *) buf;
2197 if (!TransactionIdEquals(hdr->xid, xid))
2198 {
2199 if (fromdisk)
2200 ereport(ERROR,
2201 (errcode(ERRCODE_DATA_CORRUPTED),
2202 errmsg("corrupted two-phase state file for transaction %u",
2203 xid)));
2204 else
2205 ereport(ERROR,
2206 (errcode(ERRCODE_DATA_CORRUPTED),
2207 errmsg("corrupted two-phase state in memory for transaction %u",
2208 xid)));
2209 }
2210
2211 /*
2212 * Examine subtransaction XIDs ... they should all follow main XID, and
2213 * they may force us to advance nextXid.
2214 */
2215 subxids = (TransactionId *) (buf +
2216 MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2217 MAXALIGN(hdr->gidlen));
2218 for (i = 0; i < hdr->nsubxacts; i++)
2219 {
2220 TransactionId subxid = subxids[i];
2221
2222 Assert(TransactionIdFollows(subxid, xid));
2223
2224 /* update nextXid if needed */
2225 if (setNextXid)
2226 AdvanceNextFullTransactionIdPastXid(subxid);
2227
2228 if (setParent)
2229 SubTransSetParent(subxid, xid);
2230 }
2231
2232 return buf;
2233 }
2234
2235
2236 /*
2237 * RecordTransactionCommitPrepared
2238 *
2239 * This is basically the same as RecordTransactionCommit (q.v. if you change
2240 * this function): in particular, we must set the delayChkpt flag to avoid a
2241 * race condition.
2242 *
2243 * We know the transaction made at least one XLOG entry (its PREPARE),
2244 * so it is never possible to optimize out the commit record.
2245 */
2246 static void
RecordTransactionCommitPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels,int ninvalmsgs,SharedInvalidationMessage * invalmsgs,bool initfileinval,const char * gid)2247 RecordTransactionCommitPrepared(TransactionId xid,
2248 int nchildren,
2249 TransactionId *children,
2250 int nrels,
2251 RelFileNode *rels,
2252 int ninvalmsgs,
2253 SharedInvalidationMessage *invalmsgs,
2254 bool initfileinval,
2255 const char *gid)
2256 {
2257 XLogRecPtr recptr;
2258 TimestampTz committs = GetCurrentTimestamp();
2259 bool replorigin;
2260
2261 /*
2262 * Are we using the replication origins feature? Or, in other words, are
2263 * we replaying remote actions?
2264 */
2265 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2266 replorigin_session_origin != DoNotReplicateId);
2267
2268 START_CRIT_SECTION();
2269
2270 /* See notes in RecordTransactionCommit */
2271 MyProc->delayChkpt = true;
2272
2273 /*
2274 * Emit the XLOG commit record. Note that we mark 2PC commits as
2275 * potentially having AccessExclusiveLocks since we don't know whether or
2276 * not they do.
2277 */
2278 recptr = XactLogCommitRecord(committs,
2279 nchildren, children, nrels, rels,
2280 ninvalmsgs, invalmsgs,
2281 initfileinval,
2282 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2283 xid, gid);
2284
2285
2286 if (replorigin)
2287 /* Move LSNs forward for this replication origin */
2288 replorigin_session_advance(replorigin_session_origin_lsn,
2289 XactLastRecEnd);
2290
2291 /*
2292 * Record commit timestamp. The value comes from plain commit timestamp
2293 * if replorigin is not enabled, or replorigin already set a value for us
2294 * in replorigin_session_origin_timestamp otherwise.
2295 *
2296 * We don't need to WAL-log anything here, as the commit record written
2297 * above already contains the data.
2298 */
2299 if (!replorigin || replorigin_session_origin_timestamp == 0)
2300 replorigin_session_origin_timestamp = committs;
2301
2302 TransactionTreeSetCommitTsData(xid, nchildren, children,
2303 replorigin_session_origin_timestamp,
2304 replorigin_session_origin);
2305
2306 /*
2307 * We don't currently try to sleep before flush here ... nor is there any
2308 * support for async commit of a prepared xact (the very idea is probably
2309 * a contradiction)
2310 */
2311
2312 /* Flush XLOG to disk */
2313 XLogFlush(recptr);
2314
2315 /* Mark the transaction committed in pg_xact */
2316 TransactionIdCommitTree(xid, nchildren, children);
2317
2318 /* Checkpoint can proceed now */
2319 MyProc->delayChkpt = false;
2320
2321 END_CRIT_SECTION();
2322
2323 /*
2324 * Wait for synchronous replication, if required.
2325 *
2326 * Note that at this stage we have marked clog, but still show as running
2327 * in the procarray and continue to hold locks.
2328 */
2329 SyncRepWaitForLSN(recptr, true);
2330 }
2331
2332 /*
2333 * RecordTransactionAbortPrepared
2334 *
2335 * This is basically the same as RecordTransactionAbort.
2336 *
2337 * We know the transaction made at least one XLOG entry (its PREPARE),
2338 * so it is never possible to optimize out the abort record.
2339 */
2340 static void
RecordTransactionAbortPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels,const char * gid)2341 RecordTransactionAbortPrepared(TransactionId xid,
2342 int nchildren,
2343 TransactionId *children,
2344 int nrels,
2345 RelFileNode *rels,
2346 const char *gid)
2347 {
2348 XLogRecPtr recptr;
2349 bool replorigin;
2350
2351 /*
2352 * Are we using the replication origins feature? Or, in other words, are
2353 * we replaying remote actions?
2354 */
2355 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2356 replorigin_session_origin != DoNotReplicateId);
2357
2358 /*
2359 * Catch the scenario where we aborted partway through
2360 * RecordTransactionCommitPrepared ...
2361 */
2362 if (TransactionIdDidCommit(xid))
2363 elog(PANIC, "cannot abort transaction %u, it was already committed",
2364 xid);
2365
2366 START_CRIT_SECTION();
2367
2368 /*
2369 * Emit the XLOG commit record. Note that we mark 2PC aborts as
2370 * potentially having AccessExclusiveLocks since we don't know whether or
2371 * not they do.
2372 */
2373 recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2374 nchildren, children,
2375 nrels, rels,
2376 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2377 xid, gid);
2378
2379 if (replorigin)
2380 /* Move LSNs forward for this replication origin */
2381 replorigin_session_advance(replorigin_session_origin_lsn,
2382 XactLastRecEnd);
2383
2384 /* Always flush, since we're about to remove the 2PC state file */
2385 XLogFlush(recptr);
2386
2387 /*
2388 * Mark the transaction aborted in clog. This is not absolutely necessary
2389 * but we may as well do it while we are here.
2390 */
2391 TransactionIdAbortTree(xid, nchildren, children);
2392
2393 END_CRIT_SECTION();
2394
2395 /*
2396 * Wait for synchronous replication, if required.
2397 *
2398 * Note that at this stage we have marked clog, but still show as running
2399 * in the procarray and continue to hold locks.
2400 */
2401 SyncRepWaitForLSN(recptr, false);
2402 }
2403
2404 /*
2405 * PrepareRedoAdd
2406 *
2407 * Store pointers to the start/end of the WAL record along with the xid in
2408 * a gxact entry in shared memory TwoPhaseState structure. If caller
2409 * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2410 * data, the entry is marked as located on disk.
2411 */
2412 void
PrepareRedoAdd(char * buf,XLogRecPtr start_lsn,XLogRecPtr end_lsn,RepOriginId origin_id)2413 PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2414 XLogRecPtr end_lsn, RepOriginId origin_id)
2415 {
2416 TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2417 char *bufptr;
2418 const char *gid;
2419 GlobalTransaction gxact;
2420
2421 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2422 Assert(RecoveryInProgress());
2423
2424 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2425 gid = (const char *) bufptr;
2426
2427 /*
2428 * Reserve the GID for the given transaction in the redo code path.
2429 *
2430 * This creates a gxact struct and puts it into the active array.
2431 *
2432 * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2433 * shared memory. Hence, we only fill up the bare minimum contents here.
2434 * The gxact also gets marked with gxact->inredo set to true to indicate
2435 * that it got added in the redo phase
2436 */
2437
2438 /* Get a free gxact from the freelist */
2439 if (TwoPhaseState->freeGXacts == NULL)
2440 ereport(ERROR,
2441 (errcode(ERRCODE_OUT_OF_MEMORY),
2442 errmsg("maximum number of prepared transactions reached"),
2443 errhint("Increase max_prepared_transactions (currently %d).",
2444 max_prepared_xacts)));
2445 gxact = TwoPhaseState->freeGXacts;
2446 TwoPhaseState->freeGXacts = gxact->next;
2447
2448 gxact->prepared_at = hdr->prepared_at;
2449 gxact->prepare_start_lsn = start_lsn;
2450 gxact->prepare_end_lsn = end_lsn;
2451 gxact->xid = hdr->xid;
2452 gxact->owner = hdr->owner;
2453 gxact->locking_backend = InvalidBackendId;
2454 gxact->valid = false;
2455 gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2456 gxact->inredo = true; /* yes, added in redo */
2457 strcpy(gxact->gid, gid);
2458
2459 /* And insert it into the active array */
2460 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2461 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2462
2463 if (origin_id != InvalidRepOriginId)
2464 {
2465 /* recover apply progress */
2466 replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2467 false /* backward */ , false /* WAL */ );
2468 }
2469
2470 elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2471 }
2472
2473 /*
2474 * PrepareRedoRemove
2475 *
2476 * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2477 * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2478 *
2479 * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2480 * is updated.
2481 */
2482 void
PrepareRedoRemove(TransactionId xid,bool giveWarning)2483 PrepareRedoRemove(TransactionId xid, bool giveWarning)
2484 {
2485 GlobalTransaction gxact = NULL;
2486 int i;
2487 bool found = false;
2488
2489 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2490 Assert(RecoveryInProgress());
2491
2492 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2493 {
2494 gxact = TwoPhaseState->prepXacts[i];
2495
2496 if (gxact->xid == xid)
2497 {
2498 Assert(gxact->inredo);
2499 found = true;
2500 break;
2501 }
2502 }
2503
2504 /*
2505 * Just leave if there is nothing, this is expected during WAL replay.
2506 */
2507 if (!found)
2508 return;
2509
2510 /*
2511 * And now we can clean up any files we may have left.
2512 */
2513 elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2514 if (gxact->ondisk)
2515 RemoveTwoPhaseFile(xid, giveWarning);
2516 RemoveGXact(gxact);
2517 }
2518