1 /*-------------------------------------------------------------------------
2 *
3 * twophase.c
4 * Two-phase commit support functions.
5 *
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/access/transam/twophase.c
11 *
12 * NOTES
13 * Each global transaction is associated with a global transaction
14 * identifier (GID). The client assigns a GID to a postgres
15 * transaction with the PREPARE TRANSACTION command.
16 *
17 * We keep all active global transactions in a shared memory array.
18 * When the PREPARE TRANSACTION command is issued, the GID is
19 * reserved for the transaction in the array. This is done before
20 * a WAL entry is made, because the reservation checks for duplicate
21 * GIDs and aborts the transaction if there already is a global
22 * transaction in prepared state with the same GID.
23 *
24 * A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
25 * what keeps the XID considered running by TransactionIdIsInProgress.
26 * It is also convenient as a PGPROC to hook the gxact's locks to.
27 *
28 * Information to recover prepared transactions in case of crash is
29 * now stored in WAL for the common case. In some cases there will be
30 * an extended period between preparing a GXACT and commit/abort, in
31 * which case we need to separately record prepared transaction data
32 * in permanent storage. This includes locking information, pending
33 * notifications etc. All that state information is written to the
34 * per-transaction state file in the pg_twophase directory.
35 * All prepared transactions will be written prior to shutdown.
36 *
37 * Life track of state data is following:
38 *
39 * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 * stores pointer to the start of the WAL record in
41 * gxact->prepare_start_lsn.
42 * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 * using prepare_start_lsn.
44 * * On checkpoint state data copied to files in pg_twophase directory and
45 * fsynced
46 * * If COMMIT happens after checkpoint then backend reads state data from
47 * files
48 *
49 * During replay and replication, TwoPhaseState also holds information
50 * about active prepared transactions that haven't been moved to disk yet.
51 *
52 * Replay of twophase records happens by the following rules:
53 *
54 * * At the beginning of recovery, pg_twophase is scanned once, filling
55 * TwoPhaseState with entries marked with gxact->inredo and
56 * gxact->ondisk. Two-phase file data older than the XID horizon of
57 * the redo position are discarded.
58 * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 * gxact->inredo is set to true for such entries.
60 * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 * that have gxact->inredo set and are behind the redo_horizon. We
62 * save them to disk and then switch gxact->ondisk to true.
63 * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 * If gxact->ondisk is true, the corresponding entry from the disk
65 * is additionally deleted.
66 * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 * and PrescanPreparedTransactions() have been modified to go through
68 * gxact->inredo entries that have not made it to disk.
69 *
70 *-------------------------------------------------------------------------
71 */
72 #include "postgres.h"
73
74 #include <fcntl.h>
75 #include <sys/stat.h>
76 #include <time.h>
77 #include <unistd.h>
78
79 #include "access/commit_ts.h"
80 #include "access/htup_details.h"
81 #include "access/subtrans.h"
82 #include "access/transam.h"
83 #include "access/twophase.h"
84 #include "access/twophase_rmgr.h"
85 #include "access/xact.h"
86 #include "access/xlog.h"
87 #include "access/xloginsert.h"
88 #include "access/xlogreader.h"
89 #include "access/xlogutils.h"
90 #include "catalog/pg_type.h"
91 #include "catalog/storage.h"
92 #include "funcapi.h"
93 #include "miscadmin.h"
94 #include "pg_trace.h"
95 #include "pgstat.h"
96 #include "replication/origin.h"
97 #include "replication/syncrep.h"
98 #include "replication/walsender.h"
99 #include "storage/fd.h"
100 #include "storage/ipc.h"
101 #include "storage/md.h"
102 #include "storage/predicate.h"
103 #include "storage/proc.h"
104 #include "storage/procarray.h"
105 #include "storage/sinvaladt.h"
106 #include "storage/smgr.h"
107 #include "utils/builtins.h"
108 #include "utils/memutils.h"
109 #include "utils/timestamp.h"
110
111 /*
112 * Directory where Two-phase commit files reside within PGDATA
113 */
114 #define TWOPHASE_DIR "pg_twophase"
115
116 /* GUC variable, can't be changed after startup */
117 int max_prepared_xacts = 0;
118
119 /*
120 * This struct describes one global transaction that is in prepared state
121 * or attempting to become prepared.
122 *
123 * The lifecycle of a global transaction is:
124 *
125 * 1. After checking that the requested GID is not in use, set up an entry in
126 * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127 * and mark it as locked by my backend.
128 *
129 * 2. After successfully completing prepare, set valid = true and enter the
130 * referenced PGPROC into the global ProcArray.
131 *
132 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133 * valid and not locked, then mark the entry as locked by storing my current
134 * backend ID into locking_backend. This prevents concurrent attempts to
135 * commit or rollback the same prepared xact.
136 *
137 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
139 * the freelist.
140 *
141 * Note that if the preparing transaction fails between steps 1 and 2, the
142 * entry must be removed so that the GID and the GlobalTransaction struct
143 * can be reused. See AtAbort_Twophase().
144 *
145 * typedef struct GlobalTransactionData *GlobalTransaction appears in
146 * twophase.h
147 */
148
149 typedef struct GlobalTransactionData
150 {
151 GlobalTransaction next; /* list link for free list */
152 int pgprocno; /* ID of associated dummy PGPROC */
153 BackendId dummyBackendId; /* similar to backend id for backends */
154 TimestampTz prepared_at; /* time of preparation */
155
156 /*
157 * Note that we need to keep track of two LSNs for each GXACT. We keep
158 * track of the start LSN because this is the address we must use to read
159 * state data back from WAL when committing a prepared GXACT. We keep
160 * track of the end LSN because that is the LSN we need to wait for prior
161 * to commit.
162 */
163 XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
164 XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
165 TransactionId xid; /* The GXACT id */
166
167 Oid owner; /* ID of user that executed the xact */
168 BackendId locking_backend; /* backend currently working on the xact */
169 bool valid; /* true if PGPROC entry is in proc array */
170 bool ondisk; /* true if prepare state file is on disk */
171 bool inredo; /* true if entry was added via xlog_redo */
172 char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
173 } GlobalTransactionData;
174
175 /*
176 * Two Phase Commit shared state. Access to this struct is protected
177 * by TwoPhaseStateLock.
178 */
179 typedef struct TwoPhaseStateData
180 {
181 /* Head of linked list of free GlobalTransactionData structs */
182 GlobalTransaction freeGXacts;
183
184 /* Number of valid prepXacts entries. */
185 int numPrepXacts;
186
187 /* There are max_prepared_xacts items in this array */
188 GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
189 } TwoPhaseStateData;
190
191 static TwoPhaseStateData *TwoPhaseState;
192
193 /*
194 * Global transaction entry currently locked by us, if any. Note that any
195 * access to the entry pointed to by this variable must be protected by
196 * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
197 * (since it's just local memory).
198 */
199 static GlobalTransaction MyLockedGxact = NULL;
200
201 static bool twophaseExitRegistered = false;
202
203 static void RecordTransactionCommitPrepared(TransactionId xid,
204 int nchildren,
205 TransactionId *children,
206 int nrels,
207 RelFileNode *rels,
208 int ninvalmsgs,
209 SharedInvalidationMessage *invalmsgs,
210 bool initfileinval,
211 const char *gid);
212 static void RecordTransactionAbortPrepared(TransactionId xid,
213 int nchildren,
214 TransactionId *children,
215 int nrels,
216 RelFileNode *rels,
217 const char *gid);
218 static void ProcessRecords(char *bufptr, TransactionId xid,
219 const TwoPhaseCallback callbacks[]);
220 static void RemoveGXact(GlobalTransaction gxact);
221
222 static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
223 static char *ProcessTwoPhaseBuffer(TransactionId xid,
224 XLogRecPtr prepare_start_lsn,
225 bool fromdisk, bool setParent, bool setNextXid);
226 static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
227 const char *gid, TimestampTz prepared_at, Oid owner,
228 Oid databaseid);
229 static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
230 static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
231
232 /*
233 * Initialization of shared memory
234 */
235 Size
TwoPhaseShmemSize(void)236 TwoPhaseShmemSize(void)
237 {
238 Size size;
239
240 /* Need the fixed struct, the array of pointers, and the GTD structs */
241 size = offsetof(TwoPhaseStateData, prepXacts);
242 size = add_size(size, mul_size(max_prepared_xacts,
243 sizeof(GlobalTransaction)));
244 size = MAXALIGN(size);
245 size = add_size(size, mul_size(max_prepared_xacts,
246 sizeof(GlobalTransactionData)));
247
248 return size;
249 }
250
251 void
TwoPhaseShmemInit(void)252 TwoPhaseShmemInit(void)
253 {
254 bool found;
255
256 TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
257 TwoPhaseShmemSize(),
258 &found);
259 if (!IsUnderPostmaster)
260 {
261 GlobalTransaction gxacts;
262 int i;
263
264 Assert(!found);
265 TwoPhaseState->freeGXacts = NULL;
266 TwoPhaseState->numPrepXacts = 0;
267
268 /*
269 * Initialize the linked list of free GlobalTransactionData structs
270 */
271 gxacts = (GlobalTransaction)
272 ((char *) TwoPhaseState +
273 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
274 sizeof(GlobalTransaction) * max_prepared_xacts));
275 for (i = 0; i < max_prepared_xacts; i++)
276 {
277 /* insert into linked list */
278 gxacts[i].next = TwoPhaseState->freeGXacts;
279 TwoPhaseState->freeGXacts = &gxacts[i];
280
281 /* associate it with a PGPROC assigned by InitProcGlobal */
282 gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
283
284 /*
285 * Assign a unique ID for each dummy proc, so that the range of
286 * dummy backend IDs immediately follows the range of normal
287 * backend IDs. We don't dare to assign a real backend ID to dummy
288 * procs, because prepared transactions don't take part in cache
289 * invalidation like a real backend ID would imply, but having a
290 * unique ID for them is nevertheless handy. This arrangement
291 * allows you to allocate an array of size (MaxBackends +
292 * max_prepared_xacts + 1), and have a slot for every backend and
293 * prepared transaction. Currently multixact.c uses that
294 * technique.
295 */
296 gxacts[i].dummyBackendId = MaxBackends + 1 + i;
297 }
298 }
299 else
300 Assert(found);
301 }
302
303 /*
304 * Exit hook to unlock the global transaction entry we're working on.
305 */
306 static void
AtProcExit_Twophase(int code,Datum arg)307 AtProcExit_Twophase(int code, Datum arg)
308 {
309 /* same logic as abort */
310 AtAbort_Twophase();
311 }
312
313 /*
314 * Abort hook to unlock the global transaction entry we're working on.
315 */
316 void
AtAbort_Twophase(void)317 AtAbort_Twophase(void)
318 {
319 if (MyLockedGxact == NULL)
320 return;
321
322 /*
323 * What to do with the locked global transaction entry? If we were in the
324 * process of preparing the transaction, but haven't written the WAL
325 * record and state file yet, the transaction must not be considered as
326 * prepared. Likewise, if we are in the process of finishing an
327 * already-prepared transaction, and fail after having already written the
328 * 2nd phase commit or rollback record to the WAL, the transaction should
329 * not be considered as prepared anymore. In those cases, just remove the
330 * entry from shared memory.
331 *
332 * Otherwise, the entry must be left in place so that the transaction can
333 * be finished later, so just unlock it.
334 *
335 * If we abort during prepare, after having written the WAL record, we
336 * might not have transferred all locks and other state to the prepared
337 * transaction yet. Likewise, if we abort during commit or rollback,
338 * after having written the WAL record, we might not have released all the
339 * resources held by the transaction yet. In those cases, the in-memory
340 * state can be wrong, but it's too late to back out.
341 */
342 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
343 if (!MyLockedGxact->valid)
344 RemoveGXact(MyLockedGxact);
345 else
346 MyLockedGxact->locking_backend = InvalidBackendId;
347 LWLockRelease(TwoPhaseStateLock);
348
349 MyLockedGxact = NULL;
350 }
351
352 /*
353 * This is called after we have finished transferring state to the prepared
354 * PGXACT entry.
355 */
356 void
PostPrepare_Twophase(void)357 PostPrepare_Twophase(void)
358 {
359 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
360 MyLockedGxact->locking_backend = InvalidBackendId;
361 LWLockRelease(TwoPhaseStateLock);
362
363 MyLockedGxact = NULL;
364 }
365
366
367 /*
368 * MarkAsPreparing
369 * Reserve the GID for the given transaction.
370 */
371 GlobalTransaction
MarkAsPreparing(TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)372 MarkAsPreparing(TransactionId xid, const char *gid,
373 TimestampTz prepared_at, Oid owner, Oid databaseid)
374 {
375 GlobalTransaction gxact;
376 int i;
377
378 if (strlen(gid) >= GIDSIZE)
379 ereport(ERROR,
380 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
381 errmsg("transaction identifier \"%s\" is too long",
382 gid)));
383
384 /* fail immediately if feature is disabled */
385 if (max_prepared_xacts == 0)
386 ereport(ERROR,
387 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
388 errmsg("prepared transactions are disabled"),
389 errhint("Set max_prepared_transactions to a nonzero value.")));
390
391 /* on first call, register the exit hook */
392 if (!twophaseExitRegistered)
393 {
394 before_shmem_exit(AtProcExit_Twophase, 0);
395 twophaseExitRegistered = true;
396 }
397
398 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
399
400 /* Check for conflicting GID */
401 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
402 {
403 gxact = TwoPhaseState->prepXacts[i];
404 if (strcmp(gxact->gid, gid) == 0)
405 {
406 ereport(ERROR,
407 (errcode(ERRCODE_DUPLICATE_OBJECT),
408 errmsg("transaction identifier \"%s\" is already in use",
409 gid)));
410 }
411 }
412
413 /* Get a free gxact from the freelist */
414 if (TwoPhaseState->freeGXacts == NULL)
415 ereport(ERROR,
416 (errcode(ERRCODE_OUT_OF_MEMORY),
417 errmsg("maximum number of prepared transactions reached"),
418 errhint("Increase max_prepared_transactions (currently %d).",
419 max_prepared_xacts)));
420 gxact = TwoPhaseState->freeGXacts;
421 TwoPhaseState->freeGXacts = gxact->next;
422
423 MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
424
425 gxact->ondisk = false;
426
427 /* And insert it into the active array */
428 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
429 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
430
431 LWLockRelease(TwoPhaseStateLock);
432
433 return gxact;
434 }
435
436 /*
437 * MarkAsPreparingGuts
438 *
439 * This uses a gxact struct and puts it into the active array.
440 * NOTE: this is also used when reloading a gxact after a crash; so avoid
441 * assuming that we can use very much backend context.
442 *
443 * Note: This function should be called with appropriate locks held.
444 */
445 static void
MarkAsPreparingGuts(GlobalTransaction gxact,TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)446 MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
447 TimestampTz prepared_at, Oid owner, Oid databaseid)
448 {
449 PGPROC *proc;
450 PGXACT *pgxact;
451 int i;
452
453 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
454
455 Assert(gxact != NULL);
456 proc = &ProcGlobal->allProcs[gxact->pgprocno];
457 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
458
459 /* Initialize the PGPROC entry */
460 MemSet(proc, 0, sizeof(PGPROC));
461 proc->pgprocno = gxact->pgprocno;
462 SHMQueueElemInit(&(proc->links));
463 proc->waitStatus = STATUS_OK;
464 if (LocalTransactionIdIsValid(MyProc->lxid))
465 {
466 /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
467 proc->lxid = MyProc->lxid;
468 proc->backendId = MyBackendId;
469 }
470 else
471 {
472 Assert(AmStartupProcess() || !IsPostmasterEnvironment);
473 /* GetLockConflicts() uses this to specify a wait on the XID */
474 proc->lxid = xid;
475 proc->backendId = InvalidBackendId;
476 }
477 pgxact->xid = xid;
478 pgxact->xmin = InvalidTransactionId;
479 proc->delayChkpt = false;
480 pgxact->vacuumFlags = 0;
481 proc->pid = 0;
482 proc->databaseId = databaseid;
483 proc->roleId = owner;
484 proc->tempNamespaceId = InvalidOid;
485 proc->isBackgroundWorker = false;
486 proc->lwWaiting = false;
487 proc->lwWaitMode = 0;
488 proc->waitLock = NULL;
489 proc->waitProcLock = NULL;
490 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
491 SHMQueueInit(&(proc->myProcLocks[i]));
492 /* subxid data must be filled later by GXactLoadSubxactData */
493 pgxact->overflowed = false;
494 pgxact->nxids = 0;
495
496 gxact->prepared_at = prepared_at;
497 gxact->xid = xid;
498 gxact->owner = owner;
499 gxact->locking_backend = MyBackendId;
500 gxact->valid = false;
501 gxact->inredo = false;
502 strcpy(gxact->gid, gid);
503
504 /*
505 * Remember that we have this GlobalTransaction entry locked for us. If we
506 * abort after this, we must release it.
507 */
508 MyLockedGxact = gxact;
509 }
510
511 /*
512 * GXactLoadSubxactData
513 *
514 * If the transaction being persisted had any subtransactions, this must
515 * be called before MarkAsPrepared() to load information into the dummy
516 * PGPROC.
517 */
518 static void
GXactLoadSubxactData(GlobalTransaction gxact,int nsubxacts,TransactionId * children)519 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
520 TransactionId *children)
521 {
522 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
523 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
524
525 /* We need no extra lock since the GXACT isn't valid yet */
526 if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
527 {
528 pgxact->overflowed = true;
529 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
530 }
531 if (nsubxacts > 0)
532 {
533 memcpy(proc->subxids.xids, children,
534 nsubxacts * sizeof(TransactionId));
535 pgxact->nxids = nsubxacts;
536 }
537 }
538
539 /*
540 * MarkAsPrepared
541 * Mark the GXACT as fully valid, and enter it into the global ProcArray.
542 *
543 * lock_held indicates whether caller already holds TwoPhaseStateLock.
544 */
545 static void
MarkAsPrepared(GlobalTransaction gxact,bool lock_held)546 MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
547 {
548 /* Lock here may be overkill, but I'm not convinced of that ... */
549 if (!lock_held)
550 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
551 Assert(!gxact->valid);
552 gxact->valid = true;
553 if (!lock_held)
554 LWLockRelease(TwoPhaseStateLock);
555
556 /*
557 * Put it into the global ProcArray so TransactionIdIsInProgress considers
558 * the XID as still running.
559 */
560 ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
561 }
562
563 /*
564 * LockGXact
565 * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
566 */
567 static GlobalTransaction
LockGXact(const char * gid,Oid user)568 LockGXact(const char *gid, Oid user)
569 {
570 int i;
571
572 /* on first call, register the exit hook */
573 if (!twophaseExitRegistered)
574 {
575 before_shmem_exit(AtProcExit_Twophase, 0);
576 twophaseExitRegistered = true;
577 }
578
579 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
580
581 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
582 {
583 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
584 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
585
586 /* Ignore not-yet-valid GIDs */
587 if (!gxact->valid)
588 continue;
589 if (strcmp(gxact->gid, gid) != 0)
590 continue;
591
592 /* Found it, but has someone else got it locked? */
593 if (gxact->locking_backend != InvalidBackendId)
594 ereport(ERROR,
595 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
596 errmsg("prepared transaction with identifier \"%s\" is busy",
597 gid)));
598
599 if (user != gxact->owner && !superuser_arg(user))
600 ereport(ERROR,
601 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
602 errmsg("permission denied to finish prepared transaction"),
603 errhint("Must be superuser or the user that prepared the transaction.")));
604
605 /*
606 * Note: it probably would be possible to allow committing from
607 * another database; but at the moment NOTIFY is known not to work and
608 * there may be some other issues as well. Hence disallow until
609 * someone gets motivated to make it work.
610 */
611 if (MyDatabaseId != proc->databaseId)
612 ereport(ERROR,
613 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
614 errmsg("prepared transaction belongs to another database"),
615 errhint("Connect to the database where the transaction was prepared to finish it.")));
616
617 /* OK for me to lock it */
618 gxact->locking_backend = MyBackendId;
619 MyLockedGxact = gxact;
620
621 LWLockRelease(TwoPhaseStateLock);
622
623 return gxact;
624 }
625
626 LWLockRelease(TwoPhaseStateLock);
627
628 ereport(ERROR,
629 (errcode(ERRCODE_UNDEFINED_OBJECT),
630 errmsg("prepared transaction with identifier \"%s\" does not exist",
631 gid)));
632
633 /* NOTREACHED */
634 return NULL;
635 }
636
637 /*
638 * RemoveGXact
639 * Remove the prepared transaction from the shared memory array.
640 *
641 * NB: caller should have already removed it from ProcArray
642 */
643 static void
RemoveGXact(GlobalTransaction gxact)644 RemoveGXact(GlobalTransaction gxact)
645 {
646 int i;
647
648 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
649
650 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
651 {
652 if (gxact == TwoPhaseState->prepXacts[i])
653 {
654 /* remove from the active array */
655 TwoPhaseState->numPrepXacts--;
656 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
657
658 /* and put it back in the freelist */
659 gxact->next = TwoPhaseState->freeGXacts;
660 TwoPhaseState->freeGXacts = gxact;
661
662 return;
663 }
664 }
665
666 elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
667 }
668
669 /*
670 * Returns an array of all prepared transactions for the user-level
671 * function pg_prepared_xact.
672 *
673 * The returned array and all its elements are copies of internal data
674 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
675 *
676 * WARNING -- we return even those transactions that are not fully prepared
677 * yet. The caller should filter them out if he doesn't want them.
678 *
679 * The returned array is palloc'd.
680 */
681 static int
GetPreparedTransactionList(GlobalTransaction * gxacts)682 GetPreparedTransactionList(GlobalTransaction *gxacts)
683 {
684 GlobalTransaction array;
685 int num;
686 int i;
687
688 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
689
690 if (TwoPhaseState->numPrepXacts == 0)
691 {
692 LWLockRelease(TwoPhaseStateLock);
693
694 *gxacts = NULL;
695 return 0;
696 }
697
698 num = TwoPhaseState->numPrepXacts;
699 array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
700 *gxacts = array;
701 for (i = 0; i < num; i++)
702 memcpy(array + i, TwoPhaseState->prepXacts[i],
703 sizeof(GlobalTransactionData));
704
705 LWLockRelease(TwoPhaseStateLock);
706
707 return num;
708 }
709
710
711 /* Working status for pg_prepared_xact */
712 typedef struct
713 {
714 GlobalTransaction array;
715 int ngxacts;
716 int currIdx;
717 } Working_State;
718
719 /*
720 * pg_prepared_xact
721 * Produce a view with one row per prepared transaction.
722 *
723 * This function is here so we don't have to export the
724 * GlobalTransactionData struct definition.
725 */
726 Datum
pg_prepared_xact(PG_FUNCTION_ARGS)727 pg_prepared_xact(PG_FUNCTION_ARGS)
728 {
729 FuncCallContext *funcctx;
730 Working_State *status;
731
732 if (SRF_IS_FIRSTCALL())
733 {
734 TupleDesc tupdesc;
735 MemoryContext oldcontext;
736
737 /* create a function context for cross-call persistence */
738 funcctx = SRF_FIRSTCALL_INIT();
739
740 /*
741 * Switch to memory context appropriate for multiple function calls
742 */
743 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
744
745 /* build tupdesc for result tuples */
746 /* this had better match pg_prepared_xacts view in system_views.sql */
747 tupdesc = CreateTemplateTupleDesc(5);
748 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
749 XIDOID, -1, 0);
750 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
751 TEXTOID, -1, 0);
752 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
753 TIMESTAMPTZOID, -1, 0);
754 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
755 OIDOID, -1, 0);
756 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
757 OIDOID, -1, 0);
758
759 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
760
761 /*
762 * Collect all the 2PC status information that we will format and send
763 * out as a result set.
764 */
765 status = (Working_State *) palloc(sizeof(Working_State));
766 funcctx->user_fctx = (void *) status;
767
768 status->ngxacts = GetPreparedTransactionList(&status->array);
769 status->currIdx = 0;
770
771 MemoryContextSwitchTo(oldcontext);
772 }
773
774 funcctx = SRF_PERCALL_SETUP();
775 status = (Working_State *) funcctx->user_fctx;
776
777 while (status->array != NULL && status->currIdx < status->ngxacts)
778 {
779 GlobalTransaction gxact = &status->array[status->currIdx++];
780 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
781 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
782 Datum values[5];
783 bool nulls[5];
784 HeapTuple tuple;
785 Datum result;
786
787 if (!gxact->valid)
788 continue;
789
790 /*
791 * Form tuple with appropriate data.
792 */
793 MemSet(values, 0, sizeof(values));
794 MemSet(nulls, 0, sizeof(nulls));
795
796 values[0] = TransactionIdGetDatum(pgxact->xid);
797 values[1] = CStringGetTextDatum(gxact->gid);
798 values[2] = TimestampTzGetDatum(gxact->prepared_at);
799 values[3] = ObjectIdGetDatum(gxact->owner);
800 values[4] = ObjectIdGetDatum(proc->databaseId);
801
802 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
803 result = HeapTupleGetDatum(tuple);
804 SRF_RETURN_NEXT(funcctx, result);
805 }
806
807 SRF_RETURN_DONE(funcctx);
808 }
809
810 /*
811 * TwoPhaseGetGXact
812 * Get the GlobalTransaction struct for a prepared transaction
813 * specified by XID
814 *
815 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
816 * caller had better hold it.
817 */
818 static GlobalTransaction
TwoPhaseGetGXact(TransactionId xid,bool lock_held)819 TwoPhaseGetGXact(TransactionId xid, bool lock_held)
820 {
821 GlobalTransaction result = NULL;
822 int i;
823
824 static TransactionId cached_xid = InvalidTransactionId;
825 static GlobalTransaction cached_gxact = NULL;
826
827 Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
828
829 /*
830 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
831 * repeatedly for the same XID. We can save work with a simple cache.
832 */
833 if (xid == cached_xid)
834 return cached_gxact;
835
836 if (!lock_held)
837 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
838
839 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
840 {
841 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
842 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
843
844 if (pgxact->xid == xid)
845 {
846 result = gxact;
847 break;
848 }
849 }
850
851 if (!lock_held)
852 LWLockRelease(TwoPhaseStateLock);
853
854 if (result == NULL) /* should not happen */
855 elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
856
857 cached_xid = xid;
858 cached_gxact = result;
859
860 return result;
861 }
862
863 /*
864 * TwoPhaseGetXidByVirtualXID
865 * Lookup VXID among xacts prepared since last startup.
866 *
867 * (This won't find recovered xacts.) If more than one matches, return any
868 * and set "have_more" to true. To witness multiple matches, a single
869 * BackendId must consume 2^32 LXIDs, with no intervening database restart.
870 */
871 TransactionId
TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,bool * have_more)872 TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
873 bool *have_more)
874 {
875 int i;
876 TransactionId result = InvalidTransactionId;
877
878 Assert(VirtualTransactionIdIsValid(vxid));
879 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
880
881 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
882 {
883 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
884 PGPROC *proc;
885 VirtualTransactionId proc_vxid;
886
887 if (!gxact->valid)
888 continue;
889 proc = &ProcGlobal->allProcs[gxact->pgprocno];
890 GET_VXID_FROM_PGPROC(proc_vxid, *proc);
891 if (VirtualTransactionIdEquals(vxid, proc_vxid))
892 {
893 /* Startup process sets proc->backendId to InvalidBackendId. */
894 Assert(!gxact->inredo);
895
896 if (result != InvalidTransactionId)
897 {
898 *have_more = true;
899 break;
900 }
901 result = gxact->xid;
902 }
903 }
904
905 LWLockRelease(TwoPhaseStateLock);
906
907 return result;
908 }
909
910 /*
911 * TwoPhaseGetDummyBackendId
912 * Get the dummy backend ID for prepared transaction specified by XID
913 *
914 * Dummy backend IDs are similar to real backend IDs of real backends.
915 * They start at MaxBackends + 1, and are unique across all currently active
916 * real backends and prepared transactions. If lock_held is set to true,
917 * TwoPhaseStateLock will not be taken, so the caller had better hold it.
918 */
919 BackendId
TwoPhaseGetDummyBackendId(TransactionId xid,bool lock_held)920 TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held)
921 {
922 GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
923
924 return gxact->dummyBackendId;
925 }
926
927 /*
928 * TwoPhaseGetDummyProc
929 * Get the PGPROC that represents a prepared transaction specified by XID
930 *
931 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
932 * caller had better hold it.
933 */
934 PGPROC *
TwoPhaseGetDummyProc(TransactionId xid,bool lock_held)935 TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
936 {
937 GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
938
939 return &ProcGlobal->allProcs[gxact->pgprocno];
940 }
941
942 /************************************************************************/
943 /* State file support */
944 /************************************************************************/
945
946 #define TwoPhaseFilePath(path, xid) \
947 snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
948
949 /*
950 * 2PC state file format:
951 *
952 * 1. TwoPhaseFileHeader
953 * 2. TransactionId[] (subtransactions)
954 * 3. RelFileNode[] (files to be deleted at commit)
955 * 4. RelFileNode[] (files to be deleted at abort)
956 * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
957 * 6. TwoPhaseRecordOnDisk
958 * 7. ...
959 * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
960 * 9. checksum (CRC-32C)
961 *
962 * Each segment except the final checksum is MAXALIGN'd.
963 */
964
965 /*
966 * Header for a 2PC state file
967 */
968 #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
969
970 typedef xl_xact_prepare TwoPhaseFileHeader;
971
972 /*
973 * Header for each record in a state file
974 *
975 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
976 * The rmgr data will be stored starting on a MAXALIGN boundary.
977 */
978 typedef struct TwoPhaseRecordOnDisk
979 {
980 uint32 len; /* length of rmgr data */
981 TwoPhaseRmgrId rmid; /* resource manager for this record */
982 uint16 info; /* flag bits for use by rmgr */
983 } TwoPhaseRecordOnDisk;
984
985 /*
986 * During prepare, the state file is assembled in memory before writing it
987 * to WAL and the actual state file. We use a chain of StateFileChunk blocks
988 * for that.
989 */
990 typedef struct StateFileChunk
991 {
992 char *data;
993 uint32 len;
994 struct StateFileChunk *next;
995 } StateFileChunk;
996
997 static struct xllist
998 {
999 StateFileChunk *head; /* first data block in the chain */
1000 StateFileChunk *tail; /* last block in chain */
1001 uint32 num_chunks;
1002 uint32 bytes_free; /* free bytes left in tail block */
1003 uint32 total_len; /* total data bytes in chain */
1004 } records;
1005
1006
1007 /*
1008 * Append a block of data to records data structure.
1009 *
1010 * NB: each block is padded to a MAXALIGN multiple. This must be
1011 * accounted for when the file is later read!
1012 *
1013 * The data is copied, so the caller is free to modify it afterwards.
1014 */
1015 static void
save_state_data(const void * data,uint32 len)1016 save_state_data(const void *data, uint32 len)
1017 {
1018 uint32 padlen = MAXALIGN(len);
1019
1020 if (padlen > records.bytes_free)
1021 {
1022 records.tail->next = palloc0(sizeof(StateFileChunk));
1023 records.tail = records.tail->next;
1024 records.tail->len = 0;
1025 records.tail->next = NULL;
1026 records.num_chunks++;
1027
1028 records.bytes_free = Max(padlen, 512);
1029 records.tail->data = palloc(records.bytes_free);
1030 }
1031
1032 memcpy(((char *) records.tail->data) + records.tail->len, data, len);
1033 records.tail->len += padlen;
1034 records.bytes_free -= padlen;
1035 records.total_len += padlen;
1036 }
1037
1038 /*
1039 * Start preparing a state file.
1040 *
1041 * Initializes data structure and inserts the 2PC file header record.
1042 */
1043 void
StartPrepare(GlobalTransaction gxact)1044 StartPrepare(GlobalTransaction gxact)
1045 {
1046 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
1047 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1048 TransactionId xid = pgxact->xid;
1049 TwoPhaseFileHeader hdr;
1050 TransactionId *children;
1051 RelFileNode *commitrels;
1052 RelFileNode *abortrels;
1053 SharedInvalidationMessage *invalmsgs;
1054
1055 /* Initialize linked list */
1056 records.head = palloc0(sizeof(StateFileChunk));
1057 records.head->len = 0;
1058 records.head->next = NULL;
1059
1060 records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1061 records.head->data = palloc(records.bytes_free);
1062
1063 records.tail = records.head;
1064 records.num_chunks = 1;
1065
1066 records.total_len = 0;
1067
1068 /* Create header */
1069 hdr.magic = TWOPHASE_MAGIC;
1070 hdr.total_len = 0; /* EndPrepare will fill this in */
1071 hdr.xid = xid;
1072 hdr.database = proc->databaseId;
1073 hdr.prepared_at = gxact->prepared_at;
1074 hdr.owner = gxact->owner;
1075 hdr.nsubxacts = xactGetCommittedChildren(&children);
1076 hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1077 hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1078 hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1079 &hdr.initfileinval);
1080 hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1081
1082 save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1083 save_state_data(gxact->gid, hdr.gidlen);
1084
1085 /*
1086 * Add the additional info about subxacts, deletable files and cache
1087 * invalidation messages.
1088 */
1089 if (hdr.nsubxacts > 0)
1090 {
1091 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1092 /* While we have the child-xact data, stuff it in the gxact too */
1093 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1094 }
1095 if (hdr.ncommitrels > 0)
1096 {
1097 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
1098 pfree(commitrels);
1099 }
1100 if (hdr.nabortrels > 0)
1101 {
1102 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
1103 pfree(abortrels);
1104 }
1105 if (hdr.ninvalmsgs > 0)
1106 {
1107 save_state_data(invalmsgs,
1108 hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1109 pfree(invalmsgs);
1110 }
1111 }
1112
1113 /*
1114 * Finish preparing state data and writing it to WAL.
1115 */
1116 void
EndPrepare(GlobalTransaction gxact)1117 EndPrepare(GlobalTransaction gxact)
1118 {
1119 TwoPhaseFileHeader *hdr;
1120 StateFileChunk *record;
1121 bool replorigin;
1122
1123 /* Add the end sentinel to the list of 2PC records */
1124 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1125 NULL, 0);
1126
1127 /* Go back and fill in total_len in the file header record */
1128 hdr = (TwoPhaseFileHeader *) records.head->data;
1129 Assert(hdr->magic == TWOPHASE_MAGIC);
1130 hdr->total_len = records.total_len + sizeof(pg_crc32c);
1131
1132 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1133 replorigin_session_origin != DoNotReplicateId);
1134
1135 if (replorigin)
1136 {
1137 Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
1138 hdr->origin_lsn = replorigin_session_origin_lsn;
1139 hdr->origin_timestamp = replorigin_session_origin_timestamp;
1140 }
1141 else
1142 {
1143 hdr->origin_lsn = InvalidXLogRecPtr;
1144 hdr->origin_timestamp = 0;
1145 }
1146
1147 /*
1148 * If the data size exceeds MaxAllocSize, we won't be able to read it in
1149 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1150 * where we write data to file and then re-read at commit time.
1151 */
1152 if (hdr->total_len > MaxAllocSize)
1153 ereport(ERROR,
1154 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1155 errmsg("two-phase state file maximum length exceeded")));
1156
1157 /*
1158 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1159 * cover us, so no need to calculate a separate CRC.
1160 *
1161 * We have to set delayChkpt here, too; otherwise a checkpoint starting
1162 * immediately after the WAL record is inserted could complete without
1163 * fsync'ing our state file. (This is essentially the same kind of race
1164 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
1165 * uses delayChkpt for; see notes there.)
1166 *
1167 * We save the PREPARE record's location in the gxact for later use by
1168 * CheckPointTwoPhase.
1169 */
1170 XLogEnsureRecordSpace(0, records.num_chunks);
1171
1172 START_CRIT_SECTION();
1173
1174 MyProc->delayChkpt = true;
1175
1176 XLogBeginInsert();
1177 for (record = records.head; record != NULL; record = record->next)
1178 XLogRegisterData(record->data, record->len);
1179
1180 XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1181
1182 gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1183
1184 if (replorigin)
1185 {
1186 /* Move LSNs forward for this replication origin */
1187 replorigin_session_advance(replorigin_session_origin_lsn,
1188 gxact->prepare_end_lsn);
1189 }
1190
1191 XLogFlush(gxact->prepare_end_lsn);
1192
1193 /* If we crash now, we have prepared: WAL replay will fix things */
1194
1195 /* Store record's start location to read that later on Commit */
1196 gxact->prepare_start_lsn = ProcLastRecPtr;
1197
1198 /*
1199 * Mark the prepared transaction as valid. As soon as xact.c marks
1200 * MyPgXact as not running our XID (which it will do immediately after
1201 * this function returns), others can commit/rollback the xact.
1202 *
1203 * NB: a side effect of this is to make a dummy ProcArray entry for the
1204 * prepared XID. This must happen before we clear the XID from MyPgXact,
1205 * else there is a window where the XID is not running according to
1206 * TransactionIdIsInProgress, and onlookers would be entitled to assume
1207 * the xact crashed. Instead we have a window where the same XID appears
1208 * twice in ProcArray, which is OK.
1209 */
1210 MarkAsPrepared(gxact, false);
1211
1212 /*
1213 * Now we can mark ourselves as out of the commit critical section: a
1214 * checkpoint starting after this will certainly see the gxact as a
1215 * candidate for fsyncing.
1216 */
1217 MyProc->delayChkpt = false;
1218
1219 /*
1220 * Remember that we have this GlobalTransaction entry locked for us. If
1221 * we crash after this point, it's too late to abort, but we must unlock
1222 * it so that the prepared transaction can be committed or rolled back.
1223 */
1224 MyLockedGxact = gxact;
1225
1226 END_CRIT_SECTION();
1227
1228 /*
1229 * Wait for synchronous replication, if required.
1230 *
1231 * Note that at this stage we have marked the prepare, but still show as
1232 * running in the procarray (twice!) and continue to hold locks.
1233 */
1234 SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1235
1236 records.tail = records.head = NULL;
1237 records.num_chunks = 0;
1238 }
1239
1240 /*
1241 * Register a 2PC record to be written to state file.
1242 */
1243 void
RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid,uint16 info,const void * data,uint32 len)1244 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1245 const void *data, uint32 len)
1246 {
1247 TwoPhaseRecordOnDisk record;
1248
1249 record.rmid = rmid;
1250 record.info = info;
1251 record.len = len;
1252 save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1253 if (len > 0)
1254 save_state_data(data, len);
1255 }
1256
1257
1258 /*
1259 * Read and validate the state file for xid.
1260 *
1261 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1262 * contents of the file, issuing an error when finding corrupted data. If
1263 * missing_ok is true, which indicates that missing files can be safely
1264 * ignored, then return NULL. This state can be reached when doing recovery.
1265 */
1266 static char *
ReadTwoPhaseFile(TransactionId xid,bool missing_ok)1267 ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
1268 {
1269 char path[MAXPGPATH];
1270 char *buf;
1271 TwoPhaseFileHeader *hdr;
1272 int fd;
1273 struct stat stat;
1274 uint32 crc_offset;
1275 pg_crc32c calc_crc,
1276 file_crc;
1277 int r;
1278
1279 TwoPhaseFilePath(path, xid);
1280
1281 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1282 if (fd < 0)
1283 {
1284 if (missing_ok && errno == ENOENT)
1285 return NULL;
1286
1287 ereport(ERROR,
1288 (errcode_for_file_access(),
1289 errmsg("could not open file \"%s\": %m", path)));
1290 }
1291
1292 /*
1293 * Check file length. We can determine a lower bound pretty easily. We
1294 * set an upper bound to avoid palloc() failure on a corrupt file, though
1295 * we can't guarantee that we won't get an out of memory error anyway,
1296 * even on a valid file.
1297 */
1298 if (fstat(fd, &stat))
1299 ereport(ERROR,
1300 (errcode_for_file_access(),
1301 errmsg("could not stat file \"%s\": %m", path)));
1302
1303 if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1304 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1305 sizeof(pg_crc32c)) ||
1306 stat.st_size > MaxAllocSize)
1307 ereport(ERROR,
1308 (errcode(ERRCODE_DATA_CORRUPTED),
1309 errmsg_plural("incorrect size of file \"%s\": %zu byte",
1310 "incorrect size of file \"%s\": %zu bytes",
1311 (Size) stat.st_size, path,
1312 (Size) stat.st_size)));
1313
1314 crc_offset = stat.st_size - sizeof(pg_crc32c);
1315 if (crc_offset != MAXALIGN(crc_offset))
1316 ereport(ERROR,
1317 (errcode(ERRCODE_DATA_CORRUPTED),
1318 errmsg("incorrect alignment of CRC offset for file \"%s\"",
1319 path)));
1320
1321 /*
1322 * OK, slurp in the file.
1323 */
1324 buf = (char *) palloc(stat.st_size);
1325
1326 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1327 r = read(fd, buf, stat.st_size);
1328 if (r != stat.st_size)
1329 {
1330 if (r < 0)
1331 ereport(ERROR,
1332 (errcode_for_file_access(),
1333 errmsg("could not read file \"%s\": %m", path)));
1334 else
1335 ereport(ERROR,
1336 (errmsg("could not read file \"%s\": read %d of %zu",
1337 path, r, (Size) stat.st_size)));
1338 }
1339
1340 pgstat_report_wait_end();
1341
1342 if (CloseTransientFile(fd) != 0)
1343 ereport(ERROR,
1344 (errcode_for_file_access(),
1345 errmsg("could not close file \"%s\": %m", path)));
1346
1347 hdr = (TwoPhaseFileHeader *) buf;
1348 if (hdr->magic != TWOPHASE_MAGIC)
1349 ereport(ERROR,
1350 (errcode(ERRCODE_DATA_CORRUPTED),
1351 errmsg("invalid magic number stored in file \"%s\"",
1352 path)));
1353
1354 if (hdr->total_len != stat.st_size)
1355 ereport(ERROR,
1356 (errcode(ERRCODE_DATA_CORRUPTED),
1357 errmsg("invalid size stored in file \"%s\"",
1358 path)));
1359
1360 INIT_CRC32C(calc_crc);
1361 COMP_CRC32C(calc_crc, buf, crc_offset);
1362 FIN_CRC32C(calc_crc);
1363
1364 file_crc = *((pg_crc32c *) (buf + crc_offset));
1365
1366 if (!EQ_CRC32C(calc_crc, file_crc))
1367 ereport(ERROR,
1368 (errcode(ERRCODE_DATA_CORRUPTED),
1369 errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1370 path)));
1371
1372 return buf;
1373 }
1374
1375
1376 /*
1377 * Reads 2PC data from xlog. During checkpoint this data will be moved to
1378 * twophase files and ReadTwoPhaseFile should be used instead.
1379 *
1380 * Note clearly that this function can access WAL during normal operation,
1381 * similarly to the way WALSender or Logical Decoding would do. While
1382 * accessing WAL, read_local_xlog_page() may change ThisTimeLineID,
1383 * particularly if this routine is called for the end-of-recovery checkpoint
1384 * in the checkpointer itself, so save the current timeline number value
1385 * and restore it once done.
1386 */
1387 static void
XlogReadTwoPhaseData(XLogRecPtr lsn,char ** buf,int * len)1388 XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1389 {
1390 XLogRecord *record;
1391 XLogReaderState *xlogreader;
1392 char *errormsg;
1393 TimeLineID save_currtli = ThisTimeLineID;
1394
1395 xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1396 XL_ROUTINE(.page_read = &read_local_xlog_page,
1397 .segment_open = &wal_segment_open,
1398 .segment_close = &wal_segment_close),
1399 NULL);
1400 if (!xlogreader)
1401 ereport(ERROR,
1402 (errcode(ERRCODE_OUT_OF_MEMORY),
1403 errmsg("out of memory"),
1404 errdetail("Failed while allocating a WAL reading processor.")));
1405
1406 XLogBeginRead(xlogreader, lsn);
1407 record = XLogReadRecord(xlogreader, &errormsg);
1408
1409 /*
1410 * Restore immediately the timeline where it was previously, as
1411 * read_local_xlog_page() could have changed it if the record was read
1412 * while recovery was finishing or if the timeline has jumped in-between.
1413 */
1414 ThisTimeLineID = save_currtli;
1415
1416 if (record == NULL)
1417 ereport(ERROR,
1418 (errcode_for_file_access(),
1419 errmsg("could not read two-phase state from WAL at %X/%X",
1420 (uint32) (lsn >> 32),
1421 (uint32) lsn)));
1422
1423 if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1424 (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1425 ereport(ERROR,
1426 (errcode_for_file_access(),
1427 errmsg("expected two-phase state data is not present in WAL at %X/%X",
1428 (uint32) (lsn >> 32),
1429 (uint32) lsn)));
1430
1431 if (len != NULL)
1432 *len = XLogRecGetDataLen(xlogreader);
1433
1434 *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1435 memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1436
1437 XLogReaderFree(xlogreader);
1438 }
1439
1440
1441 /*
1442 * Confirms an xid is prepared, during recovery
1443 */
1444 bool
StandbyTransactionIdIsPrepared(TransactionId xid)1445 StandbyTransactionIdIsPrepared(TransactionId xid)
1446 {
1447 char *buf;
1448 TwoPhaseFileHeader *hdr;
1449 bool result;
1450
1451 Assert(TransactionIdIsValid(xid));
1452
1453 if (max_prepared_xacts <= 0)
1454 return false; /* nothing to do */
1455
1456 /* Read and validate file */
1457 buf = ReadTwoPhaseFile(xid, true);
1458 if (buf == NULL)
1459 return false;
1460
1461 /* Check header also */
1462 hdr = (TwoPhaseFileHeader *) buf;
1463 result = TransactionIdEquals(hdr->xid, xid);
1464 pfree(buf);
1465
1466 return result;
1467 }
1468
1469 /*
1470 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1471 */
1472 void
FinishPreparedTransaction(const char * gid,bool isCommit)1473 FinishPreparedTransaction(const char *gid, bool isCommit)
1474 {
1475 GlobalTransaction gxact;
1476 PGPROC *proc;
1477 PGXACT *pgxact;
1478 TransactionId xid;
1479 char *buf;
1480 char *bufptr;
1481 TwoPhaseFileHeader *hdr;
1482 TransactionId latestXid;
1483 TransactionId *children;
1484 RelFileNode *commitrels;
1485 RelFileNode *abortrels;
1486 RelFileNode *delrels;
1487 int ndelrels;
1488 SharedInvalidationMessage *invalmsgs;
1489
1490 /*
1491 * Validate the GID, and lock the GXACT to ensure that two backends do not
1492 * try to commit the same GID at once.
1493 */
1494 gxact = LockGXact(gid, GetUserId());
1495 proc = &ProcGlobal->allProcs[gxact->pgprocno];
1496 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1497 xid = pgxact->xid;
1498
1499 /*
1500 * Read and validate 2PC state data. State data will typically be stored
1501 * in WAL files if the LSN is after the last checkpoint record, or moved
1502 * to disk if for some reason they have lived for a long time.
1503 */
1504 if (gxact->ondisk)
1505 buf = ReadTwoPhaseFile(xid, false);
1506 else
1507 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1508
1509
1510 /*
1511 * Disassemble the header area
1512 */
1513 hdr = (TwoPhaseFileHeader *) buf;
1514 Assert(TransactionIdEquals(hdr->xid, xid));
1515 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1516 bufptr += MAXALIGN(hdr->gidlen);
1517 children = (TransactionId *) bufptr;
1518 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1519 commitrels = (RelFileNode *) bufptr;
1520 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1521 abortrels = (RelFileNode *) bufptr;
1522 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1523 invalmsgs = (SharedInvalidationMessage *) bufptr;
1524 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1525
1526 /* compute latestXid among all children */
1527 latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1528
1529 /* Prevent cancel/die interrupt while cleaning up */
1530 HOLD_INTERRUPTS();
1531
1532 /*
1533 * The order of operations here is critical: make the XLOG entry for
1534 * commit or abort, then mark the transaction committed or aborted in
1535 * pg_xact, then remove its PGPROC from the global ProcArray (which means
1536 * TransactionIdIsInProgress will stop saying the prepared xact is in
1537 * progress), then run the post-commit or post-abort callbacks. The
1538 * callbacks will release the locks the transaction held.
1539 */
1540 if (isCommit)
1541 RecordTransactionCommitPrepared(xid,
1542 hdr->nsubxacts, children,
1543 hdr->ncommitrels, commitrels,
1544 hdr->ninvalmsgs, invalmsgs,
1545 hdr->initfileinval, gid);
1546 else
1547 RecordTransactionAbortPrepared(xid,
1548 hdr->nsubxacts, children,
1549 hdr->nabortrels, abortrels,
1550 gid);
1551
1552 ProcArrayRemove(proc, latestXid);
1553
1554 /*
1555 * In case we fail while running the callbacks, mark the gxact invalid so
1556 * no one else will try to commit/rollback, and so it will be recycled if
1557 * we fail after this point. It is still locked by our backend so it
1558 * won't go away yet.
1559 *
1560 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1561 */
1562 gxact->valid = false;
1563
1564 /*
1565 * We have to remove any files that were supposed to be dropped. For
1566 * consistency with the regular xact.c code paths, must do this before
1567 * releasing locks, so do it before running the callbacks.
1568 *
1569 * NB: this code knows that we couldn't be dropping any temp rels ...
1570 */
1571 if (isCommit)
1572 {
1573 delrels = commitrels;
1574 ndelrels = hdr->ncommitrels;
1575 }
1576 else
1577 {
1578 delrels = abortrels;
1579 ndelrels = hdr->nabortrels;
1580 }
1581
1582 /* Make sure files supposed to be dropped are dropped */
1583 DropRelationFiles(delrels, ndelrels, false);
1584
1585 /*
1586 * Handle cache invalidation messages.
1587 *
1588 * Relcache init file invalidation requires processing both before and
1589 * after we send the SI messages. See AtEOXact_Inval()
1590 */
1591 if (hdr->initfileinval)
1592 RelationCacheInitFilePreInvalidate();
1593 SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1594 if (hdr->initfileinval)
1595 RelationCacheInitFilePostInvalidate();
1596
1597 /*
1598 * Acquire the two-phase lock. We want to work on the two-phase callbacks
1599 * while holding it to avoid potential conflicts with other transactions
1600 * attempting to use the same GID, so the lock is released once the shared
1601 * memory state is cleared.
1602 */
1603 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1604
1605 /* And now do the callbacks */
1606 if (isCommit)
1607 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1608 else
1609 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1610
1611 PredicateLockTwoPhaseFinish(xid, isCommit);
1612
1613 /* Clear shared memory state */
1614 RemoveGXact(gxact);
1615
1616 /*
1617 * Release the lock as all callbacks are called and shared memory cleanup
1618 * is done.
1619 */
1620 LWLockRelease(TwoPhaseStateLock);
1621
1622 /* Count the prepared xact as committed or aborted */
1623 AtEOXact_PgStat(isCommit, false);
1624
1625 /*
1626 * And now we can clean up any files we may have left.
1627 */
1628 if (gxact->ondisk)
1629 RemoveTwoPhaseFile(xid, true);
1630
1631 MyLockedGxact = NULL;
1632
1633 RESUME_INTERRUPTS();
1634
1635 pfree(buf);
1636 }
1637
1638 /*
1639 * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1640 */
1641 static void
ProcessRecords(char * bufptr,TransactionId xid,const TwoPhaseCallback callbacks[])1642 ProcessRecords(char *bufptr, TransactionId xid,
1643 const TwoPhaseCallback callbacks[])
1644 {
1645 for (;;)
1646 {
1647 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1648
1649 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1650 if (record->rmid == TWOPHASE_RM_END_ID)
1651 break;
1652
1653 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1654
1655 if (callbacks[record->rmid] != NULL)
1656 callbacks[record->rmid] (xid, record->info,
1657 (void *) bufptr, record->len);
1658
1659 bufptr += MAXALIGN(record->len);
1660 }
1661 }
1662
1663 /*
1664 * Remove the 2PC file for the specified XID.
1665 *
1666 * If giveWarning is false, do not complain about file-not-present;
1667 * this is an expected case during WAL replay.
1668 */
1669 static void
RemoveTwoPhaseFile(TransactionId xid,bool giveWarning)1670 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1671 {
1672 char path[MAXPGPATH];
1673
1674 TwoPhaseFilePath(path, xid);
1675 if (unlink(path))
1676 if (errno != ENOENT || giveWarning)
1677 ereport(WARNING,
1678 (errcode_for_file_access(),
1679 errmsg("could not remove file \"%s\": %m", path)));
1680 }
1681
1682 /*
1683 * Recreates a state file. This is used in WAL replay and during
1684 * checkpoint creation.
1685 *
1686 * Note: content and len don't include CRC.
1687 */
1688 static void
RecreateTwoPhaseFile(TransactionId xid,void * content,int len)1689 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1690 {
1691 char path[MAXPGPATH];
1692 pg_crc32c statefile_crc;
1693 int fd;
1694
1695 /* Recompute CRC */
1696 INIT_CRC32C(statefile_crc);
1697 COMP_CRC32C(statefile_crc, content, len);
1698 FIN_CRC32C(statefile_crc);
1699
1700 TwoPhaseFilePath(path, xid);
1701
1702 fd = OpenTransientFile(path,
1703 O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1704 if (fd < 0)
1705 ereport(ERROR,
1706 (errcode_for_file_access(),
1707 errmsg("could not recreate file \"%s\": %m", path)));
1708
1709 /* Write content and CRC */
1710 errno = 0;
1711 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1712 if (write(fd, content, len) != len)
1713 {
1714 /* if write didn't set errno, assume problem is no disk space */
1715 if (errno == 0)
1716 errno = ENOSPC;
1717 ereport(ERROR,
1718 (errcode_for_file_access(),
1719 errmsg("could not write file \"%s\": %m", path)));
1720 }
1721 if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1722 {
1723 /* if write didn't set errno, assume problem is no disk space */
1724 if (errno == 0)
1725 errno = ENOSPC;
1726 ereport(ERROR,
1727 (errcode_for_file_access(),
1728 errmsg("could not write file \"%s\": %m", path)));
1729 }
1730 pgstat_report_wait_end();
1731
1732 /*
1733 * We must fsync the file because the end-of-replay checkpoint will not do
1734 * so, there being no GXACT in shared memory yet to tell it to.
1735 */
1736 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1737 if (pg_fsync(fd) != 0)
1738 ereport(ERROR,
1739 (errcode_for_file_access(),
1740 errmsg("could not fsync file \"%s\": %m", path)));
1741 pgstat_report_wait_end();
1742
1743 if (CloseTransientFile(fd) != 0)
1744 ereport(ERROR,
1745 (errcode_for_file_access(),
1746 errmsg("could not close file \"%s\": %m", path)));
1747 }
1748
1749 /*
1750 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1751 *
1752 * We must fsync the state file of any GXACT that is valid or has been
1753 * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1754 * horizon. (If the gxact isn't valid yet, has not been generated in
1755 * redo, or has a later LSN, this checkpoint is not responsible for
1756 * fsyncing it.)
1757 *
1758 * This is deliberately run as late as possible in the checkpoint sequence,
1759 * because GXACTs ordinarily have short lifespans, and so it is quite
1760 * possible that GXACTs that were valid at checkpoint start will no longer
1761 * exist if we wait a little bit. With typical checkpoint settings this
1762 * will be about 3 minutes for an online checkpoint, so as a result we
1763 * expect that there will be no GXACTs that need to be copied to disk.
1764 *
1765 * If a GXACT remains valid across multiple checkpoints, it will already
1766 * be on disk so we don't bother to repeat that write.
1767 */
1768 void
CheckPointTwoPhase(XLogRecPtr redo_horizon)1769 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1770 {
1771 int i;
1772 int serialized_xacts = 0;
1773
1774 if (max_prepared_xacts <= 0)
1775 return; /* nothing to do */
1776
1777 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1778
1779 /*
1780 * We are expecting there to be zero GXACTs that need to be copied to
1781 * disk, so we perform all I/O while holding TwoPhaseStateLock for
1782 * simplicity. This prevents any new xacts from preparing while this
1783 * occurs, which shouldn't be a problem since the presence of long-lived
1784 * prepared xacts indicates the transaction manager isn't active.
1785 *
1786 * It's also possible to move I/O out of the lock, but on every error we
1787 * should check whether somebody committed our transaction in different
1788 * backend. Let's leave this optimization for future, if somebody will
1789 * spot that this place cause bottleneck.
1790 *
1791 * Note that it isn't possible for there to be a GXACT with a
1792 * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1793 * because of the efforts with delayChkpt.
1794 */
1795 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1796 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1797 {
1798 /*
1799 * Note that we are using gxact not pgxact so this works in recovery
1800 * also
1801 */
1802 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1803
1804 if ((gxact->valid || gxact->inredo) &&
1805 !gxact->ondisk &&
1806 gxact->prepare_end_lsn <= redo_horizon)
1807 {
1808 char *buf;
1809 int len;
1810
1811 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1812 RecreateTwoPhaseFile(gxact->xid, buf, len);
1813 gxact->ondisk = true;
1814 gxact->prepare_start_lsn = InvalidXLogRecPtr;
1815 gxact->prepare_end_lsn = InvalidXLogRecPtr;
1816 pfree(buf);
1817 serialized_xacts++;
1818 }
1819 }
1820 LWLockRelease(TwoPhaseStateLock);
1821
1822 /*
1823 * Flush unconditionally the parent directory to make any information
1824 * durable on disk. Two-phase files could have been removed and those
1825 * removals need to be made persistent as well as any files newly created
1826 * previously since the last checkpoint.
1827 */
1828 fsync_fname(TWOPHASE_DIR, true);
1829
1830 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1831
1832 if (log_checkpoints && serialized_xacts > 0)
1833 ereport(LOG,
1834 (errmsg_plural("%u two-phase state file was written "
1835 "for a long-running prepared transaction",
1836 "%u two-phase state files were written "
1837 "for long-running prepared transactions",
1838 serialized_xacts,
1839 serialized_xacts)));
1840 }
1841
1842 /*
1843 * restoreTwoPhaseData
1844 *
1845 * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1846 * This is called once at the beginning of recovery, saving any extra
1847 * lookups in the future. Two-phase files that are newer than the
1848 * minimum XID horizon are discarded on the way.
1849 */
1850 void
restoreTwoPhaseData(void)1851 restoreTwoPhaseData(void)
1852 {
1853 DIR *cldir;
1854 struct dirent *clde;
1855
1856 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1857 cldir = AllocateDir(TWOPHASE_DIR);
1858 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1859 {
1860 if (strlen(clde->d_name) == 8 &&
1861 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1862 {
1863 TransactionId xid;
1864 char *buf;
1865
1866 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1867
1868 buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1869 true, false, false);
1870 if (buf == NULL)
1871 continue;
1872
1873 PrepareRedoAdd(buf, InvalidXLogRecPtr,
1874 InvalidXLogRecPtr, InvalidRepOriginId);
1875 }
1876 }
1877 LWLockRelease(TwoPhaseStateLock);
1878 FreeDir(cldir);
1879 }
1880
1881 /*
1882 * PrescanPreparedTransactions
1883 *
1884 * Scan the shared memory entries of TwoPhaseState and determine the range
1885 * of valid XIDs present. This is run during database startup, after we
1886 * have completed reading WAL. ShmemVariableCache->nextFullXid has been set to
1887 * one more than the highest XID for which evidence exists in WAL.
1888 *
1889 * We throw away any prepared xacts with main XID beyond nextFullXid --- if any
1890 * are present, it suggests that the DBA has done a PITR recovery to an
1891 * earlier point in time without cleaning out pg_twophase. We dare not
1892 * try to recover such prepared xacts since they likely depend on database
1893 * state that doesn't exist now.
1894 *
1895 * However, we will advance nextFullXid beyond any subxact XIDs belonging to
1896 * valid prepared xacts. We need to do this since subxact commit doesn't
1897 * write a WAL entry, and so there might be no evidence in WAL of those
1898 * subxact XIDs.
1899 *
1900 * On corrupted two-phase files, fail immediately. Keeping around broken
1901 * entries and let replay continue causes harm on the system, and a new
1902 * backup should be rolled in.
1903 *
1904 * Our other responsibility is to determine and return the oldest valid XID
1905 * among the prepared xacts (if none, return ShmemVariableCache->nextFullXid).
1906 * This is needed to synchronize pg_subtrans startup properly.
1907 *
1908 * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1909 * top-level xids is stored in *xids_p. The number of entries in the array
1910 * is returned in *nxids_p.
1911 */
1912 TransactionId
PrescanPreparedTransactions(TransactionId ** xids_p,int * nxids_p)1913 PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1914 {
1915 FullTransactionId nextFullXid = ShmemVariableCache->nextFullXid;
1916 TransactionId origNextXid = XidFromFullTransactionId(nextFullXid);
1917 TransactionId result = origNextXid;
1918 TransactionId *xids = NULL;
1919 int nxids = 0;
1920 int allocsize = 0;
1921 int i;
1922
1923 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1924 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1925 {
1926 TransactionId xid;
1927 char *buf;
1928 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1929
1930 Assert(gxact->inredo);
1931
1932 xid = gxact->xid;
1933
1934 buf = ProcessTwoPhaseBuffer(xid,
1935 gxact->prepare_start_lsn,
1936 gxact->ondisk, false, true);
1937
1938 if (buf == NULL)
1939 continue;
1940
1941 /*
1942 * OK, we think this file is valid. Incorporate xid into the
1943 * running-minimum result.
1944 */
1945 if (TransactionIdPrecedes(xid, result))
1946 result = xid;
1947
1948 if (xids_p)
1949 {
1950 if (nxids == allocsize)
1951 {
1952 if (nxids == 0)
1953 {
1954 allocsize = 10;
1955 xids = palloc(allocsize * sizeof(TransactionId));
1956 }
1957 else
1958 {
1959 allocsize = allocsize * 2;
1960 xids = repalloc(xids, allocsize * sizeof(TransactionId));
1961 }
1962 }
1963 xids[nxids++] = xid;
1964 }
1965
1966 pfree(buf);
1967 }
1968 LWLockRelease(TwoPhaseStateLock);
1969
1970 if (xids_p)
1971 {
1972 *xids_p = xids;
1973 *nxids_p = nxids;
1974 }
1975
1976 return result;
1977 }
1978
1979 /*
1980 * StandbyRecoverPreparedTransactions
1981 *
1982 * Scan the shared memory entries of TwoPhaseState and setup all the required
1983 * information to allow standby queries to treat prepared transactions as still
1984 * active.
1985 *
1986 * This is never called at the end of recovery - we use
1987 * RecoverPreparedTransactions() at that point.
1988 *
1989 * The lack of calls to SubTransSetParent() calls here is by design;
1990 * those calls are made by RecoverPreparedTransactions() at the end of recovery
1991 * for those xacts that need this.
1992 */
1993 void
StandbyRecoverPreparedTransactions(void)1994 StandbyRecoverPreparedTransactions(void)
1995 {
1996 int i;
1997
1998 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1999 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2000 {
2001 TransactionId xid;
2002 char *buf;
2003 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2004
2005 Assert(gxact->inredo);
2006
2007 xid = gxact->xid;
2008
2009 buf = ProcessTwoPhaseBuffer(xid,
2010 gxact->prepare_start_lsn,
2011 gxact->ondisk, false, false);
2012 if (buf != NULL)
2013 pfree(buf);
2014 }
2015 LWLockRelease(TwoPhaseStateLock);
2016 }
2017
2018 /*
2019 * RecoverPreparedTransactions
2020 *
2021 * Scan the shared memory entries of TwoPhaseState and reload the state for
2022 * each prepared transaction (reacquire locks, etc).
2023 *
2024 * This is run at the end of recovery, but before we allow backends to write
2025 * WAL.
2026 *
2027 * At the end of recovery the way we take snapshots will change. We now need
2028 * to mark all running transactions with their full SubTransSetParent() info
2029 * to allow normal snapshots to work correctly if snapshots overflow.
2030 * We do this here because by definition prepared transactions are the only
2031 * type of write transaction still running, so this is necessary and
2032 * complete.
2033 */
2034 void
RecoverPreparedTransactions(void)2035 RecoverPreparedTransactions(void)
2036 {
2037 int i;
2038
2039 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2040 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2041 {
2042 TransactionId xid;
2043 char *buf;
2044 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2045 char *bufptr;
2046 TwoPhaseFileHeader *hdr;
2047 TransactionId *subxids;
2048 const char *gid;
2049
2050 xid = gxact->xid;
2051
2052 /*
2053 * Reconstruct subtrans state for the transaction --- needed because
2054 * pg_subtrans is not preserved over a restart. Note that we are
2055 * linking all the subtransactions directly to the top-level XID;
2056 * there may originally have been a more complex hierarchy, but
2057 * there's no need to restore that exactly. It's possible that
2058 * SubTransSetParent has been set before, if the prepared transaction
2059 * generated xid assignment records.
2060 */
2061 buf = ProcessTwoPhaseBuffer(xid,
2062 gxact->prepare_start_lsn,
2063 gxact->ondisk, true, false);
2064 if (buf == NULL)
2065 continue;
2066
2067 ereport(LOG,
2068 (errmsg("recovering prepared transaction %u from shared memory", xid)));
2069
2070 hdr = (TwoPhaseFileHeader *) buf;
2071 Assert(TransactionIdEquals(hdr->xid, xid));
2072 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2073 gid = (const char *) bufptr;
2074 bufptr += MAXALIGN(hdr->gidlen);
2075 subxids = (TransactionId *) bufptr;
2076 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2077 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2078 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2079 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2080
2081 /*
2082 * Recreate its GXACT and dummy PGPROC. But, check whether it was
2083 * added in redo and already has a shmem entry for it.
2084 */
2085 MarkAsPreparingGuts(gxact, xid, gid,
2086 hdr->prepared_at,
2087 hdr->owner, hdr->database);
2088
2089 /* recovered, so reset the flag for entries generated by redo */
2090 gxact->inredo = false;
2091
2092 GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2093 MarkAsPrepared(gxact, true);
2094
2095 LWLockRelease(TwoPhaseStateLock);
2096
2097 /*
2098 * Recover other state (notably locks) using resource managers.
2099 */
2100 ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2101
2102 /*
2103 * Release locks held by the standby process after we process each
2104 * prepared transaction. As a result, we don't need too many
2105 * additional locks at any one time.
2106 */
2107 if (InHotStandby)
2108 StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2109
2110 /*
2111 * We're done with recovering this transaction. Clear MyLockedGxact,
2112 * like we do in PrepareTransaction() during normal operation.
2113 */
2114 PostPrepare_Twophase();
2115
2116 pfree(buf);
2117
2118 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2119 }
2120
2121 LWLockRelease(TwoPhaseStateLock);
2122 }
2123
2124 /*
2125 * ProcessTwoPhaseBuffer
2126 *
2127 * Given a transaction id, read it either from disk or read it directly
2128 * via shmem xlog record pointer using the provided "prepare_start_lsn".
2129 *
2130 * If setParent is true, set up subtransaction parent linkages.
2131 *
2132 * If setNextXid is true, set ShmemVariableCache->nextFullXid to the newest
2133 * value scanned.
2134 */
2135 static char *
ProcessTwoPhaseBuffer(TransactionId xid,XLogRecPtr prepare_start_lsn,bool fromdisk,bool setParent,bool setNextXid)2136 ProcessTwoPhaseBuffer(TransactionId xid,
2137 XLogRecPtr prepare_start_lsn,
2138 bool fromdisk,
2139 bool setParent, bool setNextXid)
2140 {
2141 FullTransactionId nextFullXid = ShmemVariableCache->nextFullXid;
2142 TransactionId origNextXid = XidFromFullTransactionId(nextFullXid);
2143 TransactionId *subxids;
2144 char *buf;
2145 TwoPhaseFileHeader *hdr;
2146 int i;
2147
2148 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2149
2150 if (!fromdisk)
2151 Assert(prepare_start_lsn != InvalidXLogRecPtr);
2152
2153 /* Already processed? */
2154 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2155 {
2156 if (fromdisk)
2157 {
2158 ereport(WARNING,
2159 (errmsg("removing stale two-phase state file for transaction %u",
2160 xid)));
2161 RemoveTwoPhaseFile(xid, true);
2162 }
2163 else
2164 {
2165 ereport(WARNING,
2166 (errmsg("removing stale two-phase state from memory for transaction %u",
2167 xid)));
2168 PrepareRedoRemove(xid, true);
2169 }
2170 return NULL;
2171 }
2172
2173 /* Reject XID if too new */
2174 if (TransactionIdFollowsOrEquals(xid, origNextXid))
2175 {
2176 if (fromdisk)
2177 {
2178 ereport(WARNING,
2179 (errmsg("removing future two-phase state file for transaction %u",
2180 xid)));
2181 RemoveTwoPhaseFile(xid, true);
2182 }
2183 else
2184 {
2185 ereport(WARNING,
2186 (errmsg("removing future two-phase state from memory for transaction %u",
2187 xid)));
2188 PrepareRedoRemove(xid, true);
2189 }
2190 return NULL;
2191 }
2192
2193 if (fromdisk)
2194 {
2195 /* Read and validate file */
2196 buf = ReadTwoPhaseFile(xid, false);
2197 }
2198 else
2199 {
2200 /* Read xlog data */
2201 XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2202 }
2203
2204 /* Deconstruct header */
2205 hdr = (TwoPhaseFileHeader *) buf;
2206 if (!TransactionIdEquals(hdr->xid, xid))
2207 {
2208 if (fromdisk)
2209 ereport(ERROR,
2210 (errcode(ERRCODE_DATA_CORRUPTED),
2211 errmsg("corrupted two-phase state file for transaction %u",
2212 xid)));
2213 else
2214 ereport(ERROR,
2215 (errcode(ERRCODE_DATA_CORRUPTED),
2216 errmsg("corrupted two-phase state in memory for transaction %u",
2217 xid)));
2218 }
2219
2220 /*
2221 * Examine subtransaction XIDs ... they should all follow main XID, and
2222 * they may force us to advance nextFullXid.
2223 */
2224 subxids = (TransactionId *) (buf +
2225 MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2226 MAXALIGN(hdr->gidlen));
2227 for (i = 0; i < hdr->nsubxacts; i++)
2228 {
2229 TransactionId subxid = subxids[i];
2230
2231 Assert(TransactionIdFollows(subxid, xid));
2232
2233 /* update nextFullXid if needed */
2234 if (setNextXid)
2235 AdvanceNextFullTransactionIdPastXid(subxid);
2236
2237 if (setParent)
2238 SubTransSetParent(subxid, xid);
2239 }
2240
2241 return buf;
2242 }
2243
2244
2245 /*
2246 * RecordTransactionCommitPrepared
2247 *
2248 * This is basically the same as RecordTransactionCommit (q.v. if you change
2249 * this function): in particular, we must set the delayChkpt flag to avoid a
2250 * race condition.
2251 *
2252 * We know the transaction made at least one XLOG entry (its PREPARE),
2253 * so it is never possible to optimize out the commit record.
2254 */
2255 static void
RecordTransactionCommitPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels,int ninvalmsgs,SharedInvalidationMessage * invalmsgs,bool initfileinval,const char * gid)2256 RecordTransactionCommitPrepared(TransactionId xid,
2257 int nchildren,
2258 TransactionId *children,
2259 int nrels,
2260 RelFileNode *rels,
2261 int ninvalmsgs,
2262 SharedInvalidationMessage *invalmsgs,
2263 bool initfileinval,
2264 const char *gid)
2265 {
2266 XLogRecPtr recptr;
2267 TimestampTz committs = GetCurrentTimestamp();
2268 bool replorigin;
2269
2270 /*
2271 * Are we using the replication origins feature? Or, in other words, are
2272 * we replaying remote actions?
2273 */
2274 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2275 replorigin_session_origin != DoNotReplicateId);
2276
2277 START_CRIT_SECTION();
2278
2279 /* See notes in RecordTransactionCommit */
2280 MyProc->delayChkpt = true;
2281
2282 /*
2283 * Emit the XLOG commit record. Note that we mark 2PC commits as
2284 * potentially having AccessExclusiveLocks since we don't know whether or
2285 * not they do.
2286 */
2287 recptr = XactLogCommitRecord(committs,
2288 nchildren, children, nrels, rels,
2289 ninvalmsgs, invalmsgs,
2290 initfileinval, false,
2291 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2292 xid, gid);
2293
2294
2295 if (replorigin)
2296 /* Move LSNs forward for this replication origin */
2297 replorigin_session_advance(replorigin_session_origin_lsn,
2298 XactLastRecEnd);
2299
2300 /*
2301 * Record commit timestamp. The value comes from plain commit timestamp
2302 * if replorigin is not enabled, or replorigin already set a value for us
2303 * in replorigin_session_origin_timestamp otherwise.
2304 *
2305 * We don't need to WAL-log anything here, as the commit record written
2306 * above already contains the data.
2307 */
2308 if (!replorigin || replorigin_session_origin_timestamp == 0)
2309 replorigin_session_origin_timestamp = committs;
2310
2311 TransactionTreeSetCommitTsData(xid, nchildren, children,
2312 replorigin_session_origin_timestamp,
2313 replorigin_session_origin, false);
2314
2315 /*
2316 * We don't currently try to sleep before flush here ... nor is there any
2317 * support for async commit of a prepared xact (the very idea is probably
2318 * a contradiction)
2319 */
2320
2321 /* Flush XLOG to disk */
2322 XLogFlush(recptr);
2323
2324 /* Mark the transaction committed in pg_xact */
2325 TransactionIdCommitTree(xid, nchildren, children);
2326
2327 /* Checkpoint can proceed now */
2328 MyProc->delayChkpt = false;
2329
2330 END_CRIT_SECTION();
2331
2332 /*
2333 * Wait for synchronous replication, if required.
2334 *
2335 * Note that at this stage we have marked clog, but still show as running
2336 * in the procarray and continue to hold locks.
2337 */
2338 SyncRepWaitForLSN(recptr, true);
2339 }
2340
2341 /*
2342 * RecordTransactionAbortPrepared
2343 *
2344 * This is basically the same as RecordTransactionAbort.
2345 *
2346 * We know the transaction made at least one XLOG entry (its PREPARE),
2347 * so it is never possible to optimize out the abort record.
2348 */
2349 static void
RecordTransactionAbortPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels,const char * gid)2350 RecordTransactionAbortPrepared(TransactionId xid,
2351 int nchildren,
2352 TransactionId *children,
2353 int nrels,
2354 RelFileNode *rels,
2355 const char *gid)
2356 {
2357 XLogRecPtr recptr;
2358
2359 /*
2360 * Catch the scenario where we aborted partway through
2361 * RecordTransactionCommitPrepared ...
2362 */
2363 if (TransactionIdDidCommit(xid))
2364 elog(PANIC, "cannot abort transaction %u, it was already committed",
2365 xid);
2366
2367 START_CRIT_SECTION();
2368
2369 /*
2370 * Emit the XLOG commit record. Note that we mark 2PC aborts as
2371 * potentially having AccessExclusiveLocks since we don't know whether or
2372 * not they do.
2373 */
2374 recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2375 nchildren, children,
2376 nrels, rels,
2377 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2378 xid, gid);
2379
2380 /* Always flush, since we're about to remove the 2PC state file */
2381 XLogFlush(recptr);
2382
2383 /*
2384 * Mark the transaction aborted in clog. This is not absolutely necessary
2385 * but we may as well do it while we are here.
2386 */
2387 TransactionIdAbortTree(xid, nchildren, children);
2388
2389 END_CRIT_SECTION();
2390
2391 /*
2392 * Wait for synchronous replication, if required.
2393 *
2394 * Note that at this stage we have marked clog, but still show as running
2395 * in the procarray and continue to hold locks.
2396 */
2397 SyncRepWaitForLSN(recptr, false);
2398 }
2399
2400 /*
2401 * PrepareRedoAdd
2402 *
2403 * Store pointers to the start/end of the WAL record along with the xid in
2404 * a gxact entry in shared memory TwoPhaseState structure. If caller
2405 * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2406 * data, the entry is marked as located on disk.
2407 */
2408 void
PrepareRedoAdd(char * buf,XLogRecPtr start_lsn,XLogRecPtr end_lsn,RepOriginId origin_id)2409 PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2410 XLogRecPtr end_lsn, RepOriginId origin_id)
2411 {
2412 TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2413 char *bufptr;
2414 const char *gid;
2415 GlobalTransaction gxact;
2416
2417 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2418 Assert(RecoveryInProgress());
2419
2420 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2421 gid = (const char *) bufptr;
2422
2423 /*
2424 * Reserve the GID for the given transaction in the redo code path.
2425 *
2426 * This creates a gxact struct and puts it into the active array.
2427 *
2428 * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2429 * shared memory. Hence, we only fill up the bare minimum contents here.
2430 * The gxact also gets marked with gxact->inredo set to true to indicate
2431 * that it got added in the redo phase
2432 */
2433
2434 /* Get a free gxact from the freelist */
2435 if (TwoPhaseState->freeGXacts == NULL)
2436 ereport(ERROR,
2437 (errcode(ERRCODE_OUT_OF_MEMORY),
2438 errmsg("maximum number of prepared transactions reached"),
2439 errhint("Increase max_prepared_transactions (currently %d).",
2440 max_prepared_xacts)));
2441 gxact = TwoPhaseState->freeGXacts;
2442 TwoPhaseState->freeGXacts = gxact->next;
2443
2444 gxact->prepared_at = hdr->prepared_at;
2445 gxact->prepare_start_lsn = start_lsn;
2446 gxact->prepare_end_lsn = end_lsn;
2447 gxact->xid = hdr->xid;
2448 gxact->owner = hdr->owner;
2449 gxact->locking_backend = InvalidBackendId;
2450 gxact->valid = false;
2451 gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2452 gxact->inredo = true; /* yes, added in redo */
2453 strcpy(gxact->gid, gid);
2454
2455 /* And insert it into the active array */
2456 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2457 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2458
2459 if (origin_id != InvalidRepOriginId)
2460 {
2461 /* recover apply progress */
2462 replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2463 false /* backward */ , false /* WAL */ );
2464 }
2465
2466 elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2467 }
2468
2469 /*
2470 * PrepareRedoRemove
2471 *
2472 * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2473 * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2474 *
2475 * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2476 * is updated.
2477 */
2478 void
PrepareRedoRemove(TransactionId xid,bool giveWarning)2479 PrepareRedoRemove(TransactionId xid, bool giveWarning)
2480 {
2481 GlobalTransaction gxact = NULL;
2482 int i;
2483 bool found = false;
2484
2485 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2486 Assert(RecoveryInProgress());
2487
2488 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2489 {
2490 gxact = TwoPhaseState->prepXacts[i];
2491
2492 if (gxact->xid == xid)
2493 {
2494 Assert(gxact->inredo);
2495 found = true;
2496 break;
2497 }
2498 }
2499
2500 /*
2501 * Just leave if there is nothing, this is expected during WAL replay.
2502 */
2503 if (!found)
2504 return;
2505
2506 /*
2507 * And now we can clean up any files we may have left.
2508 */
2509 elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2510 if (gxact->ondisk)
2511 RemoveTwoPhaseFile(xid, giveWarning);
2512 RemoveGXact(gxact);
2513 }
2514