1 /*-------------------------------------------------------------------------
2 *
3 * twophase.c
4 * Two-phase commit support functions.
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/access/transam/twophase.c
11 *
12 * NOTES
13 * Each global transaction is associated with a global transaction
14 * identifier (GID). The client assigns a GID to a postgres
15 * transaction with the PREPARE TRANSACTION command.
16 *
17 * We keep all active global transactions in a shared memory array.
18 * When the PREPARE TRANSACTION command is issued, the GID is
19 * reserved for the transaction in the array. This is done before
20 * a WAL entry is made, because the reservation checks for duplicate
21 * GIDs and aborts the transaction if there already is a global
22 * transaction in prepared state with the same GID.
23 *
24 * A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
25 * what keeps the XID considered running by TransactionIdIsInProgress.
26 * It is also convenient as a PGPROC to hook the gxact's locks to.
27 *
28 * Information to recover prepared transactions in case of crash is
29 * now stored in WAL for the common case. In some cases there will be
30 * an extended period between preparing a GXACT and commit/abort, in
31 * which case we need to separately record prepared transaction data
32 * in permanent storage. This includes locking information, pending
33 * notifications etc. All that state information is written to the
34 * per-transaction state file in the pg_twophase directory.
35 * All prepared transactions will be written prior to shutdown.
36 *
37 * Life track of state data is following:
38 *
39 * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 * stores pointer to the start of the WAL record in
41 * gxact->prepare_start_lsn.
42 * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 * using prepare_start_lsn.
44 * * On checkpoint state data copied to files in pg_twophase directory and
45 * fsynced
46 * * If COMMIT happens after checkpoint then backend reads state data from
47 * files
48 *
49 * During replay and replication, TwoPhaseState also holds information
50 * about active prepared transactions that haven't been moved to disk yet.
51 *
52 * Replay of twophase records happens by the following rules:
53 *
54 * * At the beginning of recovery, pg_twophase is scanned once, filling
55 * TwoPhaseState with entries marked with gxact->inredo and
56 * gxact->ondisk. Two-phase file data older than the XID horizon of
57 * the redo position are discarded.
58 * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 * gxact->inredo is set to true for such entries.
60 * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 * that have gxact->inredo set and are behind the redo_horizon. We
62 * save them to disk and then switch gxact->ondisk to true.
63 * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 * If gxact->ondisk is true, the corresponding entry from the disk
65 * is additionally deleted.
66 * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 * and PrescanPreparedTransactions() have been modified to go through
68 * gxact->inredo entries that have not made it to disk.
69 *
70 *-------------------------------------------------------------------------
71 */
72 #include "postgres.h"
73
74 #include <fcntl.h>
75 #include <sys/stat.h>
76 #include <time.h>
77 #include <unistd.h>
78
79 #include "access/commit_ts.h"
80 #include "access/htup_details.h"
81 #include "access/subtrans.h"
82 #include "access/transam.h"
83 #include "access/twophase.h"
84 #include "access/twophase_rmgr.h"
85 #include "access/xact.h"
86 #include "access/xlog.h"
87 #include "access/xloginsert.h"
88 #include "access/xlogutils.h"
89 #include "access/xlogreader.h"
90 #include "catalog/pg_type.h"
91 #include "catalog/storage.h"
92 #include "funcapi.h"
93 #include "miscadmin.h"
94 #include "pg_trace.h"
95 #include "pgstat.h"
96 #include "replication/origin.h"
97 #include "replication/syncrep.h"
98 #include "replication/walsender.h"
99 #include "storage/fd.h"
100 #include "storage/ipc.h"
101 #include "storage/predicate.h"
102 #include "storage/proc.h"
103 #include "storage/procarray.h"
104 #include "storage/sinvaladt.h"
105 #include "storage/smgr.h"
106 #include "utils/builtins.h"
107 #include "utils/memutils.h"
108 #include "utils/timestamp.h"
109
110
111 /*
112 * Directory where Two-phase commit files reside within PGDATA
113 */
114 #define TWOPHASE_DIR "pg_twophase"
115
116 /* GUC variable, can't be changed after startup */
117 int max_prepared_xacts = 0;
118
119 /*
120 * This struct describes one global transaction that is in prepared state
121 * or attempting to become prepared.
122 *
123 * The lifecycle of a global transaction is:
124 *
125 * 1. After checking that the requested GID is not in use, set up an entry in
126 * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127 * and mark it as locked by my backend.
128 *
129 * 2. After successfully completing prepare, set valid = true and enter the
130 * referenced PGPROC into the global ProcArray.
131 *
132 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133 * valid and not locked, then mark the entry as locked by storing my current
134 * backend ID into locking_backend. This prevents concurrent attempts to
135 * commit or rollback the same prepared xact.
136 *
137 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
139 * the freelist.
140 *
141 * Note that if the preparing transaction fails between steps 1 and 2, the
142 * entry must be removed so that the GID and the GlobalTransaction struct
143 * can be reused. See AtAbort_Twophase().
144 *
145 * typedef struct GlobalTransactionData *GlobalTransaction appears in
146 * twophase.h
147 *
148 * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
149 * specified in TwoPhaseFileHeader.
150 */
151 #define GIDSIZE 200
152
153 typedef struct GlobalTransactionData
154 {
155 GlobalTransaction next; /* list link for free list */
156 int pgprocno; /* ID of associated dummy PGPROC */
157 BackendId dummyBackendId; /* similar to backend id for backends */
158 TimestampTz prepared_at; /* time of preparation */
159
160 /*
161 * Note that we need to keep track of two LSNs for each GXACT. We keep
162 * track of the start LSN because this is the address we must use to read
163 * state data back from WAL when committing a prepared GXACT. We keep
164 * track of the end LSN because that is the LSN we need to wait for prior
165 * to commit.
166 */
167 XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
168 XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
169 TransactionId xid; /* The GXACT id */
170
171 Oid owner; /* ID of user that executed the xact */
172 BackendId locking_backend; /* backend currently working on the xact */
173 bool valid; /* TRUE if PGPROC entry is in proc array */
174 bool ondisk; /* TRUE if prepare state file is on disk */
175 bool inredo; /* TRUE if entry was added via xlog_redo */
176 char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
177 } GlobalTransactionData;
178
179 /*
180 * Two Phase Commit shared state. Access to this struct is protected
181 * by TwoPhaseStateLock.
182 */
183 typedef struct TwoPhaseStateData
184 {
185 /* Head of linked list of free GlobalTransactionData structs */
186 GlobalTransaction freeGXacts;
187
188 /* Number of valid prepXacts entries. */
189 int numPrepXacts;
190
191 /* There are max_prepared_xacts items in this array */
192 GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
193 } TwoPhaseStateData;
194
195 static TwoPhaseStateData *TwoPhaseState;
196
197 /*
198 * Global transaction entry currently locked by us, if any. Note that any
199 * access to the entry pointed to by this variable must be protected by
200 * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
201 * (since it's just local memory).
202 */
203 static GlobalTransaction MyLockedGxact = NULL;
204
205 static bool twophaseExitRegistered = false;
206
207 static void RecordTransactionCommitPrepared(TransactionId xid,
208 int nchildren,
209 TransactionId *children,
210 int nrels,
211 RelFileNode *rels,
212 int ninvalmsgs,
213 SharedInvalidationMessage *invalmsgs,
214 bool initfileinval);
215 static void RecordTransactionAbortPrepared(TransactionId xid,
216 int nchildren,
217 TransactionId *children,
218 int nrels,
219 RelFileNode *rels);
220 static void ProcessRecords(char *bufptr, TransactionId xid,
221 const TwoPhaseCallback callbacks[]);
222 static void RemoveGXact(GlobalTransaction gxact);
223
224 static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
225 static char *ProcessTwoPhaseBuffer(TransactionId xid,
226 XLogRecPtr prepare_start_lsn,
227 bool fromdisk, bool setParent, bool setNextXid);
228 static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
229 const char *gid, TimestampTz prepared_at, Oid owner,
230 Oid databaseid);
231 static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
232 static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
233
234 /*
235 * Initialization of shared memory
236 */
237 Size
TwoPhaseShmemSize(void)238 TwoPhaseShmemSize(void)
239 {
240 Size size;
241
242 /* Need the fixed struct, the array of pointers, and the GTD structs */
243 size = offsetof(TwoPhaseStateData, prepXacts);
244 size = add_size(size, mul_size(max_prepared_xacts,
245 sizeof(GlobalTransaction)));
246 size = MAXALIGN(size);
247 size = add_size(size, mul_size(max_prepared_xacts,
248 sizeof(GlobalTransactionData)));
249
250 return size;
251 }
252
253 void
TwoPhaseShmemInit(void)254 TwoPhaseShmemInit(void)
255 {
256 bool found;
257
258 TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
259 TwoPhaseShmemSize(),
260 &found);
261 if (!IsUnderPostmaster)
262 {
263 GlobalTransaction gxacts;
264 int i;
265
266 Assert(!found);
267 TwoPhaseState->freeGXacts = NULL;
268 TwoPhaseState->numPrepXacts = 0;
269
270 /*
271 * Initialize the linked list of free GlobalTransactionData structs
272 */
273 gxacts = (GlobalTransaction)
274 ((char *) TwoPhaseState +
275 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
276 sizeof(GlobalTransaction) * max_prepared_xacts));
277 for (i = 0; i < max_prepared_xacts; i++)
278 {
279 /* insert into linked list */
280 gxacts[i].next = TwoPhaseState->freeGXacts;
281 TwoPhaseState->freeGXacts = &gxacts[i];
282
283 /* associate it with a PGPROC assigned by InitProcGlobal */
284 gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
285
286 /*
287 * Assign a unique ID for each dummy proc, so that the range of
288 * dummy backend IDs immediately follows the range of normal
289 * backend IDs. We don't dare to assign a real backend ID to dummy
290 * procs, because prepared transactions don't take part in cache
291 * invalidation like a real backend ID would imply, but having a
292 * unique ID for them is nevertheless handy. This arrangement
293 * allows you to allocate an array of size (MaxBackends +
294 * max_prepared_xacts + 1), and have a slot for every backend and
295 * prepared transaction. Currently multixact.c uses that
296 * technique.
297 */
298 gxacts[i].dummyBackendId = MaxBackends + 1 + i;
299 }
300 }
301 else
302 Assert(found);
303 }
304
305 /*
306 * Exit hook to unlock the global transaction entry we're working on.
307 */
308 static void
AtProcExit_Twophase(int code,Datum arg)309 AtProcExit_Twophase(int code, Datum arg)
310 {
311 /* same logic as abort */
312 AtAbort_Twophase();
313 }
314
315 /*
316 * Abort hook to unlock the global transaction entry we're working on.
317 */
318 void
AtAbort_Twophase(void)319 AtAbort_Twophase(void)
320 {
321 if (MyLockedGxact == NULL)
322 return;
323
324 /*
325 * What to do with the locked global transaction entry? If we were in the
326 * process of preparing the transaction, but haven't written the WAL
327 * record and state file yet, the transaction must not be considered as
328 * prepared. Likewise, if we are in the process of finishing an
329 * already-prepared transaction, and fail after having already written the
330 * 2nd phase commit or rollback record to the WAL, the transaction should
331 * not be considered as prepared anymore. In those cases, just remove the
332 * entry from shared memory.
333 *
334 * Otherwise, the entry must be left in place so that the transaction can
335 * be finished later, so just unlock it.
336 *
337 * If we abort during prepare, after having written the WAL record, we
338 * might not have transferred all locks and other state to the prepared
339 * transaction yet. Likewise, if we abort during commit or rollback,
340 * after having written the WAL record, we might not have released all the
341 * resources held by the transaction yet. In those cases, the in-memory
342 * state can be wrong, but it's too late to back out.
343 */
344 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
345 if (!MyLockedGxact->valid)
346 RemoveGXact(MyLockedGxact);
347 else
348 MyLockedGxact->locking_backend = InvalidBackendId;
349 LWLockRelease(TwoPhaseStateLock);
350
351 MyLockedGxact = NULL;
352 }
353
354 /*
355 * This is called after we have finished transferring state to the prepared
356 * PGXACT entry.
357 */
358 void
PostPrepare_Twophase(void)359 PostPrepare_Twophase(void)
360 {
361 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
362 MyLockedGxact->locking_backend = InvalidBackendId;
363 LWLockRelease(TwoPhaseStateLock);
364
365 MyLockedGxact = NULL;
366 }
367
368
369 /*
370 * MarkAsPreparing
371 * Reserve the GID for the given transaction.
372 */
373 GlobalTransaction
MarkAsPreparing(TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)374 MarkAsPreparing(TransactionId xid, const char *gid,
375 TimestampTz prepared_at, Oid owner, Oid databaseid)
376 {
377 GlobalTransaction gxact;
378 int i;
379
380 if (strlen(gid) >= GIDSIZE)
381 ereport(ERROR,
382 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
383 errmsg("transaction identifier \"%s\" is too long",
384 gid)));
385
386 /* fail immediately if feature is disabled */
387 if (max_prepared_xacts == 0)
388 ereport(ERROR,
389 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
390 errmsg("prepared transactions are disabled"),
391 errhint("Set max_prepared_transactions to a nonzero value.")));
392
393 /* on first call, register the exit hook */
394 if (!twophaseExitRegistered)
395 {
396 before_shmem_exit(AtProcExit_Twophase, 0);
397 twophaseExitRegistered = true;
398 }
399
400 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
401
402 /* Check for conflicting GID */
403 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
404 {
405 gxact = TwoPhaseState->prepXacts[i];
406 if (strcmp(gxact->gid, gid) == 0)
407 {
408 ereport(ERROR,
409 (errcode(ERRCODE_DUPLICATE_OBJECT),
410 errmsg("transaction identifier \"%s\" is already in use",
411 gid)));
412 }
413 }
414
415 /* Get a free gxact from the freelist */
416 if (TwoPhaseState->freeGXacts == NULL)
417 ereport(ERROR,
418 (errcode(ERRCODE_OUT_OF_MEMORY),
419 errmsg("maximum number of prepared transactions reached"),
420 errhint("Increase max_prepared_transactions (currently %d).",
421 max_prepared_xacts)));
422 gxact = TwoPhaseState->freeGXacts;
423 TwoPhaseState->freeGXacts = gxact->next;
424
425 MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
426
427 gxact->ondisk = false;
428
429 /* And insert it into the active array */
430 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
431 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
432
433 LWLockRelease(TwoPhaseStateLock);
434
435 return gxact;
436 }
437
438 /*
439 * MarkAsPreparingGuts
440 *
441 * This uses a gxact struct and puts it into the active array.
442 * NOTE: this is also used when reloading a gxact after a crash; so avoid
443 * assuming that we can use very much backend context.
444 *
445 * Note: This function should be called with appropriate locks held.
446 */
447 static void
MarkAsPreparingGuts(GlobalTransaction gxact,TransactionId xid,const char * gid,TimestampTz prepared_at,Oid owner,Oid databaseid)448 MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
449 TimestampTz prepared_at, Oid owner, Oid databaseid)
450 {
451 PGPROC *proc;
452 PGXACT *pgxact;
453 int i;
454
455 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
456
457 Assert(gxact != NULL);
458 proc = &ProcGlobal->allProcs[gxact->pgprocno];
459 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
460
461 /* Initialize the PGPROC entry */
462 MemSet(proc, 0, sizeof(PGPROC));
463 proc->pgprocno = gxact->pgprocno;
464 SHMQueueElemInit(&(proc->links));
465 proc->waitStatus = STATUS_OK;
466 if (LocalTransactionIdIsValid(MyProc->lxid))
467 {
468 /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
469 proc->lxid = MyProc->lxid;
470 proc->backendId = MyBackendId;
471 }
472 else
473 {
474 Assert(AmStartupProcess() || !IsPostmasterEnvironment);
475 /* GetLockConflicts() uses this to specify a wait on the XID */
476 proc->lxid = xid;
477 proc->backendId = InvalidBackendId;
478 }
479 pgxact->xid = xid;
480 pgxact->xmin = InvalidTransactionId;
481 pgxact->delayChkpt = false;
482 pgxact->vacuumFlags = 0;
483 proc->pid = 0;
484 proc->databaseId = databaseid;
485 proc->roleId = owner;
486 proc->isBackgroundWorker = false;
487 proc->lwWaiting = false;
488 proc->lwWaitMode = 0;
489 proc->waitLock = NULL;
490 proc->waitProcLock = NULL;
491 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
492 SHMQueueInit(&(proc->myProcLocks[i]));
493 /* subxid data must be filled later by GXactLoadSubxactData */
494 pgxact->overflowed = false;
495 pgxact->nxids = 0;
496
497 gxact->prepared_at = prepared_at;
498 gxact->xid = xid;
499 gxact->owner = owner;
500 gxact->locking_backend = MyBackendId;
501 gxact->valid = false;
502 gxact->inredo = false;
503 strcpy(gxact->gid, gid);
504
505 /*
506 * Remember that we have this GlobalTransaction entry locked for us. If we
507 * abort after this, we must release it.
508 */
509 MyLockedGxact = gxact;
510 }
511
512 /*
513 * GXactLoadSubxactData
514 *
515 * If the transaction being persisted had any subtransactions, this must
516 * be called before MarkAsPrepared() to load information into the dummy
517 * PGPROC.
518 */
519 static void
GXactLoadSubxactData(GlobalTransaction gxact,int nsubxacts,TransactionId * children)520 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
521 TransactionId *children)
522 {
523 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
524 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
525
526 /* We need no extra lock since the GXACT isn't valid yet */
527 if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
528 {
529 pgxact->overflowed = true;
530 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
531 }
532 if (nsubxacts > 0)
533 {
534 memcpy(proc->subxids.xids, children,
535 nsubxacts * sizeof(TransactionId));
536 pgxact->nxids = nsubxacts;
537 }
538 }
539
540 /*
541 * MarkAsPrepared
542 * Mark the GXACT as fully valid, and enter it into the global ProcArray.
543 *
544 * lock_held indicates whether caller already holds TwoPhaseStateLock.
545 */
546 static void
MarkAsPrepared(GlobalTransaction gxact,bool lock_held)547 MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
548 {
549 /* Lock here may be overkill, but I'm not convinced of that ... */
550 if (!lock_held)
551 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
552 Assert(!gxact->valid);
553 gxact->valid = true;
554 if (!lock_held)
555 LWLockRelease(TwoPhaseStateLock);
556
557 /*
558 * Put it into the global ProcArray so TransactionIdIsInProgress considers
559 * the XID as still running.
560 */
561 ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
562 }
563
564 /*
565 * LockGXact
566 * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
567 */
568 static GlobalTransaction
LockGXact(const char * gid,Oid user)569 LockGXact(const char *gid, Oid user)
570 {
571 int i;
572
573 /* on first call, register the exit hook */
574 if (!twophaseExitRegistered)
575 {
576 before_shmem_exit(AtProcExit_Twophase, 0);
577 twophaseExitRegistered = true;
578 }
579
580 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
581
582 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
583 {
584 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
585 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
586
587 /* Ignore not-yet-valid GIDs */
588 if (!gxact->valid)
589 continue;
590 if (strcmp(gxact->gid, gid) != 0)
591 continue;
592
593 /* Found it, but has someone else got it locked? */
594 if (gxact->locking_backend != InvalidBackendId)
595 ereport(ERROR,
596 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
597 errmsg("prepared transaction with identifier \"%s\" is busy",
598 gid)));
599
600 if (user != gxact->owner && !superuser_arg(user))
601 ereport(ERROR,
602 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
603 errmsg("permission denied to finish prepared transaction"),
604 errhint("Must be superuser or the user that prepared the transaction.")));
605
606 /*
607 * Note: it probably would be possible to allow committing from
608 * another database; but at the moment NOTIFY is known not to work and
609 * there may be some other issues as well. Hence disallow until
610 * someone gets motivated to make it work.
611 */
612 if (MyDatabaseId != proc->databaseId)
613 ereport(ERROR,
614 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
615 errmsg("prepared transaction belongs to another database"),
616 errhint("Connect to the database where the transaction was prepared to finish it.")));
617
618 /* OK for me to lock it */
619 gxact->locking_backend = MyBackendId;
620 MyLockedGxact = gxact;
621
622 LWLockRelease(TwoPhaseStateLock);
623
624 return gxact;
625 }
626
627 LWLockRelease(TwoPhaseStateLock);
628
629 ereport(ERROR,
630 (errcode(ERRCODE_UNDEFINED_OBJECT),
631 errmsg("prepared transaction with identifier \"%s\" does not exist",
632 gid)));
633
634 /* NOTREACHED */
635 return NULL;
636 }
637
638 /*
639 * RemoveGXact
640 * Remove the prepared transaction from the shared memory array.
641 *
642 * NB: caller should have already removed it from ProcArray
643 */
644 static void
RemoveGXact(GlobalTransaction gxact)645 RemoveGXact(GlobalTransaction gxact)
646 {
647 int i;
648
649 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
650
651 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
652 {
653 if (gxact == TwoPhaseState->prepXacts[i])
654 {
655 /* remove from the active array */
656 TwoPhaseState->numPrepXacts--;
657 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
658
659 /* and put it back in the freelist */
660 gxact->next = TwoPhaseState->freeGXacts;
661 TwoPhaseState->freeGXacts = gxact;
662
663 return;
664 }
665 }
666
667 elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
668 }
669
670 /*
671 * Returns an array of all prepared transactions for the user-level
672 * function pg_prepared_xact.
673 *
674 * The returned array and all its elements are copies of internal data
675 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
676 *
677 * WARNING -- we return even those transactions that are not fully prepared
678 * yet. The caller should filter them out if he doesn't want them.
679 *
680 * The returned array is palloc'd.
681 */
682 static int
GetPreparedTransactionList(GlobalTransaction * gxacts)683 GetPreparedTransactionList(GlobalTransaction *gxacts)
684 {
685 GlobalTransaction array;
686 int num;
687 int i;
688
689 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
690
691 if (TwoPhaseState->numPrepXacts == 0)
692 {
693 LWLockRelease(TwoPhaseStateLock);
694
695 *gxacts = NULL;
696 return 0;
697 }
698
699 num = TwoPhaseState->numPrepXacts;
700 array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
701 *gxacts = array;
702 for (i = 0; i < num; i++)
703 memcpy(array + i, TwoPhaseState->prepXacts[i],
704 sizeof(GlobalTransactionData));
705
706 LWLockRelease(TwoPhaseStateLock);
707
708 return num;
709 }
710
711
712 /* Working status for pg_prepared_xact */
713 typedef struct
714 {
715 GlobalTransaction array;
716 int ngxacts;
717 int currIdx;
718 } Working_State;
719
720 /*
721 * pg_prepared_xact
722 * Produce a view with one row per prepared transaction.
723 *
724 * This function is here so we don't have to export the
725 * GlobalTransactionData struct definition.
726 */
727 Datum
pg_prepared_xact(PG_FUNCTION_ARGS)728 pg_prepared_xact(PG_FUNCTION_ARGS)
729 {
730 FuncCallContext *funcctx;
731 Working_State *status;
732
733 if (SRF_IS_FIRSTCALL())
734 {
735 TupleDesc tupdesc;
736 MemoryContext oldcontext;
737
738 /* create a function context for cross-call persistence */
739 funcctx = SRF_FIRSTCALL_INIT();
740
741 /*
742 * Switch to memory context appropriate for multiple function calls
743 */
744 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
745
746 /* build tupdesc for result tuples */
747 /* this had better match pg_prepared_xacts view in system_views.sql */
748 tupdesc = CreateTemplateTupleDesc(5, false);
749 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
750 XIDOID, -1, 0);
751 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
752 TEXTOID, -1, 0);
753 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
754 TIMESTAMPTZOID, -1, 0);
755 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
756 OIDOID, -1, 0);
757 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
758 OIDOID, -1, 0);
759
760 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
761
762 /*
763 * Collect all the 2PC status information that we will format and send
764 * out as a result set.
765 */
766 status = (Working_State *) palloc(sizeof(Working_State));
767 funcctx->user_fctx = (void *) status;
768
769 status->ngxacts = GetPreparedTransactionList(&status->array);
770 status->currIdx = 0;
771
772 MemoryContextSwitchTo(oldcontext);
773 }
774
775 funcctx = SRF_PERCALL_SETUP();
776 status = (Working_State *) funcctx->user_fctx;
777
778 while (status->array != NULL && status->currIdx < status->ngxacts)
779 {
780 GlobalTransaction gxact = &status->array[status->currIdx++];
781 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
782 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
783 Datum values[5];
784 bool nulls[5];
785 HeapTuple tuple;
786 Datum result;
787
788 if (!gxact->valid)
789 continue;
790
791 /*
792 * Form tuple with appropriate data.
793 */
794 MemSet(values, 0, sizeof(values));
795 MemSet(nulls, 0, sizeof(nulls));
796
797 values[0] = TransactionIdGetDatum(pgxact->xid);
798 values[1] = CStringGetTextDatum(gxact->gid);
799 values[2] = TimestampTzGetDatum(gxact->prepared_at);
800 values[3] = ObjectIdGetDatum(gxact->owner);
801 values[4] = ObjectIdGetDatum(proc->databaseId);
802
803 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
804 result = HeapTupleGetDatum(tuple);
805 SRF_RETURN_NEXT(funcctx, result);
806 }
807
808 SRF_RETURN_DONE(funcctx);
809 }
810
811 /*
812 * TwoPhaseGetGXact
813 * Get the GlobalTransaction struct for a prepared transaction
814 * specified by XID
815 */
816 static GlobalTransaction
TwoPhaseGetGXact(TransactionId xid)817 TwoPhaseGetGXact(TransactionId xid)
818 {
819 GlobalTransaction result = NULL;
820 int i;
821
822 static TransactionId cached_xid = InvalidTransactionId;
823 static GlobalTransaction cached_gxact = NULL;
824
825 /*
826 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
827 * repeatedly for the same XID. We can save work with a simple cache.
828 */
829 if (xid == cached_xid)
830 return cached_gxact;
831
832 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
833
834 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
835 {
836 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
837 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
838
839 if (pgxact->xid == xid)
840 {
841 result = gxact;
842 break;
843 }
844 }
845
846 LWLockRelease(TwoPhaseStateLock);
847
848 if (result == NULL) /* should not happen */
849 elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
850
851 cached_xid = xid;
852 cached_gxact = result;
853
854 return result;
855 }
856
857 /*
858 * TwoPhaseGetXidByVirtualXID
859 * Lookup VXID among xacts prepared since last startup.
860 *
861 * (This won't find recovered xacts.) If more than one matches, return any
862 * and set "have_more" to true. To witness multiple matches, a single
863 * BackendId must consume 2^32 LXIDs, with no intervening database restart.
864 */
865 TransactionId
TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,bool * have_more)866 TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
867 bool *have_more)
868 {
869 int i;
870 TransactionId result = InvalidTransactionId;
871
872 Assert(VirtualTransactionIdIsValid(vxid));
873 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
874
875 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
876 {
877 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
878 PGPROC *proc;
879 VirtualTransactionId proc_vxid;
880
881 if (!gxact->valid)
882 continue;
883 proc = &ProcGlobal->allProcs[gxact->pgprocno];
884 GET_VXID_FROM_PGPROC(proc_vxid, *proc);
885 if (VirtualTransactionIdEquals(vxid, proc_vxid))
886 {
887 /* Startup process sets proc->backendId to InvalidBackendId. */
888 Assert(!gxact->inredo);
889
890 if (result != InvalidTransactionId)
891 {
892 *have_more = true;
893 break;
894 }
895 result = gxact->xid;
896 }
897 }
898
899 LWLockRelease(TwoPhaseStateLock);
900
901 return result;
902 }
903
904 /*
905 * TwoPhaseGetDummyProc
906 * Get the dummy backend ID for prepared transaction specified by XID
907 *
908 * Dummy backend IDs are similar to real backend IDs of real backends.
909 * They start at MaxBackends + 1, and are unique across all currently active
910 * real backends and prepared transactions.
911 */
912 BackendId
TwoPhaseGetDummyBackendId(TransactionId xid)913 TwoPhaseGetDummyBackendId(TransactionId xid)
914 {
915 GlobalTransaction gxact = TwoPhaseGetGXact(xid);
916
917 return gxact->dummyBackendId;
918 }
919
920 /*
921 * TwoPhaseGetDummyProc
922 * Get the PGPROC that represents a prepared transaction specified by XID
923 */
924 PGPROC *
TwoPhaseGetDummyProc(TransactionId xid)925 TwoPhaseGetDummyProc(TransactionId xid)
926 {
927 GlobalTransaction gxact = TwoPhaseGetGXact(xid);
928
929 return &ProcGlobal->allProcs[gxact->pgprocno];
930 }
931
932 /************************************************************************/
933 /* State file support */
934 /************************************************************************/
935
936 #define TwoPhaseFilePath(path, xid) \
937 snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
938
939 /*
940 * 2PC state file format:
941 *
942 * 1. TwoPhaseFileHeader
943 * 2. TransactionId[] (subtransactions)
944 * 3. RelFileNode[] (files to be deleted at commit)
945 * 4. RelFileNode[] (files to be deleted at abort)
946 * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
947 * 6. TwoPhaseRecordOnDisk
948 * 7. ...
949 * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
950 * 9. checksum (CRC-32C)
951 *
952 * Each segment except the final checksum is MAXALIGN'd.
953 */
954
955 /*
956 * Header for a 2PC state file
957 */
958 #define TWOPHASE_MAGIC 0x57F94533 /* format identifier */
959
960 typedef struct TwoPhaseFileHeader
961 {
962 uint32 magic; /* format identifier */
963 uint32 total_len; /* actual file length */
964 TransactionId xid; /* original transaction XID */
965 Oid database; /* OID of database it was in */
966 TimestampTz prepared_at; /* time of preparation */
967 Oid owner; /* user running the transaction */
968 int32 nsubxacts; /* number of following subxact XIDs */
969 int32 ncommitrels; /* number of delete-on-commit rels */
970 int32 nabortrels; /* number of delete-on-abort rels */
971 int32 ninvalmsgs; /* number of cache invalidation messages */
972 bool initfileinval; /* does relcache init file need invalidation? */
973 uint16 gidlen; /* length of the GID - GID follows the header */
974 } TwoPhaseFileHeader;
975
976 /*
977 * Header for each record in a state file
978 *
979 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
980 * The rmgr data will be stored starting on a MAXALIGN boundary.
981 */
982 typedef struct TwoPhaseRecordOnDisk
983 {
984 uint32 len; /* length of rmgr data */
985 TwoPhaseRmgrId rmid; /* resource manager for this record */
986 uint16 info; /* flag bits for use by rmgr */
987 } TwoPhaseRecordOnDisk;
988
989 /*
990 * During prepare, the state file is assembled in memory before writing it
991 * to WAL and the actual state file. We use a chain of StateFileChunk blocks
992 * for that.
993 */
994 typedef struct StateFileChunk
995 {
996 char *data;
997 uint32 len;
998 struct StateFileChunk *next;
999 } StateFileChunk;
1000
1001 static struct xllist
1002 {
1003 StateFileChunk *head; /* first data block in the chain */
1004 StateFileChunk *tail; /* last block in chain */
1005 uint32 num_chunks;
1006 uint32 bytes_free; /* free bytes left in tail block */
1007 uint32 total_len; /* total data bytes in chain */
1008 } records;
1009
1010
1011 /*
1012 * Append a block of data to records data structure.
1013 *
1014 * NB: each block is padded to a MAXALIGN multiple. This must be
1015 * accounted for when the file is later read!
1016 *
1017 * The data is copied, so the caller is free to modify it afterwards.
1018 */
1019 static void
save_state_data(const void * data,uint32 len)1020 save_state_data(const void *data, uint32 len)
1021 {
1022 uint32 padlen = MAXALIGN(len);
1023
1024 if (padlen > records.bytes_free)
1025 {
1026 records.tail->next = palloc0(sizeof(StateFileChunk));
1027 records.tail = records.tail->next;
1028 records.tail->len = 0;
1029 records.tail->next = NULL;
1030 records.num_chunks++;
1031
1032 records.bytes_free = Max(padlen, 512);
1033 records.tail->data = palloc(records.bytes_free);
1034 }
1035
1036 memcpy(((char *) records.tail->data) + records.tail->len, data, len);
1037 records.tail->len += padlen;
1038 records.bytes_free -= padlen;
1039 records.total_len += padlen;
1040 }
1041
1042 /*
1043 * Start preparing a state file.
1044 *
1045 * Initializes data structure and inserts the 2PC file header record.
1046 */
1047 void
StartPrepare(GlobalTransaction gxact)1048 StartPrepare(GlobalTransaction gxact)
1049 {
1050 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
1051 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1052 TransactionId xid = pgxact->xid;
1053 TwoPhaseFileHeader hdr;
1054 TransactionId *children;
1055 RelFileNode *commitrels;
1056 RelFileNode *abortrels;
1057 SharedInvalidationMessage *invalmsgs;
1058
1059 /* Initialize linked list */
1060 records.head = palloc0(sizeof(StateFileChunk));
1061 records.head->len = 0;
1062 records.head->next = NULL;
1063
1064 records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1065 records.head->data = palloc(records.bytes_free);
1066
1067 records.tail = records.head;
1068 records.num_chunks = 1;
1069
1070 records.total_len = 0;
1071
1072 /* Create header */
1073 hdr.magic = TWOPHASE_MAGIC;
1074 hdr.total_len = 0; /* EndPrepare will fill this in */
1075 hdr.xid = xid;
1076 hdr.database = proc->databaseId;
1077 hdr.prepared_at = gxact->prepared_at;
1078 hdr.owner = gxact->owner;
1079 hdr.nsubxacts = xactGetCommittedChildren(&children);
1080 hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1081 hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1082 hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1083 &hdr.initfileinval);
1084 hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1085
1086 save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1087 save_state_data(gxact->gid, hdr.gidlen);
1088
1089 /*
1090 * Add the additional info about subxacts, deletable files and cache
1091 * invalidation messages.
1092 */
1093 if (hdr.nsubxacts > 0)
1094 {
1095 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1096 /* While we have the child-xact data, stuff it in the gxact too */
1097 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1098 }
1099 if (hdr.ncommitrels > 0)
1100 {
1101 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
1102 pfree(commitrels);
1103 }
1104 if (hdr.nabortrels > 0)
1105 {
1106 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
1107 pfree(abortrels);
1108 }
1109 if (hdr.ninvalmsgs > 0)
1110 {
1111 save_state_data(invalmsgs,
1112 hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1113 pfree(invalmsgs);
1114 }
1115 }
1116
1117 /*
1118 * Finish preparing state data and writing it to WAL.
1119 */
1120 void
EndPrepare(GlobalTransaction gxact)1121 EndPrepare(GlobalTransaction gxact)
1122 {
1123 TwoPhaseFileHeader *hdr;
1124 StateFileChunk *record;
1125
1126 /* Add the end sentinel to the list of 2PC records */
1127 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1128 NULL, 0);
1129
1130 /* Go back and fill in total_len in the file header record */
1131 hdr = (TwoPhaseFileHeader *) records.head->data;
1132 Assert(hdr->magic == TWOPHASE_MAGIC);
1133 hdr->total_len = records.total_len + sizeof(pg_crc32c);
1134
1135 /*
1136 * If the data size exceeds MaxAllocSize, we won't be able to read it in
1137 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1138 * where we write data to file and then re-read at commit time.
1139 */
1140 if (hdr->total_len > MaxAllocSize)
1141 ereport(ERROR,
1142 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1143 errmsg("two-phase state file maximum length exceeded")));
1144
1145 /*
1146 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1147 * cover us, so no need to calculate a separate CRC.
1148 *
1149 * We have to set delayChkpt here, too; otherwise a checkpoint starting
1150 * immediately after the WAL record is inserted could complete without
1151 * fsync'ing our state file. (This is essentially the same kind of race
1152 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
1153 * uses delayChkpt for; see notes there.)
1154 *
1155 * We save the PREPARE record's location in the gxact for later use by
1156 * CheckPointTwoPhase.
1157 */
1158 XLogEnsureRecordSpace(0, records.num_chunks);
1159
1160 START_CRIT_SECTION();
1161
1162 MyPgXact->delayChkpt = true;
1163
1164 XLogBeginInsert();
1165 for (record = records.head; record != NULL; record = record->next)
1166 XLogRegisterData(record->data, record->len);
1167 gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1168 XLogFlush(gxact->prepare_end_lsn);
1169
1170 /* If we crash now, we have prepared: WAL replay will fix things */
1171
1172 /* Store record's start location to read that later on Commit */
1173 gxact->prepare_start_lsn = ProcLastRecPtr;
1174
1175 /*
1176 * Mark the prepared transaction as valid. As soon as xact.c marks
1177 * MyPgXact as not running our XID (which it will do immediately after
1178 * this function returns), others can commit/rollback the xact.
1179 *
1180 * NB: a side effect of this is to make a dummy ProcArray entry for the
1181 * prepared XID. This must happen before we clear the XID from MyPgXact,
1182 * else there is a window where the XID is not running according to
1183 * TransactionIdIsInProgress, and onlookers would be entitled to assume
1184 * the xact crashed. Instead we have a window where the same XID appears
1185 * twice in ProcArray, which is OK.
1186 */
1187 MarkAsPrepared(gxact, false);
1188
1189 /*
1190 * Now we can mark ourselves as out of the commit critical section: a
1191 * checkpoint starting after this will certainly see the gxact as a
1192 * candidate for fsyncing.
1193 */
1194 MyPgXact->delayChkpt = false;
1195
1196 /*
1197 * Remember that we have this GlobalTransaction entry locked for us. If
1198 * we crash after this point, it's too late to abort, but we must unlock
1199 * it so that the prepared transaction can be committed or rolled back.
1200 */
1201 MyLockedGxact = gxact;
1202
1203 END_CRIT_SECTION();
1204
1205 /*
1206 * Wait for synchronous replication, if required.
1207 *
1208 * Note that at this stage we have marked the prepare, but still show as
1209 * running in the procarray (twice!) and continue to hold locks.
1210 */
1211 SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1212
1213 records.tail = records.head = NULL;
1214 records.num_chunks = 0;
1215 }
1216
1217 /*
1218 * Register a 2PC record to be written to state file.
1219 */
1220 void
RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid,uint16 info,const void * data,uint32 len)1221 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1222 const void *data, uint32 len)
1223 {
1224 TwoPhaseRecordOnDisk record;
1225
1226 record.rmid = rmid;
1227 record.info = info;
1228 record.len = len;
1229 save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1230 if (len > 0)
1231 save_state_data(data, len);
1232 }
1233
1234
1235 /*
1236 * Read and validate the state file for xid.
1237 *
1238 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1239 * contents of the file. Otherwise return NULL.
1240 */
1241 static char *
ReadTwoPhaseFile(TransactionId xid,bool give_warnings)1242 ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
1243 {
1244 char path[MAXPGPATH];
1245 char *buf;
1246 TwoPhaseFileHeader *hdr;
1247 int fd;
1248 struct stat stat;
1249 uint32 crc_offset;
1250 pg_crc32c calc_crc,
1251 file_crc;
1252
1253 TwoPhaseFilePath(path, xid);
1254
1255 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1256 if (fd < 0)
1257 {
1258 if (give_warnings)
1259 ereport(WARNING,
1260 (errcode_for_file_access(),
1261 errmsg("could not open two-phase state file \"%s\": %m",
1262 path)));
1263 return NULL;
1264 }
1265
1266 /*
1267 * Check file length. We can determine a lower bound pretty easily. We
1268 * set an upper bound to avoid palloc() failure on a corrupt file, though
1269 * we can't guarantee that we won't get an out of memory error anyway,
1270 * even on a valid file.
1271 */
1272 if (fstat(fd, &stat))
1273 {
1274 int save_errno = errno;
1275
1276 CloseTransientFile(fd);
1277 if (give_warnings)
1278 {
1279 errno = save_errno;
1280 ereport(WARNING,
1281 (errcode_for_file_access(),
1282 errmsg("could not stat two-phase state file \"%s\": %m",
1283 path)));
1284 }
1285 return NULL;
1286 }
1287
1288 if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1289 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1290 sizeof(pg_crc32c)) ||
1291 stat.st_size > MaxAllocSize)
1292 {
1293 CloseTransientFile(fd);
1294 return NULL;
1295 }
1296
1297 crc_offset = stat.st_size - sizeof(pg_crc32c);
1298 if (crc_offset != MAXALIGN(crc_offset))
1299 {
1300 CloseTransientFile(fd);
1301 return NULL;
1302 }
1303
1304 /*
1305 * OK, slurp in the file.
1306 */
1307 buf = (char *) palloc(stat.st_size);
1308
1309 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1310 if (read(fd, buf, stat.st_size) != stat.st_size)
1311 {
1312 int save_errno = errno;
1313
1314 pgstat_report_wait_end();
1315 CloseTransientFile(fd);
1316 if (give_warnings)
1317 {
1318 errno = save_errno;
1319 ereport(WARNING,
1320 (errcode_for_file_access(),
1321 errmsg("could not read two-phase state file \"%s\": %m",
1322 path)));
1323 }
1324 pfree(buf);
1325 return NULL;
1326 }
1327
1328 pgstat_report_wait_end();
1329 CloseTransientFile(fd);
1330
1331 hdr = (TwoPhaseFileHeader *) buf;
1332 if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1333 {
1334 pfree(buf);
1335 return NULL;
1336 }
1337
1338 INIT_CRC32C(calc_crc);
1339 COMP_CRC32C(calc_crc, buf, crc_offset);
1340 FIN_CRC32C(calc_crc);
1341
1342 file_crc = *((pg_crc32c *) (buf + crc_offset));
1343
1344 if (!EQ_CRC32C(calc_crc, file_crc))
1345 {
1346 pfree(buf);
1347 return NULL;
1348 }
1349
1350 return buf;
1351 }
1352
1353
1354 /*
1355 * Reads 2PC data from xlog. During checkpoint this data will be moved to
1356 * twophase files and ReadTwoPhaseFile should be used instead.
1357 *
1358 * Note clearly that this function can access WAL during normal operation,
1359 * similarly to the way WALSender or Logical Decoding would do. While
1360 * accessing WAL, read_local_xlog_page() may change ThisTimeLineID,
1361 * particularly if this routine is called for the end-of-recovery checkpoint
1362 * in the checkpointer itself, so save the current timeline number value
1363 * and restore it once done.
1364 */
1365 static void
XlogReadTwoPhaseData(XLogRecPtr lsn,char ** buf,int * len)1366 XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1367 {
1368 XLogRecord *record;
1369 XLogReaderState *xlogreader;
1370 char *errormsg;
1371 TimeLineID save_currtli = ThisTimeLineID;
1372
1373 xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
1374 if (!xlogreader)
1375 ereport(ERROR,
1376 (errcode(ERRCODE_OUT_OF_MEMORY),
1377 errmsg("out of memory"),
1378 errdetail("Failed while allocating a WAL reading processor.")));
1379
1380 record = XLogReadRecord(xlogreader, lsn, &errormsg);
1381
1382 /*
1383 * Restore immediately the timeline where it was previously, as
1384 * read_local_xlog_page() could have changed it if the record was read
1385 * while recovery was finishing or if the timeline has jumped in-between.
1386 */
1387 ThisTimeLineID = save_currtli;
1388
1389 if (record == NULL)
1390 ereport(ERROR,
1391 (errcode_for_file_access(),
1392 errmsg("could not read two-phase state from WAL at %X/%X",
1393 (uint32) (lsn >> 32),
1394 (uint32) lsn)));
1395
1396 if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1397 (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1398 ereport(ERROR,
1399 (errcode_for_file_access(),
1400 errmsg("expected two-phase state data is not present in WAL at %X/%X",
1401 (uint32) (lsn >> 32),
1402 (uint32) lsn)));
1403
1404 if (len != NULL)
1405 *len = XLogRecGetDataLen(xlogreader);
1406
1407 *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1408 memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1409
1410 XLogReaderFree(xlogreader);
1411 }
1412
1413
1414 /*
1415 * Confirms an xid is prepared, during recovery
1416 */
1417 bool
StandbyTransactionIdIsPrepared(TransactionId xid)1418 StandbyTransactionIdIsPrepared(TransactionId xid)
1419 {
1420 char *buf;
1421 TwoPhaseFileHeader *hdr;
1422 bool result;
1423
1424 Assert(TransactionIdIsValid(xid));
1425
1426 if (max_prepared_xacts <= 0)
1427 return false; /* nothing to do */
1428
1429 /* Read and validate file */
1430 buf = ReadTwoPhaseFile(xid, false);
1431 if (buf == NULL)
1432 return false;
1433
1434 /* Check header also */
1435 hdr = (TwoPhaseFileHeader *) buf;
1436 result = TransactionIdEquals(hdr->xid, xid);
1437 pfree(buf);
1438
1439 return result;
1440 }
1441
1442 /*
1443 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1444 */
1445 void
FinishPreparedTransaction(const char * gid,bool isCommit)1446 FinishPreparedTransaction(const char *gid, bool isCommit)
1447 {
1448 GlobalTransaction gxact;
1449 PGPROC *proc;
1450 PGXACT *pgxact;
1451 TransactionId xid;
1452 char *buf;
1453 char *bufptr;
1454 TwoPhaseFileHeader *hdr;
1455 TransactionId latestXid;
1456 TransactionId *children;
1457 RelFileNode *commitrels;
1458 RelFileNode *abortrels;
1459 RelFileNode *delrels;
1460 int ndelrels;
1461 SharedInvalidationMessage *invalmsgs;
1462
1463 /*
1464 * Validate the GID, and lock the GXACT to ensure that two backends do not
1465 * try to commit the same GID at once.
1466 */
1467 gxact = LockGXact(gid, GetUserId());
1468 proc = &ProcGlobal->allProcs[gxact->pgprocno];
1469 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1470 xid = pgxact->xid;
1471
1472 /*
1473 * Read and validate 2PC state data. State data will typically be stored
1474 * in WAL files if the LSN is after the last checkpoint record, or moved
1475 * to disk if for some reason they have lived for a long time.
1476 */
1477 if (gxact->ondisk)
1478 buf = ReadTwoPhaseFile(xid, true);
1479 else
1480 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1481
1482
1483 /*
1484 * Disassemble the header area
1485 */
1486 hdr = (TwoPhaseFileHeader *) buf;
1487 Assert(TransactionIdEquals(hdr->xid, xid));
1488 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1489 bufptr += MAXALIGN(hdr->gidlen);
1490 children = (TransactionId *) bufptr;
1491 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1492 commitrels = (RelFileNode *) bufptr;
1493 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1494 abortrels = (RelFileNode *) bufptr;
1495 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1496 invalmsgs = (SharedInvalidationMessage *) bufptr;
1497 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1498
1499 /* compute latestXid among all children */
1500 latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1501
1502 /* Prevent cancel/die interrupt while cleaning up */
1503 HOLD_INTERRUPTS();
1504
1505 /*
1506 * The order of operations here is critical: make the XLOG entry for
1507 * commit or abort, then mark the transaction committed or aborted in
1508 * pg_xact, then remove its PGPROC from the global ProcArray (which means
1509 * TransactionIdIsInProgress will stop saying the prepared xact is in
1510 * progress), then run the post-commit or post-abort callbacks. The
1511 * callbacks will release the locks the transaction held.
1512 */
1513 if (isCommit)
1514 RecordTransactionCommitPrepared(xid,
1515 hdr->nsubxacts, children,
1516 hdr->ncommitrels, commitrels,
1517 hdr->ninvalmsgs, invalmsgs,
1518 hdr->initfileinval);
1519 else
1520 RecordTransactionAbortPrepared(xid,
1521 hdr->nsubxacts, children,
1522 hdr->nabortrels, abortrels);
1523
1524 ProcArrayRemove(proc, latestXid);
1525
1526 /*
1527 * In case we fail while running the callbacks, mark the gxact invalid so
1528 * no one else will try to commit/rollback, and so it will be recycled if
1529 * we fail after this point. It is still locked by our backend so it
1530 * won't go away yet.
1531 *
1532 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1533 */
1534 gxact->valid = false;
1535
1536 /*
1537 * We have to remove any files that were supposed to be dropped. For
1538 * consistency with the regular xact.c code paths, must do this before
1539 * releasing locks, so do it before running the callbacks.
1540 *
1541 * NB: this code knows that we couldn't be dropping any temp rels ...
1542 */
1543 if (isCommit)
1544 {
1545 delrels = commitrels;
1546 ndelrels = hdr->ncommitrels;
1547 }
1548 else
1549 {
1550 delrels = abortrels;
1551 ndelrels = hdr->nabortrels;
1552 }
1553
1554 /* Make sure files supposed to be dropped are dropped */
1555 DropRelationFiles(delrels, ndelrels, false);
1556
1557 /*
1558 * Handle cache invalidation messages.
1559 *
1560 * Relcache init file invalidation requires processing both before and
1561 * after we send the SI messages. See AtEOXact_Inval()
1562 */
1563 if (hdr->initfileinval)
1564 RelationCacheInitFilePreInvalidate();
1565 SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1566 if (hdr->initfileinval)
1567 RelationCacheInitFilePostInvalidate();
1568
1569 /* And now do the callbacks */
1570 if (isCommit)
1571 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1572 else
1573 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1574
1575 PredicateLockTwoPhaseFinish(xid, isCommit);
1576
1577 /* Count the prepared xact as committed or aborted */
1578 AtEOXact_PgStat(isCommit, false);
1579
1580 /*
1581 * And now we can clean up any files we may have left.
1582 */
1583 if (gxact->ondisk)
1584 RemoveTwoPhaseFile(xid, true);
1585
1586 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1587 RemoveGXact(gxact);
1588 LWLockRelease(TwoPhaseStateLock);
1589 MyLockedGxact = NULL;
1590
1591 RESUME_INTERRUPTS();
1592
1593 pfree(buf);
1594 }
1595
1596 /*
1597 * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1598 */
1599 static void
ProcessRecords(char * bufptr,TransactionId xid,const TwoPhaseCallback callbacks[])1600 ProcessRecords(char *bufptr, TransactionId xid,
1601 const TwoPhaseCallback callbacks[])
1602 {
1603 for (;;)
1604 {
1605 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1606
1607 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1608 if (record->rmid == TWOPHASE_RM_END_ID)
1609 break;
1610
1611 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1612
1613 if (callbacks[record->rmid] != NULL)
1614 callbacks[record->rmid] (xid, record->info,
1615 (void *) bufptr, record->len);
1616
1617 bufptr += MAXALIGN(record->len);
1618 }
1619 }
1620
1621 /*
1622 * Remove the 2PC file for the specified XID.
1623 *
1624 * If giveWarning is false, do not complain about file-not-present;
1625 * this is an expected case during WAL replay.
1626 */
1627 static void
RemoveTwoPhaseFile(TransactionId xid,bool giveWarning)1628 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1629 {
1630 char path[MAXPGPATH];
1631
1632 TwoPhaseFilePath(path, xid);
1633 if (unlink(path))
1634 if (errno != ENOENT || giveWarning)
1635 ereport(WARNING,
1636 (errcode_for_file_access(),
1637 errmsg("could not remove two-phase state file \"%s\": %m",
1638 path)));
1639 }
1640
1641 /*
1642 * Recreates a state file. This is used in WAL replay and during
1643 * checkpoint creation.
1644 *
1645 * Note: content and len don't include CRC.
1646 */
1647 static void
RecreateTwoPhaseFile(TransactionId xid,void * content,int len)1648 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1649 {
1650 char path[MAXPGPATH];
1651 pg_crc32c statefile_crc;
1652 int fd;
1653
1654 /* Recompute CRC */
1655 INIT_CRC32C(statefile_crc);
1656 COMP_CRC32C(statefile_crc, content, len);
1657 FIN_CRC32C(statefile_crc);
1658
1659 TwoPhaseFilePath(path, xid);
1660
1661 fd = OpenTransientFile(path,
1662 O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
1663 S_IRUSR | S_IWUSR);
1664 if (fd < 0)
1665 ereport(ERROR,
1666 (errcode_for_file_access(),
1667 errmsg("could not recreate two-phase state file \"%s\": %m",
1668 path)));
1669
1670 /* Write content and CRC */
1671 errno = 0;
1672 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1673 if (write(fd, content, len) != len)
1674 {
1675 int save_errno = errno;
1676
1677 pgstat_report_wait_end();
1678 CloseTransientFile(fd);
1679
1680 /* if write didn't set errno, assume problem is no disk space */
1681 errno = save_errno ? save_errno : ENOSPC;
1682 ereport(ERROR,
1683 (errcode_for_file_access(),
1684 errmsg("could not write two-phase state file: %m")));
1685 }
1686 if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1687 {
1688 int save_errno = errno;
1689
1690 pgstat_report_wait_end();
1691 CloseTransientFile(fd);
1692
1693 /* if write didn't set errno, assume problem is no disk space */
1694 errno = save_errno ? save_errno : ENOSPC;
1695 ereport(ERROR,
1696 (errcode_for_file_access(),
1697 errmsg("could not write two-phase state file: %m")));
1698 }
1699 pgstat_report_wait_end();
1700
1701 /*
1702 * We must fsync the file because the end-of-replay checkpoint will not do
1703 * so, there being no GXACT in shared memory yet to tell it to.
1704 */
1705 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1706 if (pg_fsync(fd) != 0)
1707 {
1708 int save_errno = errno;
1709
1710 CloseTransientFile(fd);
1711 errno = save_errno;
1712 ereport(ERROR,
1713 (errcode_for_file_access(),
1714 errmsg("could not fsync two-phase state file: %m")));
1715 }
1716 pgstat_report_wait_end();
1717
1718 if (CloseTransientFile(fd) != 0)
1719 ereport(ERROR,
1720 (errcode_for_file_access(),
1721 errmsg("could not close two-phase state file: %m")));
1722 }
1723
1724 /*
1725 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1726 *
1727 * We must fsync the state file of any GXACT that is valid or has been
1728 * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1729 * horizon. (If the gxact isn't valid yet, has not been generated in
1730 * redo, or has a later LSN, this checkpoint is not responsible for
1731 * fsyncing it.)
1732 *
1733 * This is deliberately run as late as possible in the checkpoint sequence,
1734 * because GXACTs ordinarily have short lifespans, and so it is quite
1735 * possible that GXACTs that were valid at checkpoint start will no longer
1736 * exist if we wait a little bit. With typical checkpoint settings this
1737 * will be about 3 minutes for an online checkpoint, so as a result we
1738 * we expect that there will be no GXACTs that need to be copied to disk.
1739 *
1740 * If a GXACT remains valid across multiple checkpoints, it will already
1741 * be on disk so we don't bother to repeat that write.
1742 */
1743 void
CheckPointTwoPhase(XLogRecPtr redo_horizon)1744 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1745 {
1746 int i;
1747 int serialized_xacts = 0;
1748
1749 if (max_prepared_xacts <= 0)
1750 return; /* nothing to do */
1751
1752 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1753
1754 /*
1755 * We are expecting there to be zero GXACTs that need to be copied to
1756 * disk, so we perform all I/O while holding TwoPhaseStateLock for
1757 * simplicity. This prevents any new xacts from preparing while this
1758 * occurs, which shouldn't be a problem since the presence of long-lived
1759 * prepared xacts indicates the transaction manager isn't active.
1760 *
1761 * It's also possible to move I/O out of the lock, but on every error we
1762 * should check whether somebody committed our transaction in different
1763 * backend. Let's leave this optimization for future, if somebody will
1764 * spot that this place cause bottleneck.
1765 *
1766 * Note that it isn't possible for there to be a GXACT with a
1767 * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1768 * because of the efforts with delayChkpt.
1769 */
1770 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1771 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1772 {
1773 /*
1774 * Note that we are using gxact not pgxact so this works in recovery
1775 * also
1776 */
1777 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1778
1779 if ((gxact->valid || gxact->inredo) &&
1780 !gxact->ondisk &&
1781 gxact->prepare_end_lsn <= redo_horizon)
1782 {
1783 char *buf;
1784 int len;
1785
1786 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1787 RecreateTwoPhaseFile(gxact->xid, buf, len);
1788 gxact->ondisk = true;
1789 gxact->prepare_start_lsn = InvalidXLogRecPtr;
1790 gxact->prepare_end_lsn = InvalidXLogRecPtr;
1791 pfree(buf);
1792 serialized_xacts++;
1793 }
1794 }
1795 LWLockRelease(TwoPhaseStateLock);
1796
1797 /*
1798 * Flush unconditionally the parent directory to make any information
1799 * durable on disk. Two-phase files could have been removed and those
1800 * removals need to be made persistent as well as any files newly created
1801 * previously since the last checkpoint.
1802 */
1803 fsync_fname(TWOPHASE_DIR, true);
1804
1805 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1806
1807 if (log_checkpoints && serialized_xacts > 0)
1808 ereport(LOG,
1809 (errmsg_plural("%u two-phase state file was written "
1810 "for a long-running prepared transaction",
1811 "%u two-phase state files were written "
1812 "for long-running prepared transactions",
1813 serialized_xacts,
1814 serialized_xacts)));
1815 }
1816
1817 /*
1818 * restoreTwoPhaseData
1819 *
1820 * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1821 * This is called once at the beginning of recovery, saving any extra
1822 * lookups in the future. Two-phase files that are newer than the
1823 * minimum XID horizon are discarded on the way.
1824 */
1825 void
restoreTwoPhaseData(void)1826 restoreTwoPhaseData(void)
1827 {
1828 DIR *cldir;
1829 struct dirent *clde;
1830
1831 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1832 cldir = AllocateDir(TWOPHASE_DIR);
1833 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1834 {
1835 if (strlen(clde->d_name) == 8 &&
1836 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1837 {
1838 TransactionId xid;
1839 char *buf;
1840
1841 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1842
1843 buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1844 true, false, false);
1845 if (buf == NULL)
1846 continue;
1847
1848 PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
1849 }
1850 }
1851 LWLockRelease(TwoPhaseStateLock);
1852 FreeDir(cldir);
1853 }
1854
1855 /*
1856 * PrescanPreparedTransactions
1857 *
1858 * Scan the shared memory entries of TwoPhaseState and determine the range
1859 * of valid XIDs present. This is run during database startup, after we
1860 * have completed reading WAL. ShmemVariableCache->nextXid has been set to
1861 * one more than the highest XID for which evidence exists in WAL.
1862 *
1863 * We throw away any prepared xacts with main XID beyond nextXid --- if any
1864 * are present, it suggests that the DBA has done a PITR recovery to an
1865 * earlier point in time without cleaning out pg_twophase. We dare not
1866 * try to recover such prepared xacts since they likely depend on database
1867 * state that doesn't exist now.
1868 *
1869 * However, we will advance nextXid beyond any subxact XIDs belonging to
1870 * valid prepared xacts. We need to do this since subxact commit doesn't
1871 * write a WAL entry, and so there might be no evidence in WAL of those
1872 * subxact XIDs.
1873 *
1874 * Our other responsibility is to determine and return the oldest valid XID
1875 * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1876 * This is needed to synchronize pg_subtrans startup properly.
1877 *
1878 * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1879 * top-level xids is stored in *xids_p. The number of entries in the array
1880 * is returned in *nxids_p.
1881 */
1882 TransactionId
PrescanPreparedTransactions(TransactionId ** xids_p,int * nxids_p)1883 PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1884 {
1885 TransactionId origNextXid = ShmemVariableCache->nextXid;
1886 TransactionId result = origNextXid;
1887 TransactionId *xids = NULL;
1888 int nxids = 0;
1889 int allocsize = 0;
1890 int i;
1891
1892 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1893 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1894 {
1895 TransactionId xid;
1896 char *buf;
1897 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1898
1899 Assert(gxact->inredo);
1900
1901 xid = gxact->xid;
1902
1903 buf = ProcessTwoPhaseBuffer(xid,
1904 gxact->prepare_start_lsn,
1905 gxact->ondisk, false, true);
1906
1907 if (buf == NULL)
1908 continue;
1909
1910 /*
1911 * OK, we think this file is valid. Incorporate xid into the
1912 * running-minimum result.
1913 */
1914 if (TransactionIdPrecedes(xid, result))
1915 result = xid;
1916
1917 if (xids_p)
1918 {
1919 if (nxids == allocsize)
1920 {
1921 if (nxids == 0)
1922 {
1923 allocsize = 10;
1924 xids = palloc(allocsize * sizeof(TransactionId));
1925 }
1926 else
1927 {
1928 allocsize = allocsize * 2;
1929 xids = repalloc(xids, allocsize * sizeof(TransactionId));
1930 }
1931 }
1932 xids[nxids++] = xid;
1933 }
1934
1935 pfree(buf);
1936 }
1937 LWLockRelease(TwoPhaseStateLock);
1938
1939 if (xids_p)
1940 {
1941 *xids_p = xids;
1942 *nxids_p = nxids;
1943 }
1944
1945 return result;
1946 }
1947
1948 /*
1949 * StandbyRecoverPreparedTransactions
1950 *
1951 * Scan the shared memory entries of TwoPhaseState and setup all the required
1952 * information to allow standby queries to treat prepared transactions as still
1953 * active.
1954 *
1955 * This is never called at the end of recovery - we use
1956 * RecoverPreparedTransactions() at that point.
1957 *
1958 * The lack of calls to SubTransSetParent() calls here is by design;
1959 * those calls are made by RecoverPreparedTransactions() at the end of recovery
1960 * for those xacts that need this.
1961 */
1962 void
StandbyRecoverPreparedTransactions(void)1963 StandbyRecoverPreparedTransactions(void)
1964 {
1965 int i;
1966
1967 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1968 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1969 {
1970 TransactionId xid;
1971 char *buf;
1972 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1973
1974 Assert(gxact->inredo);
1975
1976 xid = gxact->xid;
1977
1978 buf = ProcessTwoPhaseBuffer(xid,
1979 gxact->prepare_start_lsn,
1980 gxact->ondisk, false, false);
1981 if (buf != NULL)
1982 pfree(buf);
1983 }
1984 LWLockRelease(TwoPhaseStateLock);
1985 }
1986
1987 /*
1988 * RecoverPreparedTransactions
1989 *
1990 * Scan the shared memory entries of TwoPhaseState and reload the state for
1991 * each prepared transaction (reacquire locks, etc).
1992 *
1993 * This is run at the end of recovery, but before we allow backends to write
1994 * WAL.
1995 *
1996 * At the end of recovery the way we take snapshots will change. We now need
1997 * to mark all running transactions with their full SubTransSetParent() info
1998 * to allow normal snapshots to work correctly if snapshots overflow.
1999 * We do this here because by definition prepared transactions are the only
2000 * type of write transaction still running, so this is necessary and
2001 * complete.
2002 */
2003 void
RecoverPreparedTransactions(void)2004 RecoverPreparedTransactions(void)
2005 {
2006 int i;
2007
2008 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2009 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2010 {
2011 TransactionId xid;
2012 char *buf;
2013 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2014 char *bufptr;
2015 TwoPhaseFileHeader *hdr;
2016 TransactionId *subxids;
2017 const char *gid;
2018
2019 xid = gxact->xid;
2020
2021 /*
2022 * Reconstruct subtrans state for the transaction --- needed because
2023 * pg_subtrans is not preserved over a restart. Note that we are
2024 * linking all the subtransactions directly to the top-level XID;
2025 * there may originally have been a more complex hierarchy, but
2026 * there's no need to restore that exactly. It's possible that
2027 * SubTransSetParent has been set before, if the prepared transaction
2028 * generated xid assignment records.
2029 */
2030 buf = ProcessTwoPhaseBuffer(xid,
2031 gxact->prepare_start_lsn,
2032 gxact->ondisk, true, false);
2033 if (buf == NULL)
2034 continue;
2035
2036 ereport(LOG,
2037 (errmsg("recovering prepared transaction %u from shared memory", xid)));
2038
2039 hdr = (TwoPhaseFileHeader *) buf;
2040 Assert(TransactionIdEquals(hdr->xid, xid));
2041 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2042 gid = (const char *) bufptr;
2043 bufptr += MAXALIGN(hdr->gidlen);
2044 subxids = (TransactionId *) bufptr;
2045 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2046 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2047 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2048 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2049
2050 /*
2051 * Recreate its GXACT and dummy PGPROC. But, check whether it was
2052 * added in redo and already has a shmem entry for it.
2053 */
2054 MarkAsPreparingGuts(gxact, xid, gid,
2055 hdr->prepared_at,
2056 hdr->owner, hdr->database);
2057
2058 /* recovered, so reset the flag for entries generated by redo */
2059 gxact->inredo = false;
2060
2061 GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2062 MarkAsPrepared(gxact, true);
2063
2064 LWLockRelease(TwoPhaseStateLock);
2065
2066 /*
2067 * Recover other state (notably locks) using resource managers.
2068 */
2069 ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2070
2071 /*
2072 * Release locks held by the standby process after we process each
2073 * prepared transaction. As a result, we don't need too many
2074 * additional locks at any one time.
2075 */
2076 if (InHotStandby)
2077 StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2078
2079 /*
2080 * We're done with recovering this transaction. Clear MyLockedGxact,
2081 * like we do in PrepareTransaction() during normal operation.
2082 */
2083 PostPrepare_Twophase();
2084
2085 pfree(buf);
2086
2087 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2088 }
2089
2090 LWLockRelease(TwoPhaseStateLock);
2091 }
2092
2093 /*
2094 * ProcessTwoPhaseBuffer
2095 *
2096 * Given a transaction id, read it either from disk or read it directly
2097 * via shmem xlog record pointer using the provided "prepare_start_lsn".
2098 *
2099 * If setParent is true, set up subtransaction parent linkages.
2100 *
2101 * If setNextXid is true, set ShmemVariableCache->nextXid to the newest
2102 * value scanned.
2103 */
2104 static char *
ProcessTwoPhaseBuffer(TransactionId xid,XLogRecPtr prepare_start_lsn,bool fromdisk,bool setParent,bool setNextXid)2105 ProcessTwoPhaseBuffer(TransactionId xid,
2106 XLogRecPtr prepare_start_lsn,
2107 bool fromdisk,
2108 bool setParent, bool setNextXid)
2109 {
2110 TransactionId origNextXid = ShmemVariableCache->nextXid;
2111 TransactionId *subxids;
2112 char *buf;
2113 TwoPhaseFileHeader *hdr;
2114 int i;
2115
2116 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2117
2118 if (!fromdisk)
2119 Assert(prepare_start_lsn != InvalidXLogRecPtr);
2120
2121 /* Already processed? */
2122 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2123 {
2124 if (fromdisk)
2125 {
2126 ereport(WARNING,
2127 (errmsg("removing stale two-phase state file for transaction %u",
2128 xid)));
2129 RemoveTwoPhaseFile(xid, true);
2130 }
2131 else
2132 {
2133 ereport(WARNING,
2134 (errmsg("removing stale two-phase state from memory for transaction %u",
2135 xid)));
2136 PrepareRedoRemove(xid, true);
2137 }
2138 return NULL;
2139 }
2140
2141 /* Reject XID if too new */
2142 if (TransactionIdFollowsOrEquals(xid, origNextXid))
2143 {
2144 if (fromdisk)
2145 {
2146 ereport(WARNING,
2147 (errmsg("removing future two-phase state file for transaction %u",
2148 xid)));
2149 RemoveTwoPhaseFile(xid, true);
2150 }
2151 else
2152 {
2153 ereport(WARNING,
2154 (errmsg("removing future two-phase state from memory for transaction %u",
2155 xid)));
2156 PrepareRedoRemove(xid, true);
2157 }
2158 return NULL;
2159 }
2160
2161 if (fromdisk)
2162 {
2163 /* Read and validate file */
2164 buf = ReadTwoPhaseFile(xid, true);
2165 if (buf == NULL)
2166 {
2167 ereport(WARNING,
2168 (errmsg("removing corrupt two-phase state file for transaction %u",
2169 xid)));
2170 RemoveTwoPhaseFile(xid, true);
2171 return NULL;
2172 }
2173 }
2174 else
2175 {
2176 /* Read xlog data */
2177 XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2178 }
2179
2180 /* Deconstruct header */
2181 hdr = (TwoPhaseFileHeader *) buf;
2182 if (!TransactionIdEquals(hdr->xid, xid))
2183 {
2184 if (fromdisk)
2185 {
2186 ereport(WARNING,
2187 (errmsg("removing corrupt two-phase state file for transaction %u",
2188 xid)));
2189 RemoveTwoPhaseFile(xid, true);
2190 }
2191 else
2192 {
2193 ereport(WARNING,
2194 (errmsg("removing corrupt two-phase state from memory for transaction %u",
2195 xid)));
2196 PrepareRedoRemove(xid, true);
2197 }
2198 pfree(buf);
2199 return NULL;
2200 }
2201
2202 /*
2203 * Examine subtransaction XIDs ... they should all follow main XID, and
2204 * they may force us to advance nextXid.
2205 */
2206 subxids = (TransactionId *) (buf +
2207 MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2208 MAXALIGN(hdr->gidlen));
2209 for (i = 0; i < hdr->nsubxacts; i++)
2210 {
2211 TransactionId subxid = subxids[i];
2212
2213 Assert(TransactionIdFollows(subxid, xid));
2214
2215 /* update nextXid if needed */
2216 if (setNextXid &&
2217 TransactionIdFollowsOrEquals(subxid,
2218 ShmemVariableCache->nextXid))
2219 {
2220 /*
2221 * We don't expect anyone else to modify nextXid, hence we don't
2222 * need to hold a lock while examining it. We still acquire the
2223 * lock to modify it, though, so we recheck.
2224 */
2225 LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
2226 if (TransactionIdFollowsOrEquals(subxid,
2227 ShmemVariableCache->nextXid))
2228 {
2229 ShmemVariableCache->nextXid = subxid;
2230 TransactionIdAdvance(ShmemVariableCache->nextXid);
2231 }
2232 LWLockRelease(XidGenLock);
2233 }
2234
2235 if (setParent)
2236 SubTransSetParent(subxid, xid);
2237 }
2238
2239 return buf;
2240 }
2241
2242
2243 /*
2244 * RecordTransactionCommitPrepared
2245 *
2246 * This is basically the same as RecordTransactionCommit (q.v. if you change
2247 * this function): in particular, we must set the delayChkpt flag to avoid a
2248 * race condition.
2249 *
2250 * We know the transaction made at least one XLOG entry (its PREPARE),
2251 * so it is never possible to optimize out the commit record.
2252 */
2253 static void
RecordTransactionCommitPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels,int ninvalmsgs,SharedInvalidationMessage * invalmsgs,bool initfileinval)2254 RecordTransactionCommitPrepared(TransactionId xid,
2255 int nchildren,
2256 TransactionId *children,
2257 int nrels,
2258 RelFileNode *rels,
2259 int ninvalmsgs,
2260 SharedInvalidationMessage *invalmsgs,
2261 bool initfileinval)
2262 {
2263 XLogRecPtr recptr;
2264 TimestampTz committs = GetCurrentTimestamp();
2265 bool replorigin;
2266
2267 /*
2268 * Are we using the replication origins feature? Or, in other words, are
2269 * we replaying remote actions?
2270 */
2271 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2272 replorigin_session_origin != DoNotReplicateId);
2273
2274 START_CRIT_SECTION();
2275
2276 /* See notes in RecordTransactionCommit */
2277 MyPgXact->delayChkpt = true;
2278
2279 /*
2280 * Emit the XLOG commit record. Note that we mark 2PC commits as
2281 * potentially having AccessExclusiveLocks since we don't know whether or
2282 * not they do.
2283 */
2284 recptr = XactLogCommitRecord(committs,
2285 nchildren, children, nrels, rels,
2286 ninvalmsgs, invalmsgs,
2287 initfileinval, false,
2288 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2289 xid);
2290
2291
2292 if (replorigin)
2293 /* Move LSNs forward for this replication origin */
2294 replorigin_session_advance(replorigin_session_origin_lsn,
2295 XactLastRecEnd);
2296
2297 /*
2298 * Record commit timestamp. The value comes from plain commit timestamp
2299 * if replorigin is not enabled, or replorigin already set a value for us
2300 * in replorigin_session_origin_timestamp otherwise.
2301 *
2302 * We don't need to WAL-log anything here, as the commit record written
2303 * above already contains the data.
2304 */
2305 if (!replorigin || replorigin_session_origin_timestamp == 0)
2306 replorigin_session_origin_timestamp = committs;
2307
2308 TransactionTreeSetCommitTsData(xid, nchildren, children,
2309 replorigin_session_origin_timestamp,
2310 replorigin_session_origin, false);
2311
2312 /*
2313 * We don't currently try to sleep before flush here ... nor is there any
2314 * support for async commit of a prepared xact (the very idea is probably
2315 * a contradiction)
2316 */
2317
2318 /* Flush XLOG to disk */
2319 XLogFlush(recptr);
2320
2321 /* Mark the transaction committed in pg_xact */
2322 TransactionIdCommitTree(xid, nchildren, children);
2323
2324 /* Checkpoint can proceed now */
2325 MyPgXact->delayChkpt = false;
2326
2327 END_CRIT_SECTION();
2328
2329 /*
2330 * Wait for synchronous replication, if required.
2331 *
2332 * Note that at this stage we have marked clog, but still show as running
2333 * in the procarray and continue to hold locks.
2334 */
2335 SyncRepWaitForLSN(recptr, true);
2336 }
2337
2338 /*
2339 * RecordTransactionAbortPrepared
2340 *
2341 * This is basically the same as RecordTransactionAbort.
2342 *
2343 * We know the transaction made at least one XLOG entry (its PREPARE),
2344 * so it is never possible to optimize out the abort record.
2345 */
2346 static void
RecordTransactionAbortPrepared(TransactionId xid,int nchildren,TransactionId * children,int nrels,RelFileNode * rels)2347 RecordTransactionAbortPrepared(TransactionId xid,
2348 int nchildren,
2349 TransactionId *children,
2350 int nrels,
2351 RelFileNode *rels)
2352 {
2353 XLogRecPtr recptr;
2354
2355 /*
2356 * Catch the scenario where we aborted partway through
2357 * RecordTransactionCommitPrepared ...
2358 */
2359 if (TransactionIdDidCommit(xid))
2360 elog(PANIC, "cannot abort transaction %u, it was already committed",
2361 xid);
2362
2363 START_CRIT_SECTION();
2364
2365 /*
2366 * Emit the XLOG commit record. Note that we mark 2PC aborts as
2367 * potentially having AccessExclusiveLocks since we don't know whether or
2368 * not they do.
2369 */
2370 recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2371 nchildren, children,
2372 nrels, rels,
2373 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2374 xid);
2375
2376 /* Always flush, since we're about to remove the 2PC state file */
2377 XLogFlush(recptr);
2378
2379 /*
2380 * Mark the transaction aborted in clog. This is not absolutely necessary
2381 * but we may as well do it while we are here.
2382 */
2383 TransactionIdAbortTree(xid, nchildren, children);
2384
2385 END_CRIT_SECTION();
2386
2387 /*
2388 * Wait for synchronous replication, if required.
2389 *
2390 * Note that at this stage we have marked clog, but still show as running
2391 * in the procarray and continue to hold locks.
2392 */
2393 SyncRepWaitForLSN(recptr, false);
2394 }
2395
2396 /*
2397 * PrepareRedoAdd
2398 *
2399 * Store pointers to the start/end of the WAL record along with the xid in
2400 * a gxact entry in shared memory TwoPhaseState structure. If caller
2401 * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2402 * data, the entry is marked as located on disk.
2403 */
2404 void
PrepareRedoAdd(char * buf,XLogRecPtr start_lsn,XLogRecPtr end_lsn)2405 PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
2406 {
2407 TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2408 char *bufptr;
2409 const char *gid;
2410 GlobalTransaction gxact;
2411
2412 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2413 Assert(RecoveryInProgress());
2414
2415 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2416 gid = (const char *) bufptr;
2417
2418 /*
2419 * Reserve the GID for the given transaction in the redo code path.
2420 *
2421 * This creates a gxact struct and puts it into the active array.
2422 *
2423 * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2424 * shared memory. Hence, we only fill up the bare minimum contents here.
2425 * The gxact also gets marked with gxact->inredo set to true to indicate
2426 * that it got added in the redo phase
2427 */
2428
2429 /* Get a free gxact from the freelist */
2430 if (TwoPhaseState->freeGXacts == NULL)
2431 ereport(ERROR,
2432 (errcode(ERRCODE_OUT_OF_MEMORY),
2433 errmsg("maximum number of prepared transactions reached"),
2434 errhint("Increase max_prepared_transactions (currently %d).",
2435 max_prepared_xacts)));
2436 gxact = TwoPhaseState->freeGXacts;
2437 TwoPhaseState->freeGXacts = gxact->next;
2438
2439 gxact->prepared_at = hdr->prepared_at;
2440 gxact->prepare_start_lsn = start_lsn;
2441 gxact->prepare_end_lsn = end_lsn;
2442 gxact->xid = hdr->xid;
2443 gxact->owner = hdr->owner;
2444 gxact->locking_backend = InvalidBackendId;
2445 gxact->valid = false;
2446 gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2447 gxact->inredo = true; /* yes, added in redo */
2448 strcpy(gxact->gid, gid);
2449
2450 /* And insert it into the active array */
2451 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2452 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2453
2454 elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2455 }
2456
2457 /*
2458 * PrepareRedoRemove
2459 *
2460 * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2461 * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2462 *
2463 * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2464 * is updated.
2465 */
2466 void
PrepareRedoRemove(TransactionId xid,bool giveWarning)2467 PrepareRedoRemove(TransactionId xid, bool giveWarning)
2468 {
2469 GlobalTransaction gxact = NULL;
2470 int i;
2471 bool found = false;
2472
2473 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2474 Assert(RecoveryInProgress());
2475
2476 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2477 {
2478 gxact = TwoPhaseState->prepXacts[i];
2479
2480 if (gxact->xid == xid)
2481 {
2482 Assert(gxact->inredo);
2483 found = true;
2484 break;
2485 }
2486 }
2487
2488 /*
2489 * Just leave if there is nothing, this is expected during WAL replay.
2490 */
2491 if (!found)
2492 return;
2493
2494 /*
2495 * And now we can clean up any files we may have left.
2496 */
2497 elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2498 if (gxact->ondisk)
2499 RemoveTwoPhaseFile(xid, giveWarning);
2500 RemoveGXact(gxact);
2501
2502 return;
2503 }
2504