1 /*-------------------------------------------------------------------------
2  *
3  * procarray.c
4  *	  POSTGRES process array code.
5  *
6  *
7  * This module maintains arrays of the PGPROC and PGXACT structures for all
8  * active backends.  Although there are several uses for this, the principal
9  * one is as a means of determining the set of currently running transactions.
10  *
11  * Because of various subtle race conditions it is critical that a backend
12  * hold the correct locks while setting or clearing its MyPgXact->xid field.
13  * See notes in src/backend/access/transam/README.
14  *
15  * The process arrays now also include structures representing prepared
16  * transactions.  The xid and subxids fields of these are valid, as are the
17  * myProcLocks lists.  They can be distinguished from regular backend PGPROCs
18  * at need by checking for pid == 0.
19  *
20  * During hot standby, we also keep a list of XIDs representing transactions
21  * that are known to be running in the master (or more precisely, were running
22  * as of the current point in the WAL stream).  This list is kept in the
23  * KnownAssignedXids array, and is updated by watching the sequence of
24  * arriving XIDs.  This is necessary because if we leave those XIDs out of
25  * snapshots taken for standby queries, then they will appear to be already
26  * complete, leading to MVCC failures.  Note that in hot standby, the PGPROC
27  * array represents standby processes, which by definition are not running
28  * transactions that have XIDs.
29  *
30  * It is perhaps possible for a backend on the master to terminate without
31  * writing an abort record for its transaction.  While that shouldn't really
32  * happen, it would tie up KnownAssignedXids indefinitely, so we protect
33  * ourselves by pruning the array when a valid list of running XIDs arrives.
34  *
35  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
36  * Portions Copyright (c) 1994, Regents of the University of California
37  *
38  *
39  * IDENTIFICATION
40  *	  src/backend/storage/ipc/procarray.c
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45 
46 #include <signal.h>
47 
48 #include "access/clog.h"
49 #include "access/subtrans.h"
50 #include "access/transam.h"
51 #include "access/twophase.h"
52 #include "access/xact.h"
53 #include "access/xlog.h"
54 #include "catalog/catalog.h"
55 #include "miscadmin.h"
56 #include "storage/proc.h"
57 #include "storage/procarray.h"
58 #include "storage/spin.h"
59 #include "utils/builtins.h"
60 #include "utils/rel.h"
61 #include "utils/snapmgr.h"
62 
63 
64 /* Our shared memory area */
65 typedef struct ProcArrayStruct
66 {
67 	int			numProcs;		/* number of valid procs entries */
68 	int			maxProcs;		/* allocated size of procs array */
69 
70 	/*
71 	 * Known assigned XIDs handling
72 	 */
73 	int			maxKnownAssignedXids;	/* allocated size of array */
74 	int			numKnownAssignedXids;	/* current # of valid entries */
75 	int			tailKnownAssignedXids;	/* index of oldest valid element */
76 	int			headKnownAssignedXids;	/* index of newest element, + 1 */
77 	slock_t		known_assigned_xids_lck;		/* protects head/tail pointers */
78 
79 	/*
80 	 * Highest subxid that has been removed from KnownAssignedXids array to
81 	 * prevent overflow; or InvalidTransactionId if none.  We track this for
82 	 * similar reasons to tracking overflowing cached subxids in PGXACT
83 	 * entries.  Must hold exclusive ProcArrayLock to change this, and shared
84 	 * lock to read it.
85 	 */
86 	TransactionId lastOverflowedXid;
87 
88 	/* oldest xmin of any replication slot */
89 	TransactionId replication_slot_xmin;
90 	/* oldest catalog xmin of any replication slot */
91 	TransactionId replication_slot_catalog_xmin;
92 
93 	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
94 	int			pgprocnos[FLEXIBLE_ARRAY_MEMBER];
95 } ProcArrayStruct;
96 
97 static ProcArrayStruct *procArray;
98 
99 static PGPROC *allProcs;
100 static PGXACT *allPgXact;
101 
102 /*
103  * Bookkeeping for tracking emulated transactions in recovery
104  */
105 static TransactionId *KnownAssignedXids;
106 static bool *KnownAssignedXidsValid;
107 static TransactionId latestObservedXid = InvalidTransactionId;
108 
109 /* LWLock tranche for backend locks */
110 static LWLockTranche ProcLWLockTranche;
111 
112 /*
113  * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
114  * the highest xid that might still be running that we don't have in
115  * KnownAssignedXids.
116  */
117 static TransactionId standbySnapshotPendingXmin;
118 
119 #ifdef XIDCACHE_DEBUG
120 
121 /* counters for XidCache measurement */
122 static long xc_by_recent_xmin = 0;
123 static long xc_by_known_xact = 0;
124 static long xc_by_my_xact = 0;
125 static long xc_by_latest_xid = 0;
126 static long xc_by_main_xid = 0;
127 static long xc_by_child_xid = 0;
128 static long xc_by_known_assigned = 0;
129 static long xc_no_overflow = 0;
130 static long xc_slow_answer = 0;
131 
132 #define xc_by_recent_xmin_inc()		(xc_by_recent_xmin++)
133 #define xc_by_known_xact_inc()		(xc_by_known_xact++)
134 #define xc_by_my_xact_inc()			(xc_by_my_xact++)
135 #define xc_by_latest_xid_inc()		(xc_by_latest_xid++)
136 #define xc_by_main_xid_inc()		(xc_by_main_xid++)
137 #define xc_by_child_xid_inc()		(xc_by_child_xid++)
138 #define xc_by_known_assigned_inc()	(xc_by_known_assigned++)
139 #define xc_no_overflow_inc()		(xc_no_overflow++)
140 #define xc_slow_answer_inc()		(xc_slow_answer++)
141 
142 static void DisplayXidCache(void);
143 #else							/* !XIDCACHE_DEBUG */
144 
145 #define xc_by_recent_xmin_inc()		((void) 0)
146 #define xc_by_known_xact_inc()		((void) 0)
147 #define xc_by_my_xact_inc()			((void) 0)
148 #define xc_by_latest_xid_inc()		((void) 0)
149 #define xc_by_main_xid_inc()		((void) 0)
150 #define xc_by_child_xid_inc()		((void) 0)
151 #define xc_by_known_assigned_inc()	((void) 0)
152 #define xc_no_overflow_inc()		((void) 0)
153 #define xc_slow_answer_inc()		((void) 0)
154 #endif   /* XIDCACHE_DEBUG */
155 
156 /* Primitives for KnownAssignedXids array handling for standby */
157 static void KnownAssignedXidsCompress(bool force);
158 static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
159 					 bool exclusive_lock);
160 static bool KnownAssignedXidsSearch(TransactionId xid, bool remove);
161 static bool KnownAssignedXidExists(TransactionId xid);
162 static void KnownAssignedXidsRemove(TransactionId xid);
163 static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
164 							TransactionId *subxids);
165 static void KnownAssignedXidsRemovePreceding(TransactionId xid);
166 static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
167 static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
168 							   TransactionId *xmin,
169 							   TransactionId xmax);
170 static TransactionId KnownAssignedXidsGetOldestXmin(void);
171 static void KnownAssignedXidsDisplay(int trace_level);
172 static void KnownAssignedXidsReset(void);
173 static inline void ProcArrayEndTransactionInternal(PGPROC *proc,
174 								PGXACT *pgxact, TransactionId latestXid);
175 static void ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid);
176 
177 /*
178  * Report shared-memory space needed by CreateSharedProcArray.
179  */
180 Size
ProcArrayShmemSize(void)181 ProcArrayShmemSize(void)
182 {
183 	Size		size;
184 
185 	/* Size of the ProcArray structure itself */
186 #define PROCARRAY_MAXPROCS	(MaxBackends + max_prepared_xacts)
187 
188 	size = offsetof(ProcArrayStruct, pgprocnos);
189 	size = add_size(size, mul_size(sizeof(int), PROCARRAY_MAXPROCS));
190 
191 	/*
192 	 * During Hot Standby processing we have a data structure called
193 	 * KnownAssignedXids, created in shared memory. Local data structures are
194 	 * also created in various backends during GetSnapshotData(),
195 	 * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the
196 	 * main structures created in those functions must be identically sized,
197 	 * since we may at times copy the whole of the data structures around. We
198 	 * refer to this size as TOTAL_MAX_CACHED_SUBXIDS.
199 	 *
200 	 * Ideally we'd only create this structure if we were actually doing hot
201 	 * standby in the current run, but we don't know that yet at the time
202 	 * shared memory is being set up.
203 	 */
204 #define TOTAL_MAX_CACHED_SUBXIDS \
205 	((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
206 
207 	if (EnableHotStandby)
208 	{
209 		size = add_size(size,
210 						mul_size(sizeof(TransactionId),
211 								 TOTAL_MAX_CACHED_SUBXIDS));
212 		size = add_size(size,
213 						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
214 	}
215 
216 	return size;
217 }
218 
219 /*
220  * Initialize the shared PGPROC array during postmaster startup.
221  */
222 void
CreateSharedProcArray(void)223 CreateSharedProcArray(void)
224 {
225 	bool		found;
226 
227 	/* Create or attach to the ProcArray shared structure */
228 	procArray = (ProcArrayStruct *)
229 		ShmemInitStruct("Proc Array",
230 						add_size(offsetof(ProcArrayStruct, pgprocnos),
231 								 mul_size(sizeof(int),
232 										  PROCARRAY_MAXPROCS)),
233 						&found);
234 
235 	if (!found)
236 	{
237 		/*
238 		 * We're the first - initialize.
239 		 */
240 		procArray->numProcs = 0;
241 		procArray->maxProcs = PROCARRAY_MAXPROCS;
242 		procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
243 		procArray->numKnownAssignedXids = 0;
244 		procArray->tailKnownAssignedXids = 0;
245 		procArray->headKnownAssignedXids = 0;
246 		SpinLockInit(&procArray->known_assigned_xids_lck);
247 		procArray->lastOverflowedXid = InvalidTransactionId;
248 		procArray->replication_slot_xmin = InvalidTransactionId;
249 		procArray->replication_slot_catalog_xmin = InvalidTransactionId;
250 	}
251 
252 	allProcs = ProcGlobal->allProcs;
253 	allPgXact = ProcGlobal->allPgXact;
254 
255 	/* Create or attach to the KnownAssignedXids arrays too, if needed */
256 	if (EnableHotStandby)
257 	{
258 		KnownAssignedXids = (TransactionId *)
259 			ShmemInitStruct("KnownAssignedXids",
260 							mul_size(sizeof(TransactionId),
261 									 TOTAL_MAX_CACHED_SUBXIDS),
262 							&found);
263 		KnownAssignedXidsValid = (bool *)
264 			ShmemInitStruct("KnownAssignedXidsValid",
265 							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
266 							&found);
267 	}
268 
269 	/* Register and initialize fields of ProcLWLockTranche */
270 	ProcLWLockTranche.name = "proc";
271 	ProcLWLockTranche.array_base = (char *) (ProcGlobal->allProcs) +
272 		offsetof(PGPROC, backendLock);
273 	ProcLWLockTranche.array_stride = sizeof(PGPROC);
274 	LWLockRegisterTranche(LWTRANCHE_PROC, &ProcLWLockTranche);
275 }
276 
277 /*
278  * Add the specified PGPROC to the shared array.
279  */
280 void
ProcArrayAdd(PGPROC * proc)281 ProcArrayAdd(PGPROC *proc)
282 {
283 	ProcArrayStruct *arrayP = procArray;
284 	int			index;
285 
286 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
287 
288 	if (arrayP->numProcs >= arrayP->maxProcs)
289 	{
290 		/*
291 		 * Ooops, no room.  (This really shouldn't happen, since there is a
292 		 * fixed supply of PGPROC structs too, and so we should have failed
293 		 * earlier.)
294 		 */
295 		LWLockRelease(ProcArrayLock);
296 		ereport(FATAL,
297 				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
298 				 errmsg("sorry, too many clients already")));
299 	}
300 
301 	/*
302 	 * Keep the procs array sorted by (PGPROC *) so that we can utilize
303 	 * locality of references much better. This is useful while traversing the
304 	 * ProcArray because there is an increased likelihood of finding the next
305 	 * PGPROC structure in the cache.
306 	 *
307 	 * Since the occurrence of adding/removing a proc is much lower than the
308 	 * access to the ProcArray itself, the overhead should be marginal
309 	 */
310 	for (index = 0; index < arrayP->numProcs; index++)
311 	{
312 		/*
313 		 * If we are the first PGPROC or if we have found our right position
314 		 * in the array, break
315 		 */
316 		if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
317 			break;
318 	}
319 
320 	memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
321 			(arrayP->numProcs - index) * sizeof(int));
322 	arrayP->pgprocnos[index] = proc->pgprocno;
323 	arrayP->numProcs++;
324 
325 	LWLockRelease(ProcArrayLock);
326 }
327 
328 /*
329  * Remove the specified PGPROC from the shared array.
330  *
331  * When latestXid is a valid XID, we are removing a live 2PC gxact from the
332  * array, and thus causing it to appear as "not running" anymore.  In this
333  * case we must advance latestCompletedXid.  (This is essentially the same
334  * as ProcArrayEndTransaction followed by removal of the PGPROC, but we take
335  * the ProcArrayLock only once, and don't damage the content of the PGPROC;
336  * twophase.c depends on the latter.)
337  */
338 void
ProcArrayRemove(PGPROC * proc,TransactionId latestXid)339 ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
340 {
341 	ProcArrayStruct *arrayP = procArray;
342 	int			index;
343 
344 #ifdef XIDCACHE_DEBUG
345 	/* dump stats at backend shutdown, but not prepared-xact end */
346 	if (proc->pid != 0)
347 		DisplayXidCache();
348 #endif
349 
350 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
351 
352 	if (TransactionIdIsValid(latestXid))
353 	{
354 		Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
355 
356 		/* Advance global latestCompletedXid while holding the lock */
357 		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
358 								  latestXid))
359 			ShmemVariableCache->latestCompletedXid = latestXid;
360 	}
361 	else
362 	{
363 		/* Shouldn't be trying to remove a live transaction here */
364 		Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
365 	}
366 
367 	for (index = 0; index < arrayP->numProcs; index++)
368 	{
369 		if (arrayP->pgprocnos[index] == proc->pgprocno)
370 		{
371 			/* Keep the PGPROC array sorted. See notes above */
372 			memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
373 					(arrayP->numProcs - index - 1) * sizeof(int));
374 			arrayP->pgprocnos[arrayP->numProcs - 1] = -1;		/* for debugging */
375 			arrayP->numProcs--;
376 			LWLockRelease(ProcArrayLock);
377 			return;
378 		}
379 	}
380 
381 	/* Ooops */
382 	LWLockRelease(ProcArrayLock);
383 
384 	elog(LOG, "failed to find proc %p in ProcArray", proc);
385 }
386 
387 
388 /*
389  * ProcArrayEndTransaction -- mark a transaction as no longer running
390  *
391  * This is used interchangeably for commit and abort cases.  The transaction
392  * commit/abort must already be reported to WAL and pg_clog.
393  *
394  * proc is currently always MyProc, but we pass it explicitly for flexibility.
395  * latestXid is the latest Xid among the transaction's main XID and
396  * subtransactions, or InvalidTransactionId if it has no XID.  (We must ask
397  * the caller to pass latestXid, instead of computing it from the PGPROC's
398  * contents, because the subxid information in the PGPROC might be
399  * incomplete.)
400  */
401 void
ProcArrayEndTransaction(PGPROC * proc,TransactionId latestXid)402 ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
403 {
404 	PGXACT	   *pgxact = &allPgXact[proc->pgprocno];
405 
406 	if (TransactionIdIsValid(latestXid))
407 	{
408 		/*
409 		 * We must lock ProcArrayLock while clearing our advertised XID, so
410 		 * that we do not exit the set of "running" transactions while someone
411 		 * else is taking a snapshot.  See discussion in
412 		 * src/backend/access/transam/README.
413 		 */
414 		Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
415 
416 		/*
417 		 * If we can immediately acquire ProcArrayLock, we clear our own XID
418 		 * and release the lock.  If not, use group XID clearing to improve
419 		 * efficiency.
420 		 */
421 		if (LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE))
422 		{
423 			ProcArrayEndTransactionInternal(proc, pgxact, latestXid);
424 			LWLockRelease(ProcArrayLock);
425 		}
426 		else
427 			ProcArrayGroupClearXid(proc, latestXid);
428 	}
429 	else
430 	{
431 		/*
432 		 * If we have no XID, we don't need to lock, since we won't affect
433 		 * anyone else's calculation of a snapshot.  We might change their
434 		 * estimate of global xmin, but that's OK.
435 		 */
436 		Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
437 
438 		proc->lxid = InvalidLocalTransactionId;
439 		pgxact->xmin = InvalidTransactionId;
440 		/* must be cleared with xid/xmin: */
441 		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
442 		pgxact->delayChkpt = false;		/* be sure this is cleared in abort */
443 		proc->recoveryConflictPending = false;
444 
445 		Assert(pgxact->nxids == 0);
446 		Assert(pgxact->overflowed == false);
447 	}
448 }
449 
450 /*
451  * Mark a write transaction as no longer running.
452  *
453  * We don't do any locking here; caller must handle that.
454  */
455 static inline void
ProcArrayEndTransactionInternal(PGPROC * proc,PGXACT * pgxact,TransactionId latestXid)456 ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
457 								TransactionId latestXid)
458 {
459 	pgxact->xid = InvalidTransactionId;
460 	proc->lxid = InvalidLocalTransactionId;
461 	pgxact->xmin = InvalidTransactionId;
462 	/* must be cleared with xid/xmin: */
463 	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
464 	pgxact->delayChkpt = false; /* be sure this is cleared in abort */
465 	proc->recoveryConflictPending = false;
466 
467 	/* Clear the subtransaction-XID cache too while holding the lock */
468 	pgxact->nxids = 0;
469 	pgxact->overflowed = false;
470 
471 	/* Also advance global latestCompletedXid while holding the lock */
472 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
473 							  latestXid))
474 		ShmemVariableCache->latestCompletedXid = latestXid;
475 }
476 
477 /*
478  * ProcArrayGroupClearXid -- group XID clearing
479  *
480  * When we cannot immediately acquire ProcArrayLock in exclusive mode at
481  * commit time, add ourselves to a list of processes that need their XIDs
482  * cleared.  The first process to add itself to the list will acquire
483  * ProcArrayLock in exclusive mode and perform ProcArrayEndTransactionInternal
484  * on behalf of all group members.  This avoids a great deal of contention
485  * around ProcArrayLock when many processes are trying to commit at once,
486  * since the lock need not be repeatedly handed off from one committing
487  * process to the next.
488  */
489 static void
ProcArrayGroupClearXid(PGPROC * proc,TransactionId latestXid)490 ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid)
491 {
492 	volatile PROC_HDR *procglobal = ProcGlobal;
493 	uint32		nextidx;
494 	uint32		wakeidx;
495 
496 	/* We should definitely have an XID to clear. */
497 	Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
498 
499 	/* Add ourselves to the list of processes needing a group XID clear. */
500 	proc->procArrayGroupMember = true;
501 	proc->procArrayGroupMemberXid = latestXid;
502 	while (true)
503 	{
504 		nextidx = pg_atomic_read_u32(&procglobal->procArrayGroupFirst);
505 		pg_atomic_write_u32(&proc->procArrayGroupNext, nextidx);
506 
507 		if (pg_atomic_compare_exchange_u32(&procglobal->procArrayGroupFirst,
508 										   &nextidx,
509 										   (uint32) proc->pgprocno))
510 			break;
511 	}
512 
513 	/*
514 	 * If the list was not empty, the leader will clear our XID.  It is
515 	 * impossible to have followers without a leader because the first process
516 	 * that has added itself to the list will always have nextidx as
517 	 * INVALID_PGPROCNO.
518 	 */
519 	if (nextidx != INVALID_PGPROCNO)
520 	{
521 		int			extraWaits = 0;
522 
523 		/* Sleep until the leader clears our XID. */
524 		for (;;)
525 		{
526 			/* acts as a read barrier */
527 			PGSemaphoreLock(&proc->sem);
528 			if (!proc->procArrayGroupMember)
529 				break;
530 			extraWaits++;
531 		}
532 
533 		Assert(pg_atomic_read_u32(&proc->procArrayGroupNext) == INVALID_PGPROCNO);
534 
535 		/* Fix semaphore count for any absorbed wakeups */
536 		while (extraWaits-- > 0)
537 			PGSemaphoreUnlock(&proc->sem);
538 		return;
539 	}
540 
541 	/* We are the leader.  Acquire the lock on behalf of everyone. */
542 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
543 
544 	/*
545 	 * Now that we've got the lock, clear the list of processes waiting for
546 	 * group XID clearing, saving a pointer to the head of the list.  Trying
547 	 * to pop elements one at a time could lead to an ABA problem.
548 	 */
549 	while (true)
550 	{
551 		nextidx = pg_atomic_read_u32(&procglobal->procArrayGroupFirst);
552 		if (pg_atomic_compare_exchange_u32(&procglobal->procArrayGroupFirst,
553 										   &nextidx,
554 										   INVALID_PGPROCNO))
555 			break;
556 	}
557 
558 	/* Remember head of list so we can perform wakeups after dropping lock. */
559 	wakeidx = nextidx;
560 
561 	/* Walk the list and clear all XIDs. */
562 	while (nextidx != INVALID_PGPROCNO)
563 	{
564 		PGPROC	   *nextproc = &allProcs[nextidx];
565 		PGXACT	   *pgxact = &allPgXact[nextidx];
566 
567 		ProcArrayEndTransactionInternal(nextproc, pgxact, nextproc->procArrayGroupMemberXid);
568 
569 		/* Move to next proc in list. */
570 		nextidx = pg_atomic_read_u32(&nextproc->procArrayGroupNext);
571 	}
572 
573 	/* We're done with the lock now. */
574 	LWLockRelease(ProcArrayLock);
575 
576 	/*
577 	 * Now that we've released the lock, go back and wake everybody up.  We
578 	 * don't do this under the lock so as to keep lock hold times to a
579 	 * minimum.  The system calls we need to perform to wake other processes
580 	 * up are probably much slower than the simple memory writes we did while
581 	 * holding the lock.
582 	 */
583 	while (wakeidx != INVALID_PGPROCNO)
584 	{
585 		PGPROC	   *nextproc = &allProcs[wakeidx];
586 
587 		wakeidx = pg_atomic_read_u32(&nextproc->procArrayGroupNext);
588 		pg_atomic_write_u32(&nextproc->procArrayGroupNext, INVALID_PGPROCNO);
589 
590 		/* ensure all previous writes are visible before follower continues. */
591 		pg_write_barrier();
592 
593 		nextproc->procArrayGroupMember = false;
594 
595 		if (nextproc != MyProc)
596 			PGSemaphoreUnlock(&nextproc->sem);
597 	}
598 }
599 
600 /*
601  * ProcArrayClearTransaction -- clear the transaction fields
602  *
603  * This is used after successfully preparing a 2-phase transaction.  We are
604  * not actually reporting the transaction's XID as no longer running --- it
605  * will still appear as running because the 2PC's gxact is in the ProcArray
606  * too.  We just have to clear out our own PGXACT.
607  */
608 void
ProcArrayClearTransaction(PGPROC * proc)609 ProcArrayClearTransaction(PGPROC *proc)
610 {
611 	PGXACT	   *pgxact = &allPgXact[proc->pgprocno];
612 
613 	/*
614 	 * We can skip locking ProcArrayLock here, because this action does not
615 	 * actually change anyone's view of the set of running XIDs: our entry is
616 	 * duplicate with the gxact that has already been inserted into the
617 	 * ProcArray.
618 	 */
619 	pgxact->xid = InvalidTransactionId;
620 	proc->lxid = InvalidLocalTransactionId;
621 	pgxact->xmin = InvalidTransactionId;
622 	proc->recoveryConflictPending = false;
623 
624 	/* redundant, but just in case */
625 	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
626 	pgxact->delayChkpt = false;
627 
628 	/* Clear the subtransaction-XID cache too */
629 	pgxact->nxids = 0;
630 	pgxact->overflowed = false;
631 }
632 
633 /*
634  * ProcArrayInitRecovery -- initialize recovery xid mgmt environment
635  *
636  * Remember up to where the startup process initialized the CLOG and subtrans
637  * so we can ensure it's initialized gaplessly up to the point where necessary
638  * while in recovery.
639  */
640 void
ProcArrayInitRecovery(TransactionId initializedUptoXID)641 ProcArrayInitRecovery(TransactionId initializedUptoXID)
642 {
643 	Assert(standbyState == STANDBY_INITIALIZED);
644 	Assert(TransactionIdIsNormal(initializedUptoXID));
645 
646 	/*
647 	 * we set latestObservedXid to the xid SUBTRANS has been initialized up
648 	 * to, so we can extend it from that point onwards in
649 	 * RecordKnownAssignedTransactionIds, and when we get consistent in
650 	 * ProcArrayApplyRecoveryInfo().
651 	 */
652 	latestObservedXid = initializedUptoXID;
653 	TransactionIdRetreat(latestObservedXid);
654 }
655 
656 /*
657  * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
658  *
659  * Takes us through 3 states: Initialized, Pending and Ready.
660  * Normal case is to go all the way to Ready straight away, though there
661  * are atypical cases where we need to take it in steps.
662  *
663  * Use the data about running transactions on master to create the initial
664  * state of KnownAssignedXids. We also use these records to regularly prune
665  * KnownAssignedXids because we know it is possible that some transactions
666  * with FATAL errors fail to write abort records, which could cause eventual
667  * overflow.
668  *
669  * See comments for LogStandbySnapshot().
670  */
671 void
ProcArrayApplyRecoveryInfo(RunningTransactions running)672 ProcArrayApplyRecoveryInfo(RunningTransactions running)
673 {
674 	TransactionId *xids;
675 	int			nxids;
676 	TransactionId nextXid;
677 	int			i;
678 
679 	Assert(standbyState >= STANDBY_INITIALIZED);
680 	Assert(TransactionIdIsValid(running->nextXid));
681 	Assert(TransactionIdIsValid(running->oldestRunningXid));
682 	Assert(TransactionIdIsNormal(running->latestCompletedXid));
683 
684 	/*
685 	 * Remove stale transactions, if any.
686 	 */
687 	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
688 
689 	/*
690 	 * Remove stale locks, if any.
691 	 *
692 	 * Locks are always assigned to the toplevel xid so we don't need to care
693 	 * about subxcnt/subxids (and by extension not about ->suboverflowed).
694 	 */
695 	StandbyReleaseOldLocks(running->xcnt, running->xids);
696 
697 	/*
698 	 * If our snapshot is already valid, nothing else to do...
699 	 */
700 	if (standbyState == STANDBY_SNAPSHOT_READY)
701 		return;
702 
703 	/*
704 	 * If our initial RunningTransactionsData had an overflowed snapshot then
705 	 * we knew we were missing some subxids from our snapshot. If we continue
706 	 * to see overflowed snapshots then we might never be able to start up, so
707 	 * we make another test to see if our snapshot is now valid. We know that
708 	 * the missing subxids are equal to or earlier than nextXid. After we
709 	 * initialise we continue to apply changes during recovery, so once the
710 	 * oldestRunningXid is later than the nextXid from the initial snapshot we
711 	 * know that we no longer have missing information and can mark the
712 	 * snapshot as valid.
713 	 */
714 	if (standbyState == STANDBY_SNAPSHOT_PENDING)
715 	{
716 		/*
717 		 * If the snapshot isn't overflowed or if its empty we can reset our
718 		 * pending state and use this snapshot instead.
719 		 */
720 		if (!running->subxid_overflow || running->xcnt == 0)
721 		{
722 			/*
723 			 * If we have already collected known assigned xids, we need to
724 			 * throw them away before we apply the recovery snapshot.
725 			 */
726 			KnownAssignedXidsReset();
727 			standbyState = STANDBY_INITIALIZED;
728 		}
729 		else
730 		{
731 			if (TransactionIdPrecedes(standbySnapshotPendingXmin,
732 									  running->oldestRunningXid))
733 			{
734 				standbyState = STANDBY_SNAPSHOT_READY;
735 				elog(trace_recovery(DEBUG1),
736 					 "recovery snapshots are now enabled");
737 			}
738 			else
739 				elog(trace_recovery(DEBUG1),
740 				  "recovery snapshot waiting for non-overflowed snapshot or "
741 				"until oldest active xid on standby is at least %u (now %u)",
742 					 standbySnapshotPendingXmin,
743 					 running->oldestRunningXid);
744 			return;
745 		}
746 	}
747 
748 	Assert(standbyState == STANDBY_INITIALIZED);
749 
750 	/*
751 	 * OK, we need to initialise from the RunningTransactionsData record.
752 	 *
753 	 * NB: this can be reached at least twice, so make sure new code can deal
754 	 * with that.
755 	 */
756 
757 	/*
758 	 * Nobody else is running yet, but take locks anyhow
759 	 */
760 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
761 
762 	/*
763 	 * KnownAssignedXids is sorted so we cannot just add the xids, we have to
764 	 * sort them first.
765 	 *
766 	 * Some of the new xids are top-level xids and some are subtransactions.
767 	 * We don't call SubtransSetParent because it doesn't matter yet. If we
768 	 * aren't overflowed then all xids will fit in snapshot and so we don't
769 	 * need subtrans. If we later overflow, an xid assignment record will add
770 	 * xids to subtrans. If RunningXacts is overflowed then we don't have
771 	 * enough information to correctly update subtrans anyway.
772 	 */
773 
774 	/*
775 	 * Allocate a temporary array to avoid modifying the array passed as
776 	 * argument.
777 	 */
778 	xids = palloc(sizeof(TransactionId) * (running->xcnt + running->subxcnt));
779 
780 	/*
781 	 * Add to the temp array any xids which have not already completed.
782 	 */
783 	nxids = 0;
784 	for (i = 0; i < running->xcnt + running->subxcnt; i++)
785 	{
786 		TransactionId xid = running->xids[i];
787 
788 		/*
789 		 * The running-xacts snapshot can contain xids that were still visible
790 		 * in the procarray when the snapshot was taken, but were already
791 		 * WAL-logged as completed. They're not running anymore, so ignore
792 		 * them.
793 		 */
794 		if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
795 			continue;
796 
797 		xids[nxids++] = xid;
798 	}
799 
800 	if (nxids > 0)
801 	{
802 		if (procArray->numKnownAssignedXids != 0)
803 		{
804 			LWLockRelease(ProcArrayLock);
805 			elog(ERROR, "KnownAssignedXids is not empty");
806 		}
807 
808 		/*
809 		 * Sort the array so that we can add them safely into
810 		 * KnownAssignedXids.
811 		 */
812 		qsort(xids, nxids, sizeof(TransactionId), xidComparator);
813 
814 		/*
815 		 * Add the sorted snapshot into KnownAssignedXids.  The running-xacts
816 		 * snapshot may include duplicated xids because of prepared
817 		 * transactions, so ignore them.
818 		 */
819 		for (i = 0; i < nxids; i++)
820 		{
821 			if (i > 0 && TransactionIdEquals(xids[i - 1], xids[i]))
822 			{
823 				elog(DEBUG1,
824 					 "found duplicated transaction %u for KnownAssignedXids insertion",
825 					 xids[i]);
826 				continue;
827 			}
828 			KnownAssignedXidsAdd(xids[i], xids[i], true);
829 		}
830 
831 		KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
832 	}
833 
834 	pfree(xids);
835 
836 	/*
837 	 * latestObservedXid is at least set to the point where SUBTRANS was
838 	 * started up to (c.f. ProcArrayInitRecovery()) or to the biggest xid
839 	 * RecordKnownAssignedTransactionIds() was called for.  Initialize
840 	 * subtrans from thereon, up to nextXid - 1.
841 	 *
842 	 * We need to duplicate parts of RecordKnownAssignedTransactionId() here,
843 	 * because we've just added xids to the known assigned xids machinery that
844 	 * haven't gone through RecordKnownAssignedTransactionId().
845 	 */
846 	Assert(TransactionIdIsNormal(latestObservedXid));
847 	TransactionIdAdvance(latestObservedXid);
848 	while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
849 	{
850 		ExtendSUBTRANS(latestObservedXid);
851 		TransactionIdAdvance(latestObservedXid);
852 	}
853 	TransactionIdRetreat(latestObservedXid);	/* = running->nextXid - 1 */
854 
855 	/* ----------
856 	 * Now we've got the running xids we need to set the global values that
857 	 * are used to track snapshots as they evolve further.
858 	 *
859 	 * - latestCompletedXid which will be the xmax for snapshots
860 	 * - lastOverflowedXid which shows whether snapshots overflow
861 	 * - nextXid
862 	 *
863 	 * If the snapshot overflowed, then we still initialise with what we know,
864 	 * but the recovery snapshot isn't fully valid yet because we know there
865 	 * are some subxids missing. We don't know the specific subxids that are
866 	 * missing, so conservatively assume the last one is latestObservedXid.
867 	 * ----------
868 	 */
869 	if (running->subxid_overflow)
870 	{
871 		standbyState = STANDBY_SNAPSHOT_PENDING;
872 
873 		standbySnapshotPendingXmin = latestObservedXid;
874 		procArray->lastOverflowedXid = latestObservedXid;
875 	}
876 	else
877 	{
878 		standbyState = STANDBY_SNAPSHOT_READY;
879 
880 		standbySnapshotPendingXmin = InvalidTransactionId;
881 	}
882 
883 	/*
884 	 * If a transaction wrote a commit record in the gap between taking and
885 	 * logging the snapshot then latestCompletedXid may already be higher than
886 	 * the value from the snapshot, so check before we use the incoming value.
887 	 */
888 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
889 							  running->latestCompletedXid))
890 		ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
891 
892 	Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));
893 
894 	LWLockRelease(ProcArrayLock);
895 
896 	/*
897 	 * ShmemVariableCache->nextXid must be beyond any observed xid.
898 	 *
899 	 * We don't expect anyone else to modify nextXid, hence we don't need to
900 	 * hold a lock while examining it.  We still acquire the lock to modify
901 	 * it, though.
902 	 */
903 	nextXid = latestObservedXid;
904 	TransactionIdAdvance(nextXid);
905 	if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
906 	{
907 		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
908 		ShmemVariableCache->nextXid = nextXid;
909 		LWLockRelease(XidGenLock);
910 	}
911 
912 	Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));
913 
914 	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
915 	if (standbyState == STANDBY_SNAPSHOT_READY)
916 		elog(trace_recovery(DEBUG1), "recovery snapshots are now enabled");
917 	else
918 		elog(trace_recovery(DEBUG1),
919 			 "recovery snapshot waiting for non-overflowed snapshot or "
920 			 "until oldest active xid on standby is at least %u (now %u)",
921 			 standbySnapshotPendingXmin,
922 			 running->oldestRunningXid);
923 }
924 
925 /*
926  * ProcArrayApplyXidAssignment
927  *		Process an XLOG_XACT_ASSIGNMENT WAL record
928  */
929 void
ProcArrayApplyXidAssignment(TransactionId topxid,int nsubxids,TransactionId * subxids)930 ProcArrayApplyXidAssignment(TransactionId topxid,
931 							int nsubxids, TransactionId *subxids)
932 {
933 	TransactionId max_xid;
934 	int			i;
935 
936 	Assert(standbyState >= STANDBY_INITIALIZED);
937 
938 	max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
939 
940 	/*
941 	 * Mark all the subtransactions as observed.
942 	 *
943 	 * NOTE: This will fail if the subxid contains too many previously
944 	 * unobserved xids to fit into known-assigned-xids. That shouldn't happen
945 	 * as the code stands, because xid-assignment records should never contain
946 	 * more than PGPROC_MAX_CACHED_SUBXIDS entries.
947 	 */
948 	RecordKnownAssignedTransactionIds(max_xid);
949 
950 	/*
951 	 * Notice that we update pg_subtrans with the top-level xid, rather than
952 	 * the parent xid. This is a difference between normal processing and
953 	 * recovery, yet is still correct in all cases. The reason is that
954 	 * subtransaction commit is not marked in clog until commit processing, so
955 	 * all aborted subtransactions have already been clearly marked in clog.
956 	 * As a result we are able to refer directly to the top-level
957 	 * transaction's state rather than skipping through all the intermediate
958 	 * states in the subtransaction tree. This should be the first time we
959 	 * have attempted to SubTransSetParent().
960 	 */
961 	for (i = 0; i < nsubxids; i++)
962 		SubTransSetParent(subxids[i], topxid, false);
963 
964 	/* KnownAssignedXids isn't maintained yet, so we're done for now */
965 	if (standbyState == STANDBY_INITIALIZED)
966 		return;
967 
968 	/*
969 	 * Uses same locking as transaction commit
970 	 */
971 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
972 
973 	/*
974 	 * Remove subxids from known-assigned-xacts.
975 	 */
976 	KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
977 
978 	/*
979 	 * Advance lastOverflowedXid to be at least the last of these subxids.
980 	 */
981 	if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
982 		procArray->lastOverflowedXid = max_xid;
983 
984 	LWLockRelease(ProcArrayLock);
985 }
986 
987 /*
988  * TransactionIdIsInProgress -- is given transaction running in some backend
989  *
990  * Aside from some shortcuts such as checking RecentXmin and our own Xid,
991  * there are four possibilities for finding a running transaction:
992  *
993  * 1. The given Xid is a main transaction Id.  We will find this out cheaply
994  * by looking at the PGXACT struct for each backend.
995  *
996  * 2. The given Xid is one of the cached subxact Xids in the PGPROC array.
997  * We can find this out cheaply too.
998  *
999  * 3. In Hot Standby mode, we must search the KnownAssignedXids list to see
1000  * if the Xid is running on the master.
1001  *
1002  * 4. Search the SubTrans tree to find the Xid's topmost parent, and then see
1003  * if that is running according to PGXACT or KnownAssignedXids.  This is the
1004  * slowest way, but sadly it has to be done always if the others failed,
1005  * unless we see that the cached subxact sets are complete (none have
1006  * overflowed).
1007  *
1008  * ProcArrayLock has to be held while we do 1, 2, 3.  If we save the top Xids
1009  * while doing 1 and 3, we can release the ProcArrayLock while we do 4.
1010  * This buys back some concurrency (and we can't retrieve the main Xids from
1011  * PGXACT again anyway; see GetNewTransactionId).
1012  */
1013 bool
TransactionIdIsInProgress(TransactionId xid)1014 TransactionIdIsInProgress(TransactionId xid)
1015 {
1016 	static TransactionId *xids = NULL;
1017 	int			nxids = 0;
1018 	ProcArrayStruct *arrayP = procArray;
1019 	TransactionId topxid;
1020 	int			i,
1021 				j;
1022 
1023 	/*
1024 	 * Don't bother checking a transaction older than RecentXmin; it could not
1025 	 * possibly still be running.  (Note: in particular, this guarantees that
1026 	 * we reject InvalidTransactionId, FrozenTransactionId, etc as not
1027 	 * running.)
1028 	 */
1029 	if (TransactionIdPrecedes(xid, RecentXmin))
1030 	{
1031 		xc_by_recent_xmin_inc();
1032 		return false;
1033 	}
1034 
1035 	/*
1036 	 * We may have just checked the status of this transaction, so if it is
1037 	 * already known to be completed, we can fall out without any access to
1038 	 * shared memory.
1039 	 */
1040 	if (TransactionIdIsKnownCompleted(xid))
1041 	{
1042 		xc_by_known_xact_inc();
1043 		return false;
1044 	}
1045 
1046 	/*
1047 	 * Also, we can handle our own transaction (and subtransactions) without
1048 	 * any access to shared memory.
1049 	 */
1050 	if (TransactionIdIsCurrentTransactionId(xid))
1051 	{
1052 		xc_by_my_xact_inc();
1053 		return true;
1054 	}
1055 
1056 	/*
1057 	 * If first time through, get workspace to remember main XIDs in. We
1058 	 * malloc it permanently to avoid repeated palloc/pfree overhead.
1059 	 */
1060 	if (xids == NULL)
1061 	{
1062 		/*
1063 		 * In hot standby mode, reserve enough space to hold all xids in the
1064 		 * known-assigned list. If we later finish recovery, we no longer need
1065 		 * the bigger array, but we don't bother to shrink it.
1066 		 */
1067 		int			maxxids = RecoveryInProgress() ? TOTAL_MAX_CACHED_SUBXIDS : arrayP->maxProcs;
1068 
1069 		xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
1070 		if (xids == NULL)
1071 			ereport(ERROR,
1072 					(errcode(ERRCODE_OUT_OF_MEMORY),
1073 					 errmsg("out of memory")));
1074 	}
1075 
1076 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1077 
1078 	/*
1079 	 * Now that we have the lock, we can check latestCompletedXid; if the
1080 	 * target Xid is after that, it's surely still running.
1081 	 */
1082 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
1083 	{
1084 		LWLockRelease(ProcArrayLock);
1085 		xc_by_latest_xid_inc();
1086 		return true;
1087 	}
1088 
1089 	/* No shortcuts, gotta grovel through the array */
1090 	for (i = 0; i < arrayP->numProcs; i++)
1091 	{
1092 		int			pgprocno = arrayP->pgprocnos[i];
1093 		volatile PGPROC *proc = &allProcs[pgprocno];
1094 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1095 		TransactionId pxid;
1096 
1097 		/* Ignore my own proc --- dealt with it above */
1098 		if (proc == MyProc)
1099 			continue;
1100 
1101 		/* Fetch xid just once - see GetNewTransactionId */
1102 		pxid = pgxact->xid;
1103 
1104 		if (!TransactionIdIsValid(pxid))
1105 			continue;
1106 
1107 		/*
1108 		 * Step 1: check the main Xid
1109 		 */
1110 		if (TransactionIdEquals(pxid, xid))
1111 		{
1112 			LWLockRelease(ProcArrayLock);
1113 			xc_by_main_xid_inc();
1114 			return true;
1115 		}
1116 
1117 		/*
1118 		 * We can ignore main Xids that are younger than the target Xid, since
1119 		 * the target could not possibly be their child.
1120 		 */
1121 		if (TransactionIdPrecedes(xid, pxid))
1122 			continue;
1123 
1124 		/*
1125 		 * Step 2: check the cached child-Xids arrays
1126 		 */
1127 		for (j = pgxact->nxids - 1; j >= 0; j--)
1128 		{
1129 			/* Fetch xid just once - see GetNewTransactionId */
1130 			TransactionId cxid = proc->subxids.xids[j];
1131 
1132 			if (TransactionIdEquals(cxid, xid))
1133 			{
1134 				LWLockRelease(ProcArrayLock);
1135 				xc_by_child_xid_inc();
1136 				return true;
1137 			}
1138 		}
1139 
1140 		/*
1141 		 * Save the main Xid for step 4.  We only need to remember main Xids
1142 		 * that have uncached children.  (Note: there is no race condition
1143 		 * here because the overflowed flag cannot be cleared, only set, while
1144 		 * we hold ProcArrayLock.  So we can't miss an Xid that we need to
1145 		 * worry about.)
1146 		 */
1147 		if (pgxact->overflowed)
1148 			xids[nxids++] = pxid;
1149 	}
1150 
1151 	/*
1152 	 * Step 3: in hot standby mode, check the known-assigned-xids list.  XIDs
1153 	 * in the list must be treated as running.
1154 	 */
1155 	if (RecoveryInProgress())
1156 	{
1157 		/* none of the PGXACT entries should have XIDs in hot standby mode */
1158 		Assert(nxids == 0);
1159 
1160 		if (KnownAssignedXidExists(xid))
1161 		{
1162 			LWLockRelease(ProcArrayLock);
1163 			xc_by_known_assigned_inc();
1164 			return true;
1165 		}
1166 
1167 		/*
1168 		 * If the KnownAssignedXids overflowed, we have to check pg_subtrans
1169 		 * too.  Fetch all xids from KnownAssignedXids that are lower than
1170 		 * xid, since if xid is a subtransaction its parent will always have a
1171 		 * lower value.  Note we will collect both main and subXIDs here, but
1172 		 * there's no help for it.
1173 		 */
1174 		if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
1175 			nxids = KnownAssignedXidsGet(xids, xid);
1176 	}
1177 
1178 	LWLockRelease(ProcArrayLock);
1179 
1180 	/*
1181 	 * If none of the relevant caches overflowed, we know the Xid is not
1182 	 * running without even looking at pg_subtrans.
1183 	 */
1184 	if (nxids == 0)
1185 	{
1186 		xc_no_overflow_inc();
1187 		return false;
1188 	}
1189 
1190 	/*
1191 	 * Step 4: have to check pg_subtrans.
1192 	 *
1193 	 * At this point, we know it's either a subtransaction of one of the Xids
1194 	 * in xids[], or it's not running.  If it's an already-failed
1195 	 * subtransaction, we want to say "not running" even though its parent may
1196 	 * still be running.  So first, check pg_clog to see if it's been aborted.
1197 	 */
1198 	xc_slow_answer_inc();
1199 
1200 	if (TransactionIdDidAbort(xid))
1201 		return false;
1202 
1203 	/*
1204 	 * It isn't aborted, so check whether the transaction tree it belongs to
1205 	 * is still running (or, more precisely, whether it was running when we
1206 	 * held ProcArrayLock).
1207 	 */
1208 	topxid = SubTransGetTopmostTransaction(xid);
1209 	Assert(TransactionIdIsValid(topxid));
1210 	if (!TransactionIdEquals(topxid, xid))
1211 	{
1212 		for (i = 0; i < nxids; i++)
1213 		{
1214 			if (TransactionIdEquals(xids[i], topxid))
1215 				return true;
1216 		}
1217 	}
1218 
1219 	return false;
1220 }
1221 
1222 /*
1223  * TransactionIdIsActive -- is xid the top-level XID of an active backend?
1224  *
1225  * This differs from TransactionIdIsInProgress in that it ignores prepared
1226  * transactions, as well as transactions running on the master if we're in
1227  * hot standby.  Also, we ignore subtransactions since that's not needed
1228  * for current uses.
1229  */
1230 bool
TransactionIdIsActive(TransactionId xid)1231 TransactionIdIsActive(TransactionId xid)
1232 {
1233 	bool		result = false;
1234 	ProcArrayStruct *arrayP = procArray;
1235 	int			i;
1236 
1237 	/*
1238 	 * Don't bother checking a transaction older than RecentXmin; it could not
1239 	 * possibly still be running.
1240 	 */
1241 	if (TransactionIdPrecedes(xid, RecentXmin))
1242 		return false;
1243 
1244 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1245 
1246 	for (i = 0; i < arrayP->numProcs; i++)
1247 	{
1248 		int			pgprocno = arrayP->pgprocnos[i];
1249 		volatile PGPROC *proc = &allProcs[pgprocno];
1250 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1251 		TransactionId pxid;
1252 
1253 		/* Fetch xid just once - see GetNewTransactionId */
1254 		pxid = pgxact->xid;
1255 
1256 		if (!TransactionIdIsValid(pxid))
1257 			continue;
1258 
1259 		if (proc->pid == 0)
1260 			continue;			/* ignore prepared transactions */
1261 
1262 		if (TransactionIdEquals(pxid, xid))
1263 		{
1264 			result = true;
1265 			break;
1266 		}
1267 	}
1268 
1269 	LWLockRelease(ProcArrayLock);
1270 
1271 	return result;
1272 }
1273 
1274 
1275 /*
1276  * GetOldestXmin -- returns oldest transaction that was running
1277  *					when any current transaction was started.
1278  *
1279  * If rel is NULL or a shared relation, all backends are considered, otherwise
1280  * only backends running in this database are considered.
1281  *
1282  * If ignoreVacuum is TRUE then backends with the PROC_IN_VACUUM flag set are
1283  * ignored.
1284  *
1285  * This is used by VACUUM to decide which deleted tuples must be preserved in
1286  * the passed in table. For shared relations backends in all databases must be
1287  * considered, but for non-shared relations that's not required, since only
1288  * backends in my own database could ever see the tuples in them. Also, we can
1289  * ignore concurrently running lazy VACUUMs because (a) they must be working
1290  * on other tables, and (b) they don't need to do snapshot-based lookups.
1291  *
1292  * This is also used to determine where to truncate pg_subtrans.  For that
1293  * backends in all databases have to be considered, so rel = NULL has to be
1294  * passed in.
1295  *
1296  * Note: we include all currently running xids in the set of considered xids.
1297  * This ensures that if a just-started xact has not yet set its snapshot,
1298  * when it does set the snapshot it cannot set xmin less than what we compute.
1299  * See notes in src/backend/access/transam/README.
1300  *
1301  * Note: despite the above, it's possible for the calculated value to move
1302  * backwards on repeated calls. The calculated value is conservative, so that
1303  * anything older is definitely not considered as running by anyone anymore,
1304  * but the exact value calculated depends on a number of things. For example,
1305  * if rel = NULL and there are no transactions running in the current
1306  * database, GetOldestXmin() returns latestCompletedXid. If a transaction
1307  * begins after that, its xmin will include in-progress transactions in other
1308  * databases that started earlier, so another call will return a lower value.
1309  * Nonetheless it is safe to vacuum a table in the current database with the
1310  * first result.  There are also replication-related effects: a walsender
1311  * process can set its xmin based on transactions that are no longer running
1312  * in the master but are still being replayed on the standby, thus possibly
1313  * making the GetOldestXmin reading go backwards.  In this case there is a
1314  * possibility that we lose data that the standby would like to have, but
1315  * there is little we can do about that --- data is only protected if the
1316  * walsender runs continuously while queries are executed on the standby.
1317  * (The Hot Standby code deals with such cases by failing standby queries
1318  * that needed to access already-removed data, so there's no integrity bug.)
1319  * The return value is also adjusted with vacuum_defer_cleanup_age, so
1320  * increasing that setting on the fly is another easy way to make
1321  * GetOldestXmin() move backwards, with no consequences for data integrity.
1322  */
1323 TransactionId
GetOldestXmin(Relation rel,bool ignoreVacuum)1324 GetOldestXmin(Relation rel, bool ignoreVacuum)
1325 {
1326 	ProcArrayStruct *arrayP = procArray;
1327 	TransactionId result;
1328 	int			index;
1329 	bool		allDbs;
1330 
1331 	volatile TransactionId replication_slot_xmin = InvalidTransactionId;
1332 	volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1333 
1334 	/*
1335 	 * If we're not computing a relation specific limit, or if a shared
1336 	 * relation has been passed in, backends in all databases have to be
1337 	 * considered.
1338 	 */
1339 	allDbs = rel == NULL || rel->rd_rel->relisshared;
1340 
1341 	/* Cannot look for individual databases during recovery */
1342 	Assert(allDbs || !RecoveryInProgress());
1343 
1344 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1345 
1346 	/*
1347 	 * We initialize the MIN() calculation with latestCompletedXid + 1. This
1348 	 * is a lower bound for the XIDs that might appear in the ProcArray later,
1349 	 * and so protects us against overestimating the result due to future
1350 	 * additions.
1351 	 */
1352 	result = ShmemVariableCache->latestCompletedXid;
1353 	Assert(TransactionIdIsNormal(result));
1354 	TransactionIdAdvance(result);
1355 
1356 	for (index = 0; index < arrayP->numProcs; index++)
1357 	{
1358 		int			pgprocno = arrayP->pgprocnos[index];
1359 		volatile PGPROC *proc = &allProcs[pgprocno];
1360 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1361 
1362 		/*
1363 		 * Backend is doing logical decoding which manages xmin separately,
1364 		 * check below.
1365 		 */
1366 		if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
1367 			continue;
1368 
1369 		if (ignoreVacuum && (pgxact->vacuumFlags & PROC_IN_VACUUM))
1370 			continue;
1371 
1372 		if (allDbs ||
1373 			proc->databaseId == MyDatabaseId ||
1374 			proc->databaseId == 0)		/* always include WalSender */
1375 		{
1376 			/* Fetch xid just once - see GetNewTransactionId */
1377 			TransactionId xid = pgxact->xid;
1378 
1379 			/* First consider the transaction's own Xid, if any */
1380 			if (TransactionIdIsNormal(xid) &&
1381 				TransactionIdPrecedes(xid, result))
1382 				result = xid;
1383 
1384 			/*
1385 			 * Also consider the transaction's Xmin, if set.
1386 			 *
1387 			 * We must check both Xid and Xmin because a transaction might
1388 			 * have an Xmin but not (yet) an Xid; conversely, if it has an
1389 			 * Xid, that could determine some not-yet-set Xmin.
1390 			 */
1391 			xid = pgxact->xmin; /* Fetch just once */
1392 			if (TransactionIdIsNormal(xid) &&
1393 				TransactionIdPrecedes(xid, result))
1394 				result = xid;
1395 		}
1396 	}
1397 
1398 	/* fetch into volatile var while ProcArrayLock is held */
1399 	replication_slot_xmin = procArray->replication_slot_xmin;
1400 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1401 
1402 	if (RecoveryInProgress())
1403 	{
1404 		/*
1405 		 * Check to see whether KnownAssignedXids contains an xid value older
1406 		 * than the main procarray.
1407 		 */
1408 		TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
1409 
1410 		LWLockRelease(ProcArrayLock);
1411 
1412 		if (TransactionIdIsNormal(kaxmin) &&
1413 			TransactionIdPrecedes(kaxmin, result))
1414 			result = kaxmin;
1415 	}
1416 	else
1417 	{
1418 		/*
1419 		 * No other information needed, so release the lock immediately.
1420 		 */
1421 		LWLockRelease(ProcArrayLock);
1422 
1423 		/*
1424 		 * Compute the cutoff XID by subtracting vacuum_defer_cleanup_age,
1425 		 * being careful not to generate a "permanent" XID.
1426 		 *
1427 		 * vacuum_defer_cleanup_age provides some additional "slop" for the
1428 		 * benefit of hot standby queries on slave servers.  This is quick and
1429 		 * dirty, and perhaps not all that useful unless the master has a
1430 		 * predictable transaction rate, but it offers some protection when
1431 		 * there's no walsender connection.  Note that we are assuming
1432 		 * vacuum_defer_cleanup_age isn't large enough to cause wraparound ---
1433 		 * so guc.c should limit it to no more than the xidStopLimit threshold
1434 		 * in varsup.c.  Also note that we intentionally don't apply
1435 		 * vacuum_defer_cleanup_age on standby servers.
1436 		 */
1437 		result -= vacuum_defer_cleanup_age;
1438 		if (!TransactionIdIsNormal(result))
1439 			result = FirstNormalTransactionId;
1440 	}
1441 
1442 	/*
1443 	 * Check whether there are replication slots requiring an older xmin.
1444 	 */
1445 	if (TransactionIdIsValid(replication_slot_xmin) &&
1446 		NormalTransactionIdPrecedes(replication_slot_xmin, result))
1447 		result = replication_slot_xmin;
1448 
1449 	/*
1450 	 * After locks have been released and defer_cleanup_age has been applied,
1451 	 * check whether we need to back up further to make logical decoding
1452 	 * possible. We need to do so if we're computing the global limit (rel =
1453 	 * NULL) or if the passed relation is a catalog relation of some kind.
1454 	 */
1455 	if ((rel == NULL ||
1456 		 RelationIsAccessibleInLogicalDecoding(rel)) &&
1457 		TransactionIdIsValid(replication_slot_catalog_xmin) &&
1458 		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
1459 		result = replication_slot_catalog_xmin;
1460 
1461 	return result;
1462 }
1463 
1464 /*
1465  * GetMaxSnapshotXidCount -- get max size for snapshot XID array
1466  *
1467  * We have to export this for use by snapmgr.c.
1468  */
1469 int
GetMaxSnapshotXidCount(void)1470 GetMaxSnapshotXidCount(void)
1471 {
1472 	return procArray->maxProcs;
1473 }
1474 
1475 /*
1476  * GetMaxSnapshotSubxidCount -- get max size for snapshot sub-XID array
1477  *
1478  * We have to export this for use by snapmgr.c.
1479  */
1480 int
GetMaxSnapshotSubxidCount(void)1481 GetMaxSnapshotSubxidCount(void)
1482 {
1483 	return TOTAL_MAX_CACHED_SUBXIDS;
1484 }
1485 
1486 /*
1487  * GetSnapshotData -- returns information about running transactions.
1488  *
1489  * The returned snapshot includes xmin (lowest still-running xact ID),
1490  * xmax (highest completed xact ID + 1), and a list of running xact IDs
1491  * in the range xmin <= xid < xmax.  It is used as follows:
1492  *		All xact IDs < xmin are considered finished.
1493  *		All xact IDs >= xmax are considered still running.
1494  *		For an xact ID xmin <= xid < xmax, consult list to see whether
1495  *		it is considered running or not.
1496  * This ensures that the set of transactions seen as "running" by the
1497  * current xact will not change after it takes the snapshot.
1498  *
1499  * All running top-level XIDs are included in the snapshot, except for lazy
1500  * VACUUM processes.  We also try to include running subtransaction XIDs,
1501  * but since PGPROC has only a limited cache area for subxact XIDs, full
1502  * information may not be available.  If we find any overflowed subxid arrays,
1503  * we have to mark the snapshot's subxid data as overflowed, and extra work
1504  * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
1505  * in tqual.c).
1506  *
1507  * We also update the following backend-global variables:
1508  *		TransactionXmin: the oldest xmin of any snapshot in use in the
1509  *			current transaction (this is the same as MyPgXact->xmin).
1510  *		RecentXmin: the xmin computed for the most recent snapshot.  XIDs
1511  *			older than this are known not running any more.
1512  *		RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
1513  *			running transactions, except those running LAZY VACUUM).  This is
1514  *			the same computation done by GetOldestXmin(true, true).
1515  *		RecentGlobalDataXmin: the global xmin for non-catalog tables
1516  *			>= RecentGlobalXmin
1517  *
1518  * Note: this function should probably not be called with an argument that's
1519  * not statically allocated (see xip allocation below).
1520  */
1521 Snapshot
GetSnapshotData(Snapshot snapshot)1522 GetSnapshotData(Snapshot snapshot)
1523 {
1524 	ProcArrayStruct *arrayP = procArray;
1525 	TransactionId xmin;
1526 	TransactionId xmax;
1527 	TransactionId globalxmin;
1528 	int			index;
1529 	int			count = 0;
1530 	int			subcount = 0;
1531 	bool		suboverflowed = false;
1532 	volatile TransactionId replication_slot_xmin = InvalidTransactionId;
1533 	volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1534 
1535 	Assert(snapshot != NULL);
1536 
1537 	/*
1538 	 * Allocating space for maxProcs xids is usually overkill; numProcs would
1539 	 * be sufficient.  But it seems better to do the malloc while not holding
1540 	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
1541 	 * more subxip storage than is probably needed.
1542 	 *
1543 	 * This does open a possibility for avoiding repeated malloc/free: since
1544 	 * maxProcs does not change at runtime, we can simply reuse the previous
1545 	 * xip arrays if any.  (This relies on the fact that all callers pass
1546 	 * static SnapshotData structs.)
1547 	 */
1548 	if (snapshot->xip == NULL)
1549 	{
1550 		/*
1551 		 * First call for this snapshot. Snapshot is same size whether or not
1552 		 * we are in recovery, see later comments.
1553 		 */
1554 		snapshot->xip = (TransactionId *)
1555 			malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
1556 		if (snapshot->xip == NULL)
1557 			ereport(ERROR,
1558 					(errcode(ERRCODE_OUT_OF_MEMORY),
1559 					 errmsg("out of memory")));
1560 		Assert(snapshot->subxip == NULL);
1561 		snapshot->subxip = (TransactionId *)
1562 			malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
1563 		if (snapshot->subxip == NULL)
1564 			ereport(ERROR,
1565 					(errcode(ERRCODE_OUT_OF_MEMORY),
1566 					 errmsg("out of memory")));
1567 	}
1568 
1569 	/*
1570 	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
1571 	 * going to set MyPgXact->xmin.
1572 	 */
1573 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1574 
1575 	/* xmax is always latestCompletedXid + 1 */
1576 	xmax = ShmemVariableCache->latestCompletedXid;
1577 	Assert(TransactionIdIsNormal(xmax));
1578 	TransactionIdAdvance(xmax);
1579 
1580 	/* initialize xmin calculation with xmax */
1581 	globalxmin = xmin = xmax;
1582 
1583 	snapshot->takenDuringRecovery = RecoveryInProgress();
1584 
1585 	if (!snapshot->takenDuringRecovery)
1586 	{
1587 		int		   *pgprocnos = arrayP->pgprocnos;
1588 		int			numProcs;
1589 
1590 		/*
1591 		 * Spin over procArray checking xid, xmin, and subxids.  The goal is
1592 		 * to gather all active xids, find the lowest xmin, and try to record
1593 		 * subxids.
1594 		 */
1595 		numProcs = arrayP->numProcs;
1596 		for (index = 0; index < numProcs; index++)
1597 		{
1598 			int			pgprocno = pgprocnos[index];
1599 			volatile PGXACT *pgxact = &allPgXact[pgprocno];
1600 			TransactionId xid;
1601 
1602 			/*
1603 			 * Backend is doing logical decoding which manages xmin
1604 			 * separately, check below.
1605 			 */
1606 			if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
1607 				continue;
1608 
1609 			/* Ignore procs running LAZY VACUUM */
1610 			if (pgxact->vacuumFlags & PROC_IN_VACUUM)
1611 				continue;
1612 
1613 			/* Update globalxmin to be the smallest valid xmin */
1614 			xid = pgxact->xmin; /* fetch just once */
1615 			if (TransactionIdIsNormal(xid) &&
1616 				NormalTransactionIdPrecedes(xid, globalxmin))
1617 				globalxmin = xid;
1618 
1619 			/* Fetch xid just once - see GetNewTransactionId */
1620 			xid = pgxact->xid;
1621 
1622 			/*
1623 			 * If the transaction has no XID assigned, we can skip it; it
1624 			 * won't have sub-XIDs either.  If the XID is >= xmax, we can also
1625 			 * skip it; such transactions will be treated as running anyway
1626 			 * (and any sub-XIDs will also be >= xmax).
1627 			 */
1628 			if (!TransactionIdIsNormal(xid)
1629 				|| !NormalTransactionIdPrecedes(xid, xmax))
1630 				continue;
1631 
1632 			/*
1633 			 * We don't include our own XIDs (if any) in the snapshot, but we
1634 			 * must include them in xmin.
1635 			 */
1636 			if (NormalTransactionIdPrecedes(xid, xmin))
1637 				xmin = xid;
1638 			if (pgxact == MyPgXact)
1639 				continue;
1640 
1641 			/* Add XID to snapshot. */
1642 			snapshot->xip[count++] = xid;
1643 
1644 			/*
1645 			 * Save subtransaction XIDs if possible (if we've already
1646 			 * overflowed, there's no point).  Note that the subxact XIDs must
1647 			 * be later than their parent, so no need to check them against
1648 			 * xmin.  We could filter against xmax, but it seems better not to
1649 			 * do that much work while holding the ProcArrayLock.
1650 			 *
1651 			 * The other backend can add more subxids concurrently, but cannot
1652 			 * remove any.  Hence it's important to fetch nxids just once.
1653 			 * Should be safe to use memcpy, though.  (We needn't worry about
1654 			 * missing any xids added concurrently, because they must postdate
1655 			 * xmax.)
1656 			 *
1657 			 * Again, our own XIDs are not included in the snapshot.
1658 			 */
1659 			if (!suboverflowed)
1660 			{
1661 				if (pgxact->overflowed)
1662 					suboverflowed = true;
1663 				else
1664 				{
1665 					int			nxids = pgxact->nxids;
1666 
1667 					if (nxids > 0)
1668 					{
1669 						volatile PGPROC *proc = &allProcs[pgprocno];
1670 
1671 						memcpy(snapshot->subxip + subcount,
1672 							   (void *) proc->subxids.xids,
1673 							   nxids * sizeof(TransactionId));
1674 						subcount += nxids;
1675 					}
1676 				}
1677 			}
1678 		}
1679 	}
1680 	else
1681 	{
1682 		/*
1683 		 * We're in hot standby, so get XIDs from KnownAssignedXids.
1684 		 *
1685 		 * We store all xids directly into subxip[]. Here's why:
1686 		 *
1687 		 * In recovery we don't know which xids are top-level and which are
1688 		 * subxacts, a design choice that greatly simplifies xid processing.
1689 		 *
1690 		 * It seems like we would want to try to put xids into xip[] only, but
1691 		 * that is fairly small. We would either need to make that bigger or
1692 		 * to increase the rate at which we WAL-log xid assignment; neither is
1693 		 * an appealing choice.
1694 		 *
1695 		 * We could try to store xids into xip[] first and then into subxip[]
1696 		 * if there are too many xids. That only works if the snapshot doesn't
1697 		 * overflow because we do not search subxip[] in that case. A simpler
1698 		 * way is to just store all xids in the subxact array because this is
1699 		 * by far the bigger array. We just leave the xip array empty.
1700 		 *
1701 		 * Either way we need to change the way XidInMVCCSnapshot() works
1702 		 * depending upon when the snapshot was taken, or change normal
1703 		 * snapshot processing so it matches.
1704 		 *
1705 		 * Note: It is possible for recovery to end before we finish taking
1706 		 * the snapshot, and for newly assigned transaction ids to be added to
1707 		 * the ProcArray.  xmax cannot change while we hold ProcArrayLock, so
1708 		 * those newly added transaction ids would be filtered away, so we
1709 		 * need not be concerned about them.
1710 		 */
1711 		subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
1712 												  xmax);
1713 
1714 		if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
1715 			suboverflowed = true;
1716 	}
1717 
1718 
1719 	/* fetch into volatile var while ProcArrayLock is held */
1720 	replication_slot_xmin = procArray->replication_slot_xmin;
1721 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1722 
1723 	if (!TransactionIdIsValid(MyPgXact->xmin))
1724 		MyPgXact->xmin = TransactionXmin = xmin;
1725 
1726 	LWLockRelease(ProcArrayLock);
1727 
1728 	/*
1729 	 * Update globalxmin to include actual process xids.  This is a slightly
1730 	 * different way of computing it than GetOldestXmin uses, but should give
1731 	 * the same result.
1732 	 */
1733 	if (TransactionIdPrecedes(xmin, globalxmin))
1734 		globalxmin = xmin;
1735 
1736 	/* Update global variables too */
1737 	RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
1738 	if (!TransactionIdIsNormal(RecentGlobalXmin))
1739 		RecentGlobalXmin = FirstNormalTransactionId;
1740 
1741 	/* Check whether there's a replication slot requiring an older xmin. */
1742 	if (TransactionIdIsValid(replication_slot_xmin) &&
1743 		NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
1744 		RecentGlobalXmin = replication_slot_xmin;
1745 
1746 	/* Non-catalog tables can be vacuumed if older than this xid */
1747 	RecentGlobalDataXmin = RecentGlobalXmin;
1748 
1749 	/*
1750 	 * Check whether there's a replication slot requiring an older catalog
1751 	 * xmin.
1752 	 */
1753 	if (TransactionIdIsNormal(replication_slot_catalog_xmin) &&
1754 		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, RecentGlobalXmin))
1755 		RecentGlobalXmin = replication_slot_catalog_xmin;
1756 
1757 	RecentXmin = xmin;
1758 
1759 	snapshot->xmin = xmin;
1760 	snapshot->xmax = xmax;
1761 	snapshot->xcnt = count;
1762 	snapshot->subxcnt = subcount;
1763 	snapshot->suboverflowed = suboverflowed;
1764 
1765 	snapshot->curcid = GetCurrentCommandId(false);
1766 
1767 	/*
1768 	 * This is a new snapshot, so set both refcounts are zero, and mark it as
1769 	 * not copied in persistent memory.
1770 	 */
1771 	snapshot->active_count = 0;
1772 	snapshot->regd_count = 0;
1773 	snapshot->copied = false;
1774 
1775 	if (old_snapshot_threshold < 0)
1776 	{
1777 		/*
1778 		 * If not using "snapshot too old" feature, fill related fields with
1779 		 * dummy values that don't require any locking.
1780 		 */
1781 		snapshot->lsn = InvalidXLogRecPtr;
1782 		snapshot->whenTaken = 0;
1783 	}
1784 	else
1785 	{
1786 		/*
1787 		 * Capture the current time and WAL stream location in case this
1788 		 * snapshot becomes old enough to need to fall back on the special
1789 		 * "old snapshot" logic.
1790 		 */
1791 		snapshot->lsn = GetXLogInsertRecPtr();
1792 		snapshot->whenTaken = GetSnapshotCurrentTimestamp();
1793 		MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
1794 	}
1795 
1796 	return snapshot;
1797 }
1798 
1799 /*
1800  * ProcArrayInstallImportedXmin -- install imported xmin into MyPgXact->xmin
1801  *
1802  * This is called when installing a snapshot imported from another
1803  * transaction.  To ensure that OldestXmin doesn't go backwards, we must
1804  * check that the source transaction is still running, and we'd better do
1805  * that atomically with installing the new xmin.
1806  *
1807  * Returns TRUE if successful, FALSE if source xact is no longer running.
1808  */
1809 bool
ProcArrayInstallImportedXmin(TransactionId xmin,TransactionId sourcexid)1810 ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
1811 {
1812 	bool		result = false;
1813 	ProcArrayStruct *arrayP = procArray;
1814 	int			index;
1815 
1816 	Assert(TransactionIdIsNormal(xmin));
1817 	if (!TransactionIdIsNormal(sourcexid))
1818 		return false;
1819 
1820 	/* Get lock so source xact can't end while we're doing this */
1821 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1822 
1823 	for (index = 0; index < arrayP->numProcs; index++)
1824 	{
1825 		int			pgprocno = arrayP->pgprocnos[index];
1826 		volatile PGPROC *proc = &allProcs[pgprocno];
1827 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1828 		TransactionId xid;
1829 
1830 		/* Ignore procs running LAZY VACUUM */
1831 		if (pgxact->vacuumFlags & PROC_IN_VACUUM)
1832 			continue;
1833 
1834 		xid = pgxact->xid;		/* fetch just once */
1835 		if (xid != sourcexid)
1836 			continue;
1837 
1838 		/*
1839 		 * We check the transaction's database ID for paranoia's sake: if it's
1840 		 * in another DB then its xmin does not cover us.  Caller should have
1841 		 * detected this already, so we just treat any funny cases as
1842 		 * "transaction not found".
1843 		 */
1844 		if (proc->databaseId != MyDatabaseId)
1845 			continue;
1846 
1847 		/*
1848 		 * Likewise, let's just make real sure its xmin does cover us.
1849 		 */
1850 		xid = pgxact->xmin;		/* fetch just once */
1851 		if (!TransactionIdIsNormal(xid) ||
1852 			!TransactionIdPrecedesOrEquals(xid, xmin))
1853 			continue;
1854 
1855 		/*
1856 		 * We're good.  Install the new xmin.  As in GetSnapshotData, set
1857 		 * TransactionXmin too.  (Note that because snapmgr.c called
1858 		 * GetSnapshotData first, we'll be overwriting a valid xmin here, so
1859 		 * we don't check that.)
1860 		 */
1861 		MyPgXact->xmin = TransactionXmin = xmin;
1862 
1863 		result = true;
1864 		break;
1865 	}
1866 
1867 	LWLockRelease(ProcArrayLock);
1868 
1869 	return result;
1870 }
1871 
1872 /*
1873  * ProcArrayInstallRestoredXmin -- install restored xmin into MyPgXact->xmin
1874  *
1875  * This is like ProcArrayInstallImportedXmin, but we have a pointer to the
1876  * PGPROC of the transaction from which we imported the snapshot, rather than
1877  * an XID.
1878  *
1879  * Returns TRUE if successful, FALSE if source xact is no longer running.
1880  */
1881 bool
ProcArrayInstallRestoredXmin(TransactionId xmin,PGPROC * proc)1882 ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
1883 {
1884 	bool		result = false;
1885 	TransactionId xid;
1886 	volatile PGXACT *pgxact;
1887 
1888 	Assert(TransactionIdIsNormal(xmin));
1889 	Assert(proc != NULL);
1890 
1891 	/* Get lock so source xact can't end while we're doing this */
1892 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1893 
1894 	pgxact = &allPgXact[proc->pgprocno];
1895 
1896 	/*
1897 	 * Be certain that the referenced PGPROC has an advertised xmin which is
1898 	 * no later than the one we're installing, so that the system-wide xmin
1899 	 * can't go backwards.  Also, make sure it's running in the same database,
1900 	 * so that the per-database xmin cannot go backwards.
1901 	 */
1902 	xid = pgxact->xmin;			/* fetch just once */
1903 	if (proc->databaseId == MyDatabaseId &&
1904 		TransactionIdIsNormal(xid) &&
1905 		TransactionIdPrecedesOrEquals(xid, xmin))
1906 	{
1907 		MyPgXact->xmin = TransactionXmin = xmin;
1908 		result = true;
1909 	}
1910 
1911 	LWLockRelease(ProcArrayLock);
1912 
1913 	return result;
1914 }
1915 
1916 /*
1917  * GetRunningTransactionData -- returns information about running transactions.
1918  *
1919  * Similar to GetSnapshotData but returns more information. We include
1920  * all PGXACTs with an assigned TransactionId, even VACUUM processes and
1921  * prepared transactions.
1922  *
1923  * We acquire XidGenLock and ProcArrayLock, but the caller is responsible for
1924  * releasing them. Acquiring XidGenLock ensures that no new XIDs enter the proc
1925  * array until the caller has WAL-logged this snapshot, and releases the
1926  * lock. Acquiring ProcArrayLock ensures that no transactions commit until the
1927  * lock is released.
1928  *
1929  * The returned data structure is statically allocated; caller should not
1930  * modify it, and must not assume it is valid past the next call.
1931  *
1932  * This is never executed during recovery so there is no need to look at
1933  * KnownAssignedXids.
1934  *
1935  * Dummy PGXACTs from prepared transaction are included, meaning that this
1936  * may return entries with duplicated TransactionId values coming from
1937  * transaction finishing to prepare.  Nothing is done about duplicated
1938  * entries here to not hold on ProcArrayLock more than necessary.
1939  *
1940  * We don't worry about updating other counters, we want to keep this as
1941  * simple as possible and leave GetSnapshotData() as the primary code for
1942  * that bookkeeping.
1943  *
1944  * Note that if any transaction has overflowed its cached subtransactions
1945  * then there is no real need include any subtransactions. That isn't a
1946  * common enough case to worry about optimising the size of the WAL record,
1947  * and we may wish to see that data for diagnostic purposes anyway.
1948  */
1949 RunningTransactions
GetRunningTransactionData(void)1950 GetRunningTransactionData(void)
1951 {
1952 	/* result workspace */
1953 	static RunningTransactionsData CurrentRunningXactsData;
1954 
1955 	ProcArrayStruct *arrayP = procArray;
1956 	RunningTransactions CurrentRunningXacts = &CurrentRunningXactsData;
1957 	TransactionId latestCompletedXid;
1958 	TransactionId oldestRunningXid;
1959 	TransactionId *xids;
1960 	int			index;
1961 	int			count;
1962 	int			subcount;
1963 	bool		suboverflowed;
1964 
1965 	Assert(!RecoveryInProgress());
1966 
1967 	/*
1968 	 * Allocating space for maxProcs xids is usually overkill; numProcs would
1969 	 * be sufficient.  But it seems better to do the malloc while not holding
1970 	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
1971 	 * more subxip storage than is probably needed.
1972 	 *
1973 	 * Should only be allocated in bgwriter, since only ever executed during
1974 	 * checkpoints.
1975 	 */
1976 	if (CurrentRunningXacts->xids == NULL)
1977 	{
1978 		/*
1979 		 * First call
1980 		 */
1981 		CurrentRunningXacts->xids = (TransactionId *)
1982 			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
1983 		if (CurrentRunningXacts->xids == NULL)
1984 			ereport(ERROR,
1985 					(errcode(ERRCODE_OUT_OF_MEMORY),
1986 					 errmsg("out of memory")));
1987 	}
1988 
1989 	xids = CurrentRunningXacts->xids;
1990 
1991 	count = subcount = 0;
1992 	suboverflowed = false;
1993 
1994 	/*
1995 	 * Ensure that no xids enter or leave the procarray while we obtain
1996 	 * snapshot.
1997 	 */
1998 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1999 	LWLockAcquire(XidGenLock, LW_SHARED);
2000 
2001 	latestCompletedXid = ShmemVariableCache->latestCompletedXid;
2002 
2003 	oldestRunningXid = ShmemVariableCache->nextXid;
2004 
2005 	/*
2006 	 * Spin over procArray collecting all xids
2007 	 */
2008 	for (index = 0; index < arrayP->numProcs; index++)
2009 	{
2010 		int			pgprocno = arrayP->pgprocnos[index];
2011 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2012 		TransactionId xid;
2013 
2014 		/* Fetch xid just once - see GetNewTransactionId */
2015 		xid = pgxact->xid;
2016 
2017 		/*
2018 		 * We don't need to store transactions that don't have a TransactionId
2019 		 * yet because they will not show as running on a standby server.
2020 		 */
2021 		if (!TransactionIdIsValid(xid))
2022 			continue;
2023 
2024 		xids[count++] = xid;
2025 
2026 		if (TransactionIdPrecedes(xid, oldestRunningXid))
2027 			oldestRunningXid = xid;
2028 
2029 		if (pgxact->overflowed)
2030 			suboverflowed = true;
2031 	}
2032 
2033 	/*
2034 	 * Spin over procArray collecting all subxids, but only if there hasn't
2035 	 * been a suboverflow.
2036 	 */
2037 	if (!suboverflowed)
2038 	{
2039 		for (index = 0; index < arrayP->numProcs; index++)
2040 		{
2041 			int			pgprocno = arrayP->pgprocnos[index];
2042 			volatile PGPROC *proc = &allProcs[pgprocno];
2043 			volatile PGXACT *pgxact = &allPgXact[pgprocno];
2044 			int			nxids;
2045 
2046 			/*
2047 			 * Save subtransaction XIDs. Other backends can't add or remove
2048 			 * entries while we're holding XidGenLock.
2049 			 */
2050 			nxids = pgxact->nxids;
2051 			if (nxids > 0)
2052 			{
2053 				memcpy(&xids[count], (void *) proc->subxids.xids,
2054 					   nxids * sizeof(TransactionId));
2055 				count += nxids;
2056 				subcount += nxids;
2057 
2058 				/*
2059 				 * Top-level XID of a transaction is always less than any of
2060 				 * its subxids, so we don't need to check if any of the
2061 				 * subxids are smaller than oldestRunningXid
2062 				 */
2063 			}
2064 		}
2065 	}
2066 
2067 	/*
2068 	 * It's important *not* to include the limits set by slots here because
2069 	 * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
2070 	 * were to be included here the initial value could never increase because
2071 	 * of a circular dependency where slots only increase their limits when
2072 	 * running xacts increases oldestRunningXid and running xacts only
2073 	 * increases if slots do.
2074 	 */
2075 
2076 	CurrentRunningXacts->xcnt = count - subcount;
2077 	CurrentRunningXacts->subxcnt = subcount;
2078 	CurrentRunningXacts->subxid_overflow = suboverflowed;
2079 	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
2080 	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
2081 	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
2082 
2083 	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
2084 	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
2085 	Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));
2086 
2087 	/* We don't release the locks here, the caller is responsible for that */
2088 
2089 	return CurrentRunningXacts;
2090 }
2091 
2092 /*
2093  * GetOldestActiveTransactionId()
2094  *
2095  * Similar to GetSnapshotData but returns just oldestActiveXid. We include
2096  * all PGXACTs with an assigned TransactionId, even VACUUM processes.
2097  * We look at all databases, though there is no need to include WALSender
2098  * since this has no effect on hot standby conflicts.
2099  *
2100  * This is never executed during recovery so there is no need to look at
2101  * KnownAssignedXids.
2102  *
2103  * We don't worry about updating other counters, we want to keep this as
2104  * simple as possible and leave GetSnapshotData() as the primary code for
2105  * that bookkeeping.
2106  */
2107 TransactionId
GetOldestActiveTransactionId(void)2108 GetOldestActiveTransactionId(void)
2109 {
2110 	ProcArrayStruct *arrayP = procArray;
2111 	TransactionId oldestRunningXid;
2112 	int			index;
2113 
2114 	Assert(!RecoveryInProgress());
2115 
2116 	/*
2117 	 * Read nextXid, as the upper bound of what's still active.
2118 	 *
2119 	 * Reading a TransactionId is atomic, but we must grab the lock to make
2120 	 * sure that all XIDs < nextXid are already present in the proc array (or
2121 	 * have already completed), when we spin over it.
2122 	 */
2123 	LWLockAcquire(XidGenLock, LW_SHARED);
2124 	oldestRunningXid = ShmemVariableCache->nextXid;
2125 	LWLockRelease(XidGenLock);
2126 
2127 	/*
2128 	 * Spin over procArray collecting all xids and subxids.
2129 	 */
2130 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2131 	for (index = 0; index < arrayP->numProcs; index++)
2132 	{
2133 		int			pgprocno = arrayP->pgprocnos[index];
2134 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2135 		TransactionId xid;
2136 
2137 		/* Fetch xid just once - see GetNewTransactionId */
2138 		xid = pgxact->xid;
2139 
2140 		if (!TransactionIdIsNormal(xid))
2141 			continue;
2142 
2143 		if (TransactionIdPrecedes(xid, oldestRunningXid))
2144 			oldestRunningXid = xid;
2145 
2146 		/*
2147 		 * Top-level XID of a transaction is always less than any of its
2148 		 * subxids, so we don't need to check if any of the subxids are
2149 		 * smaller than oldestRunningXid
2150 		 */
2151 	}
2152 	LWLockRelease(ProcArrayLock);
2153 
2154 	return oldestRunningXid;
2155 }
2156 
2157 /*
2158  * GetOldestSafeDecodingTransactionId -- lowest xid not affected by vacuum
2159  *
2160  * Returns the oldest xid that we can guarantee not to have been affected by
2161  * vacuum, i.e. no rows >= that xid have been vacuumed away unless the
2162  * transaction aborted. Note that the value can (and most of the time will) be
2163  * much more conservative than what really has been affected by vacuum, but we
2164  * currently don't have better data available.
2165  *
2166  * This is useful to initialize the cutoff xid after which a new changeset
2167  * extraction replication slot can start decoding changes.
2168  *
2169  * Must be called with ProcArrayLock held either shared or exclusively,
2170  * although most callers will want to use exclusive mode since it is expected
2171  * that the caller will immediately use the xid to peg the xmin horizon.
2172  */
2173 TransactionId
GetOldestSafeDecodingTransactionId(bool catalogOnly)2174 GetOldestSafeDecodingTransactionId(bool catalogOnly)
2175 {
2176 	ProcArrayStruct *arrayP = procArray;
2177 	TransactionId oldestSafeXid;
2178 	int			index;
2179 	bool		recovery_in_progress = RecoveryInProgress();
2180 
2181 	Assert(LWLockHeldByMe(ProcArrayLock));
2182 
2183 	/*
2184 	 * Acquire XidGenLock, so no transactions can acquire an xid while we're
2185 	 * running. If no transaction with xid were running concurrently a new xid
2186 	 * could influence the RecentXmin et al.
2187 	 *
2188 	 * We initialize the computation to nextXid since that's guaranteed to be
2189 	 * a safe, albeit pessimal, value.
2190 	 */
2191 	LWLockAcquire(XidGenLock, LW_SHARED);
2192 	oldestSafeXid = ShmemVariableCache->nextXid;
2193 
2194 	/*
2195 	 * If there's already a slot pegging the xmin horizon, we can start with
2196 	 * that value, it's guaranteed to be safe since it's computed by this
2197 	 * routine initially and has been enforced since.  We can always use the
2198 	 * slot's general xmin horizon, but the catalog horizon is only usable
2199 	 * when we only catalog data is going to be looked at.
2200 	 */
2201 	if (TransactionIdIsValid(procArray->replication_slot_xmin) &&
2202 		TransactionIdPrecedes(procArray->replication_slot_xmin,
2203 							  oldestSafeXid))
2204 		oldestSafeXid = procArray->replication_slot_xmin;
2205 
2206 	if (catalogOnly &&
2207 		TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
2208 		TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
2209 							  oldestSafeXid))
2210 		oldestSafeXid = procArray->replication_slot_catalog_xmin;
2211 
2212 	/*
2213 	 * If we're not in recovery, we walk over the procarray and collect the
2214 	 * lowest xid. Since we're called with ProcArrayLock held and have
2215 	 * acquired XidGenLock, no entries can vanish concurrently, since
2216 	 * PGXACT->xid is only set with XidGenLock held and only cleared with
2217 	 * ProcArrayLock held.
2218 	 *
2219 	 * In recovery we can't lower the safe value besides what we've computed
2220 	 * above, so we'll have to wait a bit longer there. We unfortunately can
2221 	 * *not* use KnownAssignedXidsGetOldestXmin() since the KnownAssignedXids
2222 	 * machinery can miss values and return an older value than is safe.
2223 	 */
2224 	if (!recovery_in_progress)
2225 	{
2226 		/*
2227 		 * Spin over procArray collecting all min(PGXACT->xid)
2228 		 */
2229 		for (index = 0; index < arrayP->numProcs; index++)
2230 		{
2231 			int			pgprocno = arrayP->pgprocnos[index];
2232 			volatile PGXACT *pgxact = &allPgXact[pgprocno];
2233 			TransactionId xid;
2234 
2235 			/* Fetch xid just once - see GetNewTransactionId */
2236 			xid = pgxact->xid;
2237 
2238 			if (!TransactionIdIsNormal(xid))
2239 				continue;
2240 
2241 			if (TransactionIdPrecedes(xid, oldestSafeXid))
2242 				oldestSafeXid = xid;
2243 		}
2244 	}
2245 
2246 	LWLockRelease(XidGenLock);
2247 
2248 	return oldestSafeXid;
2249 }
2250 
2251 /*
2252  * GetVirtualXIDsDelayingChkpt -- Get the VXIDs of transactions that are
2253  * delaying checkpoint because they have critical actions in progress.
2254  *
2255  * Constructs an array of VXIDs of transactions that are currently in commit
2256  * critical sections, as shown by having delayChkpt set in their PGXACT.
2257  *
2258  * Returns a palloc'd array that should be freed by the caller.
2259  * *nvxids is the number of valid entries.
2260  *
2261  * Note that because backends set or clear delayChkpt without holding any lock,
2262  * the result is somewhat indeterminate, but we don't really care.  Even in
2263  * a multiprocessor with delayed writes to shared memory, it should be certain
2264  * that setting of delayChkpt will propagate to shared memory when the backend
2265  * takes a lock, so we cannot fail to see a virtual xact as delayChkpt if
2266  * it's already inserted its commit record.  Whether it takes a little while
2267  * for clearing of delayChkpt to propagate is unimportant for correctness.
2268  */
2269 VirtualTransactionId *
GetVirtualXIDsDelayingChkpt(int * nvxids)2270 GetVirtualXIDsDelayingChkpt(int *nvxids)
2271 {
2272 	VirtualTransactionId *vxids;
2273 	ProcArrayStruct *arrayP = procArray;
2274 	int			count = 0;
2275 	int			index;
2276 
2277 	/* allocate what's certainly enough result space */
2278 	vxids = (VirtualTransactionId *)
2279 		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2280 
2281 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2282 
2283 	for (index = 0; index < arrayP->numProcs; index++)
2284 	{
2285 		int			pgprocno = arrayP->pgprocnos[index];
2286 		volatile PGPROC *proc = &allProcs[pgprocno];
2287 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2288 
2289 		if (pgxact->delayChkpt)
2290 		{
2291 			VirtualTransactionId vxid;
2292 
2293 			GET_VXID_FROM_PGPROC(vxid, *proc);
2294 			if (VirtualTransactionIdIsValid(vxid))
2295 				vxids[count++] = vxid;
2296 		}
2297 	}
2298 
2299 	LWLockRelease(ProcArrayLock);
2300 
2301 	*nvxids = count;
2302 	return vxids;
2303 }
2304 
2305 /*
2306  * HaveVirtualXIDsDelayingChkpt -- Are any of the specified VXIDs delaying?
2307  *
2308  * This is used with the results of GetVirtualXIDsDelayingChkpt to see if any
2309  * of the specified VXIDs are still in critical sections of code.
2310  *
2311  * Note: this is O(N^2) in the number of vxacts that are/were delaying, but
2312  * those numbers should be small enough for it not to be a problem.
2313  */
2314 bool
HaveVirtualXIDsDelayingChkpt(VirtualTransactionId * vxids,int nvxids)2315 HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids)
2316 {
2317 	bool		result = false;
2318 	ProcArrayStruct *arrayP = procArray;
2319 	int			index;
2320 
2321 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2322 
2323 	for (index = 0; index < arrayP->numProcs; index++)
2324 	{
2325 		int			pgprocno = arrayP->pgprocnos[index];
2326 		volatile PGPROC *proc = &allProcs[pgprocno];
2327 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2328 		VirtualTransactionId vxid;
2329 
2330 		GET_VXID_FROM_PGPROC(vxid, *proc);
2331 
2332 		if (pgxact->delayChkpt && VirtualTransactionIdIsValid(vxid))
2333 		{
2334 			int			i;
2335 
2336 			for (i = 0; i < nvxids; i++)
2337 			{
2338 				if (VirtualTransactionIdEquals(vxid, vxids[i]))
2339 				{
2340 					result = true;
2341 					break;
2342 				}
2343 			}
2344 			if (result)
2345 				break;
2346 		}
2347 	}
2348 
2349 	LWLockRelease(ProcArrayLock);
2350 
2351 	return result;
2352 }
2353 
2354 /*
2355  * BackendPidGetProc -- get a backend's PGPROC given its PID
2356  *
2357  * Returns NULL if not found.  Note that it is up to the caller to be
2358  * sure that the question remains meaningful for long enough for the
2359  * answer to be used ...
2360  */
2361 PGPROC *
BackendPidGetProc(int pid)2362 BackendPidGetProc(int pid)
2363 {
2364 	PGPROC	   *result;
2365 
2366 	if (pid == 0)				/* never match dummy PGPROCs */
2367 		return NULL;
2368 
2369 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2370 
2371 	result = BackendPidGetProcWithLock(pid);
2372 
2373 	LWLockRelease(ProcArrayLock);
2374 
2375 	return result;
2376 }
2377 
2378 /*
2379  * BackendPidGetProcWithLock -- get a backend's PGPROC given its PID
2380  *
2381  * Same as above, except caller must be holding ProcArrayLock.  The found
2382  * entry, if any, can be assumed to be valid as long as the lock remains held.
2383  */
2384 PGPROC *
BackendPidGetProcWithLock(int pid)2385 BackendPidGetProcWithLock(int pid)
2386 {
2387 	PGPROC	   *result = NULL;
2388 	ProcArrayStruct *arrayP = procArray;
2389 	int			index;
2390 
2391 	if (pid == 0)				/* never match dummy PGPROCs */
2392 		return NULL;
2393 
2394 	for (index = 0; index < arrayP->numProcs; index++)
2395 	{
2396 		PGPROC	   *proc = &allProcs[arrayP->pgprocnos[index]];
2397 
2398 		if (proc->pid == pid)
2399 		{
2400 			result = proc;
2401 			break;
2402 		}
2403 	}
2404 
2405 	return result;
2406 }
2407 
2408 /*
2409  * BackendXidGetPid -- get a backend's pid given its XID
2410  *
2411  * Returns 0 if not found or it's a prepared transaction.  Note that
2412  * it is up to the caller to be sure that the question remains
2413  * meaningful for long enough for the answer to be used ...
2414  *
2415  * Only main transaction Ids are considered.  This function is mainly
2416  * useful for determining what backend owns a lock.
2417  *
2418  * Beware that not every xact has an XID assigned.  However, as long as you
2419  * only call this using an XID found on disk, you're safe.
2420  */
2421 int
BackendXidGetPid(TransactionId xid)2422 BackendXidGetPid(TransactionId xid)
2423 {
2424 	int			result = 0;
2425 	ProcArrayStruct *arrayP = procArray;
2426 	int			index;
2427 
2428 	if (xid == InvalidTransactionId)	/* never match invalid xid */
2429 		return 0;
2430 
2431 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2432 
2433 	for (index = 0; index < arrayP->numProcs; index++)
2434 	{
2435 		int			pgprocno = arrayP->pgprocnos[index];
2436 		volatile PGPROC *proc = &allProcs[pgprocno];
2437 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2438 
2439 		if (pgxact->xid == xid)
2440 		{
2441 			result = proc->pid;
2442 			break;
2443 		}
2444 	}
2445 
2446 	LWLockRelease(ProcArrayLock);
2447 
2448 	return result;
2449 }
2450 
2451 /*
2452  * IsBackendPid -- is a given pid a running backend
2453  *
2454  * This is not called by the backend, but is called by external modules.
2455  */
2456 bool
IsBackendPid(int pid)2457 IsBackendPid(int pid)
2458 {
2459 	return (BackendPidGetProc(pid) != NULL);
2460 }
2461 
2462 
2463 /*
2464  * GetCurrentVirtualXIDs -- returns an array of currently active VXIDs.
2465  *
2466  * The array is palloc'd. The number of valid entries is returned into *nvxids.
2467  *
2468  * The arguments allow filtering the set of VXIDs returned.  Our own process
2469  * is always skipped.  In addition:
2470  *	If limitXmin is not InvalidTransactionId, skip processes with
2471  *		xmin > limitXmin.
2472  *	If excludeXmin0 is true, skip processes with xmin = 0.
2473  *	If allDbs is false, skip processes attached to other databases.
2474  *	If excludeVacuum isn't zero, skip processes for which
2475  *		(vacuumFlags & excludeVacuum) is not zero.
2476  *
2477  * Note: the purpose of the limitXmin and excludeXmin0 parameters is to
2478  * allow skipping backends whose oldest live snapshot is no older than
2479  * some snapshot we have.  Since we examine the procarray with only shared
2480  * lock, there are race conditions: a backend could set its xmin just after
2481  * we look.  Indeed, on multiprocessors with weak memory ordering, the
2482  * other backend could have set its xmin *before* we look.  We know however
2483  * that such a backend must have held shared ProcArrayLock overlapping our
2484  * own hold of ProcArrayLock, else we would see its xmin update.  Therefore,
2485  * any snapshot the other backend is taking concurrently with our scan cannot
2486  * consider any transactions as still running that we think are committed
2487  * (since backends must hold ProcArrayLock exclusive to commit).
2488  */
2489 VirtualTransactionId *
GetCurrentVirtualXIDs(TransactionId limitXmin,bool excludeXmin0,bool allDbs,int excludeVacuum,int * nvxids)2490 GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
2491 					  bool allDbs, int excludeVacuum,
2492 					  int *nvxids)
2493 {
2494 	VirtualTransactionId *vxids;
2495 	ProcArrayStruct *arrayP = procArray;
2496 	int			count = 0;
2497 	int			index;
2498 
2499 	/* allocate what's certainly enough result space */
2500 	vxids = (VirtualTransactionId *)
2501 		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2502 
2503 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2504 
2505 	for (index = 0; index < arrayP->numProcs; index++)
2506 	{
2507 		int			pgprocno = arrayP->pgprocnos[index];
2508 		volatile PGPROC *proc = &allProcs[pgprocno];
2509 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2510 
2511 		if (proc == MyProc)
2512 			continue;
2513 
2514 		if (excludeVacuum & pgxact->vacuumFlags)
2515 			continue;
2516 
2517 		if (allDbs || proc->databaseId == MyDatabaseId)
2518 		{
2519 			/* Fetch xmin just once - might change on us */
2520 			TransactionId pxmin = pgxact->xmin;
2521 
2522 			if (excludeXmin0 && !TransactionIdIsValid(pxmin))
2523 				continue;
2524 
2525 			/*
2526 			 * InvalidTransactionId precedes all other XIDs, so a proc that
2527 			 * hasn't set xmin yet will not be rejected by this test.
2528 			 */
2529 			if (!TransactionIdIsValid(limitXmin) ||
2530 				TransactionIdPrecedesOrEquals(pxmin, limitXmin))
2531 			{
2532 				VirtualTransactionId vxid;
2533 
2534 				GET_VXID_FROM_PGPROC(vxid, *proc);
2535 				if (VirtualTransactionIdIsValid(vxid))
2536 					vxids[count++] = vxid;
2537 			}
2538 		}
2539 	}
2540 
2541 	LWLockRelease(ProcArrayLock);
2542 
2543 	*nvxids = count;
2544 	return vxids;
2545 }
2546 
2547 /*
2548  * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
2549  *
2550  * Usage is limited to conflict resolution during recovery on standby servers.
2551  * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
2552  * in cases where we cannot accurately determine a value for latestRemovedXid.
2553  *
2554  * If limitXmin is InvalidTransactionId then we want to kill everybody,
2555  * so we're not worried if they have a snapshot or not, nor does it really
2556  * matter what type of lock we hold.
2557  *
2558  * All callers that are checking xmins always now supply a valid and useful
2559  * value for limitXmin. The limitXmin is always lower than the lowest
2560  * numbered KnownAssignedXid that is not already a FATAL error. This is
2561  * because we only care about cleanup records that are cleaning up tuple
2562  * versions from committed transactions. In that case they will only occur
2563  * at the point where the record is less than the lowest running xid. That
2564  * allows us to say that if any backend takes a snapshot concurrently with
2565  * us then the conflict assessment made here would never include the snapshot
2566  * that is being derived. So we take LW_SHARED on the ProcArray and allow
2567  * concurrent snapshots when limitXmin is valid. We might think about adding
2568  *	 Assert(limitXmin < lowest(KnownAssignedXids))
2569  * but that would not be true in the case of FATAL errors lagging in array,
2570  * but we already know those are bogus anyway, so we skip that test.
2571  *
2572  * If dbOid is valid we skip backends attached to other databases.
2573  *
2574  * Be careful to *not* pfree the result from this function. We reuse
2575  * this array sufficiently often that we use malloc for the result.
2576  */
2577 VirtualTransactionId *
GetConflictingVirtualXIDs(TransactionId limitXmin,Oid dbOid)2578 GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
2579 {
2580 	static VirtualTransactionId *vxids;
2581 	ProcArrayStruct *arrayP = procArray;
2582 	int			count = 0;
2583 	int			index;
2584 
2585 	/*
2586 	 * If first time through, get workspace to remember main XIDs in. We
2587 	 * malloc it permanently to avoid repeated palloc/pfree overhead. Allow
2588 	 * result space, remembering room for a terminator.
2589 	 */
2590 	if (vxids == NULL)
2591 	{
2592 		vxids = (VirtualTransactionId *)
2593 			malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
2594 		if (vxids == NULL)
2595 			ereport(ERROR,
2596 					(errcode(ERRCODE_OUT_OF_MEMORY),
2597 					 errmsg("out of memory")));
2598 	}
2599 
2600 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2601 
2602 	for (index = 0; index < arrayP->numProcs; index++)
2603 	{
2604 		int			pgprocno = arrayP->pgprocnos[index];
2605 		volatile PGPROC *proc = &allProcs[pgprocno];
2606 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2607 
2608 		/* Exclude prepared transactions */
2609 		if (proc->pid == 0)
2610 			continue;
2611 
2612 		if (!OidIsValid(dbOid) ||
2613 			proc->databaseId == dbOid)
2614 		{
2615 			/* Fetch xmin just once - can't change on us, but good coding */
2616 			TransactionId pxmin = pgxact->xmin;
2617 
2618 			/*
2619 			 * We ignore an invalid pxmin because this means that backend has
2620 			 * no snapshot currently. We hold a Share lock to avoid contention
2621 			 * with users taking snapshots.  That is not a problem because the
2622 			 * current xmin is always at least one higher than the latest
2623 			 * removed xid, so any new snapshot would never conflict with the
2624 			 * test here.
2625 			 */
2626 			if (!TransactionIdIsValid(limitXmin) ||
2627 				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
2628 			{
2629 				VirtualTransactionId vxid;
2630 
2631 				GET_VXID_FROM_PGPROC(vxid, *proc);
2632 				if (VirtualTransactionIdIsValid(vxid))
2633 					vxids[count++] = vxid;
2634 			}
2635 		}
2636 	}
2637 
2638 	LWLockRelease(ProcArrayLock);
2639 
2640 	/* add the terminator */
2641 	vxids[count].backendId = InvalidBackendId;
2642 	vxids[count].localTransactionId = InvalidLocalTransactionId;
2643 
2644 	return vxids;
2645 }
2646 
2647 /*
2648  * CancelVirtualTransaction - used in recovery conflict processing
2649  *
2650  * Returns pid of the process signaled, or 0 if not found.
2651  */
2652 pid_t
CancelVirtualTransaction(VirtualTransactionId vxid,ProcSignalReason sigmode)2653 CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
2654 {
2655 	return SignalVirtualTransaction(vxid, sigmode, true);
2656 }
2657 
2658 pid_t
SignalVirtualTransaction(VirtualTransactionId vxid,ProcSignalReason sigmode,bool conflictPending)2659 SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode,
2660 						 bool conflictPending)
2661 {
2662 	ProcArrayStruct *arrayP = procArray;
2663 	int			index;
2664 	pid_t		pid = 0;
2665 
2666 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2667 
2668 	for (index = 0; index < arrayP->numProcs; index++)
2669 	{
2670 		int			pgprocno = arrayP->pgprocnos[index];
2671 		volatile PGPROC *proc = &allProcs[pgprocno];
2672 		VirtualTransactionId procvxid;
2673 
2674 		GET_VXID_FROM_PGPROC(procvxid, *proc);
2675 
2676 		if (procvxid.backendId == vxid.backendId &&
2677 			procvxid.localTransactionId == vxid.localTransactionId)
2678 		{
2679 			proc->recoveryConflictPending = conflictPending;
2680 			pid = proc->pid;
2681 			if (pid != 0)
2682 			{
2683 				/*
2684 				 * Kill the pid if it's still here. If not, that's what we
2685 				 * wanted so ignore any errors.
2686 				 */
2687 				(void) SendProcSignal(pid, sigmode, vxid.backendId);
2688 			}
2689 			break;
2690 		}
2691 	}
2692 
2693 	LWLockRelease(ProcArrayLock);
2694 
2695 	return pid;
2696 }
2697 
2698 /*
2699  * MinimumActiveBackends --- count backends (other than myself) that are
2700  *		in active transactions.  Return true if the count exceeds the
2701  *		minimum threshold passed.  This is used as a heuristic to decide if
2702  *		a pre-XLOG-flush delay is worthwhile during commit.
2703  *
2704  * Do not count backends that are blocked waiting for locks, since they are
2705  * not going to get to run until someone else commits.
2706  */
2707 bool
MinimumActiveBackends(int min)2708 MinimumActiveBackends(int min)
2709 {
2710 	ProcArrayStruct *arrayP = procArray;
2711 	int			count = 0;
2712 	int			index;
2713 
2714 	/* Quick short-circuit if no minimum is specified */
2715 	if (min == 0)
2716 		return true;
2717 
2718 	/*
2719 	 * Note: for speed, we don't acquire ProcArrayLock.  This is a little bit
2720 	 * bogus, but since we are only testing fields for zero or nonzero, it
2721 	 * should be OK.  The result is only used for heuristic purposes anyway...
2722 	 */
2723 	for (index = 0; index < arrayP->numProcs; index++)
2724 	{
2725 		int			pgprocno = arrayP->pgprocnos[index];
2726 		volatile PGPROC *proc = &allProcs[pgprocno];
2727 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2728 
2729 		/*
2730 		 * Since we're not holding a lock, need to be prepared to deal with
2731 		 * garbage, as someone could have incremented numProcs but not yet
2732 		 * filled the structure.
2733 		 *
2734 		 * If someone just decremented numProcs, 'proc' could also point to a
2735 		 * PGPROC entry that's no longer in the array. It still points to a
2736 		 * PGPROC struct, though, because freed PGPROC entries just go to the
2737 		 * free list and are recycled. Its contents are nonsense in that case,
2738 		 * but that's acceptable for this function.
2739 		 */
2740 		if (pgprocno == -1)
2741 			continue;			/* do not count deleted entries */
2742 		if (proc == MyProc)
2743 			continue;			/* do not count myself */
2744 		if (pgxact->xid == InvalidTransactionId)
2745 			continue;			/* do not count if no XID assigned */
2746 		if (proc->pid == 0)
2747 			continue;			/* do not count prepared xacts */
2748 		if (proc->waitLock != NULL)
2749 			continue;			/* do not count if blocked on a lock */
2750 		count++;
2751 		if (count >= min)
2752 			break;
2753 	}
2754 
2755 	return count >= min;
2756 }
2757 
2758 /*
2759  * CountDBBackends --- count backends that are using specified database
2760  */
2761 int
CountDBBackends(Oid databaseid)2762 CountDBBackends(Oid databaseid)
2763 {
2764 	ProcArrayStruct *arrayP = procArray;
2765 	int			count = 0;
2766 	int			index;
2767 
2768 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2769 
2770 	for (index = 0; index < arrayP->numProcs; index++)
2771 	{
2772 		int			pgprocno = arrayP->pgprocnos[index];
2773 		volatile PGPROC *proc = &allProcs[pgprocno];
2774 
2775 		if (proc->pid == 0)
2776 			continue;			/* do not count prepared xacts */
2777 		if (!OidIsValid(databaseid) ||
2778 			proc->databaseId == databaseid)
2779 			count++;
2780 	}
2781 
2782 	LWLockRelease(ProcArrayLock);
2783 
2784 	return count;
2785 }
2786 
2787 /*
2788  * CountDBConnections --- counts database backends ignoring any background
2789  *		worker processes
2790  */
2791 int
CountDBConnections(Oid databaseid)2792 CountDBConnections(Oid databaseid)
2793 {
2794 	ProcArrayStruct *arrayP = procArray;
2795 	int			count = 0;
2796 	int			index;
2797 
2798 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2799 
2800 	for (index = 0; index < arrayP->numProcs; index++)
2801 	{
2802 		int			pgprocno = arrayP->pgprocnos[index];
2803 		volatile PGPROC *proc = &allProcs[pgprocno];
2804 
2805 		if (proc->pid == 0)
2806 			continue;			/* do not count prepared xacts */
2807 		if (proc->isBackgroundWorker)
2808 			continue;			/* do not count background workers */
2809 		if (!OidIsValid(databaseid) ||
2810 			proc->databaseId == databaseid)
2811 			count++;
2812 	}
2813 
2814 	LWLockRelease(ProcArrayLock);
2815 
2816 	return count;
2817 }
2818 
2819 /*
2820  * CancelDBBackends --- cancel backends that are using specified database
2821  */
2822 void
CancelDBBackends(Oid databaseid,ProcSignalReason sigmode,bool conflictPending)2823 CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
2824 {
2825 	ProcArrayStruct *arrayP = procArray;
2826 	int			index;
2827 	pid_t		pid = 0;
2828 
2829 	/* tell all backends to die */
2830 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2831 
2832 	for (index = 0; index < arrayP->numProcs; index++)
2833 	{
2834 		int			pgprocno = arrayP->pgprocnos[index];
2835 		volatile PGPROC *proc = &allProcs[pgprocno];
2836 
2837 		if (databaseid == InvalidOid || proc->databaseId == databaseid)
2838 		{
2839 			VirtualTransactionId procvxid;
2840 
2841 			GET_VXID_FROM_PGPROC(procvxid, *proc);
2842 
2843 			proc->recoveryConflictPending = conflictPending;
2844 			pid = proc->pid;
2845 			if (pid != 0)
2846 			{
2847 				/*
2848 				 * Kill the pid if it's still here. If not, that's what we
2849 				 * wanted so ignore any errors.
2850 				 */
2851 				(void) SendProcSignal(pid, sigmode, procvxid.backendId);
2852 			}
2853 		}
2854 	}
2855 
2856 	LWLockRelease(ProcArrayLock);
2857 }
2858 
2859 /*
2860  * CountUserBackends --- count backends that are used by specified user
2861  */
2862 int
CountUserBackends(Oid roleid)2863 CountUserBackends(Oid roleid)
2864 {
2865 	ProcArrayStruct *arrayP = procArray;
2866 	int			count = 0;
2867 	int			index;
2868 
2869 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2870 
2871 	for (index = 0; index < arrayP->numProcs; index++)
2872 	{
2873 		int			pgprocno = arrayP->pgprocnos[index];
2874 		volatile PGPROC *proc = &allProcs[pgprocno];
2875 
2876 		if (proc->pid == 0)
2877 			continue;			/* do not count prepared xacts */
2878 		if (proc->isBackgroundWorker)
2879 			continue;			/* do not count background workers */
2880 		if (proc->roleId == roleid)
2881 			count++;
2882 	}
2883 
2884 	LWLockRelease(ProcArrayLock);
2885 
2886 	return count;
2887 }
2888 
2889 /*
2890  * CountOtherDBBackends -- check for other backends running in the given DB
2891  *
2892  * If there are other backends in the DB, we will wait a maximum of 5 seconds
2893  * for them to exit.  Autovacuum backends are encouraged to exit early by
2894  * sending them SIGTERM, but normal user backends are just waited for.
2895  *
2896  * The current backend is always ignored; it is caller's responsibility to
2897  * check whether the current backend uses the given DB, if it's important.
2898  *
2899  * Returns TRUE if there are (still) other backends in the DB, FALSE if not.
2900  * Also, *nbackends and *nprepared are set to the number of other backends
2901  * and prepared transactions in the DB, respectively.
2902  *
2903  * This function is used to interlock DROP DATABASE and related commands
2904  * against there being any active backends in the target DB --- dropping the
2905  * DB while active backends remain would be a Bad Thing.  Note that we cannot
2906  * detect here the possibility of a newly-started backend that is trying to
2907  * connect to the doomed database, so additional interlocking is needed during
2908  * backend startup.  The caller should normally hold an exclusive lock on the
2909  * target DB before calling this, which is one reason we mustn't wait
2910  * indefinitely.
2911  */
2912 bool
CountOtherDBBackends(Oid databaseId,int * nbackends,int * nprepared)2913 CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
2914 {
2915 	ProcArrayStruct *arrayP = procArray;
2916 
2917 #define MAXAUTOVACPIDS	10		/* max autovacs to SIGTERM per iteration */
2918 	int			autovac_pids[MAXAUTOVACPIDS];
2919 	int			tries;
2920 
2921 	/* 50 tries with 100ms sleep between tries makes 5 sec total wait */
2922 	for (tries = 0; tries < 50; tries++)
2923 	{
2924 		int			nautovacs = 0;
2925 		bool		found = false;
2926 		int			index;
2927 
2928 		CHECK_FOR_INTERRUPTS();
2929 
2930 		*nbackends = *nprepared = 0;
2931 
2932 		LWLockAcquire(ProcArrayLock, LW_SHARED);
2933 
2934 		for (index = 0; index < arrayP->numProcs; index++)
2935 		{
2936 			int			pgprocno = arrayP->pgprocnos[index];
2937 			volatile PGPROC *proc = &allProcs[pgprocno];
2938 			volatile PGXACT *pgxact = &allPgXact[pgprocno];
2939 
2940 			if (proc->databaseId != databaseId)
2941 				continue;
2942 			if (proc == MyProc)
2943 				continue;
2944 
2945 			found = true;
2946 
2947 			if (proc->pid == 0)
2948 				(*nprepared)++;
2949 			else
2950 			{
2951 				(*nbackends)++;
2952 				if ((pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&
2953 					nautovacs < MAXAUTOVACPIDS)
2954 					autovac_pids[nautovacs++] = proc->pid;
2955 			}
2956 		}
2957 
2958 		LWLockRelease(ProcArrayLock);
2959 
2960 		if (!found)
2961 			return false;		/* no conflicting backends, so done */
2962 
2963 		/*
2964 		 * Send SIGTERM to any conflicting autovacuums before sleeping. We
2965 		 * postpone this step until after the loop because we don't want to
2966 		 * hold ProcArrayLock while issuing kill(). We have no idea what might
2967 		 * block kill() inside the kernel...
2968 		 */
2969 		for (index = 0; index < nautovacs; index++)
2970 			(void) kill(autovac_pids[index], SIGTERM);	/* ignore any error */
2971 
2972 		/* sleep, then try again */
2973 		pg_usleep(100 * 1000L); /* 100ms */
2974 	}
2975 
2976 	return true;				/* timed out, still conflicts */
2977 }
2978 
2979 /*
2980  * ProcArraySetReplicationSlotXmin
2981  *
2982  * Install limits to future computations of the xmin horizon to prevent vacuum
2983  * and HOT pruning from removing affected rows still needed by clients with
2984  * replicaton slots.
2985  */
2986 void
ProcArraySetReplicationSlotXmin(TransactionId xmin,TransactionId catalog_xmin,bool already_locked)2987 ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
2988 								bool already_locked)
2989 {
2990 	Assert(!already_locked || LWLockHeldByMe(ProcArrayLock));
2991 
2992 	if (!already_locked)
2993 		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2994 
2995 	procArray->replication_slot_xmin = xmin;
2996 	procArray->replication_slot_catalog_xmin = catalog_xmin;
2997 
2998 	if (!already_locked)
2999 		LWLockRelease(ProcArrayLock);
3000 }
3001 
3002 /*
3003  * ProcArrayGetReplicationSlotXmin
3004  *
3005  * Return the current slot xmin limits. That's useful to be able to remove
3006  * data that's older than those limits.
3007  */
3008 void
ProcArrayGetReplicationSlotXmin(TransactionId * xmin,TransactionId * catalog_xmin)3009 ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
3010 								TransactionId *catalog_xmin)
3011 {
3012 	LWLockAcquire(ProcArrayLock, LW_SHARED);
3013 
3014 	if (xmin != NULL)
3015 		*xmin = procArray->replication_slot_xmin;
3016 
3017 	if (catalog_xmin != NULL)
3018 		*catalog_xmin = procArray->replication_slot_catalog_xmin;
3019 
3020 	LWLockRelease(ProcArrayLock);
3021 }
3022 
3023 
3024 #define XidCacheRemove(i) \
3025 	do { \
3026 		MyProc->subxids.xids[i] = MyProc->subxids.xids[MyPgXact->nxids - 1]; \
3027 		MyPgXact->nxids--; \
3028 	} while (0)
3029 
3030 /*
3031  * XidCacheRemoveRunningXids
3032  *
3033  * Remove a bunch of TransactionIds from the list of known-running
3034  * subtransactions for my backend.  Both the specified xid and those in
3035  * the xids[] array (of length nxids) are removed from the subxids cache.
3036  * latestXid must be the latest XID among the group.
3037  */
3038 void
XidCacheRemoveRunningXids(TransactionId xid,int nxids,const TransactionId * xids,TransactionId latestXid)3039 XidCacheRemoveRunningXids(TransactionId xid,
3040 						  int nxids, const TransactionId *xids,
3041 						  TransactionId latestXid)
3042 {
3043 	int			i,
3044 				j;
3045 
3046 	Assert(TransactionIdIsValid(xid));
3047 
3048 	/*
3049 	 * We must hold ProcArrayLock exclusively in order to remove transactions
3050 	 * from the PGPROC array.  (See src/backend/access/transam/README.)  It's
3051 	 * possible this could be relaxed since we know this routine is only used
3052 	 * to abort subtransactions, but pending closer analysis we'd best be
3053 	 * conservative.
3054 	 */
3055 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3056 
3057 	/*
3058 	 * Under normal circumstances xid and xids[] will be in increasing order,
3059 	 * as will be the entries in subxids.  Scan backwards to avoid O(N^2)
3060 	 * behavior when removing a lot of xids.
3061 	 */
3062 	for (i = nxids - 1; i >= 0; i--)
3063 	{
3064 		TransactionId anxid = xids[i];
3065 
3066 		for (j = MyPgXact->nxids - 1; j >= 0; j--)
3067 		{
3068 			if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
3069 			{
3070 				XidCacheRemove(j);
3071 				break;
3072 			}
3073 		}
3074 
3075 		/*
3076 		 * Ordinarily we should have found it, unless the cache has
3077 		 * overflowed. However it's also possible for this routine to be
3078 		 * invoked multiple times for the same subtransaction, in case of an
3079 		 * error during AbortSubTransaction.  So instead of Assert, emit a
3080 		 * debug warning.
3081 		 */
3082 		if (j < 0 && !MyPgXact->overflowed)
3083 			elog(WARNING, "did not find subXID %u in MyProc", anxid);
3084 	}
3085 
3086 	for (j = MyPgXact->nxids - 1; j >= 0; j--)
3087 	{
3088 		if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
3089 		{
3090 			XidCacheRemove(j);
3091 			break;
3092 		}
3093 	}
3094 	/* Ordinarily we should have found it, unless the cache has overflowed */
3095 	if (j < 0 && !MyPgXact->overflowed)
3096 		elog(WARNING, "did not find subXID %u in MyProc", xid);
3097 
3098 	/* Also advance global latestCompletedXid while holding the lock */
3099 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
3100 							  latestXid))
3101 		ShmemVariableCache->latestCompletedXid = latestXid;
3102 
3103 	LWLockRelease(ProcArrayLock);
3104 }
3105 
3106 #ifdef XIDCACHE_DEBUG
3107 
3108 /*
3109  * Print stats about effectiveness of XID cache
3110  */
3111 static void
DisplayXidCache(void)3112 DisplayXidCache(void)
3113 {
3114 	fprintf(stderr,
3115 			"XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, knownassigned: %ld, nooflo: %ld, slow: %ld\n",
3116 			xc_by_recent_xmin,
3117 			xc_by_known_xact,
3118 			xc_by_my_xact,
3119 			xc_by_latest_xid,
3120 			xc_by_main_xid,
3121 			xc_by_child_xid,
3122 			xc_by_known_assigned,
3123 			xc_no_overflow,
3124 			xc_slow_answer);
3125 }
3126 #endif   /* XIDCACHE_DEBUG */
3127 
3128 
3129 /* ----------------------------------------------
3130  *		KnownAssignedTransactions sub-module
3131  * ----------------------------------------------
3132  */
3133 
3134 /*
3135  * In Hot Standby mode, we maintain a list of transactions that are (or were)
3136  * running in the master at the current point in WAL.  These XIDs must be
3137  * treated as running by standby transactions, even though they are not in
3138  * the standby server's PGXACT array.
3139  *
3140  * We record all XIDs that we know have been assigned.  That includes all the
3141  * XIDs seen in WAL records, plus all unobserved XIDs that we can deduce have
3142  * been assigned.  We can deduce the existence of unobserved XIDs because we
3143  * know XIDs are assigned in sequence, with no gaps.  The KnownAssignedXids
3144  * list expands as new XIDs are observed or inferred, and contracts when
3145  * transaction completion records arrive.
3146  *
3147  * During hot standby we do not fret too much about the distinction between
3148  * top-level XIDs and subtransaction XIDs. We store both together in the
3149  * KnownAssignedXids list.  In backends, this is copied into snapshots in
3150  * GetSnapshotData(), taking advantage of the fact that XidInMVCCSnapshot()
3151  * doesn't care about the distinction either.  Subtransaction XIDs are
3152  * effectively treated as top-level XIDs and in the typical case pg_subtrans
3153  * links are *not* maintained (which does not affect visibility).
3154  *
3155  * We have room in KnownAssignedXids and in snapshots to hold maxProcs *
3156  * (1 + PGPROC_MAX_CACHED_SUBXIDS) XIDs, so every master transaction must
3157  * report its subtransaction XIDs in a WAL XLOG_XACT_ASSIGNMENT record at
3158  * least every PGPROC_MAX_CACHED_SUBXIDS.  When we receive one of these
3159  * records, we mark the subXIDs as children of the top XID in pg_subtrans,
3160  * and then remove them from KnownAssignedXids.  This prevents overflow of
3161  * KnownAssignedXids and snapshots, at the cost that status checks for these
3162  * subXIDs will take a slower path through TransactionIdIsInProgress().
3163  * This means that KnownAssignedXids is not necessarily complete for subXIDs,
3164  * though it should be complete for top-level XIDs; this is the same situation
3165  * that holds with respect to the PGPROC entries in normal running.
3166  *
3167  * When we throw away subXIDs from KnownAssignedXids, we need to keep track of
3168  * that, similarly to tracking overflow of a PGPROC's subxids array.  We do
3169  * that by remembering the lastOverflowedXID, ie the last thrown-away subXID.
3170  * As long as that is within the range of interesting XIDs, we have to assume
3171  * that subXIDs are missing from snapshots.  (Note that subXID overflow occurs
3172  * on primary when 65th subXID arrives, whereas on standby it occurs when 64th
3173  * subXID arrives - that is not an error.)
3174  *
3175  * Should a backend on primary somehow disappear before it can write an abort
3176  * record, then we just leave those XIDs in KnownAssignedXids. They actually
3177  * aborted but we think they were running; the distinction is irrelevant
3178  * because either way any changes done by the transaction are not visible to
3179  * backends in the standby.  We prune KnownAssignedXids when
3180  * XLOG_RUNNING_XACTS arrives, to forestall possible overflow of the
3181  * array due to such dead XIDs.
3182  */
3183 
3184 /*
3185  * RecordKnownAssignedTransactionIds
3186  *		Record the given XID in KnownAssignedXids, as well as any preceding
3187  *		unobserved XIDs.
3188  *
3189  * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
3190  * associated with a transaction. Must be called for each record after we
3191  * have executed StartupCLOG() et al, since we must ExtendCLOG() etc..
3192  *
3193  * Called during recovery in analogy with and in place of GetNewTransactionId()
3194  */
3195 void
RecordKnownAssignedTransactionIds(TransactionId xid)3196 RecordKnownAssignedTransactionIds(TransactionId xid)
3197 {
3198 	Assert(standbyState >= STANDBY_INITIALIZED);
3199 	Assert(TransactionIdIsValid(xid));
3200 	Assert(TransactionIdIsValid(latestObservedXid));
3201 
3202 	elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
3203 		 xid, latestObservedXid);
3204 
3205 	/*
3206 	 * When a newly observed xid arrives, it is frequently the case that it is
3207 	 * *not* the next xid in sequence. When this occurs, we must treat the
3208 	 * intervening xids as running also.
3209 	 */
3210 	if (TransactionIdFollows(xid, latestObservedXid))
3211 	{
3212 		TransactionId next_expected_xid;
3213 
3214 		/*
3215 		 * Extend subtrans like we do in GetNewTransactionId() during normal
3216 		 * operation using individual extend steps. Note that we do not need
3217 		 * to extend clog since its extensions are WAL logged.
3218 		 *
3219 		 * This part has to be done regardless of standbyState since we
3220 		 * immediately start assigning subtransactions to their toplevel
3221 		 * transactions.
3222 		 */
3223 		next_expected_xid = latestObservedXid;
3224 		while (TransactionIdPrecedes(next_expected_xid, xid))
3225 		{
3226 			TransactionIdAdvance(next_expected_xid);
3227 			ExtendSUBTRANS(next_expected_xid);
3228 		}
3229 		Assert(next_expected_xid == xid);
3230 
3231 		/*
3232 		 * If the KnownAssignedXids machinery isn't up yet, there's nothing
3233 		 * more to do since we don't track assigned xids yet.
3234 		 */
3235 		if (standbyState <= STANDBY_INITIALIZED)
3236 		{
3237 			latestObservedXid = xid;
3238 			return;
3239 		}
3240 
3241 		/*
3242 		 * Add (latestObservedXid, xid] onto the KnownAssignedXids array.
3243 		 */
3244 		next_expected_xid = latestObservedXid;
3245 		TransactionIdAdvance(next_expected_xid);
3246 		KnownAssignedXidsAdd(next_expected_xid, xid, false);
3247 
3248 		/*
3249 		 * Now we can advance latestObservedXid
3250 		 */
3251 		latestObservedXid = xid;
3252 
3253 		/* ShmemVariableCache->nextXid must be beyond any observed xid */
3254 		next_expected_xid = latestObservedXid;
3255 		TransactionIdAdvance(next_expected_xid);
3256 		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
3257 		if (TransactionIdFollows(next_expected_xid, ShmemVariableCache->nextXid))
3258 			ShmemVariableCache->nextXid = next_expected_xid;
3259 		LWLockRelease(XidGenLock);
3260 	}
3261 }
3262 
3263 /*
3264  * ExpireTreeKnownAssignedTransactionIds
3265  *		Remove the given XIDs from KnownAssignedXids.
3266  *
3267  * Called during recovery in analogy with and in place of ProcArrayEndTransaction()
3268  */
3269 void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid,int nsubxids,TransactionId * subxids,TransactionId max_xid)3270 ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
3271 							   TransactionId *subxids, TransactionId max_xid)
3272 {
3273 	Assert(standbyState >= STANDBY_INITIALIZED);
3274 
3275 	/*
3276 	 * Uses same locking as transaction commit
3277 	 */
3278 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3279 
3280 	KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
3281 
3282 	/* As in ProcArrayEndTransaction, advance latestCompletedXid */
3283 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
3284 							  max_xid))
3285 		ShmemVariableCache->latestCompletedXid = max_xid;
3286 
3287 	LWLockRelease(ProcArrayLock);
3288 }
3289 
3290 /*
3291  * ExpireAllKnownAssignedTransactionIds
3292  *		Remove all entries in KnownAssignedXids and reset lastOverflowedXid.
3293  */
3294 void
ExpireAllKnownAssignedTransactionIds(void)3295 ExpireAllKnownAssignedTransactionIds(void)
3296 {
3297 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3298 	KnownAssignedXidsRemovePreceding(InvalidTransactionId);
3299 
3300 	/*
3301 	 * Reset lastOverflowedXid.  Currently, lastOverflowedXid has no use after
3302 	 * the call of this function.  But do this for unification with what
3303 	 * ExpireOldKnownAssignedTransactionIds() do.
3304 	 */
3305 	procArray->lastOverflowedXid = InvalidTransactionId;
3306 	LWLockRelease(ProcArrayLock);
3307 }
3308 
3309 /*
3310  * ExpireOldKnownAssignedTransactionIds
3311  *		Remove KnownAssignedXids entries preceding the given XID and
3312  *		potentially reset lastOverflowedXid.
3313  */
3314 void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)3315 ExpireOldKnownAssignedTransactionIds(TransactionId xid)
3316 {
3317 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3318 
3319 	/*
3320 	 * Reset lastOverflowedXid if we know all transactions that have been
3321 	 * possibly running are being gone.  Not doing so could cause an incorrect
3322 	 * lastOverflowedXid value, which makes extra snapshots be marked as
3323 	 * suboverflowed.
3324 	 */
3325 	if (TransactionIdPrecedes(procArray->lastOverflowedXid, xid))
3326 		procArray->lastOverflowedXid = InvalidTransactionId;
3327 	KnownAssignedXidsRemovePreceding(xid);
3328 	LWLockRelease(ProcArrayLock);
3329 }
3330 
3331 
3332 /*
3333  * Private module functions to manipulate KnownAssignedXids
3334  *
3335  * There are 5 main uses of the KnownAssignedXids data structure:
3336  *
3337  *	* backends taking snapshots - all valid XIDs need to be copied out
3338  *	* backends seeking to determine presence of a specific XID
3339  *	* startup process adding new known-assigned XIDs
3340  *	* startup process removing specific XIDs as transactions end
3341  *	* startup process pruning array when special WAL records arrive
3342  *
3343  * This data structure is known to be a hot spot during Hot Standby, so we
3344  * go to some lengths to make these operations as efficient and as concurrent
3345  * as possible.
3346  *
3347  * The XIDs are stored in an array in sorted order --- TransactionIdPrecedes
3348  * order, to be exact --- to allow binary search for specific XIDs.  Note:
3349  * in general TransactionIdPrecedes would not provide a total order, but
3350  * we know that the entries present at any instant should not extend across
3351  * a large enough fraction of XID space to wrap around (the master would
3352  * shut down for fear of XID wrap long before that happens).  So it's OK to
3353  * use TransactionIdPrecedes as a binary-search comparator.
3354  *
3355  * It's cheap to maintain the sortedness during insertions, since new known
3356  * XIDs are always reported in XID order; we just append them at the right.
3357  *
3358  * To keep individual deletions cheap, we need to allow gaps in the array.
3359  * This is implemented by marking array elements as valid or invalid using
3360  * the parallel boolean array KnownAssignedXidsValid[].  A deletion is done
3361  * by setting KnownAssignedXidsValid[i] to false, *without* clearing the
3362  * XID entry itself.  This preserves the property that the XID entries are
3363  * sorted, so we can do binary searches easily.  Periodically we compress
3364  * out the unused entries; that's much cheaper than having to compress the
3365  * array immediately on every deletion.
3366  *
3367  * The actually valid items in KnownAssignedXids[] and KnownAssignedXidsValid[]
3368  * are those with indexes tail <= i < head; items outside this subscript range
3369  * have unspecified contents.  When head reaches the end of the array, we
3370  * force compression of unused entries rather than wrapping around, since
3371  * allowing wraparound would greatly complicate the search logic.  We maintain
3372  * an explicit tail pointer so that pruning of old XIDs can be done without
3373  * immediately moving the array contents.  In most cases only a small fraction
3374  * of the array contains valid entries at any instant.
3375  *
3376  * Although only the startup process can ever change the KnownAssignedXids
3377  * data structure, we still need interlocking so that standby backends will
3378  * not observe invalid intermediate states.  The convention is that backends
3379  * must hold shared ProcArrayLock to examine the array.  To remove XIDs from
3380  * the array, the startup process must hold ProcArrayLock exclusively, for
3381  * the usual transactional reasons (compare commit/abort of a transaction
3382  * during normal running).  Compressing unused entries out of the array
3383  * likewise requires exclusive lock.  To add XIDs to the array, we just insert
3384  * them into slots to the right of the head pointer and then advance the head
3385  * pointer.  This wouldn't require any lock at all, except that on machines
3386  * with weak memory ordering we need to be careful that other processors
3387  * see the array element changes before they see the head pointer change.
3388  * We handle this by using a spinlock to protect reads and writes of the
3389  * head/tail pointers.  (We could dispense with the spinlock if we were to
3390  * create suitable memory access barrier primitives and use those instead.)
3391  * The spinlock must be taken to read or write the head/tail pointers unless
3392  * the caller holds ProcArrayLock exclusively.
3393  *
3394  * Algorithmic analysis:
3395  *
3396  * If we have a maximum of M slots, with N XIDs currently spread across
3397  * S elements then we have N <= S <= M always.
3398  *
3399  *	* Adding a new XID is O(1) and needs little locking (unless compression
3400  *		must happen)
3401  *	* Compressing the array is O(S) and requires exclusive lock
3402  *	* Removing an XID is O(logS) and requires exclusive lock
3403  *	* Taking a snapshot is O(S) and requires shared lock
3404  *	* Checking for an XID is O(logS) and requires shared lock
3405  *
3406  * In comparison, using a hash table for KnownAssignedXids would mean that
3407  * taking snapshots would be O(M). If we can maintain S << M then the
3408  * sorted array technique will deliver significantly faster snapshots.
3409  * If we try to keep S too small then we will spend too much time compressing,
3410  * so there is an optimal point for any workload mix. We use a heuristic to
3411  * decide when to compress the array, though trimming also helps reduce
3412  * frequency of compressing. The heuristic requires us to track the number of
3413  * currently valid XIDs in the array.
3414  */
3415 
3416 
3417 /*
3418  * Compress KnownAssignedXids by shifting valid data down to the start of the
3419  * array, removing any gaps.
3420  *
3421  * A compression step is forced if "force" is true, otherwise we do it
3422  * only if a heuristic indicates it's a good time to do it.
3423  *
3424  * Caller must hold ProcArrayLock in exclusive mode.
3425  */
3426 static void
KnownAssignedXidsCompress(bool force)3427 KnownAssignedXidsCompress(bool force)
3428 {
3429 	/* use volatile pointer to prevent code rearrangement */
3430 	volatile ProcArrayStruct *pArray = procArray;
3431 	int			head,
3432 				tail;
3433 	int			compress_index;
3434 	int			i;
3435 
3436 	/* no spinlock required since we hold ProcArrayLock exclusively */
3437 	head = pArray->headKnownAssignedXids;
3438 	tail = pArray->tailKnownAssignedXids;
3439 
3440 	if (!force)
3441 	{
3442 		/*
3443 		 * If we can choose how much to compress, use a heuristic to avoid
3444 		 * compressing too often or not often enough.
3445 		 *
3446 		 * Heuristic is if we have a large enough current spread and less than
3447 		 * 50% of the elements are currently in use, then compress. This
3448 		 * should ensure we compress fairly infrequently. We could compress
3449 		 * less often though the virtual array would spread out more and
3450 		 * snapshots would become more expensive.
3451 		 */
3452 		int			nelements = head - tail;
3453 
3454 		if (nelements < 4 * PROCARRAY_MAXPROCS ||
3455 			nelements < 2 * pArray->numKnownAssignedXids)
3456 			return;
3457 	}
3458 
3459 	/*
3460 	 * We compress the array by reading the valid values from tail to head,
3461 	 * re-aligning data to 0th element.
3462 	 */
3463 	compress_index = 0;
3464 	for (i = tail; i < head; i++)
3465 	{
3466 		if (KnownAssignedXidsValid[i])
3467 		{
3468 			KnownAssignedXids[compress_index] = KnownAssignedXids[i];
3469 			KnownAssignedXidsValid[compress_index] = true;
3470 			compress_index++;
3471 		}
3472 	}
3473 
3474 	pArray->tailKnownAssignedXids = 0;
3475 	pArray->headKnownAssignedXids = compress_index;
3476 }
3477 
3478 /*
3479  * Add xids into KnownAssignedXids at the head of the array.
3480  *
3481  * xids from from_xid to to_xid, inclusive, are added to the array.
3482  *
3483  * If exclusive_lock is true then caller already holds ProcArrayLock in
3484  * exclusive mode, so we need no extra locking here.  Else caller holds no
3485  * lock, so we need to be sure we maintain sufficient interlocks against
3486  * concurrent readers.  (Only the startup process ever calls this, so no need
3487  * to worry about concurrent writers.)
3488  */
3489 static void
KnownAssignedXidsAdd(TransactionId from_xid,TransactionId to_xid,bool exclusive_lock)3490 KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
3491 					 bool exclusive_lock)
3492 {
3493 	/* use volatile pointer to prevent code rearrangement */
3494 	volatile ProcArrayStruct *pArray = procArray;
3495 	TransactionId next_xid;
3496 	int			head,
3497 				tail;
3498 	int			nxids;
3499 	int			i;
3500 
3501 	Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));
3502 
3503 	/*
3504 	 * Calculate how many array slots we'll need.  Normally this is cheap; in
3505 	 * the unusual case where the XIDs cross the wrap point, we do it the hard
3506 	 * way.
3507 	 */
3508 	if (to_xid >= from_xid)
3509 		nxids = to_xid - from_xid + 1;
3510 	else
3511 	{
3512 		nxids = 1;
3513 		next_xid = from_xid;
3514 		while (TransactionIdPrecedes(next_xid, to_xid))
3515 		{
3516 			nxids++;
3517 			TransactionIdAdvance(next_xid);
3518 		}
3519 	}
3520 
3521 	/*
3522 	 * Since only the startup process modifies the head/tail pointers, we
3523 	 * don't need a lock to read them here.
3524 	 */
3525 	head = pArray->headKnownAssignedXids;
3526 	tail = pArray->tailKnownAssignedXids;
3527 
3528 	Assert(head >= 0 && head <= pArray->maxKnownAssignedXids);
3529 	Assert(tail >= 0 && tail < pArray->maxKnownAssignedXids);
3530 
3531 	/*
3532 	 * Verify that insertions occur in TransactionId sequence.  Note that even
3533 	 * if the last existing element is marked invalid, it must still have a
3534 	 * correctly sequenced XID value.
3535 	 */
3536 	if (head > tail &&
3537 		TransactionIdFollowsOrEquals(KnownAssignedXids[head - 1], from_xid))
3538 	{
3539 		KnownAssignedXidsDisplay(LOG);
3540 		elog(ERROR, "out-of-order XID insertion in KnownAssignedXids");
3541 	}
3542 
3543 	/*
3544 	 * If our xids won't fit in the remaining space, compress out free space
3545 	 */
3546 	if (head + nxids > pArray->maxKnownAssignedXids)
3547 	{
3548 		/* must hold lock to compress */
3549 		if (!exclusive_lock)
3550 			LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3551 
3552 		KnownAssignedXidsCompress(true);
3553 
3554 		head = pArray->headKnownAssignedXids;
3555 		/* note: we no longer care about the tail pointer */
3556 
3557 		if (!exclusive_lock)
3558 			LWLockRelease(ProcArrayLock);
3559 
3560 		/*
3561 		 * If it still won't fit then we're out of memory
3562 		 */
3563 		if (head + nxids > pArray->maxKnownAssignedXids)
3564 			elog(ERROR, "too many KnownAssignedXids");
3565 	}
3566 
3567 	/* Now we can insert the xids into the space starting at head */
3568 	next_xid = from_xid;
3569 	for (i = 0; i < nxids; i++)
3570 	{
3571 		KnownAssignedXids[head] = next_xid;
3572 		KnownAssignedXidsValid[head] = true;
3573 		TransactionIdAdvance(next_xid);
3574 		head++;
3575 	}
3576 
3577 	/* Adjust count of number of valid entries */
3578 	pArray->numKnownAssignedXids += nxids;
3579 
3580 	/*
3581 	 * Now update the head pointer.  We use a spinlock to protect this
3582 	 * pointer, not because the update is likely to be non-atomic, but to
3583 	 * ensure that other processors see the above array updates before they
3584 	 * see the head pointer change.
3585 	 *
3586 	 * If we're holding ProcArrayLock exclusively, there's no need to take the
3587 	 * spinlock.
3588 	 */
3589 	if (exclusive_lock)
3590 		pArray->headKnownAssignedXids = head;
3591 	else
3592 	{
3593 		SpinLockAcquire(&pArray->known_assigned_xids_lck);
3594 		pArray->headKnownAssignedXids = head;
3595 		SpinLockRelease(&pArray->known_assigned_xids_lck);
3596 	}
3597 }
3598 
3599 /*
3600  * KnownAssignedXidsSearch
3601  *
3602  * Searches KnownAssignedXids for a specific xid and optionally removes it.
3603  * Returns true if it was found, false if not.
3604  *
3605  * Caller must hold ProcArrayLock in shared or exclusive mode.
3606  * Exclusive lock must be held for remove = true.
3607  */
3608 static bool
KnownAssignedXidsSearch(TransactionId xid,bool remove)3609 KnownAssignedXidsSearch(TransactionId xid, bool remove)
3610 {
3611 	/* use volatile pointer to prevent code rearrangement */
3612 	volatile ProcArrayStruct *pArray = procArray;
3613 	int			first,
3614 				last;
3615 	int			head;
3616 	int			tail;
3617 	int			result_index = -1;
3618 
3619 	if (remove)
3620 	{
3621 		/* we hold ProcArrayLock exclusively, so no need for spinlock */
3622 		tail = pArray->tailKnownAssignedXids;
3623 		head = pArray->headKnownAssignedXids;
3624 	}
3625 	else
3626 	{
3627 		/* take spinlock to ensure we see up-to-date array contents */
3628 		SpinLockAcquire(&pArray->known_assigned_xids_lck);
3629 		tail = pArray->tailKnownAssignedXids;
3630 		head = pArray->headKnownAssignedXids;
3631 		SpinLockRelease(&pArray->known_assigned_xids_lck);
3632 	}
3633 
3634 	/*
3635 	 * Standard binary search.  Note we can ignore the KnownAssignedXidsValid
3636 	 * array here, since even invalid entries will contain sorted XIDs.
3637 	 */
3638 	first = tail;
3639 	last = head - 1;
3640 	while (first <= last)
3641 	{
3642 		int			mid_index;
3643 		TransactionId mid_xid;
3644 
3645 		mid_index = (first + last) / 2;
3646 		mid_xid = KnownAssignedXids[mid_index];
3647 
3648 		if (xid == mid_xid)
3649 		{
3650 			result_index = mid_index;
3651 			break;
3652 		}
3653 		else if (TransactionIdPrecedes(xid, mid_xid))
3654 			last = mid_index - 1;
3655 		else
3656 			first = mid_index + 1;
3657 	}
3658 
3659 	if (result_index < 0)
3660 		return false;			/* not in array */
3661 
3662 	if (!KnownAssignedXidsValid[result_index])
3663 		return false;			/* in array, but invalid */
3664 
3665 	if (remove)
3666 	{
3667 		KnownAssignedXidsValid[result_index] = false;
3668 
3669 		pArray->numKnownAssignedXids--;
3670 		Assert(pArray->numKnownAssignedXids >= 0);
3671 
3672 		/*
3673 		 * If we're removing the tail element then advance tail pointer over
3674 		 * any invalid elements.  This will speed future searches.
3675 		 */
3676 		if (result_index == tail)
3677 		{
3678 			tail++;
3679 			while (tail < head && !KnownAssignedXidsValid[tail])
3680 				tail++;
3681 			if (tail >= head)
3682 			{
3683 				/* Array is empty, so we can reset both pointers */
3684 				pArray->headKnownAssignedXids = 0;
3685 				pArray->tailKnownAssignedXids = 0;
3686 			}
3687 			else
3688 			{
3689 				pArray->tailKnownAssignedXids = tail;
3690 			}
3691 		}
3692 	}
3693 
3694 	return true;
3695 }
3696 
3697 /*
3698  * Is the specified XID present in KnownAssignedXids[]?
3699  *
3700  * Caller must hold ProcArrayLock in shared or exclusive mode.
3701  */
3702 static bool
KnownAssignedXidExists(TransactionId xid)3703 KnownAssignedXidExists(TransactionId xid)
3704 {
3705 	Assert(TransactionIdIsValid(xid));
3706 
3707 	return KnownAssignedXidsSearch(xid, false);
3708 }
3709 
3710 /*
3711  * Remove the specified XID from KnownAssignedXids[].
3712  *
3713  * Caller must hold ProcArrayLock in exclusive mode.
3714  */
3715 static void
KnownAssignedXidsRemove(TransactionId xid)3716 KnownAssignedXidsRemove(TransactionId xid)
3717 {
3718 	Assert(TransactionIdIsValid(xid));
3719 
3720 	elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
3721 
3722 	/*
3723 	 * Note: we cannot consider it an error to remove an XID that's not
3724 	 * present.  We intentionally remove subxact IDs while processing
3725 	 * XLOG_XACT_ASSIGNMENT, to avoid array overflow.  Then those XIDs will be
3726 	 * removed again when the top-level xact commits or aborts.
3727 	 *
3728 	 * It might be possible to track such XIDs to distinguish this case from
3729 	 * actual errors, but it would be complicated and probably not worth it.
3730 	 * So, just ignore the search result.
3731 	 */
3732 	(void) KnownAssignedXidsSearch(xid, true);
3733 }
3734 
3735 /*
3736  * KnownAssignedXidsRemoveTree
3737  *		Remove xid (if it's not InvalidTransactionId) and all the subxids.
3738  *
3739  * Caller must hold ProcArrayLock in exclusive mode.
3740  */
3741 static void
KnownAssignedXidsRemoveTree(TransactionId xid,int nsubxids,TransactionId * subxids)3742 KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
3743 							TransactionId *subxids)
3744 {
3745 	int			i;
3746 
3747 	if (TransactionIdIsValid(xid))
3748 		KnownAssignedXidsRemove(xid);
3749 
3750 	for (i = 0; i < nsubxids; i++)
3751 		KnownAssignedXidsRemove(subxids[i]);
3752 
3753 	/* Opportunistically compress the array */
3754 	KnownAssignedXidsCompress(false);
3755 }
3756 
3757 /*
3758  * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
3759  * then clear the whole table.
3760  *
3761  * Caller must hold ProcArrayLock in exclusive mode.
3762  */
3763 static void
KnownAssignedXidsRemovePreceding(TransactionId removeXid)3764 KnownAssignedXidsRemovePreceding(TransactionId removeXid)
3765 {
3766 	/* use volatile pointer to prevent code rearrangement */
3767 	volatile ProcArrayStruct *pArray = procArray;
3768 	int			count = 0;
3769 	int			head,
3770 				tail,
3771 				i;
3772 
3773 	if (!TransactionIdIsValid(removeXid))
3774 	{
3775 		elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
3776 		pArray->numKnownAssignedXids = 0;
3777 		pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
3778 		return;
3779 	}
3780 
3781 	elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
3782 
3783 	/*
3784 	 * Mark entries invalid starting at the tail.  Since array is sorted, we
3785 	 * can stop as soon as we reach an entry >= removeXid.
3786 	 */
3787 	tail = pArray->tailKnownAssignedXids;
3788 	head = pArray->headKnownAssignedXids;
3789 
3790 	for (i = tail; i < head; i++)
3791 	{
3792 		if (KnownAssignedXidsValid[i])
3793 		{
3794 			TransactionId knownXid = KnownAssignedXids[i];
3795 
3796 			if (TransactionIdFollowsOrEquals(knownXid, removeXid))
3797 				break;
3798 
3799 			if (!StandbyTransactionIdIsPrepared(knownXid))
3800 			{
3801 				KnownAssignedXidsValid[i] = false;
3802 				count++;
3803 			}
3804 		}
3805 	}
3806 
3807 	pArray->numKnownAssignedXids -= count;
3808 	Assert(pArray->numKnownAssignedXids >= 0);
3809 
3810 	/*
3811 	 * Advance the tail pointer if we've marked the tail item invalid.
3812 	 */
3813 	for (i = tail; i < head; i++)
3814 	{
3815 		if (KnownAssignedXidsValid[i])
3816 			break;
3817 	}
3818 	if (i >= head)
3819 	{
3820 		/* Array is empty, so we can reset both pointers */
3821 		pArray->headKnownAssignedXids = 0;
3822 		pArray->tailKnownAssignedXids = 0;
3823 	}
3824 	else
3825 	{
3826 		pArray->tailKnownAssignedXids = i;
3827 	}
3828 
3829 	/* Opportunistically compress the array */
3830 	KnownAssignedXidsCompress(false);
3831 }
3832 
3833 /*
3834  * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
3835  * We filter out anything >= xmax.
3836  *
3837  * Returns the number of XIDs stored into xarray[].  Caller is responsible
3838  * that array is large enough.
3839  *
3840  * Caller must hold ProcArrayLock in (at least) shared mode.
3841  */
3842 static int
KnownAssignedXidsGet(TransactionId * xarray,TransactionId xmax)3843 KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
3844 {
3845 	TransactionId xtmp = InvalidTransactionId;
3846 
3847 	return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
3848 }
3849 
3850 /*
3851  * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus
3852  * we reduce *xmin to the lowest xid value seen if not already lower.
3853  *
3854  * Caller must hold ProcArrayLock in (at least) shared mode.
3855  */
3856 static int
KnownAssignedXidsGetAndSetXmin(TransactionId * xarray,TransactionId * xmin,TransactionId xmax)3857 KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
3858 							   TransactionId xmax)
3859 {
3860 	int			count = 0;
3861 	int			head,
3862 				tail;
3863 	int			i;
3864 
3865 	/*
3866 	 * Fetch head just once, since it may change while we loop. We can stop
3867 	 * once we reach the initially seen head, since we are certain that an xid
3868 	 * cannot enter and then leave the array while we hold ProcArrayLock.  We
3869 	 * might miss newly-added xids, but they should be >= xmax so irrelevant
3870 	 * anyway.
3871 	 *
3872 	 * Must take spinlock to ensure we see up-to-date array contents.
3873 	 */
3874 	SpinLockAcquire(&procArray->known_assigned_xids_lck);
3875 	tail = procArray->tailKnownAssignedXids;
3876 	head = procArray->headKnownAssignedXids;
3877 	SpinLockRelease(&procArray->known_assigned_xids_lck);
3878 
3879 	for (i = tail; i < head; i++)
3880 	{
3881 		/* Skip any gaps in the array */
3882 		if (KnownAssignedXidsValid[i])
3883 		{
3884 			TransactionId knownXid = KnownAssignedXids[i];
3885 
3886 			/*
3887 			 * Update xmin if required.  Only the first XID need be checked,
3888 			 * since the array is sorted.
3889 			 */
3890 			if (count == 0 &&
3891 				TransactionIdPrecedes(knownXid, *xmin))
3892 				*xmin = knownXid;
3893 
3894 			/*
3895 			 * Filter out anything >= xmax, again relying on sorted property
3896 			 * of array.
3897 			 */
3898 			if (TransactionIdIsValid(xmax) &&
3899 				TransactionIdFollowsOrEquals(knownXid, xmax))
3900 				break;
3901 
3902 			/* Add knownXid into output array */
3903 			xarray[count++] = knownXid;
3904 		}
3905 	}
3906 
3907 	return count;
3908 }
3909 
3910 /*
3911  * Get oldest XID in the KnownAssignedXids array, or InvalidTransactionId
3912  * if nothing there.
3913  */
3914 static TransactionId
KnownAssignedXidsGetOldestXmin(void)3915 KnownAssignedXidsGetOldestXmin(void)
3916 {
3917 	int			head,
3918 				tail;
3919 	int			i;
3920 
3921 	/*
3922 	 * Fetch head just once, since it may change while we loop.
3923 	 */
3924 	SpinLockAcquire(&procArray->known_assigned_xids_lck);
3925 	tail = procArray->tailKnownAssignedXids;
3926 	head = procArray->headKnownAssignedXids;
3927 	SpinLockRelease(&procArray->known_assigned_xids_lck);
3928 
3929 	for (i = tail; i < head; i++)
3930 	{
3931 		/* Skip any gaps in the array */
3932 		if (KnownAssignedXidsValid[i])
3933 			return KnownAssignedXids[i];
3934 	}
3935 
3936 	return InvalidTransactionId;
3937 }
3938 
3939 /*
3940  * Display KnownAssignedXids to provide debug trail
3941  *
3942  * Currently this is only called within startup process, so we need no
3943  * special locking.
3944  *
3945  * Note this is pretty expensive, and much of the expense will be incurred
3946  * even if the elog message will get discarded.  It's not currently called
3947  * in any performance-critical places, however, so no need to be tenser.
3948  */
3949 static void
KnownAssignedXidsDisplay(int trace_level)3950 KnownAssignedXidsDisplay(int trace_level)
3951 {
3952 	/* use volatile pointer to prevent code rearrangement */
3953 	volatile ProcArrayStruct *pArray = procArray;
3954 	StringInfoData buf;
3955 	int			head,
3956 				tail,
3957 				i;
3958 	int			nxids = 0;
3959 
3960 	tail = pArray->tailKnownAssignedXids;
3961 	head = pArray->headKnownAssignedXids;
3962 
3963 	initStringInfo(&buf);
3964 
3965 	for (i = tail; i < head; i++)
3966 	{
3967 		if (KnownAssignedXidsValid[i])
3968 		{
3969 			nxids++;
3970 			appendStringInfo(&buf, "[%d]=%u ", i, KnownAssignedXids[i]);
3971 		}
3972 	}
3973 
3974 	elog(trace_level, "%d KnownAssignedXids (num=%d tail=%d head=%d) %s",
3975 		 nxids,
3976 		 pArray->numKnownAssignedXids,
3977 		 pArray->tailKnownAssignedXids,
3978 		 pArray->headKnownAssignedXids,
3979 		 buf.data);
3980 
3981 	pfree(buf.data);
3982 }
3983 
3984 /*
3985  * KnownAssignedXidsReset
3986  *		Resets KnownAssignedXids to be empty
3987  */
3988 static void
KnownAssignedXidsReset(void)3989 KnownAssignedXidsReset(void)
3990 {
3991 	/* use volatile pointer to prevent code rearrangement */
3992 	volatile ProcArrayStruct *pArray = procArray;
3993 
3994 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3995 
3996 	pArray->numKnownAssignedXids = 0;
3997 	pArray->tailKnownAssignedXids = 0;
3998 	pArray->headKnownAssignedXids = 0;
3999 
4000 	LWLockRelease(ProcArrayLock);
4001 }
4002