1 /*-------------------------------------------------------------------------
2 *
3 * procarray.c
4 * POSTGRES process array code.
5 *
6 *
7 * This module maintains arrays of the PGPROC and PGXACT structures for all
8 * active backends. Although there are several uses for this, the principal
9 * one is as a means of determining the set of currently running transactions.
10 *
11 * Because of various subtle race conditions it is critical that a backend
12 * hold the correct locks while setting or clearing its MyPgXact->xid field.
13 * See notes in src/backend/access/transam/README.
14 *
15 * The process arrays now also include structures representing prepared
16 * transactions. The xid and subxids fields of these are valid, as are the
17 * myProcLocks lists. They can be distinguished from regular backend PGPROCs
18 * at need by checking for pid == 0.
19 *
20 * During hot standby, we also keep a list of XIDs representing transactions
21 * that are known to be running in the master (or more precisely, were running
22 * as of the current point in the WAL stream). This list is kept in the
23 * KnownAssignedXids array, and is updated by watching the sequence of
24 * arriving XIDs. This is necessary because if we leave those XIDs out of
25 * snapshots taken for standby queries, then they will appear to be already
26 * complete, leading to MVCC failures. Note that in hot standby, the PGPROC
27 * array represents standby processes, which by definition are not running
28 * transactions that have XIDs.
29 *
30 * It is perhaps possible for a backend on the master to terminate without
31 * writing an abort record for its transaction. While that shouldn't really
32 * happen, it would tie up KnownAssignedXids indefinitely, so we protect
33 * ourselves by pruning the array when a valid list of running XIDs arrives.
34 *
35 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
36 * Portions Copyright (c) 1994, Regents of the University of California
37 *
38 *
39 * IDENTIFICATION
40 * src/backend/storage/ipc/procarray.c
41 *
42 *-------------------------------------------------------------------------
43 */
44 #include "postgres.h"
45
46 #include <signal.h>
47
48 #include "access/clog.h"
49 #include "access/subtrans.h"
50 #include "access/transam.h"
51 #include "access/twophase.h"
52 #include "access/xact.h"
53 #include "access/xlog.h"
54 #include "catalog/catalog.h"
55 #include "miscadmin.h"
56 #include "pgstat.h"
57 #include "storage/proc.h"
58 #include "storage/procarray.h"
59 #include "storage/spin.h"
60 #include "utils/builtins.h"
61 #include "utils/rel.h"
62 #include "utils/snapmgr.h"
63
64
65 /* Our shared memory area */
66 typedef struct ProcArrayStruct
67 {
68 int numProcs; /* number of valid procs entries */
69 int maxProcs; /* allocated size of procs array */
70
71 /*
72 * Known assigned XIDs handling
73 */
74 int maxKnownAssignedXids; /* allocated size of array */
75 int numKnownAssignedXids; /* current # of valid entries */
76 int tailKnownAssignedXids; /* index of oldest valid element */
77 int headKnownAssignedXids; /* index of newest element, + 1 */
78 slock_t known_assigned_xids_lck; /* protects head/tail pointers */
79
80 /*
81 * Highest subxid that has been removed from KnownAssignedXids array to
82 * prevent overflow; or InvalidTransactionId if none. We track this for
83 * similar reasons to tracking overflowing cached subxids in PGXACT
84 * entries. Must hold exclusive ProcArrayLock to change this, and shared
85 * lock to read it.
86 */
87 TransactionId lastOverflowedXid;
88
89 /* oldest xmin of any replication slot */
90 TransactionId replication_slot_xmin;
91 /* oldest catalog xmin of any replication slot */
92 TransactionId replication_slot_catalog_xmin;
93
94 /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
95 int pgprocnos[FLEXIBLE_ARRAY_MEMBER];
96 } ProcArrayStruct;
97
98 static ProcArrayStruct *procArray;
99
100 static PGPROC *allProcs;
101 static PGXACT *allPgXact;
102
103 /*
104 * Bookkeeping for tracking emulated transactions in recovery
105 */
106 static TransactionId *KnownAssignedXids;
107 static bool *KnownAssignedXidsValid;
108 static TransactionId latestObservedXid = InvalidTransactionId;
109
110 /*
111 * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
112 * the highest xid that might still be running that we don't have in
113 * KnownAssignedXids.
114 */
115 static TransactionId standbySnapshotPendingXmin;
116
117 #ifdef XIDCACHE_DEBUG
118
119 /* counters for XidCache measurement */
120 static long xc_by_recent_xmin = 0;
121 static long xc_by_known_xact = 0;
122 static long xc_by_my_xact = 0;
123 static long xc_by_latest_xid = 0;
124 static long xc_by_main_xid = 0;
125 static long xc_by_child_xid = 0;
126 static long xc_by_known_assigned = 0;
127 static long xc_no_overflow = 0;
128 static long xc_slow_answer = 0;
129
130 #define xc_by_recent_xmin_inc() (xc_by_recent_xmin++)
131 #define xc_by_known_xact_inc() (xc_by_known_xact++)
132 #define xc_by_my_xact_inc() (xc_by_my_xact++)
133 #define xc_by_latest_xid_inc() (xc_by_latest_xid++)
134 #define xc_by_main_xid_inc() (xc_by_main_xid++)
135 #define xc_by_child_xid_inc() (xc_by_child_xid++)
136 #define xc_by_known_assigned_inc() (xc_by_known_assigned++)
137 #define xc_no_overflow_inc() (xc_no_overflow++)
138 #define xc_slow_answer_inc() (xc_slow_answer++)
139
140 static void DisplayXidCache(void);
141 #else /* !XIDCACHE_DEBUG */
142
143 #define xc_by_recent_xmin_inc() ((void) 0)
144 #define xc_by_known_xact_inc() ((void) 0)
145 #define xc_by_my_xact_inc() ((void) 0)
146 #define xc_by_latest_xid_inc() ((void) 0)
147 #define xc_by_main_xid_inc() ((void) 0)
148 #define xc_by_child_xid_inc() ((void) 0)
149 #define xc_by_known_assigned_inc() ((void) 0)
150 #define xc_no_overflow_inc() ((void) 0)
151 #define xc_slow_answer_inc() ((void) 0)
152 #endif /* XIDCACHE_DEBUG */
153
154 /* Primitives for KnownAssignedXids array handling for standby */
155 static void KnownAssignedXidsCompress(bool force);
156 static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
157 bool exclusive_lock);
158 static bool KnownAssignedXidsSearch(TransactionId xid, bool remove);
159 static bool KnownAssignedXidExists(TransactionId xid);
160 static void KnownAssignedXidsRemove(TransactionId xid);
161 static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
162 TransactionId *subxids);
163 static void KnownAssignedXidsRemovePreceding(TransactionId xid);
164 static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
165 static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
166 TransactionId *xmin,
167 TransactionId xmax);
168 static TransactionId KnownAssignedXidsGetOldestXmin(void);
169 static void KnownAssignedXidsDisplay(int trace_level);
170 static void KnownAssignedXidsReset(void);
171 static inline void ProcArrayEndTransactionInternal(PGPROC *proc,
172 PGXACT *pgxact, TransactionId latestXid);
173 static void ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid);
174
175 /*
176 * Report shared-memory space needed by CreateSharedProcArray.
177 */
178 Size
ProcArrayShmemSize(void)179 ProcArrayShmemSize(void)
180 {
181 Size size;
182
183 /* Size of the ProcArray structure itself */
184 #define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
185
186 size = offsetof(ProcArrayStruct, pgprocnos);
187 size = add_size(size, mul_size(sizeof(int), PROCARRAY_MAXPROCS));
188
189 /*
190 * During Hot Standby processing we have a data structure called
191 * KnownAssignedXids, created in shared memory. Local data structures are
192 * also created in various backends during GetSnapshotData(),
193 * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the
194 * main structures created in those functions must be identically sized,
195 * since we may at times copy the whole of the data structures around. We
196 * refer to this size as TOTAL_MAX_CACHED_SUBXIDS.
197 *
198 * Ideally we'd only create this structure if we were actually doing hot
199 * standby in the current run, but we don't know that yet at the time
200 * shared memory is being set up.
201 */
202 #define TOTAL_MAX_CACHED_SUBXIDS \
203 ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
204
205 if (EnableHotStandby)
206 {
207 size = add_size(size,
208 mul_size(sizeof(TransactionId),
209 TOTAL_MAX_CACHED_SUBXIDS));
210 size = add_size(size,
211 mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
212 }
213
214 return size;
215 }
216
217 /*
218 * Initialize the shared PGPROC array during postmaster startup.
219 */
220 void
CreateSharedProcArray(void)221 CreateSharedProcArray(void)
222 {
223 bool found;
224
225 /* Create or attach to the ProcArray shared structure */
226 procArray = (ProcArrayStruct *)
227 ShmemInitStruct("Proc Array",
228 add_size(offsetof(ProcArrayStruct, pgprocnos),
229 mul_size(sizeof(int),
230 PROCARRAY_MAXPROCS)),
231 &found);
232
233 if (!found)
234 {
235 /*
236 * We're the first - initialize.
237 */
238 procArray->numProcs = 0;
239 procArray->maxProcs = PROCARRAY_MAXPROCS;
240 procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
241 procArray->numKnownAssignedXids = 0;
242 procArray->tailKnownAssignedXids = 0;
243 procArray->headKnownAssignedXids = 0;
244 SpinLockInit(&procArray->known_assigned_xids_lck);
245 procArray->lastOverflowedXid = InvalidTransactionId;
246 procArray->replication_slot_xmin = InvalidTransactionId;
247 procArray->replication_slot_catalog_xmin = InvalidTransactionId;
248 }
249
250 allProcs = ProcGlobal->allProcs;
251 allPgXact = ProcGlobal->allPgXact;
252
253 /* Create or attach to the KnownAssignedXids arrays too, if needed */
254 if (EnableHotStandby)
255 {
256 KnownAssignedXids = (TransactionId *)
257 ShmemInitStruct("KnownAssignedXids",
258 mul_size(sizeof(TransactionId),
259 TOTAL_MAX_CACHED_SUBXIDS),
260 &found);
261 KnownAssignedXidsValid = (bool *)
262 ShmemInitStruct("KnownAssignedXidsValid",
263 mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
264 &found);
265 }
266
267 /* Register and initialize fields of ProcLWLockTranche */
268 LWLockRegisterTranche(LWTRANCHE_PROC, "proc");
269 }
270
271 /*
272 * Add the specified PGPROC to the shared array.
273 */
274 void
ProcArrayAdd(PGPROC * proc)275 ProcArrayAdd(PGPROC *proc)
276 {
277 ProcArrayStruct *arrayP = procArray;
278 int index;
279
280 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
281
282 if (arrayP->numProcs >= arrayP->maxProcs)
283 {
284 /*
285 * Oops, no room. (This really shouldn't happen, since there is a
286 * fixed supply of PGPROC structs too, and so we should have failed
287 * earlier.)
288 */
289 LWLockRelease(ProcArrayLock);
290 ereport(FATAL,
291 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
292 errmsg("sorry, too many clients already")));
293 }
294
295 /*
296 * Keep the procs array sorted by (PGPROC *) so that we can utilize
297 * locality of references much better. This is useful while traversing the
298 * ProcArray because there is an increased likelihood of finding the next
299 * PGPROC structure in the cache.
300 *
301 * Since the occurrence of adding/removing a proc is much lower than the
302 * access to the ProcArray itself, the overhead should be marginal
303 */
304 for (index = 0; index < arrayP->numProcs; index++)
305 {
306 /*
307 * If we are the first PGPROC or if we have found our right position
308 * in the array, break
309 */
310 if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
311 break;
312 }
313
314 memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
315 (arrayP->numProcs - index) * sizeof(int));
316 arrayP->pgprocnos[index] = proc->pgprocno;
317 arrayP->numProcs++;
318
319 LWLockRelease(ProcArrayLock);
320 }
321
322 /*
323 * Remove the specified PGPROC from the shared array.
324 *
325 * When latestXid is a valid XID, we are removing a live 2PC gxact from the
326 * array, and thus causing it to appear as "not running" anymore. In this
327 * case we must advance latestCompletedXid. (This is essentially the same
328 * as ProcArrayEndTransaction followed by removal of the PGPROC, but we take
329 * the ProcArrayLock only once, and don't damage the content of the PGPROC;
330 * twophase.c depends on the latter.)
331 */
332 void
ProcArrayRemove(PGPROC * proc,TransactionId latestXid)333 ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
334 {
335 ProcArrayStruct *arrayP = procArray;
336 int index;
337
338 #ifdef XIDCACHE_DEBUG
339 /* dump stats at backend shutdown, but not prepared-xact end */
340 if (proc->pid != 0)
341 DisplayXidCache();
342 #endif
343
344 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
345
346 if (TransactionIdIsValid(latestXid))
347 {
348 Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
349
350 /* Advance global latestCompletedXid while holding the lock */
351 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
352 latestXid))
353 ShmemVariableCache->latestCompletedXid = latestXid;
354 }
355 else
356 {
357 /* Shouldn't be trying to remove a live transaction here */
358 Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
359 }
360
361 for (index = 0; index < arrayP->numProcs; index++)
362 {
363 if (arrayP->pgprocnos[index] == proc->pgprocno)
364 {
365 /* Keep the PGPROC array sorted. See notes above */
366 memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
367 (arrayP->numProcs - index - 1) * sizeof(int));
368 arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
369 arrayP->numProcs--;
370 LWLockRelease(ProcArrayLock);
371 return;
372 }
373 }
374
375 /* Oops */
376 LWLockRelease(ProcArrayLock);
377
378 elog(LOG, "failed to find proc %p in ProcArray", proc);
379 }
380
381
382 /*
383 * ProcArrayEndTransaction -- mark a transaction as no longer running
384 *
385 * This is used interchangeably for commit and abort cases. The transaction
386 * commit/abort must already be reported to WAL and pg_xact.
387 *
388 * proc is currently always MyProc, but we pass it explicitly for flexibility.
389 * latestXid is the latest Xid among the transaction's main XID and
390 * subtransactions, or InvalidTransactionId if it has no XID. (We must ask
391 * the caller to pass latestXid, instead of computing it from the PGPROC's
392 * contents, because the subxid information in the PGPROC might be
393 * incomplete.)
394 */
395 void
ProcArrayEndTransaction(PGPROC * proc,TransactionId latestXid)396 ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
397 {
398 PGXACT *pgxact = &allPgXact[proc->pgprocno];
399
400 if (TransactionIdIsValid(latestXid))
401 {
402 /*
403 * We must lock ProcArrayLock while clearing our advertised XID, so
404 * that we do not exit the set of "running" transactions while someone
405 * else is taking a snapshot. See discussion in
406 * src/backend/access/transam/README.
407 */
408 Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
409
410 /*
411 * If we can immediately acquire ProcArrayLock, we clear our own XID
412 * and release the lock. If not, use group XID clearing to improve
413 * efficiency.
414 */
415 if (LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE))
416 {
417 ProcArrayEndTransactionInternal(proc, pgxact, latestXid);
418 LWLockRelease(ProcArrayLock);
419 }
420 else
421 ProcArrayGroupClearXid(proc, latestXid);
422 }
423 else
424 {
425 /*
426 * If we have no XID, we don't need to lock, since we won't affect
427 * anyone else's calculation of a snapshot. We might change their
428 * estimate of global xmin, but that's OK.
429 */
430 Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
431
432 proc->lxid = InvalidLocalTransactionId;
433 pgxact->xmin = InvalidTransactionId;
434 /* must be cleared with xid/xmin: */
435 pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
436 pgxact->delayChkpt = false; /* be sure this is cleared in abort */
437 proc->recoveryConflictPending = false;
438
439 Assert(pgxact->nxids == 0);
440 Assert(pgxact->overflowed == false);
441 }
442 }
443
444 /*
445 * Mark a write transaction as no longer running.
446 *
447 * We don't do any locking here; caller must handle that.
448 */
449 static inline void
ProcArrayEndTransactionInternal(PGPROC * proc,PGXACT * pgxact,TransactionId latestXid)450 ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
451 TransactionId latestXid)
452 {
453 pgxact->xid = InvalidTransactionId;
454 proc->lxid = InvalidLocalTransactionId;
455 pgxact->xmin = InvalidTransactionId;
456 /* must be cleared with xid/xmin: */
457 pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
458 pgxact->delayChkpt = false; /* be sure this is cleared in abort */
459 proc->recoveryConflictPending = false;
460
461 /* Clear the subtransaction-XID cache too while holding the lock */
462 pgxact->nxids = 0;
463 pgxact->overflowed = false;
464
465 /* Also advance global latestCompletedXid while holding the lock */
466 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
467 latestXid))
468 ShmemVariableCache->latestCompletedXid = latestXid;
469 }
470
471 /*
472 * ProcArrayGroupClearXid -- group XID clearing
473 *
474 * When we cannot immediately acquire ProcArrayLock in exclusive mode at
475 * commit time, add ourselves to a list of processes that need their XIDs
476 * cleared. The first process to add itself to the list will acquire
477 * ProcArrayLock in exclusive mode and perform ProcArrayEndTransactionInternal
478 * on behalf of all group members. This avoids a great deal of contention
479 * around ProcArrayLock when many processes are trying to commit at once,
480 * since the lock need not be repeatedly handed off from one committing
481 * process to the next.
482 */
483 static void
ProcArrayGroupClearXid(PGPROC * proc,TransactionId latestXid)484 ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid)
485 {
486 volatile PROC_HDR *procglobal = ProcGlobal;
487 uint32 nextidx;
488 uint32 wakeidx;
489
490 /* We should definitely have an XID to clear. */
491 Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
492
493 /* Add ourselves to the list of processes needing a group XID clear. */
494 proc->procArrayGroupMember = true;
495 proc->procArrayGroupMemberXid = latestXid;
496 while (true)
497 {
498 nextidx = pg_atomic_read_u32(&procglobal->procArrayGroupFirst);
499 pg_atomic_write_u32(&proc->procArrayGroupNext, nextidx);
500
501 if (pg_atomic_compare_exchange_u32(&procglobal->procArrayGroupFirst,
502 &nextidx,
503 (uint32) proc->pgprocno))
504 break;
505 }
506
507 /*
508 * If the list was not empty, the leader will clear our XID. It is
509 * impossible to have followers without a leader because the first process
510 * that has added itself to the list will always have nextidx as
511 * INVALID_PGPROCNO.
512 */
513 if (nextidx != INVALID_PGPROCNO)
514 {
515 int extraWaits = 0;
516
517 /* Sleep until the leader clears our XID. */
518 pgstat_report_wait_start(WAIT_EVENT_PROCARRAY_GROUP_UPDATE);
519 for (;;)
520 {
521 /* acts as a read barrier */
522 PGSemaphoreLock(proc->sem);
523 if (!proc->procArrayGroupMember)
524 break;
525 extraWaits++;
526 }
527 pgstat_report_wait_end();
528
529 Assert(pg_atomic_read_u32(&proc->procArrayGroupNext) == INVALID_PGPROCNO);
530
531 /* Fix semaphore count for any absorbed wakeups */
532 while (extraWaits-- > 0)
533 PGSemaphoreUnlock(proc->sem);
534 return;
535 }
536
537 /* We are the leader. Acquire the lock on behalf of everyone. */
538 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
539
540 /*
541 * Now that we've got the lock, clear the list of processes waiting for
542 * group XID clearing, saving a pointer to the head of the list. Trying
543 * to pop elements one at a time could lead to an ABA problem.
544 */
545 while (true)
546 {
547 nextidx = pg_atomic_read_u32(&procglobal->procArrayGroupFirst);
548 if (pg_atomic_compare_exchange_u32(&procglobal->procArrayGroupFirst,
549 &nextidx,
550 INVALID_PGPROCNO))
551 break;
552 }
553
554 /* Remember head of list so we can perform wakeups after dropping lock. */
555 wakeidx = nextidx;
556
557 /* Walk the list and clear all XIDs. */
558 while (nextidx != INVALID_PGPROCNO)
559 {
560 PGPROC *nextproc = &allProcs[nextidx];
561 PGXACT *pgxact = &allPgXact[nextidx];
562
563 ProcArrayEndTransactionInternal(nextproc, pgxact, nextproc->procArrayGroupMemberXid);
564
565 /* Move to next proc in list. */
566 nextidx = pg_atomic_read_u32(&nextproc->procArrayGroupNext);
567 }
568
569 /* We're done with the lock now. */
570 LWLockRelease(ProcArrayLock);
571
572 /*
573 * Now that we've released the lock, go back and wake everybody up. We
574 * don't do this under the lock so as to keep lock hold times to a
575 * minimum. The system calls we need to perform to wake other processes
576 * up are probably much slower than the simple memory writes we did while
577 * holding the lock.
578 */
579 while (wakeidx != INVALID_PGPROCNO)
580 {
581 PGPROC *nextproc = &allProcs[wakeidx];
582
583 wakeidx = pg_atomic_read_u32(&nextproc->procArrayGroupNext);
584 pg_atomic_write_u32(&nextproc->procArrayGroupNext, INVALID_PGPROCNO);
585
586 /* ensure all previous writes are visible before follower continues. */
587 pg_write_barrier();
588
589 nextproc->procArrayGroupMember = false;
590
591 if (nextproc != MyProc)
592 PGSemaphoreUnlock(nextproc->sem);
593 }
594 }
595
596 /*
597 * ProcArrayClearTransaction -- clear the transaction fields
598 *
599 * This is used after successfully preparing a 2-phase transaction. We are
600 * not actually reporting the transaction's XID as no longer running --- it
601 * will still appear as running because the 2PC's gxact is in the ProcArray
602 * too. We just have to clear out our own PGXACT.
603 */
604 void
ProcArrayClearTransaction(PGPROC * proc)605 ProcArrayClearTransaction(PGPROC *proc)
606 {
607 PGXACT *pgxact = &allPgXact[proc->pgprocno];
608
609 /*
610 * We can skip locking ProcArrayLock here, because this action does not
611 * actually change anyone's view of the set of running XIDs: our entry is
612 * duplicate with the gxact that has already been inserted into the
613 * ProcArray.
614 */
615 pgxact->xid = InvalidTransactionId;
616 proc->lxid = InvalidLocalTransactionId;
617 pgxact->xmin = InvalidTransactionId;
618 proc->recoveryConflictPending = false;
619
620 /* redundant, but just in case */
621 pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
622 pgxact->delayChkpt = false;
623
624 /* Clear the subtransaction-XID cache too */
625 pgxact->nxids = 0;
626 pgxact->overflowed = false;
627 }
628
629 /*
630 * ProcArrayInitRecovery -- initialize recovery xid mgmt environment
631 *
632 * Remember up to where the startup process initialized the CLOG and subtrans
633 * so we can ensure it's initialized gaplessly up to the point where necessary
634 * while in recovery.
635 */
636 void
ProcArrayInitRecovery(TransactionId initializedUptoXID)637 ProcArrayInitRecovery(TransactionId initializedUptoXID)
638 {
639 Assert(standbyState == STANDBY_INITIALIZED);
640 Assert(TransactionIdIsNormal(initializedUptoXID));
641
642 /*
643 * we set latestObservedXid to the xid SUBTRANS has been initialized up
644 * to, so we can extend it from that point onwards in
645 * RecordKnownAssignedTransactionIds, and when we get consistent in
646 * ProcArrayApplyRecoveryInfo().
647 */
648 latestObservedXid = initializedUptoXID;
649 TransactionIdRetreat(latestObservedXid);
650 }
651
652 /*
653 * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
654 *
655 * Takes us through 3 states: Initialized, Pending and Ready.
656 * Normal case is to go all the way to Ready straight away, though there
657 * are atypical cases where we need to take it in steps.
658 *
659 * Use the data about running transactions on master to create the initial
660 * state of KnownAssignedXids. We also use these records to regularly prune
661 * KnownAssignedXids because we know it is possible that some transactions
662 * with FATAL errors fail to write abort records, which could cause eventual
663 * overflow.
664 *
665 * See comments for LogStandbySnapshot().
666 */
667 void
ProcArrayApplyRecoveryInfo(RunningTransactions running)668 ProcArrayApplyRecoveryInfo(RunningTransactions running)
669 {
670 TransactionId *xids;
671 int nxids;
672 TransactionId nextXid;
673 int i;
674
675 Assert(standbyState >= STANDBY_INITIALIZED);
676 Assert(TransactionIdIsValid(running->nextXid));
677 Assert(TransactionIdIsValid(running->oldestRunningXid));
678 Assert(TransactionIdIsNormal(running->latestCompletedXid));
679
680 /*
681 * Remove stale transactions, if any.
682 */
683 ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
684
685 /*
686 * Remove stale locks, if any.
687 *
688 * Locks are always assigned to the toplevel xid so we don't need to care
689 * about subxcnt/subxids (and by extension not about ->suboverflowed).
690 */
691 StandbyReleaseOldLocks(running->xcnt, running->xids);
692
693 /*
694 * If our snapshot is already valid, nothing else to do...
695 */
696 if (standbyState == STANDBY_SNAPSHOT_READY)
697 return;
698
699 /*
700 * If our initial RunningTransactionsData had an overflowed snapshot then
701 * we knew we were missing some subxids from our snapshot. If we continue
702 * to see overflowed snapshots then we might never be able to start up, so
703 * we make another test to see if our snapshot is now valid. We know that
704 * the missing subxids are equal to or earlier than nextXid. After we
705 * initialise we continue to apply changes during recovery, so once the
706 * oldestRunningXid is later than the nextXid from the initial snapshot we
707 * know that we no longer have missing information and can mark the
708 * snapshot as valid.
709 */
710 if (standbyState == STANDBY_SNAPSHOT_PENDING)
711 {
712 /*
713 * If the snapshot isn't overflowed or if its empty we can reset our
714 * pending state and use this snapshot instead.
715 */
716 if (!running->subxid_overflow || running->xcnt == 0)
717 {
718 /*
719 * If we have already collected known assigned xids, we need to
720 * throw them away before we apply the recovery snapshot.
721 */
722 KnownAssignedXidsReset();
723 standbyState = STANDBY_INITIALIZED;
724 }
725 else
726 {
727 if (TransactionIdPrecedes(standbySnapshotPendingXmin,
728 running->oldestRunningXid))
729 {
730 standbyState = STANDBY_SNAPSHOT_READY;
731 elog(trace_recovery(DEBUG1),
732 "recovery snapshots are now enabled");
733 }
734 else
735 elog(trace_recovery(DEBUG1),
736 "recovery snapshot waiting for non-overflowed snapshot or "
737 "until oldest active xid on standby is at least %u (now %u)",
738 standbySnapshotPendingXmin,
739 running->oldestRunningXid);
740 return;
741 }
742 }
743
744 Assert(standbyState == STANDBY_INITIALIZED);
745
746 /*
747 * OK, we need to initialise from the RunningTransactionsData record.
748 *
749 * NB: this can be reached at least twice, so make sure new code can deal
750 * with that.
751 */
752
753 /*
754 * Nobody else is running yet, but take locks anyhow
755 */
756 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
757
758 /*
759 * KnownAssignedXids is sorted so we cannot just add the xids, we have to
760 * sort them first.
761 *
762 * Some of the new xids are top-level xids and some are subtransactions.
763 * We don't call SubtransSetParent because it doesn't matter yet. If we
764 * aren't overflowed then all xids will fit in snapshot and so we don't
765 * need subtrans. If we later overflow, an xid assignment record will add
766 * xids to subtrans. If RunningXacts is overflowed then we don't have
767 * enough information to correctly update subtrans anyway.
768 */
769
770 /*
771 * Allocate a temporary array to avoid modifying the array passed as
772 * argument.
773 */
774 xids = palloc(sizeof(TransactionId) * (running->xcnt + running->subxcnt));
775
776 /*
777 * Add to the temp array any xids which have not already completed.
778 */
779 nxids = 0;
780 for (i = 0; i < running->xcnt + running->subxcnt; i++)
781 {
782 TransactionId xid = running->xids[i];
783
784 /*
785 * The running-xacts snapshot can contain xids that were still visible
786 * in the procarray when the snapshot was taken, but were already
787 * WAL-logged as completed. They're not running anymore, so ignore
788 * them.
789 */
790 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
791 continue;
792
793 xids[nxids++] = xid;
794 }
795
796 if (nxids > 0)
797 {
798 if (procArray->numKnownAssignedXids != 0)
799 {
800 LWLockRelease(ProcArrayLock);
801 elog(ERROR, "KnownAssignedXids is not empty");
802 }
803
804 /*
805 * Sort the array so that we can add them safely into
806 * KnownAssignedXids.
807 */
808 qsort(xids, nxids, sizeof(TransactionId), xidComparator);
809
810 /*
811 * Add the sorted snapshot into KnownAssignedXids. The running-xacts
812 * snapshot may include duplicated xids because of prepared
813 * transactions, so ignore them.
814 */
815 for (i = 0; i < nxids; i++)
816 {
817 if (i > 0 && TransactionIdEquals(xids[i - 1], xids[i]))
818 {
819 elog(DEBUG1,
820 "found duplicated transaction %u for KnownAssignedXids insertion",
821 xids[i]);
822 continue;
823 }
824 KnownAssignedXidsAdd(xids[i], xids[i], true);
825 }
826
827 KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
828 }
829
830 pfree(xids);
831
832 /*
833 * latestObservedXid is at least set to the point where SUBTRANS was
834 * started up to (c.f. ProcArrayInitRecovery()) or to the biggest xid
835 * RecordKnownAssignedTransactionIds() was called for. Initialize
836 * subtrans from thereon, up to nextXid - 1.
837 *
838 * We need to duplicate parts of RecordKnownAssignedTransactionId() here,
839 * because we've just added xids to the known assigned xids machinery that
840 * haven't gone through RecordKnownAssignedTransactionId().
841 */
842 Assert(TransactionIdIsNormal(latestObservedXid));
843 TransactionIdAdvance(latestObservedXid);
844 while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
845 {
846 ExtendSUBTRANS(latestObservedXid);
847 TransactionIdAdvance(latestObservedXid);
848 }
849 TransactionIdRetreat(latestObservedXid); /* = running->nextXid - 1 */
850
851 /* ----------
852 * Now we've got the running xids we need to set the global values that
853 * are used to track snapshots as they evolve further.
854 *
855 * - latestCompletedXid which will be the xmax for snapshots
856 * - lastOverflowedXid which shows whether snapshots overflow
857 * - nextXid
858 *
859 * If the snapshot overflowed, then we still initialise with what we know,
860 * but the recovery snapshot isn't fully valid yet because we know there
861 * are some subxids missing. We don't know the specific subxids that are
862 * missing, so conservatively assume the last one is latestObservedXid.
863 * ----------
864 */
865 if (running->subxid_overflow)
866 {
867 standbyState = STANDBY_SNAPSHOT_PENDING;
868
869 standbySnapshotPendingXmin = latestObservedXid;
870 procArray->lastOverflowedXid = latestObservedXid;
871 }
872 else
873 {
874 standbyState = STANDBY_SNAPSHOT_READY;
875
876 standbySnapshotPendingXmin = InvalidTransactionId;
877 }
878
879 /*
880 * If a transaction wrote a commit record in the gap between taking and
881 * logging the snapshot then latestCompletedXid may already be higher than
882 * the value from the snapshot, so check before we use the incoming value.
883 */
884 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
885 running->latestCompletedXid))
886 ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
887
888 Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));
889
890 LWLockRelease(ProcArrayLock);
891
892 /*
893 * ShmemVariableCache->nextXid must be beyond any observed xid.
894 *
895 * We don't expect anyone else to modify nextXid, hence we don't need to
896 * hold a lock while examining it. We still acquire the lock to modify
897 * it, though.
898 */
899 nextXid = latestObservedXid;
900 TransactionIdAdvance(nextXid);
901 if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
902 {
903 LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
904 ShmemVariableCache->nextXid = nextXid;
905 LWLockRelease(XidGenLock);
906 }
907
908 Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));
909
910 KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
911 if (standbyState == STANDBY_SNAPSHOT_READY)
912 elog(trace_recovery(DEBUG1), "recovery snapshots are now enabled");
913 else
914 elog(trace_recovery(DEBUG1),
915 "recovery snapshot waiting for non-overflowed snapshot or "
916 "until oldest active xid on standby is at least %u (now %u)",
917 standbySnapshotPendingXmin,
918 running->oldestRunningXid);
919 }
920
921 /*
922 * ProcArrayApplyXidAssignment
923 * Process an XLOG_XACT_ASSIGNMENT WAL record
924 */
925 void
ProcArrayApplyXidAssignment(TransactionId topxid,int nsubxids,TransactionId * subxids)926 ProcArrayApplyXidAssignment(TransactionId topxid,
927 int nsubxids, TransactionId *subxids)
928 {
929 TransactionId max_xid;
930 int i;
931
932 Assert(standbyState >= STANDBY_INITIALIZED);
933
934 max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
935
936 /*
937 * Mark all the subtransactions as observed.
938 *
939 * NOTE: This will fail if the subxid contains too many previously
940 * unobserved xids to fit into known-assigned-xids. That shouldn't happen
941 * as the code stands, because xid-assignment records should never contain
942 * more than PGPROC_MAX_CACHED_SUBXIDS entries.
943 */
944 RecordKnownAssignedTransactionIds(max_xid);
945
946 /*
947 * Notice that we update pg_subtrans with the top-level xid, rather than
948 * the parent xid. This is a difference between normal processing and
949 * recovery, yet is still correct in all cases. The reason is that
950 * subtransaction commit is not marked in clog until commit processing, so
951 * all aborted subtransactions have already been clearly marked in clog.
952 * As a result we are able to refer directly to the top-level
953 * transaction's state rather than skipping through all the intermediate
954 * states in the subtransaction tree. This should be the first time we
955 * have attempted to SubTransSetParent().
956 */
957 for (i = 0; i < nsubxids; i++)
958 SubTransSetParent(subxids[i], topxid);
959
960 /* KnownAssignedXids isn't maintained yet, so we're done for now */
961 if (standbyState == STANDBY_INITIALIZED)
962 return;
963
964 /*
965 * Uses same locking as transaction commit
966 */
967 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
968
969 /*
970 * Remove subxids from known-assigned-xacts.
971 */
972 KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
973
974 /*
975 * Advance lastOverflowedXid to be at least the last of these subxids.
976 */
977 if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
978 procArray->lastOverflowedXid = max_xid;
979
980 LWLockRelease(ProcArrayLock);
981 }
982
983 /*
984 * TransactionIdIsInProgress -- is given transaction running in some backend
985 *
986 * Aside from some shortcuts such as checking RecentXmin and our own Xid,
987 * there are four possibilities for finding a running transaction:
988 *
989 * 1. The given Xid is a main transaction Id. We will find this out cheaply
990 * by looking at the PGXACT struct for each backend.
991 *
992 * 2. The given Xid is one of the cached subxact Xids in the PGPROC array.
993 * We can find this out cheaply too.
994 *
995 * 3. In Hot Standby mode, we must search the KnownAssignedXids list to see
996 * if the Xid is running on the master.
997 *
998 * 4. Search the SubTrans tree to find the Xid's topmost parent, and then see
999 * if that is running according to PGXACT or KnownAssignedXids. This is the
1000 * slowest way, but sadly it has to be done always if the others failed,
1001 * unless we see that the cached subxact sets are complete (none have
1002 * overflowed).
1003 *
1004 * ProcArrayLock has to be held while we do 1, 2, 3. If we save the top Xids
1005 * while doing 1 and 3, we can release the ProcArrayLock while we do 4.
1006 * This buys back some concurrency (and we can't retrieve the main Xids from
1007 * PGXACT again anyway; see GetNewTransactionId).
1008 */
1009 bool
TransactionIdIsInProgress(TransactionId xid)1010 TransactionIdIsInProgress(TransactionId xid)
1011 {
1012 static TransactionId *xids = NULL;
1013 int nxids = 0;
1014 ProcArrayStruct *arrayP = procArray;
1015 TransactionId topxid;
1016 int i,
1017 j;
1018
1019 /*
1020 * Don't bother checking a transaction older than RecentXmin; it could not
1021 * possibly still be running. (Note: in particular, this guarantees that
1022 * we reject InvalidTransactionId, FrozenTransactionId, etc as not
1023 * running.)
1024 */
1025 if (TransactionIdPrecedes(xid, RecentXmin))
1026 {
1027 xc_by_recent_xmin_inc();
1028 return false;
1029 }
1030
1031 /*
1032 * We may have just checked the status of this transaction, so if it is
1033 * already known to be completed, we can fall out without any access to
1034 * shared memory.
1035 */
1036 if (TransactionIdIsKnownCompleted(xid))
1037 {
1038 xc_by_known_xact_inc();
1039 return false;
1040 }
1041
1042 /*
1043 * Also, we can handle our own transaction (and subtransactions) without
1044 * any access to shared memory.
1045 */
1046 if (TransactionIdIsCurrentTransactionId(xid))
1047 {
1048 xc_by_my_xact_inc();
1049 return true;
1050 }
1051
1052 /*
1053 * If first time through, get workspace to remember main XIDs in. We
1054 * malloc it permanently to avoid repeated palloc/pfree overhead.
1055 */
1056 if (xids == NULL)
1057 {
1058 /*
1059 * In hot standby mode, reserve enough space to hold all xids in the
1060 * known-assigned list. If we later finish recovery, we no longer need
1061 * the bigger array, but we don't bother to shrink it.
1062 */
1063 int maxxids = RecoveryInProgress() ? TOTAL_MAX_CACHED_SUBXIDS : arrayP->maxProcs;
1064
1065 xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
1066 if (xids == NULL)
1067 ereport(ERROR,
1068 (errcode(ERRCODE_OUT_OF_MEMORY),
1069 errmsg("out of memory")));
1070 }
1071
1072 LWLockAcquire(ProcArrayLock, LW_SHARED);
1073
1074 /*
1075 * Now that we have the lock, we can check latestCompletedXid; if the
1076 * target Xid is after that, it's surely still running.
1077 */
1078 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
1079 {
1080 LWLockRelease(ProcArrayLock);
1081 xc_by_latest_xid_inc();
1082 return true;
1083 }
1084
1085 /* No shortcuts, gotta grovel through the array */
1086 for (i = 0; i < arrayP->numProcs; i++)
1087 {
1088 int pgprocno = arrayP->pgprocnos[i];
1089 volatile PGPROC *proc = &allProcs[pgprocno];
1090 volatile PGXACT *pgxact = &allPgXact[pgprocno];
1091 TransactionId pxid;
1092
1093 /* Ignore my own proc --- dealt with it above */
1094 if (proc == MyProc)
1095 continue;
1096
1097 /* Fetch xid just once - see GetNewTransactionId */
1098 pxid = pgxact->xid;
1099
1100 if (!TransactionIdIsValid(pxid))
1101 continue;
1102
1103 /*
1104 * Step 1: check the main Xid
1105 */
1106 if (TransactionIdEquals(pxid, xid))
1107 {
1108 LWLockRelease(ProcArrayLock);
1109 xc_by_main_xid_inc();
1110 return true;
1111 }
1112
1113 /*
1114 * We can ignore main Xids that are younger than the target Xid, since
1115 * the target could not possibly be their child.
1116 */
1117 if (TransactionIdPrecedes(xid, pxid))
1118 continue;
1119
1120 /*
1121 * Step 2: check the cached child-Xids arrays
1122 */
1123 for (j = pgxact->nxids - 1; j >= 0; j--)
1124 {
1125 /* Fetch xid just once - see GetNewTransactionId */
1126 TransactionId cxid = proc->subxids.xids[j];
1127
1128 if (TransactionIdEquals(cxid, xid))
1129 {
1130 LWLockRelease(ProcArrayLock);
1131 xc_by_child_xid_inc();
1132 return true;
1133 }
1134 }
1135
1136 /*
1137 * Save the main Xid for step 4. We only need to remember main Xids
1138 * that have uncached children. (Note: there is no race condition
1139 * here because the overflowed flag cannot be cleared, only set, while
1140 * we hold ProcArrayLock. So we can't miss an Xid that we need to
1141 * worry about.)
1142 */
1143 if (pgxact->overflowed)
1144 xids[nxids++] = pxid;
1145 }
1146
1147 /*
1148 * Step 3: in hot standby mode, check the known-assigned-xids list. XIDs
1149 * in the list must be treated as running.
1150 */
1151 if (RecoveryInProgress())
1152 {
1153 /* none of the PGXACT entries should have XIDs in hot standby mode */
1154 Assert(nxids == 0);
1155
1156 if (KnownAssignedXidExists(xid))
1157 {
1158 LWLockRelease(ProcArrayLock);
1159 xc_by_known_assigned_inc();
1160 return true;
1161 }
1162
1163 /*
1164 * If the KnownAssignedXids overflowed, we have to check pg_subtrans
1165 * too. Fetch all xids from KnownAssignedXids that are lower than
1166 * xid, since if xid is a subtransaction its parent will always have a
1167 * lower value. Note we will collect both main and subXIDs here, but
1168 * there's no help for it.
1169 */
1170 if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
1171 nxids = KnownAssignedXidsGet(xids, xid);
1172 }
1173
1174 LWLockRelease(ProcArrayLock);
1175
1176 /*
1177 * If none of the relevant caches overflowed, we know the Xid is not
1178 * running without even looking at pg_subtrans.
1179 */
1180 if (nxids == 0)
1181 {
1182 xc_no_overflow_inc();
1183 return false;
1184 }
1185
1186 /*
1187 * Step 4: have to check pg_subtrans.
1188 *
1189 * At this point, we know it's either a subtransaction of one of the Xids
1190 * in xids[], or it's not running. If it's an already-failed
1191 * subtransaction, we want to say "not running" even though its parent may
1192 * still be running. So first, check pg_xact to see if it's been aborted.
1193 */
1194 xc_slow_answer_inc();
1195
1196 if (TransactionIdDidAbort(xid))
1197 return false;
1198
1199 /*
1200 * It isn't aborted, so check whether the transaction tree it belongs to
1201 * is still running (or, more precisely, whether it was running when we
1202 * held ProcArrayLock).
1203 */
1204 topxid = SubTransGetTopmostTransaction(xid);
1205 Assert(TransactionIdIsValid(topxid));
1206 if (!TransactionIdEquals(topxid, xid))
1207 {
1208 for (i = 0; i < nxids; i++)
1209 {
1210 if (TransactionIdEquals(xids[i], topxid))
1211 return true;
1212 }
1213 }
1214
1215 return false;
1216 }
1217
1218 /*
1219 * TransactionIdIsActive -- is xid the top-level XID of an active backend?
1220 *
1221 * This differs from TransactionIdIsInProgress in that it ignores prepared
1222 * transactions, as well as transactions running on the master if we're in
1223 * hot standby. Also, we ignore subtransactions since that's not needed
1224 * for current uses.
1225 */
1226 bool
TransactionIdIsActive(TransactionId xid)1227 TransactionIdIsActive(TransactionId xid)
1228 {
1229 bool result = false;
1230 ProcArrayStruct *arrayP = procArray;
1231 int i;
1232
1233 /*
1234 * Don't bother checking a transaction older than RecentXmin; it could not
1235 * possibly still be running.
1236 */
1237 if (TransactionIdPrecedes(xid, RecentXmin))
1238 return false;
1239
1240 LWLockAcquire(ProcArrayLock, LW_SHARED);
1241
1242 for (i = 0; i < arrayP->numProcs; i++)
1243 {
1244 int pgprocno = arrayP->pgprocnos[i];
1245 volatile PGPROC *proc = &allProcs[pgprocno];
1246 volatile PGXACT *pgxact = &allPgXact[pgprocno];
1247 TransactionId pxid;
1248
1249 /* Fetch xid just once - see GetNewTransactionId */
1250 pxid = pgxact->xid;
1251
1252 if (!TransactionIdIsValid(pxid))
1253 continue;
1254
1255 if (proc->pid == 0)
1256 continue; /* ignore prepared transactions */
1257
1258 if (TransactionIdEquals(pxid, xid))
1259 {
1260 result = true;
1261 break;
1262 }
1263 }
1264
1265 LWLockRelease(ProcArrayLock);
1266
1267 return result;
1268 }
1269
1270
1271 /*
1272 * GetOldestXmin -- returns oldest transaction that was running
1273 * when any current transaction was started.
1274 *
1275 * If rel is NULL or a shared relation, all backends are considered, otherwise
1276 * only backends running in this database are considered.
1277 *
1278 * The flags are used to ignore the backends in calculation when any of the
1279 * corresponding flags is set. Typically, if you want to ignore ones with
1280 * PROC_IN_VACUUM flag, you can use PROCARRAY_FLAGS_VACUUM.
1281 *
1282 * PROCARRAY_SLOTS_XMIN causes GetOldestXmin to ignore the xmin and
1283 * catalog_xmin of any replication slots that exist in the system when
1284 * calculating the oldest xmin.
1285 *
1286 * This is used by VACUUM to decide which deleted tuples must be preserved in
1287 * the passed in table. For shared relations backends in all databases must be
1288 * considered, but for non-shared relations that's not required, since only
1289 * backends in my own database could ever see the tuples in them. Also, we can
1290 * ignore concurrently running lazy VACUUMs because (a) they must be working
1291 * on other tables, and (b) they don't need to do snapshot-based lookups.
1292 *
1293 * This is also used to determine where to truncate pg_subtrans. For that
1294 * backends in all databases have to be considered, so rel = NULL has to be
1295 * passed in.
1296 *
1297 * Note: we include all currently running xids in the set of considered xids.
1298 * This ensures that if a just-started xact has not yet set its snapshot,
1299 * when it does set the snapshot it cannot set xmin less than what we compute.
1300 * See notes in src/backend/access/transam/README.
1301 *
1302 * Note: despite the above, it's possible for the calculated value to move
1303 * backwards on repeated calls. The calculated value is conservative, so that
1304 * anything older is definitely not considered as running by anyone anymore,
1305 * but the exact value calculated depends on a number of things. For example,
1306 * if rel = NULL and there are no transactions running in the current
1307 * database, GetOldestXmin() returns latestCompletedXid. If a transaction
1308 * begins after that, its xmin will include in-progress transactions in other
1309 * databases that started earlier, so another call will return a lower value.
1310 * Nonetheless it is safe to vacuum a table in the current database with the
1311 * first result. There are also replication-related effects: a walsender
1312 * process can set its xmin based on transactions that are no longer running
1313 * in the master but are still being replayed on the standby, thus possibly
1314 * making the GetOldestXmin reading go backwards. In this case there is a
1315 * possibility that we lose data that the standby would like to have, but
1316 * unless the standby uses a replication slot to make its xmin persistent
1317 * there is little we can do about that --- data is only protected if the
1318 * walsender runs continuously while queries are executed on the standby.
1319 * (The Hot Standby code deals with such cases by failing standby queries
1320 * that needed to access already-removed data, so there's no integrity bug.)
1321 * The return value is also adjusted with vacuum_defer_cleanup_age, so
1322 * increasing that setting on the fly is another easy way to make
1323 * GetOldestXmin() move backwards, with no consequences for data integrity.
1324 */
1325 TransactionId
GetOldestXmin(Relation rel,int flags)1326 GetOldestXmin(Relation rel, int flags)
1327 {
1328 ProcArrayStruct *arrayP = procArray;
1329 TransactionId result;
1330 int index;
1331 bool allDbs;
1332
1333 volatile TransactionId replication_slot_xmin = InvalidTransactionId;
1334 volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1335
1336 /*
1337 * If we're not computing a relation specific limit, or if a shared
1338 * relation has been passed in, backends in all databases have to be
1339 * considered.
1340 */
1341 allDbs = rel == NULL || rel->rd_rel->relisshared;
1342
1343 /* Cannot look for individual databases during recovery */
1344 Assert(allDbs || !RecoveryInProgress());
1345
1346 LWLockAcquire(ProcArrayLock, LW_SHARED);
1347
1348 /*
1349 * We initialize the MIN() calculation with latestCompletedXid + 1. This
1350 * is a lower bound for the XIDs that might appear in the ProcArray later,
1351 * and so protects us against overestimating the result due to future
1352 * additions.
1353 */
1354 result = ShmemVariableCache->latestCompletedXid;
1355 Assert(TransactionIdIsNormal(result));
1356 TransactionIdAdvance(result);
1357
1358 for (index = 0; index < arrayP->numProcs; index++)
1359 {
1360 int pgprocno = arrayP->pgprocnos[index];
1361 volatile PGPROC *proc = &allProcs[pgprocno];
1362 volatile PGXACT *pgxact = &allPgXact[pgprocno];
1363
1364 if (pgxact->vacuumFlags & (flags & PROCARRAY_PROC_FLAGS_MASK))
1365 continue;
1366
1367 if (allDbs ||
1368 proc->databaseId == MyDatabaseId ||
1369 proc->databaseId == 0) /* always include WalSender */
1370 {
1371 /* Fetch xid just once - see GetNewTransactionId */
1372 TransactionId xid = pgxact->xid;
1373
1374 /* First consider the transaction's own Xid, if any */
1375 if (TransactionIdIsNormal(xid) &&
1376 TransactionIdPrecedes(xid, result))
1377 result = xid;
1378
1379 /*
1380 * Also consider the transaction's Xmin, if set.
1381 *
1382 * We must check both Xid and Xmin because a transaction might
1383 * have an Xmin but not (yet) an Xid; conversely, if it has an
1384 * Xid, that could determine some not-yet-set Xmin.
1385 */
1386 xid = pgxact->xmin; /* Fetch just once */
1387 if (TransactionIdIsNormal(xid) &&
1388 TransactionIdPrecedes(xid, result))
1389 result = xid;
1390 }
1391 }
1392
1393 /* fetch into volatile var while ProcArrayLock is held */
1394 replication_slot_xmin = procArray->replication_slot_xmin;
1395 replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1396
1397 if (RecoveryInProgress())
1398 {
1399 /*
1400 * Check to see whether KnownAssignedXids contains an xid value older
1401 * than the main procarray.
1402 */
1403 TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
1404
1405 LWLockRelease(ProcArrayLock);
1406
1407 if (TransactionIdIsNormal(kaxmin) &&
1408 TransactionIdPrecedes(kaxmin, result))
1409 result = kaxmin;
1410 }
1411 else
1412 {
1413 /*
1414 * No other information needed, so release the lock immediately.
1415 */
1416 LWLockRelease(ProcArrayLock);
1417
1418 /*
1419 * Compute the cutoff XID by subtracting vacuum_defer_cleanup_age,
1420 * being careful not to generate a "permanent" XID.
1421 *
1422 * vacuum_defer_cleanup_age provides some additional "slop" for the
1423 * benefit of hot standby queries on standby servers. This is quick
1424 * and dirty, and perhaps not all that useful unless the master has a
1425 * predictable transaction rate, but it offers some protection when
1426 * there's no walsender connection. Note that we are assuming
1427 * vacuum_defer_cleanup_age isn't large enough to cause wraparound ---
1428 * so guc.c should limit it to no more than the xidStopLimit threshold
1429 * in varsup.c. Also note that we intentionally don't apply
1430 * vacuum_defer_cleanup_age on standby servers.
1431 */
1432 result -= vacuum_defer_cleanup_age;
1433 if (!TransactionIdIsNormal(result))
1434 result = FirstNormalTransactionId;
1435 }
1436
1437 /*
1438 * Check whether there are replication slots requiring an older xmin.
1439 */
1440 if (!(flags & PROCARRAY_SLOTS_XMIN) &&
1441 TransactionIdIsValid(replication_slot_xmin) &&
1442 NormalTransactionIdPrecedes(replication_slot_xmin, result))
1443 result = replication_slot_xmin;
1444
1445 /*
1446 * After locks have been released and defer_cleanup_age has been applied,
1447 * check whether we need to back up further to make logical decoding
1448 * possible. We need to do so if we're computing the global limit (rel =
1449 * NULL) or if the passed relation is a catalog relation of some kind.
1450 */
1451 if (!(flags & PROCARRAY_SLOTS_XMIN) &&
1452 (rel == NULL ||
1453 RelationIsAccessibleInLogicalDecoding(rel)) &&
1454 TransactionIdIsValid(replication_slot_catalog_xmin) &&
1455 NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
1456 result = replication_slot_catalog_xmin;
1457
1458 return result;
1459 }
1460
1461 /*
1462 * GetMaxSnapshotXidCount -- get max size for snapshot XID array
1463 *
1464 * We have to export this for use by snapmgr.c.
1465 */
1466 int
GetMaxSnapshotXidCount(void)1467 GetMaxSnapshotXidCount(void)
1468 {
1469 return procArray->maxProcs;
1470 }
1471
1472 /*
1473 * GetMaxSnapshotSubxidCount -- get max size for snapshot sub-XID array
1474 *
1475 * We have to export this for use by snapmgr.c.
1476 */
1477 int
GetMaxSnapshotSubxidCount(void)1478 GetMaxSnapshotSubxidCount(void)
1479 {
1480 return TOTAL_MAX_CACHED_SUBXIDS;
1481 }
1482
1483 /*
1484 * GetSnapshotData -- returns information about running transactions.
1485 *
1486 * The returned snapshot includes xmin (lowest still-running xact ID),
1487 * xmax (highest completed xact ID + 1), and a list of running xact IDs
1488 * in the range xmin <= xid < xmax. It is used as follows:
1489 * All xact IDs < xmin are considered finished.
1490 * All xact IDs >= xmax are considered still running.
1491 * For an xact ID xmin <= xid < xmax, consult list to see whether
1492 * it is considered running or not.
1493 * This ensures that the set of transactions seen as "running" by the
1494 * current xact will not change after it takes the snapshot.
1495 *
1496 * All running top-level XIDs are included in the snapshot, except for lazy
1497 * VACUUM processes. We also try to include running subtransaction XIDs,
1498 * but since PGPROC has only a limited cache area for subxact XIDs, full
1499 * information may not be available. If we find any overflowed subxid arrays,
1500 * we have to mark the snapshot's subxid data as overflowed, and extra work
1501 * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
1502 * in tqual.c).
1503 *
1504 * We also update the following backend-global variables:
1505 * TransactionXmin: the oldest xmin of any snapshot in use in the
1506 * current transaction (this is the same as MyPgXact->xmin).
1507 * RecentXmin: the xmin computed for the most recent snapshot. XIDs
1508 * older than this are known not running any more.
1509 * RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
1510 * running transactions, except those running LAZY VACUUM). This is
1511 * the same computation done by
1512 * GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM).
1513 * RecentGlobalDataXmin: the global xmin for non-catalog tables
1514 * >= RecentGlobalXmin
1515 *
1516 * Note: this function should probably not be called with an argument that's
1517 * not statically allocated (see xip allocation below).
1518 */
1519 Snapshot
GetSnapshotData(Snapshot snapshot)1520 GetSnapshotData(Snapshot snapshot)
1521 {
1522 ProcArrayStruct *arrayP = procArray;
1523 TransactionId xmin;
1524 TransactionId xmax;
1525 TransactionId globalxmin;
1526 int index;
1527 int count = 0;
1528 int subcount = 0;
1529 bool suboverflowed = false;
1530 volatile TransactionId replication_slot_xmin = InvalidTransactionId;
1531 volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1532
1533 Assert(snapshot != NULL);
1534
1535 /*
1536 * Allocating space for maxProcs xids is usually overkill; numProcs would
1537 * be sufficient. But it seems better to do the malloc while not holding
1538 * the lock, so we can't look at numProcs. Likewise, we allocate much
1539 * more subxip storage than is probably needed.
1540 *
1541 * This does open a possibility for avoiding repeated malloc/free: since
1542 * maxProcs does not change at runtime, we can simply reuse the previous
1543 * xip arrays if any. (This relies on the fact that all callers pass
1544 * static SnapshotData structs.)
1545 */
1546 if (snapshot->xip == NULL)
1547 {
1548 /*
1549 * First call for this snapshot. Snapshot is same size whether or not
1550 * we are in recovery, see later comments.
1551 */
1552 snapshot->xip = (TransactionId *)
1553 malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
1554 if (snapshot->xip == NULL)
1555 ereport(ERROR,
1556 (errcode(ERRCODE_OUT_OF_MEMORY),
1557 errmsg("out of memory")));
1558 Assert(snapshot->subxip == NULL);
1559 snapshot->subxip = (TransactionId *)
1560 malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
1561 if (snapshot->subxip == NULL)
1562 ereport(ERROR,
1563 (errcode(ERRCODE_OUT_OF_MEMORY),
1564 errmsg("out of memory")));
1565 }
1566
1567 /*
1568 * It is sufficient to get shared lock on ProcArrayLock, even if we are
1569 * going to set MyPgXact->xmin.
1570 */
1571 LWLockAcquire(ProcArrayLock, LW_SHARED);
1572
1573 /* xmax is always latestCompletedXid + 1 */
1574 xmax = ShmemVariableCache->latestCompletedXid;
1575 Assert(TransactionIdIsNormal(xmax));
1576 TransactionIdAdvance(xmax);
1577
1578 /* initialize xmin calculation with xmax */
1579 globalxmin = xmin = xmax;
1580
1581 snapshot->takenDuringRecovery = RecoveryInProgress();
1582
1583 if (!snapshot->takenDuringRecovery)
1584 {
1585 int *pgprocnos = arrayP->pgprocnos;
1586 int numProcs;
1587
1588 /*
1589 * Spin over procArray checking xid, xmin, and subxids. The goal is
1590 * to gather all active xids, find the lowest xmin, and try to record
1591 * subxids.
1592 */
1593 numProcs = arrayP->numProcs;
1594 for (index = 0; index < numProcs; index++)
1595 {
1596 int pgprocno = pgprocnos[index];
1597 volatile PGXACT *pgxact = &allPgXact[pgprocno];
1598 TransactionId xid;
1599
1600 /*
1601 * Backend is doing logical decoding which manages xmin
1602 * separately, check below.
1603 */
1604 if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
1605 continue;
1606
1607 /* Ignore procs running LAZY VACUUM */
1608 if (pgxact->vacuumFlags & PROC_IN_VACUUM)
1609 continue;
1610
1611 /* Update globalxmin to be the smallest valid xmin */
1612 xid = pgxact->xmin; /* fetch just once */
1613 if (TransactionIdIsNormal(xid) &&
1614 NormalTransactionIdPrecedes(xid, globalxmin))
1615 globalxmin = xid;
1616
1617 /* Fetch xid just once - see GetNewTransactionId */
1618 xid = pgxact->xid;
1619
1620 /*
1621 * If the transaction has no XID assigned, we can skip it; it
1622 * won't have sub-XIDs either. If the XID is >= xmax, we can also
1623 * skip it; such transactions will be treated as running anyway
1624 * (and any sub-XIDs will also be >= xmax).
1625 */
1626 if (!TransactionIdIsNormal(xid)
1627 || !NormalTransactionIdPrecedes(xid, xmax))
1628 continue;
1629
1630 /*
1631 * We don't include our own XIDs (if any) in the snapshot, but we
1632 * must include them in xmin.
1633 */
1634 if (NormalTransactionIdPrecedes(xid, xmin))
1635 xmin = xid;
1636 if (pgxact == MyPgXact)
1637 continue;
1638
1639 /* Add XID to snapshot. */
1640 snapshot->xip[count++] = xid;
1641
1642 /*
1643 * Save subtransaction XIDs if possible (if we've already
1644 * overflowed, there's no point). Note that the subxact XIDs must
1645 * be later than their parent, so no need to check them against
1646 * xmin. We could filter against xmax, but it seems better not to
1647 * do that much work while holding the ProcArrayLock.
1648 *
1649 * The other backend can add more subxids concurrently, but cannot
1650 * remove any. Hence it's important to fetch nxids just once.
1651 * Should be safe to use memcpy, though. (We needn't worry about
1652 * missing any xids added concurrently, because they must postdate
1653 * xmax.)
1654 *
1655 * Again, our own XIDs are not included in the snapshot.
1656 */
1657 if (!suboverflowed)
1658 {
1659 if (pgxact->overflowed)
1660 suboverflowed = true;
1661 else
1662 {
1663 int nxids = pgxact->nxids;
1664
1665 if (nxids > 0)
1666 {
1667 volatile PGPROC *proc = &allProcs[pgprocno];
1668
1669 memcpy(snapshot->subxip + subcount,
1670 (void *) proc->subxids.xids,
1671 nxids * sizeof(TransactionId));
1672 subcount += nxids;
1673 }
1674 }
1675 }
1676 }
1677 }
1678 else
1679 {
1680 /*
1681 * We're in hot standby, so get XIDs from KnownAssignedXids.
1682 *
1683 * We store all xids directly into subxip[]. Here's why:
1684 *
1685 * In recovery we don't know which xids are top-level and which are
1686 * subxacts, a design choice that greatly simplifies xid processing.
1687 *
1688 * It seems like we would want to try to put xids into xip[] only, but
1689 * that is fairly small. We would either need to make that bigger or
1690 * to increase the rate at which we WAL-log xid assignment; neither is
1691 * an appealing choice.
1692 *
1693 * We could try to store xids into xip[] first and then into subxip[]
1694 * if there are too many xids. That only works if the snapshot doesn't
1695 * overflow because we do not search subxip[] in that case. A simpler
1696 * way is to just store all xids in the subxact array because this is
1697 * by far the bigger array. We just leave the xip array empty.
1698 *
1699 * Either way we need to change the way XidInMVCCSnapshot() works
1700 * depending upon when the snapshot was taken, or change normal
1701 * snapshot processing so it matches.
1702 *
1703 * Note: It is possible for recovery to end before we finish taking
1704 * the snapshot, and for newly assigned transaction ids to be added to
1705 * the ProcArray. xmax cannot change while we hold ProcArrayLock, so
1706 * those newly added transaction ids would be filtered away, so we
1707 * need not be concerned about them.
1708 */
1709 subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
1710 xmax);
1711
1712 if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
1713 suboverflowed = true;
1714 }
1715
1716
1717 /* fetch into volatile var while ProcArrayLock is held */
1718 replication_slot_xmin = procArray->replication_slot_xmin;
1719 replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1720
1721 if (!TransactionIdIsValid(MyPgXact->xmin))
1722 MyPgXact->xmin = TransactionXmin = xmin;
1723
1724 LWLockRelease(ProcArrayLock);
1725
1726 /*
1727 * Update globalxmin to include actual process xids. This is a slightly
1728 * different way of computing it than GetOldestXmin uses, but should give
1729 * the same result.
1730 */
1731 if (TransactionIdPrecedes(xmin, globalxmin))
1732 globalxmin = xmin;
1733
1734 /* Update global variables too */
1735 RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
1736 if (!TransactionIdIsNormal(RecentGlobalXmin))
1737 RecentGlobalXmin = FirstNormalTransactionId;
1738
1739 /* Check whether there's a replication slot requiring an older xmin. */
1740 if (TransactionIdIsValid(replication_slot_xmin) &&
1741 NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
1742 RecentGlobalXmin = replication_slot_xmin;
1743
1744 /* Non-catalog tables can be vacuumed if older than this xid */
1745 RecentGlobalDataXmin = RecentGlobalXmin;
1746
1747 /*
1748 * Check whether there's a replication slot requiring an older catalog
1749 * xmin.
1750 */
1751 if (TransactionIdIsNormal(replication_slot_catalog_xmin) &&
1752 NormalTransactionIdPrecedes(replication_slot_catalog_xmin, RecentGlobalXmin))
1753 RecentGlobalXmin = replication_slot_catalog_xmin;
1754
1755 RecentXmin = xmin;
1756
1757 snapshot->xmin = xmin;
1758 snapshot->xmax = xmax;
1759 snapshot->xcnt = count;
1760 snapshot->subxcnt = subcount;
1761 snapshot->suboverflowed = suboverflowed;
1762
1763 snapshot->curcid = GetCurrentCommandId(false);
1764
1765 /*
1766 * This is a new snapshot, so set both refcounts are zero, and mark it as
1767 * not copied in persistent memory.
1768 */
1769 snapshot->active_count = 0;
1770 snapshot->regd_count = 0;
1771 snapshot->copied = false;
1772
1773 if (old_snapshot_threshold < 0)
1774 {
1775 /*
1776 * If not using "snapshot too old" feature, fill related fields with
1777 * dummy values that don't require any locking.
1778 */
1779 snapshot->lsn = InvalidXLogRecPtr;
1780 snapshot->whenTaken = 0;
1781 }
1782 else
1783 {
1784 /*
1785 * Capture the current time and WAL stream location in case this
1786 * snapshot becomes old enough to need to fall back on the special
1787 * "old snapshot" logic.
1788 */
1789 snapshot->lsn = GetXLogInsertRecPtr();
1790 snapshot->whenTaken = GetSnapshotCurrentTimestamp();
1791 MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
1792 }
1793
1794 return snapshot;
1795 }
1796
1797 /*
1798 * ProcArrayInstallImportedXmin -- install imported xmin into MyPgXact->xmin
1799 *
1800 * This is called when installing a snapshot imported from another
1801 * transaction. To ensure that OldestXmin doesn't go backwards, we must
1802 * check that the source transaction is still running, and we'd better do
1803 * that atomically with installing the new xmin.
1804 *
1805 * Returns TRUE if successful, FALSE if source xact is no longer running.
1806 */
1807 bool
ProcArrayInstallImportedXmin(TransactionId xmin,VirtualTransactionId * sourcevxid)1808 ProcArrayInstallImportedXmin(TransactionId xmin,
1809 VirtualTransactionId *sourcevxid)
1810 {
1811 bool result = false;
1812 ProcArrayStruct *arrayP = procArray;
1813 int index;
1814
1815 Assert(TransactionIdIsNormal(xmin));
1816 if (!sourcevxid)
1817 return false;
1818
1819 /* Get lock so source xact can't end while we're doing this */
1820 LWLockAcquire(ProcArrayLock, LW_SHARED);
1821
1822 for (index = 0; index < arrayP->numProcs; index++)
1823 {
1824 int pgprocno = arrayP->pgprocnos[index];
1825 volatile PGPROC *proc = &allProcs[pgprocno];
1826 volatile PGXACT *pgxact = &allPgXact[pgprocno];
1827 TransactionId xid;
1828
1829 /* Ignore procs running LAZY VACUUM */
1830 if (pgxact->vacuumFlags & PROC_IN_VACUUM)
1831 continue;
1832
1833 /* We are only interested in the specific virtual transaction. */
1834 if (proc->backendId != sourcevxid->backendId)
1835 continue;
1836 if (proc->lxid != sourcevxid->localTransactionId)
1837 continue;
1838
1839 /*
1840 * We check the transaction's database ID for paranoia's sake: if it's
1841 * in another DB then its xmin does not cover us. Caller should have
1842 * detected this already, so we just treat any funny cases as
1843 * "transaction not found".
1844 */
1845 if (proc->databaseId != MyDatabaseId)
1846 continue;
1847
1848 /*
1849 * Likewise, let's just make real sure its xmin does cover us.
1850 */
1851 xid = pgxact->xmin; /* fetch just once */
1852 if (!TransactionIdIsNormal(xid) ||
1853 !TransactionIdPrecedesOrEquals(xid, xmin))
1854 continue;
1855
1856 /*
1857 * We're good. Install the new xmin. As in GetSnapshotData, set
1858 * TransactionXmin too. (Note that because snapmgr.c called
1859 * GetSnapshotData first, we'll be overwriting a valid xmin here, so
1860 * we don't check that.)
1861 */
1862 MyPgXact->xmin = TransactionXmin = xmin;
1863
1864 result = true;
1865 break;
1866 }
1867
1868 LWLockRelease(ProcArrayLock);
1869
1870 return result;
1871 }
1872
1873 /*
1874 * ProcArrayInstallRestoredXmin -- install restored xmin into MyPgXact->xmin
1875 *
1876 * This is like ProcArrayInstallImportedXmin, but we have a pointer to the
1877 * PGPROC of the transaction from which we imported the snapshot, rather than
1878 * an XID.
1879 *
1880 * Returns TRUE if successful, FALSE if source xact is no longer running.
1881 */
1882 bool
ProcArrayInstallRestoredXmin(TransactionId xmin,PGPROC * proc)1883 ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
1884 {
1885 bool result = false;
1886 TransactionId xid;
1887 volatile PGXACT *pgxact;
1888
1889 Assert(TransactionIdIsNormal(xmin));
1890 Assert(proc != NULL);
1891
1892 /* Get lock so source xact can't end while we're doing this */
1893 LWLockAcquire(ProcArrayLock, LW_SHARED);
1894
1895 pgxact = &allPgXact[proc->pgprocno];
1896
1897 /*
1898 * Be certain that the referenced PGPROC has an advertised xmin which is
1899 * no later than the one we're installing, so that the system-wide xmin
1900 * can't go backwards. Also, make sure it's running in the same database,
1901 * so that the per-database xmin cannot go backwards.
1902 */
1903 xid = pgxact->xmin; /* fetch just once */
1904 if (proc->databaseId == MyDatabaseId &&
1905 TransactionIdIsNormal(xid) &&
1906 TransactionIdPrecedesOrEquals(xid, xmin))
1907 {
1908 MyPgXact->xmin = TransactionXmin = xmin;
1909 result = true;
1910 }
1911
1912 LWLockRelease(ProcArrayLock);
1913
1914 return result;
1915 }
1916
1917 /*
1918 * GetRunningTransactionData -- returns information about running transactions.
1919 *
1920 * Similar to GetSnapshotData but returns more information. We include
1921 * all PGXACTs with an assigned TransactionId, even VACUUM processes and
1922 * prepared transactions.
1923 *
1924 * We acquire XidGenLock and ProcArrayLock, but the caller is responsible for
1925 * releasing them. Acquiring XidGenLock ensures that no new XIDs enter the proc
1926 * array until the caller has WAL-logged this snapshot, and releases the
1927 * lock. Acquiring ProcArrayLock ensures that no transactions commit until the
1928 * lock is released.
1929 *
1930 * The returned data structure is statically allocated; caller should not
1931 * modify it, and must not assume it is valid past the next call.
1932 *
1933 * This is never executed during recovery so there is no need to look at
1934 * KnownAssignedXids.
1935 *
1936 * Dummy PGXACTs from prepared transaction are included, meaning that this
1937 * may return entries with duplicated TransactionId values coming from
1938 * transaction finishing to prepare. Nothing is done about duplicated
1939 * entries here to not hold on ProcArrayLock more than necessary.
1940 *
1941 * We don't worry about updating other counters, we want to keep this as
1942 * simple as possible and leave GetSnapshotData() as the primary code for
1943 * that bookkeeping.
1944 *
1945 * Note that if any transaction has overflowed its cached subtransactions
1946 * then there is no real need include any subtransactions. That isn't a
1947 * common enough case to worry about optimising the size of the WAL record,
1948 * and we may wish to see that data for diagnostic purposes anyway.
1949 */
1950 RunningTransactions
GetRunningTransactionData(void)1951 GetRunningTransactionData(void)
1952 {
1953 /* result workspace */
1954 static RunningTransactionsData CurrentRunningXactsData;
1955
1956 ProcArrayStruct *arrayP = procArray;
1957 RunningTransactions CurrentRunningXacts = &CurrentRunningXactsData;
1958 TransactionId latestCompletedXid;
1959 TransactionId oldestRunningXid;
1960 TransactionId *xids;
1961 int index;
1962 int count;
1963 int subcount;
1964 bool suboverflowed;
1965
1966 Assert(!RecoveryInProgress());
1967
1968 /*
1969 * Allocating space for maxProcs xids is usually overkill; numProcs would
1970 * be sufficient. But it seems better to do the malloc while not holding
1971 * the lock, so we can't look at numProcs. Likewise, we allocate much
1972 * more subxip storage than is probably needed.
1973 *
1974 * Should only be allocated in bgwriter, since only ever executed during
1975 * checkpoints.
1976 */
1977 if (CurrentRunningXacts->xids == NULL)
1978 {
1979 /*
1980 * First call
1981 */
1982 CurrentRunningXacts->xids = (TransactionId *)
1983 malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
1984 if (CurrentRunningXacts->xids == NULL)
1985 ereport(ERROR,
1986 (errcode(ERRCODE_OUT_OF_MEMORY),
1987 errmsg("out of memory")));
1988 }
1989
1990 xids = CurrentRunningXacts->xids;
1991
1992 count = subcount = 0;
1993 suboverflowed = false;
1994
1995 /*
1996 * Ensure that no xids enter or leave the procarray while we obtain
1997 * snapshot.
1998 */
1999 LWLockAcquire(ProcArrayLock, LW_SHARED);
2000 LWLockAcquire(XidGenLock, LW_SHARED);
2001
2002 latestCompletedXid = ShmemVariableCache->latestCompletedXid;
2003
2004 oldestRunningXid = ShmemVariableCache->nextXid;
2005
2006 /*
2007 * Spin over procArray collecting all xids
2008 */
2009 for (index = 0; index < arrayP->numProcs; index++)
2010 {
2011 int pgprocno = arrayP->pgprocnos[index];
2012 volatile PGXACT *pgxact = &allPgXact[pgprocno];
2013 TransactionId xid;
2014
2015 /* Fetch xid just once - see GetNewTransactionId */
2016 xid = pgxact->xid;
2017
2018 /*
2019 * We don't need to store transactions that don't have a TransactionId
2020 * yet because they will not show as running on a standby server.
2021 */
2022 if (!TransactionIdIsValid(xid))
2023 continue;
2024
2025 xids[count++] = xid;
2026
2027 if (TransactionIdPrecedes(xid, oldestRunningXid))
2028 oldestRunningXid = xid;
2029
2030 if (pgxact->overflowed)
2031 suboverflowed = true;
2032 }
2033
2034 /*
2035 * Spin over procArray collecting all subxids, but only if there hasn't
2036 * been a suboverflow.
2037 */
2038 if (!suboverflowed)
2039 {
2040 for (index = 0; index < arrayP->numProcs; index++)
2041 {
2042 int pgprocno = arrayP->pgprocnos[index];
2043 volatile PGPROC *proc = &allProcs[pgprocno];
2044 volatile PGXACT *pgxact = &allPgXact[pgprocno];
2045 int nxids;
2046
2047 /*
2048 * Save subtransaction XIDs. Other backends can't add or remove
2049 * entries while we're holding XidGenLock.
2050 */
2051 nxids = pgxact->nxids;
2052 if (nxids > 0)
2053 {
2054 memcpy(&xids[count], (void *) proc->subxids.xids,
2055 nxids * sizeof(TransactionId));
2056 count += nxids;
2057 subcount += nxids;
2058
2059 /*
2060 * Top-level XID of a transaction is always less than any of
2061 * its subxids, so we don't need to check if any of the
2062 * subxids are smaller than oldestRunningXid
2063 */
2064 }
2065 }
2066 }
2067
2068 /*
2069 * It's important *not* to include the limits set by slots here because
2070 * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
2071 * were to be included here the initial value could never increase because
2072 * of a circular dependency where slots only increase their limits when
2073 * running xacts increases oldestRunningXid and running xacts only
2074 * increases if slots do.
2075 */
2076
2077 CurrentRunningXacts->xcnt = count - subcount;
2078 CurrentRunningXacts->subxcnt = subcount;
2079 CurrentRunningXacts->subxid_overflow = suboverflowed;
2080 CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
2081 CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
2082 CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
2083
2084 Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
2085 Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
2086 Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));
2087
2088 /* We don't release the locks here, the caller is responsible for that */
2089
2090 return CurrentRunningXacts;
2091 }
2092
2093 /*
2094 * GetOldestActiveTransactionId()
2095 *
2096 * Similar to GetSnapshotData but returns just oldestActiveXid. We include
2097 * all PGXACTs with an assigned TransactionId, even VACUUM processes.
2098 * We look at all databases, though there is no need to include WALSender
2099 * since this has no effect on hot standby conflicts.
2100 *
2101 * This is never executed during recovery so there is no need to look at
2102 * KnownAssignedXids.
2103 *
2104 * We don't worry about updating other counters, we want to keep this as
2105 * simple as possible and leave GetSnapshotData() as the primary code for
2106 * that bookkeeping.
2107 */
2108 TransactionId
GetOldestActiveTransactionId(void)2109 GetOldestActiveTransactionId(void)
2110 {
2111 ProcArrayStruct *arrayP = procArray;
2112 TransactionId oldestRunningXid;
2113 int index;
2114
2115 Assert(!RecoveryInProgress());
2116
2117 /*
2118 * Read nextXid, as the upper bound of what's still active.
2119 *
2120 * Reading a TransactionId is atomic, but we must grab the lock to make
2121 * sure that all XIDs < nextXid are already present in the proc array (or
2122 * have already completed), when we spin over it.
2123 */
2124 LWLockAcquire(XidGenLock, LW_SHARED);
2125 oldestRunningXid = ShmemVariableCache->nextXid;
2126 LWLockRelease(XidGenLock);
2127
2128 /*
2129 * Spin over procArray collecting all xids and subxids.
2130 */
2131 LWLockAcquire(ProcArrayLock, LW_SHARED);
2132 for (index = 0; index < arrayP->numProcs; index++)
2133 {
2134 int pgprocno = arrayP->pgprocnos[index];
2135 volatile PGXACT *pgxact = &allPgXact[pgprocno];
2136 TransactionId xid;
2137
2138 /* Fetch xid just once - see GetNewTransactionId */
2139 xid = pgxact->xid;
2140
2141 if (!TransactionIdIsNormal(xid))
2142 continue;
2143
2144 if (TransactionIdPrecedes(xid, oldestRunningXid))
2145 oldestRunningXid = xid;
2146
2147 /*
2148 * Top-level XID of a transaction is always less than any of its
2149 * subxids, so we don't need to check if any of the subxids are
2150 * smaller than oldestRunningXid
2151 */
2152 }
2153 LWLockRelease(ProcArrayLock);
2154
2155 return oldestRunningXid;
2156 }
2157
2158 /*
2159 * GetOldestSafeDecodingTransactionId -- lowest xid not affected by vacuum
2160 *
2161 * Returns the oldest xid that we can guarantee not to have been affected by
2162 * vacuum, i.e. no rows >= that xid have been vacuumed away unless the
2163 * transaction aborted. Note that the value can (and most of the time will) be
2164 * much more conservative than what really has been affected by vacuum, but we
2165 * currently don't have better data available.
2166 *
2167 * This is useful to initialize the cutoff xid after which a new changeset
2168 * extraction replication slot can start decoding changes.
2169 *
2170 * Must be called with ProcArrayLock held either shared or exclusively,
2171 * although most callers will want to use exclusive mode since it is expected
2172 * that the caller will immediately use the xid to peg the xmin horizon.
2173 */
2174 TransactionId
GetOldestSafeDecodingTransactionId(bool catalogOnly)2175 GetOldestSafeDecodingTransactionId(bool catalogOnly)
2176 {
2177 ProcArrayStruct *arrayP = procArray;
2178 TransactionId oldestSafeXid;
2179 int index;
2180 bool recovery_in_progress = RecoveryInProgress();
2181
2182 Assert(LWLockHeldByMe(ProcArrayLock));
2183
2184 /*
2185 * Acquire XidGenLock, so no transactions can acquire an xid while we're
2186 * running. If no transaction with xid were running concurrently a new xid
2187 * could influence the RecentXmin et al.
2188 *
2189 * We initialize the computation to nextXid since that's guaranteed to be
2190 * a safe, albeit pessimal, value.
2191 */
2192 LWLockAcquire(XidGenLock, LW_SHARED);
2193 oldestSafeXid = ShmemVariableCache->nextXid;
2194
2195 /*
2196 * If there's already a slot pegging the xmin horizon, we can start with
2197 * that value, it's guaranteed to be safe since it's computed by this
2198 * routine initially and has been enforced since. We can always use the
2199 * slot's general xmin horizon, but the catalog horizon is only usable
2200 * when we only catalog data is going to be looked at.
2201 */
2202 if (TransactionIdIsValid(procArray->replication_slot_xmin) &&
2203 TransactionIdPrecedes(procArray->replication_slot_xmin,
2204 oldestSafeXid))
2205 oldestSafeXid = procArray->replication_slot_xmin;
2206
2207 if (catalogOnly &&
2208 TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
2209 TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
2210 oldestSafeXid))
2211 oldestSafeXid = procArray->replication_slot_catalog_xmin;
2212
2213 /*
2214 * If we're not in recovery, we walk over the procarray and collect the
2215 * lowest xid. Since we're called with ProcArrayLock held and have
2216 * acquired XidGenLock, no entries can vanish concurrently, since
2217 * PGXACT->xid is only set with XidGenLock held and only cleared with
2218 * ProcArrayLock held.
2219 *
2220 * In recovery we can't lower the safe value besides what we've computed
2221 * above, so we'll have to wait a bit longer there. We unfortunately can
2222 * *not* use KnownAssignedXidsGetOldestXmin() since the KnownAssignedXids
2223 * machinery can miss values and return an older value than is safe.
2224 */
2225 if (!recovery_in_progress)
2226 {
2227 /*
2228 * Spin over procArray collecting all min(PGXACT->xid)
2229 */
2230 for (index = 0; index < arrayP->numProcs; index++)
2231 {
2232 int pgprocno = arrayP->pgprocnos[index];
2233 volatile PGXACT *pgxact = &allPgXact[pgprocno];
2234 TransactionId xid;
2235
2236 /* Fetch xid just once - see GetNewTransactionId */
2237 xid = pgxact->xid;
2238
2239 if (!TransactionIdIsNormal(xid))
2240 continue;
2241
2242 if (TransactionIdPrecedes(xid, oldestSafeXid))
2243 oldestSafeXid = xid;
2244 }
2245 }
2246
2247 LWLockRelease(XidGenLock);
2248
2249 return oldestSafeXid;
2250 }
2251
2252 /*
2253 * GetVirtualXIDsDelayingChkpt -- Get the VXIDs of transactions that are
2254 * delaying checkpoint because they have critical actions in progress.
2255 *
2256 * Constructs an array of VXIDs of transactions that are currently in commit
2257 * critical sections, as shown by having delayChkpt set in their PGXACT.
2258 *
2259 * Returns a palloc'd array that should be freed by the caller.
2260 * *nvxids is the number of valid entries.
2261 *
2262 * Note that because backends set or clear delayChkpt without holding any lock,
2263 * the result is somewhat indeterminate, but we don't really care. Even in
2264 * a multiprocessor with delayed writes to shared memory, it should be certain
2265 * that setting of delayChkpt will propagate to shared memory when the backend
2266 * takes a lock, so we cannot fail to see a virtual xact as delayChkpt if
2267 * it's already inserted its commit record. Whether it takes a little while
2268 * for clearing of delayChkpt to propagate is unimportant for correctness.
2269 */
2270 VirtualTransactionId *
GetVirtualXIDsDelayingChkpt(int * nvxids)2271 GetVirtualXIDsDelayingChkpt(int *nvxids)
2272 {
2273 VirtualTransactionId *vxids;
2274 ProcArrayStruct *arrayP = procArray;
2275 int count = 0;
2276 int index;
2277
2278 /* allocate what's certainly enough result space */
2279 vxids = (VirtualTransactionId *)
2280 palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2281
2282 LWLockAcquire(ProcArrayLock, LW_SHARED);
2283
2284 for (index = 0; index < arrayP->numProcs; index++)
2285 {
2286 int pgprocno = arrayP->pgprocnos[index];
2287 volatile PGPROC *proc = &allProcs[pgprocno];
2288 volatile PGXACT *pgxact = &allPgXact[pgprocno];
2289
2290 if (pgxact->delayChkpt)
2291 {
2292 VirtualTransactionId vxid;
2293
2294 GET_VXID_FROM_PGPROC(vxid, *proc);
2295 if (VirtualTransactionIdIsValid(vxid))
2296 vxids[count++] = vxid;
2297 }
2298 }
2299
2300 LWLockRelease(ProcArrayLock);
2301
2302 *nvxids = count;
2303 return vxids;
2304 }
2305
2306 /*
2307 * HaveVirtualXIDsDelayingChkpt -- Are any of the specified VXIDs delaying?
2308 *
2309 * This is used with the results of GetVirtualXIDsDelayingChkpt to see if any
2310 * of the specified VXIDs are still in critical sections of code.
2311 *
2312 * Note: this is O(N^2) in the number of vxacts that are/were delaying, but
2313 * those numbers should be small enough for it not to be a problem.
2314 */
2315 bool
HaveVirtualXIDsDelayingChkpt(VirtualTransactionId * vxids,int nvxids)2316 HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids)
2317 {
2318 bool result = false;
2319 ProcArrayStruct *arrayP = procArray;
2320 int index;
2321
2322 LWLockAcquire(ProcArrayLock, LW_SHARED);
2323
2324 for (index = 0; index < arrayP->numProcs; index++)
2325 {
2326 int pgprocno = arrayP->pgprocnos[index];
2327 volatile PGPROC *proc = &allProcs[pgprocno];
2328 volatile PGXACT *pgxact = &allPgXact[pgprocno];
2329 VirtualTransactionId vxid;
2330
2331 GET_VXID_FROM_PGPROC(vxid, *proc);
2332
2333 if (pgxact->delayChkpt && VirtualTransactionIdIsValid(vxid))
2334 {
2335 int i;
2336
2337 for (i = 0; i < nvxids; i++)
2338 {
2339 if (VirtualTransactionIdEquals(vxid, vxids[i]))
2340 {
2341 result = true;
2342 break;
2343 }
2344 }
2345 if (result)
2346 break;
2347 }
2348 }
2349
2350 LWLockRelease(ProcArrayLock);
2351
2352 return result;
2353 }
2354
2355 /*
2356 * BackendPidGetProc -- get a backend's PGPROC given its PID
2357 *
2358 * Returns NULL if not found. Note that it is up to the caller to be
2359 * sure that the question remains meaningful for long enough for the
2360 * answer to be used ...
2361 */
2362 PGPROC *
BackendPidGetProc(int pid)2363 BackendPidGetProc(int pid)
2364 {
2365 PGPROC *result;
2366
2367 if (pid == 0) /* never match dummy PGPROCs */
2368 return NULL;
2369
2370 LWLockAcquire(ProcArrayLock, LW_SHARED);
2371
2372 result = BackendPidGetProcWithLock(pid);
2373
2374 LWLockRelease(ProcArrayLock);
2375
2376 return result;
2377 }
2378
2379 /*
2380 * BackendPidGetProcWithLock -- get a backend's PGPROC given its PID
2381 *
2382 * Same as above, except caller must be holding ProcArrayLock. The found
2383 * entry, if any, can be assumed to be valid as long as the lock remains held.
2384 */
2385 PGPROC *
BackendPidGetProcWithLock(int pid)2386 BackendPidGetProcWithLock(int pid)
2387 {
2388 PGPROC *result = NULL;
2389 ProcArrayStruct *arrayP = procArray;
2390 int index;
2391
2392 if (pid == 0) /* never match dummy PGPROCs */
2393 return NULL;
2394
2395 for (index = 0; index < arrayP->numProcs; index++)
2396 {
2397 PGPROC *proc = &allProcs[arrayP->pgprocnos[index]];
2398
2399 if (proc->pid == pid)
2400 {
2401 result = proc;
2402 break;
2403 }
2404 }
2405
2406 return result;
2407 }
2408
2409 /*
2410 * BackendXidGetPid -- get a backend's pid given its XID
2411 *
2412 * Returns 0 if not found or it's a prepared transaction. Note that
2413 * it is up to the caller to be sure that the question remains
2414 * meaningful for long enough for the answer to be used ...
2415 *
2416 * Only main transaction Ids are considered. This function is mainly
2417 * useful for determining what backend owns a lock.
2418 *
2419 * Beware that not every xact has an XID assigned. However, as long as you
2420 * only call this using an XID found on disk, you're safe.
2421 */
2422 int
BackendXidGetPid(TransactionId xid)2423 BackendXidGetPid(TransactionId xid)
2424 {
2425 int result = 0;
2426 ProcArrayStruct *arrayP = procArray;
2427 int index;
2428
2429 if (xid == InvalidTransactionId) /* never match invalid xid */
2430 return 0;
2431
2432 LWLockAcquire(ProcArrayLock, LW_SHARED);
2433
2434 for (index = 0; index < arrayP->numProcs; index++)
2435 {
2436 int pgprocno = arrayP->pgprocnos[index];
2437 volatile PGPROC *proc = &allProcs[pgprocno];
2438 volatile PGXACT *pgxact = &allPgXact[pgprocno];
2439
2440 if (pgxact->xid == xid)
2441 {
2442 result = proc->pid;
2443 break;
2444 }
2445 }
2446
2447 LWLockRelease(ProcArrayLock);
2448
2449 return result;
2450 }
2451
2452 /*
2453 * IsBackendPid -- is a given pid a running backend
2454 *
2455 * This is not called by the backend, but is called by external modules.
2456 */
2457 bool
IsBackendPid(int pid)2458 IsBackendPid(int pid)
2459 {
2460 return (BackendPidGetProc(pid) != NULL);
2461 }
2462
2463
2464 /*
2465 * GetCurrentVirtualXIDs -- returns an array of currently active VXIDs.
2466 *
2467 * The array is palloc'd. The number of valid entries is returned into *nvxids.
2468 *
2469 * The arguments allow filtering the set of VXIDs returned. Our own process
2470 * is always skipped. In addition:
2471 * If limitXmin is not InvalidTransactionId, skip processes with
2472 * xmin > limitXmin.
2473 * If excludeXmin0 is true, skip processes with xmin = 0.
2474 * If allDbs is false, skip processes attached to other databases.
2475 * If excludeVacuum isn't zero, skip processes for which
2476 * (vacuumFlags & excludeVacuum) is not zero.
2477 *
2478 * Note: the purpose of the limitXmin and excludeXmin0 parameters is to
2479 * allow skipping backends whose oldest live snapshot is no older than
2480 * some snapshot we have. Since we examine the procarray with only shared
2481 * lock, there are race conditions: a backend could set its xmin just after
2482 * we look. Indeed, on multiprocessors with weak memory ordering, the
2483 * other backend could have set its xmin *before* we look. We know however
2484 * that such a backend must have held shared ProcArrayLock overlapping our
2485 * own hold of ProcArrayLock, else we would see its xmin update. Therefore,
2486 * any snapshot the other backend is taking concurrently with our scan cannot
2487 * consider any transactions as still running that we think are committed
2488 * (since backends must hold ProcArrayLock exclusive to commit).
2489 */
2490 VirtualTransactionId *
GetCurrentVirtualXIDs(TransactionId limitXmin,bool excludeXmin0,bool allDbs,int excludeVacuum,int * nvxids)2491 GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
2492 bool allDbs, int excludeVacuum,
2493 int *nvxids)
2494 {
2495 VirtualTransactionId *vxids;
2496 ProcArrayStruct *arrayP = procArray;
2497 int count = 0;
2498 int index;
2499
2500 /* allocate what's certainly enough result space */
2501 vxids = (VirtualTransactionId *)
2502 palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2503
2504 LWLockAcquire(ProcArrayLock, LW_SHARED);
2505
2506 for (index = 0; index < arrayP->numProcs; index++)
2507 {
2508 int pgprocno = arrayP->pgprocnos[index];
2509 volatile PGPROC *proc = &allProcs[pgprocno];
2510 volatile PGXACT *pgxact = &allPgXact[pgprocno];
2511
2512 if (proc == MyProc)
2513 continue;
2514
2515 if (excludeVacuum & pgxact->vacuumFlags)
2516 continue;
2517
2518 if (allDbs || proc->databaseId == MyDatabaseId)
2519 {
2520 /* Fetch xmin just once - might change on us */
2521 TransactionId pxmin = pgxact->xmin;
2522
2523 if (excludeXmin0 && !TransactionIdIsValid(pxmin))
2524 continue;
2525
2526 /*
2527 * InvalidTransactionId precedes all other XIDs, so a proc that
2528 * hasn't set xmin yet will not be rejected by this test.
2529 */
2530 if (!TransactionIdIsValid(limitXmin) ||
2531 TransactionIdPrecedesOrEquals(pxmin, limitXmin))
2532 {
2533 VirtualTransactionId vxid;
2534
2535 GET_VXID_FROM_PGPROC(vxid, *proc);
2536 if (VirtualTransactionIdIsValid(vxid))
2537 vxids[count++] = vxid;
2538 }
2539 }
2540 }
2541
2542 LWLockRelease(ProcArrayLock);
2543
2544 *nvxids = count;
2545 return vxids;
2546 }
2547
2548 /*
2549 * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
2550 *
2551 * Usage is limited to conflict resolution during recovery on standby servers.
2552 * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
2553 * in cases where we cannot accurately determine a value for latestRemovedXid.
2554 *
2555 * If limitXmin is InvalidTransactionId then we want to kill everybody,
2556 * so we're not worried if they have a snapshot or not, nor does it really
2557 * matter what type of lock we hold.
2558 *
2559 * All callers that are checking xmins always now supply a valid and useful
2560 * value for limitXmin. The limitXmin is always lower than the lowest
2561 * numbered KnownAssignedXid that is not already a FATAL error. This is
2562 * because we only care about cleanup records that are cleaning up tuple
2563 * versions from committed transactions. In that case they will only occur
2564 * at the point where the record is less than the lowest running xid. That
2565 * allows us to say that if any backend takes a snapshot concurrently with
2566 * us then the conflict assessment made here would never include the snapshot
2567 * that is being derived. So we take LW_SHARED on the ProcArray and allow
2568 * concurrent snapshots when limitXmin is valid. We might think about adding
2569 * Assert(limitXmin < lowest(KnownAssignedXids))
2570 * but that would not be true in the case of FATAL errors lagging in array,
2571 * but we already know those are bogus anyway, so we skip that test.
2572 *
2573 * If dbOid is valid we skip backends attached to other databases.
2574 *
2575 * Be careful to *not* pfree the result from this function. We reuse
2576 * this array sufficiently often that we use malloc for the result.
2577 */
2578 VirtualTransactionId *
GetConflictingVirtualXIDs(TransactionId limitXmin,Oid dbOid)2579 GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
2580 {
2581 static VirtualTransactionId *vxids;
2582 ProcArrayStruct *arrayP = procArray;
2583 int count = 0;
2584 int index;
2585
2586 /*
2587 * If first time through, get workspace to remember main XIDs in. We
2588 * malloc it permanently to avoid repeated palloc/pfree overhead. Allow
2589 * result space, remembering room for a terminator.
2590 */
2591 if (vxids == NULL)
2592 {
2593 vxids = (VirtualTransactionId *)
2594 malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
2595 if (vxids == NULL)
2596 ereport(ERROR,
2597 (errcode(ERRCODE_OUT_OF_MEMORY),
2598 errmsg("out of memory")));
2599 }
2600
2601 LWLockAcquire(ProcArrayLock, LW_SHARED);
2602
2603 for (index = 0; index < arrayP->numProcs; index++)
2604 {
2605 int pgprocno = arrayP->pgprocnos[index];
2606 volatile PGPROC *proc = &allProcs[pgprocno];
2607 volatile PGXACT *pgxact = &allPgXact[pgprocno];
2608
2609 /* Exclude prepared transactions */
2610 if (proc->pid == 0)
2611 continue;
2612
2613 if (!OidIsValid(dbOid) ||
2614 proc->databaseId == dbOid)
2615 {
2616 /* Fetch xmin just once - can't change on us, but good coding */
2617 TransactionId pxmin = pgxact->xmin;
2618
2619 /*
2620 * We ignore an invalid pxmin because this means that backend has
2621 * no snapshot currently. We hold a Share lock to avoid contention
2622 * with users taking snapshots. That is not a problem because the
2623 * current xmin is always at least one higher than the latest
2624 * removed xid, so any new snapshot would never conflict with the
2625 * test here.
2626 */
2627 if (!TransactionIdIsValid(limitXmin) ||
2628 (TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
2629 {
2630 VirtualTransactionId vxid;
2631
2632 GET_VXID_FROM_PGPROC(vxid, *proc);
2633 if (VirtualTransactionIdIsValid(vxid))
2634 vxids[count++] = vxid;
2635 }
2636 }
2637 }
2638
2639 LWLockRelease(ProcArrayLock);
2640
2641 /* add the terminator */
2642 vxids[count].backendId = InvalidBackendId;
2643 vxids[count].localTransactionId = InvalidLocalTransactionId;
2644
2645 return vxids;
2646 }
2647
2648 /*
2649 * CancelVirtualTransaction - used in recovery conflict processing
2650 *
2651 * Returns pid of the process signaled, or 0 if not found.
2652 */
2653 pid_t
CancelVirtualTransaction(VirtualTransactionId vxid,ProcSignalReason sigmode)2654 CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
2655 {
2656 return SignalVirtualTransaction(vxid, sigmode, true);
2657 }
2658
2659 pid_t
SignalVirtualTransaction(VirtualTransactionId vxid,ProcSignalReason sigmode,bool conflictPending)2660 SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode,
2661 bool conflictPending)
2662 {
2663 ProcArrayStruct *arrayP = procArray;
2664 int index;
2665 pid_t pid = 0;
2666
2667 LWLockAcquire(ProcArrayLock, LW_SHARED);
2668
2669 for (index = 0; index < arrayP->numProcs; index++)
2670 {
2671 int pgprocno = arrayP->pgprocnos[index];
2672 volatile PGPROC *proc = &allProcs[pgprocno];
2673 VirtualTransactionId procvxid;
2674
2675 GET_VXID_FROM_PGPROC(procvxid, *proc);
2676
2677 if (procvxid.backendId == vxid.backendId &&
2678 procvxid.localTransactionId == vxid.localTransactionId)
2679 {
2680 proc->recoveryConflictPending = conflictPending;
2681 pid = proc->pid;
2682 if (pid != 0)
2683 {
2684 /*
2685 * Kill the pid if it's still here. If not, that's what we
2686 * wanted so ignore any errors.
2687 */
2688 (void) SendProcSignal(pid, sigmode, vxid.backendId);
2689 }
2690 break;
2691 }
2692 }
2693
2694 LWLockRelease(ProcArrayLock);
2695
2696 return pid;
2697 }
2698
2699 /*
2700 * MinimumActiveBackends --- count backends (other than myself) that are
2701 * in active transactions. Return true if the count exceeds the
2702 * minimum threshold passed. This is used as a heuristic to decide if
2703 * a pre-XLOG-flush delay is worthwhile during commit.
2704 *
2705 * Do not count backends that are blocked waiting for locks, since they are
2706 * not going to get to run until someone else commits.
2707 */
2708 bool
MinimumActiveBackends(int min)2709 MinimumActiveBackends(int min)
2710 {
2711 ProcArrayStruct *arrayP = procArray;
2712 int count = 0;
2713 int index;
2714
2715 /* Quick short-circuit if no minimum is specified */
2716 if (min == 0)
2717 return true;
2718
2719 /*
2720 * Note: for speed, we don't acquire ProcArrayLock. This is a little bit
2721 * bogus, but since we are only testing fields for zero or nonzero, it
2722 * should be OK. The result is only used for heuristic purposes anyway...
2723 */
2724 for (index = 0; index < arrayP->numProcs; index++)
2725 {
2726 int pgprocno = arrayP->pgprocnos[index];
2727 volatile PGPROC *proc = &allProcs[pgprocno];
2728 volatile PGXACT *pgxact = &allPgXact[pgprocno];
2729
2730 /*
2731 * Since we're not holding a lock, need to be prepared to deal with
2732 * garbage, as someone could have incremented numProcs but not yet
2733 * filled the structure.
2734 *
2735 * If someone just decremented numProcs, 'proc' could also point to a
2736 * PGPROC entry that's no longer in the array. It still points to a
2737 * PGPROC struct, though, because freed PGPROC entries just go to the
2738 * free list and are recycled. Its contents are nonsense in that case,
2739 * but that's acceptable for this function.
2740 */
2741 if (pgprocno == -1)
2742 continue; /* do not count deleted entries */
2743 if (proc == MyProc)
2744 continue; /* do not count myself */
2745 if (pgxact->xid == InvalidTransactionId)
2746 continue; /* do not count if no XID assigned */
2747 if (proc->pid == 0)
2748 continue; /* do not count prepared xacts */
2749 if (proc->waitLock != NULL)
2750 continue; /* do not count if blocked on a lock */
2751 count++;
2752 if (count >= min)
2753 break;
2754 }
2755
2756 return count >= min;
2757 }
2758
2759 /*
2760 * CountDBBackends --- count backends that are using specified database
2761 */
2762 int
CountDBBackends(Oid databaseid)2763 CountDBBackends(Oid databaseid)
2764 {
2765 ProcArrayStruct *arrayP = procArray;
2766 int count = 0;
2767 int index;
2768
2769 LWLockAcquire(ProcArrayLock, LW_SHARED);
2770
2771 for (index = 0; index < arrayP->numProcs; index++)
2772 {
2773 int pgprocno = arrayP->pgprocnos[index];
2774 volatile PGPROC *proc = &allProcs[pgprocno];
2775
2776 if (proc->pid == 0)
2777 continue; /* do not count prepared xacts */
2778 if (!OidIsValid(databaseid) ||
2779 proc->databaseId == databaseid)
2780 count++;
2781 }
2782
2783 LWLockRelease(ProcArrayLock);
2784
2785 return count;
2786 }
2787
2788 /*
2789 * CountDBConnections --- counts database backends ignoring any background
2790 * worker processes
2791 */
2792 int
CountDBConnections(Oid databaseid)2793 CountDBConnections(Oid databaseid)
2794 {
2795 ProcArrayStruct *arrayP = procArray;
2796 int count = 0;
2797 int index;
2798
2799 LWLockAcquire(ProcArrayLock, LW_SHARED);
2800
2801 for (index = 0; index < arrayP->numProcs; index++)
2802 {
2803 int pgprocno = arrayP->pgprocnos[index];
2804 volatile PGPROC *proc = &allProcs[pgprocno];
2805
2806 if (proc->pid == 0)
2807 continue; /* do not count prepared xacts */
2808 if (proc->isBackgroundWorker)
2809 continue; /* do not count background workers */
2810 if (!OidIsValid(databaseid) ||
2811 proc->databaseId == databaseid)
2812 count++;
2813 }
2814
2815 LWLockRelease(ProcArrayLock);
2816
2817 return count;
2818 }
2819
2820 /*
2821 * CancelDBBackends --- cancel backends that are using specified database
2822 */
2823 void
CancelDBBackends(Oid databaseid,ProcSignalReason sigmode,bool conflictPending)2824 CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
2825 {
2826 ProcArrayStruct *arrayP = procArray;
2827 int index;
2828 pid_t pid = 0;
2829
2830 /* tell all backends to die */
2831 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2832
2833 for (index = 0; index < arrayP->numProcs; index++)
2834 {
2835 int pgprocno = arrayP->pgprocnos[index];
2836 volatile PGPROC *proc = &allProcs[pgprocno];
2837
2838 if (databaseid == InvalidOid || proc->databaseId == databaseid)
2839 {
2840 VirtualTransactionId procvxid;
2841
2842 GET_VXID_FROM_PGPROC(procvxid, *proc);
2843
2844 proc->recoveryConflictPending = conflictPending;
2845 pid = proc->pid;
2846 if (pid != 0)
2847 {
2848 /*
2849 * Kill the pid if it's still here. If not, that's what we
2850 * wanted so ignore any errors.
2851 */
2852 (void) SendProcSignal(pid, sigmode, procvxid.backendId);
2853 }
2854 }
2855 }
2856
2857 LWLockRelease(ProcArrayLock);
2858 }
2859
2860 /*
2861 * CountUserBackends --- count backends that are used by specified user
2862 */
2863 int
CountUserBackends(Oid roleid)2864 CountUserBackends(Oid roleid)
2865 {
2866 ProcArrayStruct *arrayP = procArray;
2867 int count = 0;
2868 int index;
2869
2870 LWLockAcquire(ProcArrayLock, LW_SHARED);
2871
2872 for (index = 0; index < arrayP->numProcs; index++)
2873 {
2874 int pgprocno = arrayP->pgprocnos[index];
2875 volatile PGPROC *proc = &allProcs[pgprocno];
2876
2877 if (proc->pid == 0)
2878 continue; /* do not count prepared xacts */
2879 if (proc->isBackgroundWorker)
2880 continue; /* do not count background workers */
2881 if (proc->roleId == roleid)
2882 count++;
2883 }
2884
2885 LWLockRelease(ProcArrayLock);
2886
2887 return count;
2888 }
2889
2890 /*
2891 * CountOtherDBBackends -- check for other backends running in the given DB
2892 *
2893 * If there are other backends in the DB, we will wait a maximum of 5 seconds
2894 * for them to exit. Autovacuum backends are encouraged to exit early by
2895 * sending them SIGTERM, but normal user backends are just waited for.
2896 *
2897 * The current backend is always ignored; it is caller's responsibility to
2898 * check whether the current backend uses the given DB, if it's important.
2899 *
2900 * Returns TRUE if there are (still) other backends in the DB, FALSE if not.
2901 * Also, *nbackends and *nprepared are set to the number of other backends
2902 * and prepared transactions in the DB, respectively.
2903 *
2904 * This function is used to interlock DROP DATABASE and related commands
2905 * against there being any active backends in the target DB --- dropping the
2906 * DB while active backends remain would be a Bad Thing. Note that we cannot
2907 * detect here the possibility of a newly-started backend that is trying to
2908 * connect to the doomed database, so additional interlocking is needed during
2909 * backend startup. The caller should normally hold an exclusive lock on the
2910 * target DB before calling this, which is one reason we mustn't wait
2911 * indefinitely.
2912 */
2913 bool
CountOtherDBBackends(Oid databaseId,int * nbackends,int * nprepared)2914 CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
2915 {
2916 ProcArrayStruct *arrayP = procArray;
2917
2918 #define MAXAUTOVACPIDS 10 /* max autovacs to SIGTERM per iteration */
2919 int autovac_pids[MAXAUTOVACPIDS];
2920 int tries;
2921
2922 /* 50 tries with 100ms sleep between tries makes 5 sec total wait */
2923 for (tries = 0; tries < 50; tries++)
2924 {
2925 int nautovacs = 0;
2926 bool found = false;
2927 int index;
2928
2929 CHECK_FOR_INTERRUPTS();
2930
2931 *nbackends = *nprepared = 0;
2932
2933 LWLockAcquire(ProcArrayLock, LW_SHARED);
2934
2935 for (index = 0; index < arrayP->numProcs; index++)
2936 {
2937 int pgprocno = arrayP->pgprocnos[index];
2938 volatile PGPROC *proc = &allProcs[pgprocno];
2939 volatile PGXACT *pgxact = &allPgXact[pgprocno];
2940
2941 if (proc->databaseId != databaseId)
2942 continue;
2943 if (proc == MyProc)
2944 continue;
2945
2946 found = true;
2947
2948 if (proc->pid == 0)
2949 (*nprepared)++;
2950 else
2951 {
2952 (*nbackends)++;
2953 if ((pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&
2954 nautovacs < MAXAUTOVACPIDS)
2955 autovac_pids[nautovacs++] = proc->pid;
2956 }
2957 }
2958
2959 LWLockRelease(ProcArrayLock);
2960
2961 if (!found)
2962 return false; /* no conflicting backends, so done */
2963
2964 /*
2965 * Send SIGTERM to any conflicting autovacuums before sleeping. We
2966 * postpone this step until after the loop because we don't want to
2967 * hold ProcArrayLock while issuing kill(). We have no idea what might
2968 * block kill() inside the kernel...
2969 */
2970 for (index = 0; index < nautovacs; index++)
2971 (void) kill(autovac_pids[index], SIGTERM); /* ignore any error */
2972
2973 /* sleep, then try again */
2974 pg_usleep(100 * 1000L); /* 100ms */
2975 }
2976
2977 return true; /* timed out, still conflicts */
2978 }
2979
2980 /*
2981 * ProcArraySetReplicationSlotXmin
2982 *
2983 * Install limits to future computations of the xmin horizon to prevent vacuum
2984 * and HOT pruning from removing affected rows still needed by clients with
2985 * replicaton slots.
2986 */
2987 void
ProcArraySetReplicationSlotXmin(TransactionId xmin,TransactionId catalog_xmin,bool already_locked)2988 ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
2989 bool already_locked)
2990 {
2991 Assert(!already_locked || LWLockHeldByMe(ProcArrayLock));
2992
2993 if (!already_locked)
2994 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2995
2996 procArray->replication_slot_xmin = xmin;
2997 procArray->replication_slot_catalog_xmin = catalog_xmin;
2998
2999 if (!already_locked)
3000 LWLockRelease(ProcArrayLock);
3001 }
3002
3003 /*
3004 * ProcArrayGetReplicationSlotXmin
3005 *
3006 * Return the current slot xmin limits. That's useful to be able to remove
3007 * data that's older than those limits.
3008 */
3009 void
ProcArrayGetReplicationSlotXmin(TransactionId * xmin,TransactionId * catalog_xmin)3010 ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
3011 TransactionId *catalog_xmin)
3012 {
3013 LWLockAcquire(ProcArrayLock, LW_SHARED);
3014
3015 if (xmin != NULL)
3016 *xmin = procArray->replication_slot_xmin;
3017
3018 if (catalog_xmin != NULL)
3019 *catalog_xmin = procArray->replication_slot_catalog_xmin;
3020
3021 LWLockRelease(ProcArrayLock);
3022 }
3023
3024
3025 #define XidCacheRemove(i) \
3026 do { \
3027 MyProc->subxids.xids[i] = MyProc->subxids.xids[MyPgXact->nxids - 1]; \
3028 MyPgXact->nxids--; \
3029 } while (0)
3030
3031 /*
3032 * XidCacheRemoveRunningXids
3033 *
3034 * Remove a bunch of TransactionIds from the list of known-running
3035 * subtransactions for my backend. Both the specified xid and those in
3036 * the xids[] array (of length nxids) are removed from the subxids cache.
3037 * latestXid must be the latest XID among the group.
3038 */
3039 void
XidCacheRemoveRunningXids(TransactionId xid,int nxids,const TransactionId * xids,TransactionId latestXid)3040 XidCacheRemoveRunningXids(TransactionId xid,
3041 int nxids, const TransactionId *xids,
3042 TransactionId latestXid)
3043 {
3044 int i,
3045 j;
3046
3047 Assert(TransactionIdIsValid(xid));
3048
3049 /*
3050 * We must hold ProcArrayLock exclusively in order to remove transactions
3051 * from the PGPROC array. (See src/backend/access/transam/README.) It's
3052 * possible this could be relaxed since we know this routine is only used
3053 * to abort subtransactions, but pending closer analysis we'd best be
3054 * conservative.
3055 */
3056 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3057
3058 /*
3059 * Under normal circumstances xid and xids[] will be in increasing order,
3060 * as will be the entries in subxids. Scan backwards to avoid O(N^2)
3061 * behavior when removing a lot of xids.
3062 */
3063 for (i = nxids - 1; i >= 0; i--)
3064 {
3065 TransactionId anxid = xids[i];
3066
3067 for (j = MyPgXact->nxids - 1; j >= 0; j--)
3068 {
3069 if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
3070 {
3071 XidCacheRemove(j);
3072 break;
3073 }
3074 }
3075
3076 /*
3077 * Ordinarily we should have found it, unless the cache has
3078 * overflowed. However it's also possible for this routine to be
3079 * invoked multiple times for the same subtransaction, in case of an
3080 * error during AbortSubTransaction. So instead of Assert, emit a
3081 * debug warning.
3082 */
3083 if (j < 0 && !MyPgXact->overflowed)
3084 elog(WARNING, "did not find subXID %u in MyProc", anxid);
3085 }
3086
3087 for (j = MyPgXact->nxids - 1; j >= 0; j--)
3088 {
3089 if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
3090 {
3091 XidCacheRemove(j);
3092 break;
3093 }
3094 }
3095 /* Ordinarily we should have found it, unless the cache has overflowed */
3096 if (j < 0 && !MyPgXact->overflowed)
3097 elog(WARNING, "did not find subXID %u in MyProc", xid);
3098
3099 /* Also advance global latestCompletedXid while holding the lock */
3100 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
3101 latestXid))
3102 ShmemVariableCache->latestCompletedXid = latestXid;
3103
3104 LWLockRelease(ProcArrayLock);
3105 }
3106
3107 #ifdef XIDCACHE_DEBUG
3108
3109 /*
3110 * Print stats about effectiveness of XID cache
3111 */
3112 static void
DisplayXidCache(void)3113 DisplayXidCache(void)
3114 {
3115 fprintf(stderr,
3116 "XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, knownassigned: %ld, nooflo: %ld, slow: %ld\n",
3117 xc_by_recent_xmin,
3118 xc_by_known_xact,
3119 xc_by_my_xact,
3120 xc_by_latest_xid,
3121 xc_by_main_xid,
3122 xc_by_child_xid,
3123 xc_by_known_assigned,
3124 xc_no_overflow,
3125 xc_slow_answer);
3126 }
3127 #endif /* XIDCACHE_DEBUG */
3128
3129
3130 /* ----------------------------------------------
3131 * KnownAssignedTransactions sub-module
3132 * ----------------------------------------------
3133 */
3134
3135 /*
3136 * In Hot Standby mode, we maintain a list of transactions that are (or were)
3137 * running in the master at the current point in WAL. These XIDs must be
3138 * treated as running by standby transactions, even though they are not in
3139 * the standby server's PGXACT array.
3140 *
3141 * We record all XIDs that we know have been assigned. That includes all the
3142 * XIDs seen in WAL records, plus all unobserved XIDs that we can deduce have
3143 * been assigned. We can deduce the existence of unobserved XIDs because we
3144 * know XIDs are assigned in sequence, with no gaps. The KnownAssignedXids
3145 * list expands as new XIDs are observed or inferred, and contracts when
3146 * transaction completion records arrive.
3147 *
3148 * During hot standby we do not fret too much about the distinction between
3149 * top-level XIDs and subtransaction XIDs. We store both together in the
3150 * KnownAssignedXids list. In backends, this is copied into snapshots in
3151 * GetSnapshotData(), taking advantage of the fact that XidInMVCCSnapshot()
3152 * doesn't care about the distinction either. Subtransaction XIDs are
3153 * effectively treated as top-level XIDs and in the typical case pg_subtrans
3154 * links are *not* maintained (which does not affect visibility).
3155 *
3156 * We have room in KnownAssignedXids and in snapshots to hold maxProcs *
3157 * (1 + PGPROC_MAX_CACHED_SUBXIDS) XIDs, so every master transaction must
3158 * report its subtransaction XIDs in a WAL XLOG_XACT_ASSIGNMENT record at
3159 * least every PGPROC_MAX_CACHED_SUBXIDS. When we receive one of these
3160 * records, we mark the subXIDs as children of the top XID in pg_subtrans,
3161 * and then remove them from KnownAssignedXids. This prevents overflow of
3162 * KnownAssignedXids and snapshots, at the cost that status checks for these
3163 * subXIDs will take a slower path through TransactionIdIsInProgress().
3164 * This means that KnownAssignedXids is not necessarily complete for subXIDs,
3165 * though it should be complete for top-level XIDs; this is the same situation
3166 * that holds with respect to the PGPROC entries in normal running.
3167 *
3168 * When we throw away subXIDs from KnownAssignedXids, we need to keep track of
3169 * that, similarly to tracking overflow of a PGPROC's subxids array. We do
3170 * that by remembering the lastOverflowedXID, ie the last thrown-away subXID.
3171 * As long as that is within the range of interesting XIDs, we have to assume
3172 * that subXIDs are missing from snapshots. (Note that subXID overflow occurs
3173 * on primary when 65th subXID arrives, whereas on standby it occurs when 64th
3174 * subXID arrives - that is not an error.)
3175 *
3176 * Should a backend on primary somehow disappear before it can write an abort
3177 * record, then we just leave those XIDs in KnownAssignedXids. They actually
3178 * aborted but we think they were running; the distinction is irrelevant
3179 * because either way any changes done by the transaction are not visible to
3180 * backends in the standby. We prune KnownAssignedXids when
3181 * XLOG_RUNNING_XACTS arrives, to forestall possible overflow of the
3182 * array due to such dead XIDs.
3183 */
3184
3185 /*
3186 * RecordKnownAssignedTransactionIds
3187 * Record the given XID in KnownAssignedXids, as well as any preceding
3188 * unobserved XIDs.
3189 *
3190 * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
3191 * associated with a transaction. Must be called for each record after we
3192 * have executed StartupCLOG() et al, since we must ExtendCLOG() etc..
3193 *
3194 * Called during recovery in analogy with and in place of GetNewTransactionId()
3195 */
3196 void
RecordKnownAssignedTransactionIds(TransactionId xid)3197 RecordKnownAssignedTransactionIds(TransactionId xid)
3198 {
3199 Assert(standbyState >= STANDBY_INITIALIZED);
3200 Assert(TransactionIdIsValid(xid));
3201 Assert(TransactionIdIsValid(latestObservedXid));
3202
3203 elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
3204 xid, latestObservedXid);
3205
3206 /*
3207 * When a newly observed xid arrives, it is frequently the case that it is
3208 * *not* the next xid in sequence. When this occurs, we must treat the
3209 * intervening xids as running also.
3210 */
3211 if (TransactionIdFollows(xid, latestObservedXid))
3212 {
3213 TransactionId next_expected_xid;
3214
3215 /*
3216 * Extend subtrans like we do in GetNewTransactionId() during normal
3217 * operation using individual extend steps. Note that we do not need
3218 * to extend clog since its extensions are WAL logged.
3219 *
3220 * This part has to be done regardless of standbyState since we
3221 * immediately start assigning subtransactions to their toplevel
3222 * transactions.
3223 */
3224 next_expected_xid = latestObservedXid;
3225 while (TransactionIdPrecedes(next_expected_xid, xid))
3226 {
3227 TransactionIdAdvance(next_expected_xid);
3228 ExtendSUBTRANS(next_expected_xid);
3229 }
3230 Assert(next_expected_xid == xid);
3231
3232 /*
3233 * If the KnownAssignedXids machinery isn't up yet, there's nothing
3234 * more to do since we don't track assigned xids yet.
3235 */
3236 if (standbyState <= STANDBY_INITIALIZED)
3237 {
3238 latestObservedXid = xid;
3239 return;
3240 }
3241
3242 /*
3243 * Add (latestObservedXid, xid] onto the KnownAssignedXids array.
3244 */
3245 next_expected_xid = latestObservedXid;
3246 TransactionIdAdvance(next_expected_xid);
3247 KnownAssignedXidsAdd(next_expected_xid, xid, false);
3248
3249 /*
3250 * Now we can advance latestObservedXid
3251 */
3252 latestObservedXid = xid;
3253
3254 /* ShmemVariableCache->nextXid must be beyond any observed xid */
3255 next_expected_xid = latestObservedXid;
3256 TransactionIdAdvance(next_expected_xid);
3257 LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
3258 if (TransactionIdFollows(next_expected_xid, ShmemVariableCache->nextXid))
3259 ShmemVariableCache->nextXid = next_expected_xid;
3260 LWLockRelease(XidGenLock);
3261 }
3262 }
3263
3264 /*
3265 * ExpireTreeKnownAssignedTransactionIds
3266 * Remove the given XIDs from KnownAssignedXids.
3267 *
3268 * Called during recovery in analogy with and in place of ProcArrayEndTransaction()
3269 */
3270 void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid,int nsubxids,TransactionId * subxids,TransactionId max_xid)3271 ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
3272 TransactionId *subxids, TransactionId max_xid)
3273 {
3274 Assert(standbyState >= STANDBY_INITIALIZED);
3275
3276 /*
3277 * Uses same locking as transaction commit
3278 */
3279 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3280
3281 KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
3282
3283 /* As in ProcArrayEndTransaction, advance latestCompletedXid */
3284 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
3285 max_xid))
3286 ShmemVariableCache->latestCompletedXid = max_xid;
3287
3288 LWLockRelease(ProcArrayLock);
3289 }
3290
3291 /*
3292 * ExpireAllKnownAssignedTransactionIds
3293 * Remove all entries in KnownAssignedXids and reset lastOverflowedXid.
3294 */
3295 void
ExpireAllKnownAssignedTransactionIds(void)3296 ExpireAllKnownAssignedTransactionIds(void)
3297 {
3298 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3299 KnownAssignedXidsRemovePreceding(InvalidTransactionId);
3300
3301 /*
3302 * Reset lastOverflowedXid. Currently, lastOverflowedXid has no use after
3303 * the call of this function. But do this for unification with what
3304 * ExpireOldKnownAssignedTransactionIds() do.
3305 */
3306 procArray->lastOverflowedXid = InvalidTransactionId;
3307 LWLockRelease(ProcArrayLock);
3308 }
3309
3310 /*
3311 * ExpireOldKnownAssignedTransactionIds
3312 * Remove KnownAssignedXids entries preceding the given XID and
3313 * potentially reset lastOverflowedXid.
3314 */
3315 void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)3316 ExpireOldKnownAssignedTransactionIds(TransactionId xid)
3317 {
3318 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3319
3320 /*
3321 * Reset lastOverflowedXid if we know all transactions that have been
3322 * possibly running are being gone. Not doing so could cause an incorrect
3323 * lastOverflowedXid value, which makes extra snapshots be marked as
3324 * suboverflowed.
3325 */
3326 if (TransactionIdPrecedes(procArray->lastOverflowedXid, xid))
3327 procArray->lastOverflowedXid = InvalidTransactionId;
3328 KnownAssignedXidsRemovePreceding(xid);
3329 LWLockRelease(ProcArrayLock);
3330 }
3331
3332
3333 /*
3334 * Private module functions to manipulate KnownAssignedXids
3335 *
3336 * There are 5 main uses of the KnownAssignedXids data structure:
3337 *
3338 * * backends taking snapshots - all valid XIDs need to be copied out
3339 * * backends seeking to determine presence of a specific XID
3340 * * startup process adding new known-assigned XIDs
3341 * * startup process removing specific XIDs as transactions end
3342 * * startup process pruning array when special WAL records arrive
3343 *
3344 * This data structure is known to be a hot spot during Hot Standby, so we
3345 * go to some lengths to make these operations as efficient and as concurrent
3346 * as possible.
3347 *
3348 * The XIDs are stored in an array in sorted order --- TransactionIdPrecedes
3349 * order, to be exact --- to allow binary search for specific XIDs. Note:
3350 * in general TransactionIdPrecedes would not provide a total order, but
3351 * we know that the entries present at any instant should not extend across
3352 * a large enough fraction of XID space to wrap around (the master would
3353 * shut down for fear of XID wrap long before that happens). So it's OK to
3354 * use TransactionIdPrecedes as a binary-search comparator.
3355 *
3356 * It's cheap to maintain the sortedness during insertions, since new known
3357 * XIDs are always reported in XID order; we just append them at the right.
3358 *
3359 * To keep individual deletions cheap, we need to allow gaps in the array.
3360 * This is implemented by marking array elements as valid or invalid using
3361 * the parallel boolean array KnownAssignedXidsValid[]. A deletion is done
3362 * by setting KnownAssignedXidsValid[i] to false, *without* clearing the
3363 * XID entry itself. This preserves the property that the XID entries are
3364 * sorted, so we can do binary searches easily. Periodically we compress
3365 * out the unused entries; that's much cheaper than having to compress the
3366 * array immediately on every deletion.
3367 *
3368 * The actually valid items in KnownAssignedXids[] and KnownAssignedXidsValid[]
3369 * are those with indexes tail <= i < head; items outside this subscript range
3370 * have unspecified contents. When head reaches the end of the array, we
3371 * force compression of unused entries rather than wrapping around, since
3372 * allowing wraparound would greatly complicate the search logic. We maintain
3373 * an explicit tail pointer so that pruning of old XIDs can be done without
3374 * immediately moving the array contents. In most cases only a small fraction
3375 * of the array contains valid entries at any instant.
3376 *
3377 * Although only the startup process can ever change the KnownAssignedXids
3378 * data structure, we still need interlocking so that standby backends will
3379 * not observe invalid intermediate states. The convention is that backends
3380 * must hold shared ProcArrayLock to examine the array. To remove XIDs from
3381 * the array, the startup process must hold ProcArrayLock exclusively, for
3382 * the usual transactional reasons (compare commit/abort of a transaction
3383 * during normal running). Compressing unused entries out of the array
3384 * likewise requires exclusive lock. To add XIDs to the array, we just insert
3385 * them into slots to the right of the head pointer and then advance the head
3386 * pointer. This wouldn't require any lock at all, except that on machines
3387 * with weak memory ordering we need to be careful that other processors
3388 * see the array element changes before they see the head pointer change.
3389 * We handle this by using a spinlock to protect reads and writes of the
3390 * head/tail pointers. (We could dispense with the spinlock if we were to
3391 * create suitable memory access barrier primitives and use those instead.)
3392 * The spinlock must be taken to read or write the head/tail pointers unless
3393 * the caller holds ProcArrayLock exclusively.
3394 *
3395 * Algorithmic analysis:
3396 *
3397 * If we have a maximum of M slots, with N XIDs currently spread across
3398 * S elements then we have N <= S <= M always.
3399 *
3400 * * Adding a new XID is O(1) and needs little locking (unless compression
3401 * must happen)
3402 * * Compressing the array is O(S) and requires exclusive lock
3403 * * Removing an XID is O(logS) and requires exclusive lock
3404 * * Taking a snapshot is O(S) and requires shared lock
3405 * * Checking for an XID is O(logS) and requires shared lock
3406 *
3407 * In comparison, using a hash table for KnownAssignedXids would mean that
3408 * taking snapshots would be O(M). If we can maintain S << M then the
3409 * sorted array technique will deliver significantly faster snapshots.
3410 * If we try to keep S too small then we will spend too much time compressing,
3411 * so there is an optimal point for any workload mix. We use a heuristic to
3412 * decide when to compress the array, though trimming also helps reduce
3413 * frequency of compressing. The heuristic requires us to track the number of
3414 * currently valid XIDs in the array.
3415 */
3416
3417
3418 /*
3419 * Compress KnownAssignedXids by shifting valid data down to the start of the
3420 * array, removing any gaps.
3421 *
3422 * A compression step is forced if "force" is true, otherwise we do it
3423 * only if a heuristic indicates it's a good time to do it.
3424 *
3425 * Caller must hold ProcArrayLock in exclusive mode.
3426 */
3427 static void
KnownAssignedXidsCompress(bool force)3428 KnownAssignedXidsCompress(bool force)
3429 {
3430 /* use volatile pointer to prevent code rearrangement */
3431 volatile ProcArrayStruct *pArray = procArray;
3432 int head,
3433 tail;
3434 int compress_index;
3435 int i;
3436
3437 /* no spinlock required since we hold ProcArrayLock exclusively */
3438 head = pArray->headKnownAssignedXids;
3439 tail = pArray->tailKnownAssignedXids;
3440
3441 if (!force)
3442 {
3443 /*
3444 * If we can choose how much to compress, use a heuristic to avoid
3445 * compressing too often or not often enough.
3446 *
3447 * Heuristic is if we have a large enough current spread and less than
3448 * 50% of the elements are currently in use, then compress. This
3449 * should ensure we compress fairly infrequently. We could compress
3450 * less often though the virtual array would spread out more and
3451 * snapshots would become more expensive.
3452 */
3453 int nelements = head - tail;
3454
3455 if (nelements < 4 * PROCARRAY_MAXPROCS ||
3456 nelements < 2 * pArray->numKnownAssignedXids)
3457 return;
3458 }
3459
3460 /*
3461 * We compress the array by reading the valid values from tail to head,
3462 * re-aligning data to 0th element.
3463 */
3464 compress_index = 0;
3465 for (i = tail; i < head; i++)
3466 {
3467 if (KnownAssignedXidsValid[i])
3468 {
3469 KnownAssignedXids[compress_index] = KnownAssignedXids[i];
3470 KnownAssignedXidsValid[compress_index] = true;
3471 compress_index++;
3472 }
3473 }
3474
3475 pArray->tailKnownAssignedXids = 0;
3476 pArray->headKnownAssignedXids = compress_index;
3477 }
3478
3479 /*
3480 * Add xids into KnownAssignedXids at the head of the array.
3481 *
3482 * xids from from_xid to to_xid, inclusive, are added to the array.
3483 *
3484 * If exclusive_lock is true then caller already holds ProcArrayLock in
3485 * exclusive mode, so we need no extra locking here. Else caller holds no
3486 * lock, so we need to be sure we maintain sufficient interlocks against
3487 * concurrent readers. (Only the startup process ever calls this, so no need
3488 * to worry about concurrent writers.)
3489 */
3490 static void
KnownAssignedXidsAdd(TransactionId from_xid,TransactionId to_xid,bool exclusive_lock)3491 KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
3492 bool exclusive_lock)
3493 {
3494 /* use volatile pointer to prevent code rearrangement */
3495 volatile ProcArrayStruct *pArray = procArray;
3496 TransactionId next_xid;
3497 int head,
3498 tail;
3499 int nxids;
3500 int i;
3501
3502 Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));
3503
3504 /*
3505 * Calculate how many array slots we'll need. Normally this is cheap; in
3506 * the unusual case where the XIDs cross the wrap point, we do it the hard
3507 * way.
3508 */
3509 if (to_xid >= from_xid)
3510 nxids = to_xid - from_xid + 1;
3511 else
3512 {
3513 nxids = 1;
3514 next_xid = from_xid;
3515 while (TransactionIdPrecedes(next_xid, to_xid))
3516 {
3517 nxids++;
3518 TransactionIdAdvance(next_xid);
3519 }
3520 }
3521
3522 /*
3523 * Since only the startup process modifies the head/tail pointers, we
3524 * don't need a lock to read them here.
3525 */
3526 head = pArray->headKnownAssignedXids;
3527 tail = pArray->tailKnownAssignedXids;
3528
3529 Assert(head >= 0 && head <= pArray->maxKnownAssignedXids);
3530 Assert(tail >= 0 && tail < pArray->maxKnownAssignedXids);
3531
3532 /*
3533 * Verify that insertions occur in TransactionId sequence. Note that even
3534 * if the last existing element is marked invalid, it must still have a
3535 * correctly sequenced XID value.
3536 */
3537 if (head > tail &&
3538 TransactionIdFollowsOrEquals(KnownAssignedXids[head - 1], from_xid))
3539 {
3540 KnownAssignedXidsDisplay(LOG);
3541 elog(ERROR, "out-of-order XID insertion in KnownAssignedXids");
3542 }
3543
3544 /*
3545 * If our xids won't fit in the remaining space, compress out free space
3546 */
3547 if (head + nxids > pArray->maxKnownAssignedXids)
3548 {
3549 /* must hold lock to compress */
3550 if (!exclusive_lock)
3551 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3552
3553 KnownAssignedXidsCompress(true);
3554
3555 head = pArray->headKnownAssignedXids;
3556 /* note: we no longer care about the tail pointer */
3557
3558 if (!exclusive_lock)
3559 LWLockRelease(ProcArrayLock);
3560
3561 /*
3562 * If it still won't fit then we're out of memory
3563 */
3564 if (head + nxids > pArray->maxKnownAssignedXids)
3565 elog(ERROR, "too many KnownAssignedXids");
3566 }
3567
3568 /* Now we can insert the xids into the space starting at head */
3569 next_xid = from_xid;
3570 for (i = 0; i < nxids; i++)
3571 {
3572 KnownAssignedXids[head] = next_xid;
3573 KnownAssignedXidsValid[head] = true;
3574 TransactionIdAdvance(next_xid);
3575 head++;
3576 }
3577
3578 /* Adjust count of number of valid entries */
3579 pArray->numKnownAssignedXids += nxids;
3580
3581 /*
3582 * Now update the head pointer. We use a spinlock to protect this
3583 * pointer, not because the update is likely to be non-atomic, but to
3584 * ensure that other processors see the above array updates before they
3585 * see the head pointer change.
3586 *
3587 * If we're holding ProcArrayLock exclusively, there's no need to take the
3588 * spinlock.
3589 */
3590 if (exclusive_lock)
3591 pArray->headKnownAssignedXids = head;
3592 else
3593 {
3594 SpinLockAcquire(&pArray->known_assigned_xids_lck);
3595 pArray->headKnownAssignedXids = head;
3596 SpinLockRelease(&pArray->known_assigned_xids_lck);
3597 }
3598 }
3599
3600 /*
3601 * KnownAssignedXidsSearch
3602 *
3603 * Searches KnownAssignedXids for a specific xid and optionally removes it.
3604 * Returns true if it was found, false if not.
3605 *
3606 * Caller must hold ProcArrayLock in shared or exclusive mode.
3607 * Exclusive lock must be held for remove = true.
3608 */
3609 static bool
KnownAssignedXidsSearch(TransactionId xid,bool remove)3610 KnownAssignedXidsSearch(TransactionId xid, bool remove)
3611 {
3612 /* use volatile pointer to prevent code rearrangement */
3613 volatile ProcArrayStruct *pArray = procArray;
3614 int first,
3615 last;
3616 int head;
3617 int tail;
3618 int result_index = -1;
3619
3620 if (remove)
3621 {
3622 /* we hold ProcArrayLock exclusively, so no need for spinlock */
3623 tail = pArray->tailKnownAssignedXids;
3624 head = pArray->headKnownAssignedXids;
3625 }
3626 else
3627 {
3628 /* take spinlock to ensure we see up-to-date array contents */
3629 SpinLockAcquire(&pArray->known_assigned_xids_lck);
3630 tail = pArray->tailKnownAssignedXids;
3631 head = pArray->headKnownAssignedXids;
3632 SpinLockRelease(&pArray->known_assigned_xids_lck);
3633 }
3634
3635 /*
3636 * Standard binary search. Note we can ignore the KnownAssignedXidsValid
3637 * array here, since even invalid entries will contain sorted XIDs.
3638 */
3639 first = tail;
3640 last = head - 1;
3641 while (first <= last)
3642 {
3643 int mid_index;
3644 TransactionId mid_xid;
3645
3646 mid_index = (first + last) / 2;
3647 mid_xid = KnownAssignedXids[mid_index];
3648
3649 if (xid == mid_xid)
3650 {
3651 result_index = mid_index;
3652 break;
3653 }
3654 else if (TransactionIdPrecedes(xid, mid_xid))
3655 last = mid_index - 1;
3656 else
3657 first = mid_index + 1;
3658 }
3659
3660 if (result_index < 0)
3661 return false; /* not in array */
3662
3663 if (!KnownAssignedXidsValid[result_index])
3664 return false; /* in array, but invalid */
3665
3666 if (remove)
3667 {
3668 KnownAssignedXidsValid[result_index] = false;
3669
3670 pArray->numKnownAssignedXids--;
3671 Assert(pArray->numKnownAssignedXids >= 0);
3672
3673 /*
3674 * If we're removing the tail element then advance tail pointer over
3675 * any invalid elements. This will speed future searches.
3676 */
3677 if (result_index == tail)
3678 {
3679 tail++;
3680 while (tail < head && !KnownAssignedXidsValid[tail])
3681 tail++;
3682 if (tail >= head)
3683 {
3684 /* Array is empty, so we can reset both pointers */
3685 pArray->headKnownAssignedXids = 0;
3686 pArray->tailKnownAssignedXids = 0;
3687 }
3688 else
3689 {
3690 pArray->tailKnownAssignedXids = tail;
3691 }
3692 }
3693 }
3694
3695 return true;
3696 }
3697
3698 /*
3699 * Is the specified XID present in KnownAssignedXids[]?
3700 *
3701 * Caller must hold ProcArrayLock in shared or exclusive mode.
3702 */
3703 static bool
KnownAssignedXidExists(TransactionId xid)3704 KnownAssignedXidExists(TransactionId xid)
3705 {
3706 Assert(TransactionIdIsValid(xid));
3707
3708 return KnownAssignedXidsSearch(xid, false);
3709 }
3710
3711 /*
3712 * Remove the specified XID from KnownAssignedXids[].
3713 *
3714 * Caller must hold ProcArrayLock in exclusive mode.
3715 */
3716 static void
KnownAssignedXidsRemove(TransactionId xid)3717 KnownAssignedXidsRemove(TransactionId xid)
3718 {
3719 Assert(TransactionIdIsValid(xid));
3720
3721 elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
3722
3723 /*
3724 * Note: we cannot consider it an error to remove an XID that's not
3725 * present. We intentionally remove subxact IDs while processing
3726 * XLOG_XACT_ASSIGNMENT, to avoid array overflow. Then those XIDs will be
3727 * removed again when the top-level xact commits or aborts.
3728 *
3729 * It might be possible to track such XIDs to distinguish this case from
3730 * actual errors, but it would be complicated and probably not worth it.
3731 * So, just ignore the search result.
3732 */
3733 (void) KnownAssignedXidsSearch(xid, true);
3734 }
3735
3736 /*
3737 * KnownAssignedXidsRemoveTree
3738 * Remove xid (if it's not InvalidTransactionId) and all the subxids.
3739 *
3740 * Caller must hold ProcArrayLock in exclusive mode.
3741 */
3742 static void
KnownAssignedXidsRemoveTree(TransactionId xid,int nsubxids,TransactionId * subxids)3743 KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
3744 TransactionId *subxids)
3745 {
3746 int i;
3747
3748 if (TransactionIdIsValid(xid))
3749 KnownAssignedXidsRemove(xid);
3750
3751 for (i = 0; i < nsubxids; i++)
3752 KnownAssignedXidsRemove(subxids[i]);
3753
3754 /* Opportunistically compress the array */
3755 KnownAssignedXidsCompress(false);
3756 }
3757
3758 /*
3759 * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
3760 * then clear the whole table.
3761 *
3762 * Caller must hold ProcArrayLock in exclusive mode.
3763 */
3764 static void
KnownAssignedXidsRemovePreceding(TransactionId removeXid)3765 KnownAssignedXidsRemovePreceding(TransactionId removeXid)
3766 {
3767 /* use volatile pointer to prevent code rearrangement */
3768 volatile ProcArrayStruct *pArray = procArray;
3769 int count = 0;
3770 int head,
3771 tail,
3772 i;
3773
3774 if (!TransactionIdIsValid(removeXid))
3775 {
3776 elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
3777 pArray->numKnownAssignedXids = 0;
3778 pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
3779 return;
3780 }
3781
3782 elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
3783
3784 /*
3785 * Mark entries invalid starting at the tail. Since array is sorted, we
3786 * can stop as soon as we reach an entry >= removeXid.
3787 */
3788 tail = pArray->tailKnownAssignedXids;
3789 head = pArray->headKnownAssignedXids;
3790
3791 for (i = tail; i < head; i++)
3792 {
3793 if (KnownAssignedXidsValid[i])
3794 {
3795 TransactionId knownXid = KnownAssignedXids[i];
3796
3797 if (TransactionIdFollowsOrEquals(knownXid, removeXid))
3798 break;
3799
3800 if (!StandbyTransactionIdIsPrepared(knownXid))
3801 {
3802 KnownAssignedXidsValid[i] = false;
3803 count++;
3804 }
3805 }
3806 }
3807
3808 pArray->numKnownAssignedXids -= count;
3809 Assert(pArray->numKnownAssignedXids >= 0);
3810
3811 /*
3812 * Advance the tail pointer if we've marked the tail item invalid.
3813 */
3814 for (i = tail; i < head; i++)
3815 {
3816 if (KnownAssignedXidsValid[i])
3817 break;
3818 }
3819 if (i >= head)
3820 {
3821 /* Array is empty, so we can reset both pointers */
3822 pArray->headKnownAssignedXids = 0;
3823 pArray->tailKnownAssignedXids = 0;
3824 }
3825 else
3826 {
3827 pArray->tailKnownAssignedXids = i;
3828 }
3829
3830 /* Opportunistically compress the array */
3831 KnownAssignedXidsCompress(false);
3832 }
3833
3834 /*
3835 * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
3836 * We filter out anything >= xmax.
3837 *
3838 * Returns the number of XIDs stored into xarray[]. Caller is responsible
3839 * that array is large enough.
3840 *
3841 * Caller must hold ProcArrayLock in (at least) shared mode.
3842 */
3843 static int
KnownAssignedXidsGet(TransactionId * xarray,TransactionId xmax)3844 KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
3845 {
3846 TransactionId xtmp = InvalidTransactionId;
3847
3848 return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
3849 }
3850
3851 /*
3852 * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus
3853 * we reduce *xmin to the lowest xid value seen if not already lower.
3854 *
3855 * Caller must hold ProcArrayLock in (at least) shared mode.
3856 */
3857 static int
KnownAssignedXidsGetAndSetXmin(TransactionId * xarray,TransactionId * xmin,TransactionId xmax)3858 KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
3859 TransactionId xmax)
3860 {
3861 int count = 0;
3862 int head,
3863 tail;
3864 int i;
3865
3866 /*
3867 * Fetch head just once, since it may change while we loop. We can stop
3868 * once we reach the initially seen head, since we are certain that an xid
3869 * cannot enter and then leave the array while we hold ProcArrayLock. We
3870 * might miss newly-added xids, but they should be >= xmax so irrelevant
3871 * anyway.
3872 *
3873 * Must take spinlock to ensure we see up-to-date array contents.
3874 */
3875 SpinLockAcquire(&procArray->known_assigned_xids_lck);
3876 tail = procArray->tailKnownAssignedXids;
3877 head = procArray->headKnownAssignedXids;
3878 SpinLockRelease(&procArray->known_assigned_xids_lck);
3879
3880 for (i = tail; i < head; i++)
3881 {
3882 /* Skip any gaps in the array */
3883 if (KnownAssignedXidsValid[i])
3884 {
3885 TransactionId knownXid = KnownAssignedXids[i];
3886
3887 /*
3888 * Update xmin if required. Only the first XID need be checked,
3889 * since the array is sorted.
3890 */
3891 if (count == 0 &&
3892 TransactionIdPrecedes(knownXid, *xmin))
3893 *xmin = knownXid;
3894
3895 /*
3896 * Filter out anything >= xmax, again relying on sorted property
3897 * of array.
3898 */
3899 if (TransactionIdIsValid(xmax) &&
3900 TransactionIdFollowsOrEquals(knownXid, xmax))
3901 break;
3902
3903 /* Add knownXid into output array */
3904 xarray[count++] = knownXid;
3905 }
3906 }
3907
3908 return count;
3909 }
3910
3911 /*
3912 * Get oldest XID in the KnownAssignedXids array, or InvalidTransactionId
3913 * if nothing there.
3914 */
3915 static TransactionId
KnownAssignedXidsGetOldestXmin(void)3916 KnownAssignedXidsGetOldestXmin(void)
3917 {
3918 int head,
3919 tail;
3920 int i;
3921
3922 /*
3923 * Fetch head just once, since it may change while we loop.
3924 */
3925 SpinLockAcquire(&procArray->known_assigned_xids_lck);
3926 tail = procArray->tailKnownAssignedXids;
3927 head = procArray->headKnownAssignedXids;
3928 SpinLockRelease(&procArray->known_assigned_xids_lck);
3929
3930 for (i = tail; i < head; i++)
3931 {
3932 /* Skip any gaps in the array */
3933 if (KnownAssignedXidsValid[i])
3934 return KnownAssignedXids[i];
3935 }
3936
3937 return InvalidTransactionId;
3938 }
3939
3940 /*
3941 * Display KnownAssignedXids to provide debug trail
3942 *
3943 * Currently this is only called within startup process, so we need no
3944 * special locking.
3945 *
3946 * Note this is pretty expensive, and much of the expense will be incurred
3947 * even if the elog message will get discarded. It's not currently called
3948 * in any performance-critical places, however, so no need to be tenser.
3949 */
3950 static void
KnownAssignedXidsDisplay(int trace_level)3951 KnownAssignedXidsDisplay(int trace_level)
3952 {
3953 /* use volatile pointer to prevent code rearrangement */
3954 volatile ProcArrayStruct *pArray = procArray;
3955 StringInfoData buf;
3956 int head,
3957 tail,
3958 i;
3959 int nxids = 0;
3960
3961 tail = pArray->tailKnownAssignedXids;
3962 head = pArray->headKnownAssignedXids;
3963
3964 initStringInfo(&buf);
3965
3966 for (i = tail; i < head; i++)
3967 {
3968 if (KnownAssignedXidsValid[i])
3969 {
3970 nxids++;
3971 appendStringInfo(&buf, "[%d]=%u ", i, KnownAssignedXids[i]);
3972 }
3973 }
3974
3975 elog(trace_level, "%d KnownAssignedXids (num=%d tail=%d head=%d) %s",
3976 nxids,
3977 pArray->numKnownAssignedXids,
3978 pArray->tailKnownAssignedXids,
3979 pArray->headKnownAssignedXids,
3980 buf.data);
3981
3982 pfree(buf.data);
3983 }
3984
3985 /*
3986 * KnownAssignedXidsReset
3987 * Resets KnownAssignedXids to be empty
3988 */
3989 static void
KnownAssignedXidsReset(void)3990 KnownAssignedXidsReset(void)
3991 {
3992 /* use volatile pointer to prevent code rearrangement */
3993 volatile ProcArrayStruct *pArray = procArray;
3994
3995 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3996
3997 pArray->numKnownAssignedXids = 0;
3998 pArray->tailKnownAssignedXids = 0;
3999 pArray->headKnownAssignedXids = 0;
4000
4001 LWLockRelease(ProcArrayLock);
4002 }
4003