1 /*-------------------------------------------------------------------------
2  *
3  * procarray.c
4  *	  POSTGRES process array code.
5  *
6  *
7  * This module maintains arrays of the PGPROC and PGXACT structures for all
8  * active backends.  Although there are several uses for this, the principal
9  * one is as a means of determining the set of currently running transactions.
10  *
11  * Because of various subtle race conditions it is critical that a backend
12  * hold the correct locks while setting or clearing its MyPgXact->xid field.
13  * See notes in src/backend/access/transam/README.
14  *
15  * The process arrays now also include structures representing prepared
16  * transactions.  The xid and subxids fields of these are valid, as are the
17  * myProcLocks lists.  They can be distinguished from regular backend PGPROCs
18  * at need by checking for pid == 0.
19  *
20  * During hot standby, we also keep a list of XIDs representing transactions
21  * that are known to be running in the master (or more precisely, were running
22  * as of the current point in the WAL stream).  This list is kept in the
23  * KnownAssignedXids array, and is updated by watching the sequence of
24  * arriving XIDs.  This is necessary because if we leave those XIDs out of
25  * snapshots taken for standby queries, then they will appear to be already
26  * complete, leading to MVCC failures.  Note that in hot standby, the PGPROC
27  * array represents standby processes, which by definition are not running
28  * transactions that have XIDs.
29  *
30  * It is perhaps possible for a backend on the master to terminate without
31  * writing an abort record for its transaction.  While that shouldn't really
32  * happen, it would tie up KnownAssignedXids indefinitely, so we protect
33  * ourselves by pruning the array when a valid list of running XIDs arrives.
34  *
35  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
36  * Portions Copyright (c) 1994, Regents of the University of California
37  *
38  *
39  * IDENTIFICATION
40  *	  src/backend/storage/ipc/procarray.c
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45 
46 #include <signal.h>
47 
48 #include "access/clog.h"
49 #include "access/subtrans.h"
50 #include "access/transam.h"
51 #include "access/twophase.h"
52 #include "access/xact.h"
53 #include "access/xlog.h"
54 #include "catalog/catalog.h"
55 #include "miscadmin.h"
56 #include "pgstat.h"
57 #include "storage/proc.h"
58 #include "storage/procarray.h"
59 #include "storage/spin.h"
60 #include "utils/builtins.h"
61 #include "utils/rel.h"
62 #include "utils/snapmgr.h"
63 
64 
65 /* Our shared memory area */
66 typedef struct ProcArrayStruct
67 {
68 	int			numProcs;		/* number of valid procs entries */
69 	int			maxProcs;		/* allocated size of procs array */
70 
71 	/*
72 	 * Known assigned XIDs handling
73 	 */
74 	int			maxKnownAssignedXids;	/* allocated size of array */
75 	int			numKnownAssignedXids;	/* current # of valid entries */
76 	int			tailKnownAssignedXids;	/* index of oldest valid element */
77 	int			headKnownAssignedXids;	/* index of newest element, + 1 */
78 	slock_t		known_assigned_xids_lck;	/* protects head/tail pointers */
79 
80 	/*
81 	 * Highest subxid that has been removed from KnownAssignedXids array to
82 	 * prevent overflow; or InvalidTransactionId if none.  We track this for
83 	 * similar reasons to tracking overflowing cached subxids in PGXACT
84 	 * entries.  Must hold exclusive ProcArrayLock to change this, and shared
85 	 * lock to read it.
86 	 */
87 	TransactionId lastOverflowedXid;
88 
89 	/* oldest xmin of any replication slot */
90 	TransactionId replication_slot_xmin;
91 	/* oldest catalog xmin of any replication slot */
92 	TransactionId replication_slot_catalog_xmin;
93 
94 	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
95 	int			pgprocnos[FLEXIBLE_ARRAY_MEMBER];
96 } ProcArrayStruct;
97 
98 static ProcArrayStruct *procArray;
99 
100 static PGPROC *allProcs;
101 static PGXACT *allPgXact;
102 
103 /*
104  * Bookkeeping for tracking emulated transactions in recovery
105  */
106 static TransactionId *KnownAssignedXids;
107 static bool *KnownAssignedXidsValid;
108 static TransactionId latestObservedXid = InvalidTransactionId;
109 
110 /*
111  * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
112  * the highest xid that might still be running that we don't have in
113  * KnownAssignedXids.
114  */
115 static TransactionId standbySnapshotPendingXmin;
116 
117 #ifdef XIDCACHE_DEBUG
118 
119 /* counters for XidCache measurement */
120 static long xc_by_recent_xmin = 0;
121 static long xc_by_known_xact = 0;
122 static long xc_by_my_xact = 0;
123 static long xc_by_latest_xid = 0;
124 static long xc_by_main_xid = 0;
125 static long xc_by_child_xid = 0;
126 static long xc_by_known_assigned = 0;
127 static long xc_no_overflow = 0;
128 static long xc_slow_answer = 0;
129 
130 #define xc_by_recent_xmin_inc()		(xc_by_recent_xmin++)
131 #define xc_by_known_xact_inc()		(xc_by_known_xact++)
132 #define xc_by_my_xact_inc()			(xc_by_my_xact++)
133 #define xc_by_latest_xid_inc()		(xc_by_latest_xid++)
134 #define xc_by_main_xid_inc()		(xc_by_main_xid++)
135 #define xc_by_child_xid_inc()		(xc_by_child_xid++)
136 #define xc_by_known_assigned_inc()	(xc_by_known_assigned++)
137 #define xc_no_overflow_inc()		(xc_no_overflow++)
138 #define xc_slow_answer_inc()		(xc_slow_answer++)
139 
140 static void DisplayXidCache(void);
141 #else							/* !XIDCACHE_DEBUG */
142 
143 #define xc_by_recent_xmin_inc()		((void) 0)
144 #define xc_by_known_xact_inc()		((void) 0)
145 #define xc_by_my_xact_inc()			((void) 0)
146 #define xc_by_latest_xid_inc()		((void) 0)
147 #define xc_by_main_xid_inc()		((void) 0)
148 #define xc_by_child_xid_inc()		((void) 0)
149 #define xc_by_known_assigned_inc()	((void) 0)
150 #define xc_no_overflow_inc()		((void) 0)
151 #define xc_slow_answer_inc()		((void) 0)
152 #endif							/* XIDCACHE_DEBUG */
153 
154 /* Primitives for KnownAssignedXids array handling for standby */
155 static void KnownAssignedXidsCompress(bool force);
156 static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
157 					 bool exclusive_lock);
158 static bool KnownAssignedXidsSearch(TransactionId xid, bool remove);
159 static bool KnownAssignedXidExists(TransactionId xid);
160 static void KnownAssignedXidsRemove(TransactionId xid);
161 static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
162 							TransactionId *subxids);
163 static void KnownAssignedXidsRemovePreceding(TransactionId xid);
164 static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
165 static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
166 							   TransactionId *xmin,
167 							   TransactionId xmax);
168 static TransactionId KnownAssignedXidsGetOldestXmin(void);
169 static void KnownAssignedXidsDisplay(int trace_level);
170 static void KnownAssignedXidsReset(void);
171 static inline void ProcArrayEndTransactionInternal(PGPROC *proc,
172 								PGXACT *pgxact, TransactionId latestXid);
173 static void ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid);
174 
175 /*
176  * Report shared-memory space needed by CreateSharedProcArray.
177  */
178 Size
ProcArrayShmemSize(void)179 ProcArrayShmemSize(void)
180 {
181 	Size		size;
182 
183 	/* Size of the ProcArray structure itself */
184 #define PROCARRAY_MAXPROCS	(MaxBackends + max_prepared_xacts)
185 
186 	size = offsetof(ProcArrayStruct, pgprocnos);
187 	size = add_size(size, mul_size(sizeof(int), PROCARRAY_MAXPROCS));
188 
189 	/*
190 	 * During Hot Standby processing we have a data structure called
191 	 * KnownAssignedXids, created in shared memory. Local data structures are
192 	 * also created in various backends during GetSnapshotData(),
193 	 * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the
194 	 * main structures created in those functions must be identically sized,
195 	 * since we may at times copy the whole of the data structures around. We
196 	 * refer to this size as TOTAL_MAX_CACHED_SUBXIDS.
197 	 *
198 	 * Ideally we'd only create this structure if we were actually doing hot
199 	 * standby in the current run, but we don't know that yet at the time
200 	 * shared memory is being set up.
201 	 */
202 #define TOTAL_MAX_CACHED_SUBXIDS \
203 	((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
204 
205 	if (EnableHotStandby)
206 	{
207 		size = add_size(size,
208 						mul_size(sizeof(TransactionId),
209 								 TOTAL_MAX_CACHED_SUBXIDS));
210 		size = add_size(size,
211 						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
212 	}
213 
214 	return size;
215 }
216 
217 /*
218  * Initialize the shared PGPROC array during postmaster startup.
219  */
220 void
CreateSharedProcArray(void)221 CreateSharedProcArray(void)
222 {
223 	bool		found;
224 
225 	/* Create or attach to the ProcArray shared structure */
226 	procArray = (ProcArrayStruct *)
227 		ShmemInitStruct("Proc Array",
228 						add_size(offsetof(ProcArrayStruct, pgprocnos),
229 								 mul_size(sizeof(int),
230 										  PROCARRAY_MAXPROCS)),
231 						&found);
232 
233 	if (!found)
234 	{
235 		/*
236 		 * We're the first - initialize.
237 		 */
238 		procArray->numProcs = 0;
239 		procArray->maxProcs = PROCARRAY_MAXPROCS;
240 		procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
241 		procArray->numKnownAssignedXids = 0;
242 		procArray->tailKnownAssignedXids = 0;
243 		procArray->headKnownAssignedXids = 0;
244 		SpinLockInit(&procArray->known_assigned_xids_lck);
245 		procArray->lastOverflowedXid = InvalidTransactionId;
246 		procArray->replication_slot_xmin = InvalidTransactionId;
247 		procArray->replication_slot_catalog_xmin = InvalidTransactionId;
248 	}
249 
250 	allProcs = ProcGlobal->allProcs;
251 	allPgXact = ProcGlobal->allPgXact;
252 
253 	/* Create or attach to the KnownAssignedXids arrays too, if needed */
254 	if (EnableHotStandby)
255 	{
256 		KnownAssignedXids = (TransactionId *)
257 			ShmemInitStruct("KnownAssignedXids",
258 							mul_size(sizeof(TransactionId),
259 									 TOTAL_MAX_CACHED_SUBXIDS),
260 							&found);
261 		KnownAssignedXidsValid = (bool *)
262 			ShmemInitStruct("KnownAssignedXidsValid",
263 							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
264 							&found);
265 	}
266 
267 	/* Register and initialize fields of ProcLWLockTranche */
268 	LWLockRegisterTranche(LWTRANCHE_PROC, "proc");
269 }
270 
271 /*
272  * Add the specified PGPROC to the shared array.
273  */
274 void
ProcArrayAdd(PGPROC * proc)275 ProcArrayAdd(PGPROC *proc)
276 {
277 	ProcArrayStruct *arrayP = procArray;
278 	int			index;
279 
280 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
281 
282 	if (arrayP->numProcs >= arrayP->maxProcs)
283 	{
284 		/*
285 		 * Oops, no room.  (This really shouldn't happen, since there is a
286 		 * fixed supply of PGPROC structs too, and so we should have failed
287 		 * earlier.)
288 		 */
289 		LWLockRelease(ProcArrayLock);
290 		ereport(FATAL,
291 				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
292 				 errmsg("sorry, too many clients already")));
293 	}
294 
295 	/*
296 	 * Keep the procs array sorted by (PGPROC *) so that we can utilize
297 	 * locality of references much better. This is useful while traversing the
298 	 * ProcArray because there is an increased likelihood of finding the next
299 	 * PGPROC structure in the cache.
300 	 *
301 	 * Since the occurrence of adding/removing a proc is much lower than the
302 	 * access to the ProcArray itself, the overhead should be marginal
303 	 */
304 	for (index = 0; index < arrayP->numProcs; index++)
305 	{
306 		/*
307 		 * If we are the first PGPROC or if we have found our right position
308 		 * in the array, break
309 		 */
310 		if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
311 			break;
312 	}
313 
314 	memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
315 			(arrayP->numProcs - index) * sizeof(int));
316 	arrayP->pgprocnos[index] = proc->pgprocno;
317 	arrayP->numProcs++;
318 
319 	LWLockRelease(ProcArrayLock);
320 }
321 
322 /*
323  * Remove the specified PGPROC from the shared array.
324  *
325  * When latestXid is a valid XID, we are removing a live 2PC gxact from the
326  * array, and thus causing it to appear as "not running" anymore.  In this
327  * case we must advance latestCompletedXid.  (This is essentially the same
328  * as ProcArrayEndTransaction followed by removal of the PGPROC, but we take
329  * the ProcArrayLock only once, and don't damage the content of the PGPROC;
330  * twophase.c depends on the latter.)
331  */
332 void
ProcArrayRemove(PGPROC * proc,TransactionId latestXid)333 ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
334 {
335 	ProcArrayStruct *arrayP = procArray;
336 	int			index;
337 
338 #ifdef XIDCACHE_DEBUG
339 	/* dump stats at backend shutdown, but not prepared-xact end */
340 	if (proc->pid != 0)
341 		DisplayXidCache();
342 #endif
343 
344 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
345 
346 	if (TransactionIdIsValid(latestXid))
347 	{
348 		Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
349 
350 		/* Advance global latestCompletedXid while holding the lock */
351 		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
352 								  latestXid))
353 			ShmemVariableCache->latestCompletedXid = latestXid;
354 	}
355 	else
356 	{
357 		/* Shouldn't be trying to remove a live transaction here */
358 		Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
359 	}
360 
361 	for (index = 0; index < arrayP->numProcs; index++)
362 	{
363 		if (arrayP->pgprocnos[index] == proc->pgprocno)
364 		{
365 			/* Keep the PGPROC array sorted. See notes above */
366 			memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
367 					(arrayP->numProcs - index - 1) * sizeof(int));
368 			arrayP->pgprocnos[arrayP->numProcs - 1] = -1;	/* for debugging */
369 			arrayP->numProcs--;
370 			LWLockRelease(ProcArrayLock);
371 			return;
372 		}
373 	}
374 
375 	/* Oops */
376 	LWLockRelease(ProcArrayLock);
377 
378 	elog(LOG, "failed to find proc %p in ProcArray", proc);
379 }
380 
381 
382 /*
383  * ProcArrayEndTransaction -- mark a transaction as no longer running
384  *
385  * This is used interchangeably for commit and abort cases.  The transaction
386  * commit/abort must already be reported to WAL and pg_xact.
387  *
388  * proc is currently always MyProc, but we pass it explicitly for flexibility.
389  * latestXid is the latest Xid among the transaction's main XID and
390  * subtransactions, or InvalidTransactionId if it has no XID.  (We must ask
391  * the caller to pass latestXid, instead of computing it from the PGPROC's
392  * contents, because the subxid information in the PGPROC might be
393  * incomplete.)
394  */
395 void
ProcArrayEndTransaction(PGPROC * proc,TransactionId latestXid)396 ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
397 {
398 	PGXACT	   *pgxact = &allPgXact[proc->pgprocno];
399 
400 	if (TransactionIdIsValid(latestXid))
401 	{
402 		/*
403 		 * We must lock ProcArrayLock while clearing our advertised XID, so
404 		 * that we do not exit the set of "running" transactions while someone
405 		 * else is taking a snapshot.  See discussion in
406 		 * src/backend/access/transam/README.
407 		 */
408 		Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
409 
410 		/*
411 		 * If we can immediately acquire ProcArrayLock, we clear our own XID
412 		 * and release the lock.  If not, use group XID clearing to improve
413 		 * efficiency.
414 		 */
415 		if (LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE))
416 		{
417 			ProcArrayEndTransactionInternal(proc, pgxact, latestXid);
418 			LWLockRelease(ProcArrayLock);
419 		}
420 		else
421 			ProcArrayGroupClearXid(proc, latestXid);
422 	}
423 	else
424 	{
425 		/*
426 		 * If we have no XID, we don't need to lock, since we won't affect
427 		 * anyone else's calculation of a snapshot.  We might change their
428 		 * estimate of global xmin, but that's OK.
429 		 */
430 		Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
431 
432 		proc->lxid = InvalidLocalTransactionId;
433 		pgxact->xmin = InvalidTransactionId;
434 		/* must be cleared with xid/xmin: */
435 		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
436 		pgxact->delayChkpt = false; /* be sure this is cleared in abort */
437 		proc->recoveryConflictPending = false;
438 
439 		Assert(pgxact->nxids == 0);
440 		Assert(pgxact->overflowed == false);
441 	}
442 }
443 
444 /*
445  * Mark a write transaction as no longer running.
446  *
447  * We don't do any locking here; caller must handle that.
448  */
449 static inline void
ProcArrayEndTransactionInternal(PGPROC * proc,PGXACT * pgxact,TransactionId latestXid)450 ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
451 								TransactionId latestXid)
452 {
453 	pgxact->xid = InvalidTransactionId;
454 	proc->lxid = InvalidLocalTransactionId;
455 	pgxact->xmin = InvalidTransactionId;
456 	/* must be cleared with xid/xmin: */
457 	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
458 	pgxact->delayChkpt = false; /* be sure this is cleared in abort */
459 	proc->recoveryConflictPending = false;
460 
461 	/* Clear the subtransaction-XID cache too while holding the lock */
462 	pgxact->nxids = 0;
463 	pgxact->overflowed = false;
464 
465 	/* Also advance global latestCompletedXid while holding the lock */
466 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
467 							  latestXid))
468 		ShmemVariableCache->latestCompletedXid = latestXid;
469 }
470 
471 /*
472  * ProcArrayGroupClearXid -- group XID clearing
473  *
474  * When we cannot immediately acquire ProcArrayLock in exclusive mode at
475  * commit time, add ourselves to a list of processes that need their XIDs
476  * cleared.  The first process to add itself to the list will acquire
477  * ProcArrayLock in exclusive mode and perform ProcArrayEndTransactionInternal
478  * on behalf of all group members.  This avoids a great deal of contention
479  * around ProcArrayLock when many processes are trying to commit at once,
480  * since the lock need not be repeatedly handed off from one committing
481  * process to the next.
482  */
483 static void
ProcArrayGroupClearXid(PGPROC * proc,TransactionId latestXid)484 ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid)
485 {
486 	volatile PROC_HDR *procglobal = ProcGlobal;
487 	uint32		nextidx;
488 	uint32		wakeidx;
489 
490 	/* We should definitely have an XID to clear. */
491 	Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
492 
493 	/* Add ourselves to the list of processes needing a group XID clear. */
494 	proc->procArrayGroupMember = true;
495 	proc->procArrayGroupMemberXid = latestXid;
496 	while (true)
497 	{
498 		nextidx = pg_atomic_read_u32(&procglobal->procArrayGroupFirst);
499 		pg_atomic_write_u32(&proc->procArrayGroupNext, nextidx);
500 
501 		if (pg_atomic_compare_exchange_u32(&procglobal->procArrayGroupFirst,
502 										   &nextidx,
503 										   (uint32) proc->pgprocno))
504 			break;
505 	}
506 
507 	/*
508 	 * If the list was not empty, the leader will clear our XID.  It is
509 	 * impossible to have followers without a leader because the first process
510 	 * that has added itself to the list will always have nextidx as
511 	 * INVALID_PGPROCNO.
512 	 */
513 	if (nextidx != INVALID_PGPROCNO)
514 	{
515 		int			extraWaits = 0;
516 
517 		/* Sleep until the leader clears our XID. */
518 		pgstat_report_wait_start(WAIT_EVENT_PROCARRAY_GROUP_UPDATE);
519 		for (;;)
520 		{
521 			/* acts as a read barrier */
522 			PGSemaphoreLock(proc->sem);
523 			if (!proc->procArrayGroupMember)
524 				break;
525 			extraWaits++;
526 		}
527 		pgstat_report_wait_end();
528 
529 		Assert(pg_atomic_read_u32(&proc->procArrayGroupNext) == INVALID_PGPROCNO);
530 
531 		/* Fix semaphore count for any absorbed wakeups */
532 		while (extraWaits-- > 0)
533 			PGSemaphoreUnlock(proc->sem);
534 		return;
535 	}
536 
537 	/* We are the leader.  Acquire the lock on behalf of everyone. */
538 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
539 
540 	/*
541 	 * Now that we've got the lock, clear the list of processes waiting for
542 	 * group XID clearing, saving a pointer to the head of the list.  Trying
543 	 * to pop elements one at a time could lead to an ABA problem.
544 	 */
545 	while (true)
546 	{
547 		nextidx = pg_atomic_read_u32(&procglobal->procArrayGroupFirst);
548 		if (pg_atomic_compare_exchange_u32(&procglobal->procArrayGroupFirst,
549 										   &nextidx,
550 										   INVALID_PGPROCNO))
551 			break;
552 	}
553 
554 	/* Remember head of list so we can perform wakeups after dropping lock. */
555 	wakeidx = nextidx;
556 
557 	/* Walk the list and clear all XIDs. */
558 	while (nextidx != INVALID_PGPROCNO)
559 	{
560 		PGPROC	   *nextproc = &allProcs[nextidx];
561 		PGXACT	   *pgxact = &allPgXact[nextidx];
562 
563 		ProcArrayEndTransactionInternal(nextproc, pgxact, nextproc->procArrayGroupMemberXid);
564 
565 		/* Move to next proc in list. */
566 		nextidx = pg_atomic_read_u32(&nextproc->procArrayGroupNext);
567 	}
568 
569 	/* We're done with the lock now. */
570 	LWLockRelease(ProcArrayLock);
571 
572 	/*
573 	 * Now that we've released the lock, go back and wake everybody up.  We
574 	 * don't do this under the lock so as to keep lock hold times to a
575 	 * minimum.  The system calls we need to perform to wake other processes
576 	 * up are probably much slower than the simple memory writes we did while
577 	 * holding the lock.
578 	 */
579 	while (wakeidx != INVALID_PGPROCNO)
580 	{
581 		PGPROC	   *nextproc = &allProcs[wakeidx];
582 
583 		wakeidx = pg_atomic_read_u32(&nextproc->procArrayGroupNext);
584 		pg_atomic_write_u32(&nextproc->procArrayGroupNext, INVALID_PGPROCNO);
585 
586 		/* ensure all previous writes are visible before follower continues. */
587 		pg_write_barrier();
588 
589 		nextproc->procArrayGroupMember = false;
590 
591 		if (nextproc != MyProc)
592 			PGSemaphoreUnlock(nextproc->sem);
593 	}
594 }
595 
596 /*
597  * ProcArrayClearTransaction -- clear the transaction fields
598  *
599  * This is used after successfully preparing a 2-phase transaction.  We are
600  * not actually reporting the transaction's XID as no longer running --- it
601  * will still appear as running because the 2PC's gxact is in the ProcArray
602  * too.  We just have to clear out our own PGXACT.
603  */
604 void
ProcArrayClearTransaction(PGPROC * proc)605 ProcArrayClearTransaction(PGPROC *proc)
606 {
607 	PGXACT	   *pgxact = &allPgXact[proc->pgprocno];
608 
609 	/*
610 	 * We can skip locking ProcArrayLock here, because this action does not
611 	 * actually change anyone's view of the set of running XIDs: our entry is
612 	 * duplicate with the gxact that has already been inserted into the
613 	 * ProcArray.
614 	 */
615 	pgxact->xid = InvalidTransactionId;
616 	proc->lxid = InvalidLocalTransactionId;
617 	pgxact->xmin = InvalidTransactionId;
618 	proc->recoveryConflictPending = false;
619 
620 	/* redundant, but just in case */
621 	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
622 	pgxact->delayChkpt = false;
623 
624 	/* Clear the subtransaction-XID cache too */
625 	pgxact->nxids = 0;
626 	pgxact->overflowed = false;
627 }
628 
629 /*
630  * ProcArrayInitRecovery -- initialize recovery xid mgmt environment
631  *
632  * Remember up to where the startup process initialized the CLOG and subtrans
633  * so we can ensure it's initialized gaplessly up to the point where necessary
634  * while in recovery.
635  */
636 void
ProcArrayInitRecovery(TransactionId initializedUptoXID)637 ProcArrayInitRecovery(TransactionId initializedUptoXID)
638 {
639 	Assert(standbyState == STANDBY_INITIALIZED);
640 	Assert(TransactionIdIsNormal(initializedUptoXID));
641 
642 	/*
643 	 * we set latestObservedXid to the xid SUBTRANS has been initialized up
644 	 * to, so we can extend it from that point onwards in
645 	 * RecordKnownAssignedTransactionIds, and when we get consistent in
646 	 * ProcArrayApplyRecoveryInfo().
647 	 */
648 	latestObservedXid = initializedUptoXID;
649 	TransactionIdRetreat(latestObservedXid);
650 }
651 
652 /*
653  * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
654  *
655  * Takes us through 3 states: Initialized, Pending and Ready.
656  * Normal case is to go all the way to Ready straight away, though there
657  * are atypical cases where we need to take it in steps.
658  *
659  * Use the data about running transactions on master to create the initial
660  * state of KnownAssignedXids. We also use these records to regularly prune
661  * KnownAssignedXids because we know it is possible that some transactions
662  * with FATAL errors fail to write abort records, which could cause eventual
663  * overflow.
664  *
665  * See comments for LogStandbySnapshot().
666  */
667 void
ProcArrayApplyRecoveryInfo(RunningTransactions running)668 ProcArrayApplyRecoveryInfo(RunningTransactions running)
669 {
670 	TransactionId *xids;
671 	int			nxids;
672 	TransactionId nextXid;
673 	int			i;
674 
675 	Assert(standbyState >= STANDBY_INITIALIZED);
676 	Assert(TransactionIdIsValid(running->nextXid));
677 	Assert(TransactionIdIsValid(running->oldestRunningXid));
678 	Assert(TransactionIdIsNormal(running->latestCompletedXid));
679 
680 	/*
681 	 * Remove stale transactions, if any.
682 	 */
683 	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
684 
685 	/*
686 	 * Remove stale locks, if any.
687 	 *
688 	 * Locks are always assigned to the toplevel xid so we don't need to care
689 	 * about subxcnt/subxids (and by extension not about ->suboverflowed).
690 	 */
691 	StandbyReleaseOldLocks(running->xcnt, running->xids);
692 
693 	/*
694 	 * If our snapshot is already valid, nothing else to do...
695 	 */
696 	if (standbyState == STANDBY_SNAPSHOT_READY)
697 		return;
698 
699 	/*
700 	 * If our initial RunningTransactionsData had an overflowed snapshot then
701 	 * we knew we were missing some subxids from our snapshot. If we continue
702 	 * to see overflowed snapshots then we might never be able to start up, so
703 	 * we make another test to see if our snapshot is now valid. We know that
704 	 * the missing subxids are equal to or earlier than nextXid. After we
705 	 * initialise we continue to apply changes during recovery, so once the
706 	 * oldestRunningXid is later than the nextXid from the initial snapshot we
707 	 * know that we no longer have missing information and can mark the
708 	 * snapshot as valid.
709 	 */
710 	if (standbyState == STANDBY_SNAPSHOT_PENDING)
711 	{
712 		/*
713 		 * If the snapshot isn't overflowed or if its empty we can reset our
714 		 * pending state and use this snapshot instead.
715 		 */
716 		if (!running->subxid_overflow || running->xcnt == 0)
717 		{
718 			/*
719 			 * If we have already collected known assigned xids, we need to
720 			 * throw them away before we apply the recovery snapshot.
721 			 */
722 			KnownAssignedXidsReset();
723 			standbyState = STANDBY_INITIALIZED;
724 		}
725 		else
726 		{
727 			if (TransactionIdPrecedes(standbySnapshotPendingXmin,
728 									  running->oldestRunningXid))
729 			{
730 				standbyState = STANDBY_SNAPSHOT_READY;
731 				elog(trace_recovery(DEBUG1),
732 					 "recovery snapshots are now enabled");
733 			}
734 			else
735 				elog(trace_recovery(DEBUG1),
736 					 "recovery snapshot waiting for non-overflowed snapshot or "
737 					 "until oldest active xid on standby is at least %u (now %u)",
738 					 standbySnapshotPendingXmin,
739 					 running->oldestRunningXid);
740 			return;
741 		}
742 	}
743 
744 	Assert(standbyState == STANDBY_INITIALIZED);
745 
746 	/*
747 	 * OK, we need to initialise from the RunningTransactionsData record.
748 	 *
749 	 * NB: this can be reached at least twice, so make sure new code can deal
750 	 * with that.
751 	 */
752 
753 	/*
754 	 * Nobody else is running yet, but take locks anyhow
755 	 */
756 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
757 
758 	/*
759 	 * KnownAssignedXids is sorted so we cannot just add the xids, we have to
760 	 * sort them first.
761 	 *
762 	 * Some of the new xids are top-level xids and some are subtransactions.
763 	 * We don't call SubtransSetParent because it doesn't matter yet. If we
764 	 * aren't overflowed then all xids will fit in snapshot and so we don't
765 	 * need subtrans. If we later overflow, an xid assignment record will add
766 	 * xids to subtrans. If RunningXacts is overflowed then we don't have
767 	 * enough information to correctly update subtrans anyway.
768 	 */
769 
770 	/*
771 	 * Allocate a temporary array to avoid modifying the array passed as
772 	 * argument.
773 	 */
774 	xids = palloc(sizeof(TransactionId) * (running->xcnt + running->subxcnt));
775 
776 	/*
777 	 * Add to the temp array any xids which have not already completed.
778 	 */
779 	nxids = 0;
780 	for (i = 0; i < running->xcnt + running->subxcnt; i++)
781 	{
782 		TransactionId xid = running->xids[i];
783 
784 		/*
785 		 * The running-xacts snapshot can contain xids that were still visible
786 		 * in the procarray when the snapshot was taken, but were already
787 		 * WAL-logged as completed. They're not running anymore, so ignore
788 		 * them.
789 		 */
790 		if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
791 			continue;
792 
793 		xids[nxids++] = xid;
794 	}
795 
796 	if (nxids > 0)
797 	{
798 		if (procArray->numKnownAssignedXids != 0)
799 		{
800 			LWLockRelease(ProcArrayLock);
801 			elog(ERROR, "KnownAssignedXids is not empty");
802 		}
803 
804 		/*
805 		 * Sort the array so that we can add them safely into
806 		 * KnownAssignedXids.
807 		 */
808 		qsort(xids, nxids, sizeof(TransactionId), xidComparator);
809 
810 		/*
811 		 * Add the sorted snapshot into KnownAssignedXids.  The running-xacts
812 		 * snapshot may include duplicated xids because of prepared
813 		 * transactions, so ignore them.
814 		 */
815 		for (i = 0; i < nxids; i++)
816 		{
817 			if (i > 0 && TransactionIdEquals(xids[i - 1], xids[i]))
818 			{
819 				elog(DEBUG1,
820 					 "found duplicated transaction %u for KnownAssignedXids insertion",
821 					 xids[i]);
822 				continue;
823 			}
824 			KnownAssignedXidsAdd(xids[i], xids[i], true);
825 		}
826 
827 		KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
828 	}
829 
830 	pfree(xids);
831 
832 	/*
833 	 * latestObservedXid is at least set to the point where SUBTRANS was
834 	 * started up to (c.f. ProcArrayInitRecovery()) or to the biggest xid
835 	 * RecordKnownAssignedTransactionIds() was called for.  Initialize
836 	 * subtrans from thereon, up to nextXid - 1.
837 	 *
838 	 * We need to duplicate parts of RecordKnownAssignedTransactionId() here,
839 	 * because we've just added xids to the known assigned xids machinery that
840 	 * haven't gone through RecordKnownAssignedTransactionId().
841 	 */
842 	Assert(TransactionIdIsNormal(latestObservedXid));
843 	TransactionIdAdvance(latestObservedXid);
844 	while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
845 	{
846 		ExtendSUBTRANS(latestObservedXid);
847 		TransactionIdAdvance(latestObservedXid);
848 	}
849 	TransactionIdRetreat(latestObservedXid);	/* = running->nextXid - 1 */
850 
851 	/* ----------
852 	 * Now we've got the running xids we need to set the global values that
853 	 * are used to track snapshots as they evolve further.
854 	 *
855 	 * - latestCompletedXid which will be the xmax for snapshots
856 	 * - lastOverflowedXid which shows whether snapshots overflow
857 	 * - nextXid
858 	 *
859 	 * If the snapshot overflowed, then we still initialise with what we know,
860 	 * but the recovery snapshot isn't fully valid yet because we know there
861 	 * are some subxids missing. We don't know the specific subxids that are
862 	 * missing, so conservatively assume the last one is latestObservedXid.
863 	 * ----------
864 	 */
865 	if (running->subxid_overflow)
866 	{
867 		standbyState = STANDBY_SNAPSHOT_PENDING;
868 
869 		standbySnapshotPendingXmin = latestObservedXid;
870 		procArray->lastOverflowedXid = latestObservedXid;
871 	}
872 	else
873 	{
874 		standbyState = STANDBY_SNAPSHOT_READY;
875 
876 		standbySnapshotPendingXmin = InvalidTransactionId;
877 	}
878 
879 	/*
880 	 * If a transaction wrote a commit record in the gap between taking and
881 	 * logging the snapshot then latestCompletedXid may already be higher than
882 	 * the value from the snapshot, so check before we use the incoming value.
883 	 */
884 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
885 							  running->latestCompletedXid))
886 		ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
887 
888 	Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));
889 
890 	LWLockRelease(ProcArrayLock);
891 
892 	/*
893 	 * ShmemVariableCache->nextXid must be beyond any observed xid.
894 	 *
895 	 * We don't expect anyone else to modify nextXid, hence we don't need to
896 	 * hold a lock while examining it.  We still acquire the lock to modify
897 	 * it, though.
898 	 */
899 	nextXid = latestObservedXid;
900 	TransactionIdAdvance(nextXid);
901 	if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
902 	{
903 		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
904 		ShmemVariableCache->nextXid = nextXid;
905 		LWLockRelease(XidGenLock);
906 	}
907 
908 	Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));
909 
910 	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
911 	if (standbyState == STANDBY_SNAPSHOT_READY)
912 		elog(trace_recovery(DEBUG1), "recovery snapshots are now enabled");
913 	else
914 		elog(trace_recovery(DEBUG1),
915 			 "recovery snapshot waiting for non-overflowed snapshot or "
916 			 "until oldest active xid on standby is at least %u (now %u)",
917 			 standbySnapshotPendingXmin,
918 			 running->oldestRunningXid);
919 }
920 
921 /*
922  * ProcArrayApplyXidAssignment
923  *		Process an XLOG_XACT_ASSIGNMENT WAL record
924  */
925 void
ProcArrayApplyXidAssignment(TransactionId topxid,int nsubxids,TransactionId * subxids)926 ProcArrayApplyXidAssignment(TransactionId topxid,
927 							int nsubxids, TransactionId *subxids)
928 {
929 	TransactionId max_xid;
930 	int			i;
931 
932 	Assert(standbyState >= STANDBY_INITIALIZED);
933 
934 	max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
935 
936 	/*
937 	 * Mark all the subtransactions as observed.
938 	 *
939 	 * NOTE: This will fail if the subxid contains too many previously
940 	 * unobserved xids to fit into known-assigned-xids. That shouldn't happen
941 	 * as the code stands, because xid-assignment records should never contain
942 	 * more than PGPROC_MAX_CACHED_SUBXIDS entries.
943 	 */
944 	RecordKnownAssignedTransactionIds(max_xid);
945 
946 	/*
947 	 * Notice that we update pg_subtrans with the top-level xid, rather than
948 	 * the parent xid. This is a difference between normal processing and
949 	 * recovery, yet is still correct in all cases. The reason is that
950 	 * subtransaction commit is not marked in clog until commit processing, so
951 	 * all aborted subtransactions have already been clearly marked in clog.
952 	 * As a result we are able to refer directly to the top-level
953 	 * transaction's state rather than skipping through all the intermediate
954 	 * states in the subtransaction tree. This should be the first time we
955 	 * have attempted to SubTransSetParent().
956 	 */
957 	for (i = 0; i < nsubxids; i++)
958 		SubTransSetParent(subxids[i], topxid);
959 
960 	/* KnownAssignedXids isn't maintained yet, so we're done for now */
961 	if (standbyState == STANDBY_INITIALIZED)
962 		return;
963 
964 	/*
965 	 * Uses same locking as transaction commit
966 	 */
967 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
968 
969 	/*
970 	 * Remove subxids from known-assigned-xacts.
971 	 */
972 	KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
973 
974 	/*
975 	 * Advance lastOverflowedXid to be at least the last of these subxids.
976 	 */
977 	if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
978 		procArray->lastOverflowedXid = max_xid;
979 
980 	LWLockRelease(ProcArrayLock);
981 }
982 
983 /*
984  * TransactionIdIsInProgress -- is given transaction running in some backend
985  *
986  * Aside from some shortcuts such as checking RecentXmin and our own Xid,
987  * there are four possibilities for finding a running transaction:
988  *
989  * 1. The given Xid is a main transaction Id.  We will find this out cheaply
990  * by looking at the PGXACT struct for each backend.
991  *
992  * 2. The given Xid is one of the cached subxact Xids in the PGPROC array.
993  * We can find this out cheaply too.
994  *
995  * 3. In Hot Standby mode, we must search the KnownAssignedXids list to see
996  * if the Xid is running on the master.
997  *
998  * 4. Search the SubTrans tree to find the Xid's topmost parent, and then see
999  * if that is running according to PGXACT or KnownAssignedXids.  This is the
1000  * slowest way, but sadly it has to be done always if the others failed,
1001  * unless we see that the cached subxact sets are complete (none have
1002  * overflowed).
1003  *
1004  * ProcArrayLock has to be held while we do 1, 2, 3.  If we save the top Xids
1005  * while doing 1 and 3, we can release the ProcArrayLock while we do 4.
1006  * This buys back some concurrency (and we can't retrieve the main Xids from
1007  * PGXACT again anyway; see GetNewTransactionId).
1008  */
1009 bool
TransactionIdIsInProgress(TransactionId xid)1010 TransactionIdIsInProgress(TransactionId xid)
1011 {
1012 	static TransactionId *xids = NULL;
1013 	int			nxids = 0;
1014 	ProcArrayStruct *arrayP = procArray;
1015 	TransactionId topxid;
1016 	int			i,
1017 				j;
1018 
1019 	/*
1020 	 * Don't bother checking a transaction older than RecentXmin; it could not
1021 	 * possibly still be running.  (Note: in particular, this guarantees that
1022 	 * we reject InvalidTransactionId, FrozenTransactionId, etc as not
1023 	 * running.)
1024 	 */
1025 	if (TransactionIdPrecedes(xid, RecentXmin))
1026 	{
1027 		xc_by_recent_xmin_inc();
1028 		return false;
1029 	}
1030 
1031 	/*
1032 	 * We may have just checked the status of this transaction, so if it is
1033 	 * already known to be completed, we can fall out without any access to
1034 	 * shared memory.
1035 	 */
1036 	if (TransactionIdIsKnownCompleted(xid))
1037 	{
1038 		xc_by_known_xact_inc();
1039 		return false;
1040 	}
1041 
1042 	/*
1043 	 * Also, we can handle our own transaction (and subtransactions) without
1044 	 * any access to shared memory.
1045 	 */
1046 	if (TransactionIdIsCurrentTransactionId(xid))
1047 	{
1048 		xc_by_my_xact_inc();
1049 		return true;
1050 	}
1051 
1052 	/*
1053 	 * If first time through, get workspace to remember main XIDs in. We
1054 	 * malloc it permanently to avoid repeated palloc/pfree overhead.
1055 	 */
1056 	if (xids == NULL)
1057 	{
1058 		/*
1059 		 * In hot standby mode, reserve enough space to hold all xids in the
1060 		 * known-assigned list. If we later finish recovery, we no longer need
1061 		 * the bigger array, but we don't bother to shrink it.
1062 		 */
1063 		int			maxxids = RecoveryInProgress() ? TOTAL_MAX_CACHED_SUBXIDS : arrayP->maxProcs;
1064 
1065 		xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
1066 		if (xids == NULL)
1067 			ereport(ERROR,
1068 					(errcode(ERRCODE_OUT_OF_MEMORY),
1069 					 errmsg("out of memory")));
1070 	}
1071 
1072 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1073 
1074 	/*
1075 	 * Now that we have the lock, we can check latestCompletedXid; if the
1076 	 * target Xid is after that, it's surely still running.
1077 	 */
1078 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
1079 	{
1080 		LWLockRelease(ProcArrayLock);
1081 		xc_by_latest_xid_inc();
1082 		return true;
1083 	}
1084 
1085 	/* No shortcuts, gotta grovel through the array */
1086 	for (i = 0; i < arrayP->numProcs; i++)
1087 	{
1088 		int			pgprocno = arrayP->pgprocnos[i];
1089 		volatile PGPROC *proc = &allProcs[pgprocno];
1090 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1091 		TransactionId pxid;
1092 
1093 		/* Ignore my own proc --- dealt with it above */
1094 		if (proc == MyProc)
1095 			continue;
1096 
1097 		/* Fetch xid just once - see GetNewTransactionId */
1098 		pxid = pgxact->xid;
1099 
1100 		if (!TransactionIdIsValid(pxid))
1101 			continue;
1102 
1103 		/*
1104 		 * Step 1: check the main Xid
1105 		 */
1106 		if (TransactionIdEquals(pxid, xid))
1107 		{
1108 			LWLockRelease(ProcArrayLock);
1109 			xc_by_main_xid_inc();
1110 			return true;
1111 		}
1112 
1113 		/*
1114 		 * We can ignore main Xids that are younger than the target Xid, since
1115 		 * the target could not possibly be their child.
1116 		 */
1117 		if (TransactionIdPrecedes(xid, pxid))
1118 			continue;
1119 
1120 		/*
1121 		 * Step 2: check the cached child-Xids arrays
1122 		 */
1123 		for (j = pgxact->nxids - 1; j >= 0; j--)
1124 		{
1125 			/* Fetch xid just once - see GetNewTransactionId */
1126 			TransactionId cxid = proc->subxids.xids[j];
1127 
1128 			if (TransactionIdEquals(cxid, xid))
1129 			{
1130 				LWLockRelease(ProcArrayLock);
1131 				xc_by_child_xid_inc();
1132 				return true;
1133 			}
1134 		}
1135 
1136 		/*
1137 		 * Save the main Xid for step 4.  We only need to remember main Xids
1138 		 * that have uncached children.  (Note: there is no race condition
1139 		 * here because the overflowed flag cannot be cleared, only set, while
1140 		 * we hold ProcArrayLock.  So we can't miss an Xid that we need to
1141 		 * worry about.)
1142 		 */
1143 		if (pgxact->overflowed)
1144 			xids[nxids++] = pxid;
1145 	}
1146 
1147 	/*
1148 	 * Step 3: in hot standby mode, check the known-assigned-xids list.  XIDs
1149 	 * in the list must be treated as running.
1150 	 */
1151 	if (RecoveryInProgress())
1152 	{
1153 		/* none of the PGXACT entries should have XIDs in hot standby mode */
1154 		Assert(nxids == 0);
1155 
1156 		if (KnownAssignedXidExists(xid))
1157 		{
1158 			LWLockRelease(ProcArrayLock);
1159 			xc_by_known_assigned_inc();
1160 			return true;
1161 		}
1162 
1163 		/*
1164 		 * If the KnownAssignedXids overflowed, we have to check pg_subtrans
1165 		 * too.  Fetch all xids from KnownAssignedXids that are lower than
1166 		 * xid, since if xid is a subtransaction its parent will always have a
1167 		 * lower value.  Note we will collect both main and subXIDs here, but
1168 		 * there's no help for it.
1169 		 */
1170 		if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
1171 			nxids = KnownAssignedXidsGet(xids, xid);
1172 	}
1173 
1174 	LWLockRelease(ProcArrayLock);
1175 
1176 	/*
1177 	 * If none of the relevant caches overflowed, we know the Xid is not
1178 	 * running without even looking at pg_subtrans.
1179 	 */
1180 	if (nxids == 0)
1181 	{
1182 		xc_no_overflow_inc();
1183 		return false;
1184 	}
1185 
1186 	/*
1187 	 * Step 4: have to check pg_subtrans.
1188 	 *
1189 	 * At this point, we know it's either a subtransaction of one of the Xids
1190 	 * in xids[], or it's not running.  If it's an already-failed
1191 	 * subtransaction, we want to say "not running" even though its parent may
1192 	 * still be running.  So first, check pg_xact to see if it's been aborted.
1193 	 */
1194 	xc_slow_answer_inc();
1195 
1196 	if (TransactionIdDidAbort(xid))
1197 		return false;
1198 
1199 	/*
1200 	 * It isn't aborted, so check whether the transaction tree it belongs to
1201 	 * is still running (or, more precisely, whether it was running when we
1202 	 * held ProcArrayLock).
1203 	 */
1204 	topxid = SubTransGetTopmostTransaction(xid);
1205 	Assert(TransactionIdIsValid(topxid));
1206 	if (!TransactionIdEquals(topxid, xid))
1207 	{
1208 		for (i = 0; i < nxids; i++)
1209 		{
1210 			if (TransactionIdEquals(xids[i], topxid))
1211 				return true;
1212 		}
1213 	}
1214 
1215 	return false;
1216 }
1217 
1218 /*
1219  * TransactionIdIsActive -- is xid the top-level XID of an active backend?
1220  *
1221  * This differs from TransactionIdIsInProgress in that it ignores prepared
1222  * transactions, as well as transactions running on the master if we're in
1223  * hot standby.  Also, we ignore subtransactions since that's not needed
1224  * for current uses.
1225  */
1226 bool
TransactionIdIsActive(TransactionId xid)1227 TransactionIdIsActive(TransactionId xid)
1228 {
1229 	bool		result = false;
1230 	ProcArrayStruct *arrayP = procArray;
1231 	int			i;
1232 
1233 	/*
1234 	 * Don't bother checking a transaction older than RecentXmin; it could not
1235 	 * possibly still be running.
1236 	 */
1237 	if (TransactionIdPrecedes(xid, RecentXmin))
1238 		return false;
1239 
1240 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1241 
1242 	for (i = 0; i < arrayP->numProcs; i++)
1243 	{
1244 		int			pgprocno = arrayP->pgprocnos[i];
1245 		volatile PGPROC *proc = &allProcs[pgprocno];
1246 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1247 		TransactionId pxid;
1248 
1249 		/* Fetch xid just once - see GetNewTransactionId */
1250 		pxid = pgxact->xid;
1251 
1252 		if (!TransactionIdIsValid(pxid))
1253 			continue;
1254 
1255 		if (proc->pid == 0)
1256 			continue;			/* ignore prepared transactions */
1257 
1258 		if (TransactionIdEquals(pxid, xid))
1259 		{
1260 			result = true;
1261 			break;
1262 		}
1263 	}
1264 
1265 	LWLockRelease(ProcArrayLock);
1266 
1267 	return result;
1268 }
1269 
1270 
1271 /*
1272  * GetOldestXmin -- returns oldest transaction that was running
1273  *					when any current transaction was started.
1274  *
1275  * If rel is NULL or a shared relation, all backends are considered, otherwise
1276  * only backends running in this database are considered.
1277  *
1278  * The flags are used to ignore the backends in calculation when any of the
1279  * corresponding flags is set. Typically, if you want to ignore ones with
1280  * PROC_IN_VACUUM flag, you can use PROCARRAY_FLAGS_VACUUM.
1281  *
1282  * PROCARRAY_SLOTS_XMIN causes GetOldestXmin to ignore the xmin and
1283  * catalog_xmin of any replication slots that exist in the system when
1284  * calculating the oldest xmin.
1285  *
1286  * This is used by VACUUM to decide which deleted tuples must be preserved in
1287  * the passed in table. For shared relations backends in all databases must be
1288  * considered, but for non-shared relations that's not required, since only
1289  * backends in my own database could ever see the tuples in them. Also, we can
1290  * ignore concurrently running lazy VACUUMs because (a) they must be working
1291  * on other tables, and (b) they don't need to do snapshot-based lookups.
1292  *
1293  * This is also used to determine where to truncate pg_subtrans.  For that
1294  * backends in all databases have to be considered, so rel = NULL has to be
1295  * passed in.
1296  *
1297  * Note: we include all currently running xids in the set of considered xids.
1298  * This ensures that if a just-started xact has not yet set its snapshot,
1299  * when it does set the snapshot it cannot set xmin less than what we compute.
1300  * See notes in src/backend/access/transam/README.
1301  *
1302  * Note: despite the above, it's possible for the calculated value to move
1303  * backwards on repeated calls. The calculated value is conservative, so that
1304  * anything older is definitely not considered as running by anyone anymore,
1305  * but the exact value calculated depends on a number of things. For example,
1306  * if rel = NULL and there are no transactions running in the current
1307  * database, GetOldestXmin() returns latestCompletedXid. If a transaction
1308  * begins after that, its xmin will include in-progress transactions in other
1309  * databases that started earlier, so another call will return a lower value.
1310  * Nonetheless it is safe to vacuum a table in the current database with the
1311  * first result.  There are also replication-related effects: a walsender
1312  * process can set its xmin based on transactions that are no longer running
1313  * in the master but are still being replayed on the standby, thus possibly
1314  * making the GetOldestXmin reading go backwards.  In this case there is a
1315  * possibility that we lose data that the standby would like to have, but
1316  * unless the standby uses a replication slot to make its xmin persistent
1317  * there is little we can do about that --- data is only protected if the
1318  * walsender runs continuously while queries are executed on the standby.
1319  * (The Hot Standby code deals with such cases by failing standby queries
1320  * that needed to access already-removed data, so there's no integrity bug.)
1321  * The return value is also adjusted with vacuum_defer_cleanup_age, so
1322  * increasing that setting on the fly is another easy way to make
1323  * GetOldestXmin() move backwards, with no consequences for data integrity.
1324  */
1325 TransactionId
GetOldestXmin(Relation rel,int flags)1326 GetOldestXmin(Relation rel, int flags)
1327 {
1328 	ProcArrayStruct *arrayP = procArray;
1329 	TransactionId result;
1330 	int			index;
1331 	bool		allDbs;
1332 
1333 	volatile TransactionId replication_slot_xmin = InvalidTransactionId;
1334 	volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1335 
1336 	/*
1337 	 * If we're not computing a relation specific limit, or if a shared
1338 	 * relation has been passed in, backends in all databases have to be
1339 	 * considered.
1340 	 */
1341 	allDbs = rel == NULL || rel->rd_rel->relisshared;
1342 
1343 	/* Cannot look for individual databases during recovery */
1344 	Assert(allDbs || !RecoveryInProgress());
1345 
1346 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1347 
1348 	/*
1349 	 * We initialize the MIN() calculation with latestCompletedXid + 1. This
1350 	 * is a lower bound for the XIDs that might appear in the ProcArray later,
1351 	 * and so protects us against overestimating the result due to future
1352 	 * additions.
1353 	 */
1354 	result = ShmemVariableCache->latestCompletedXid;
1355 	Assert(TransactionIdIsNormal(result));
1356 	TransactionIdAdvance(result);
1357 
1358 	for (index = 0; index < arrayP->numProcs; index++)
1359 	{
1360 		int			pgprocno = arrayP->pgprocnos[index];
1361 		volatile PGPROC *proc = &allProcs[pgprocno];
1362 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1363 
1364 		if (pgxact->vacuumFlags & (flags & PROCARRAY_PROC_FLAGS_MASK))
1365 			continue;
1366 
1367 		if (allDbs ||
1368 			proc->databaseId == MyDatabaseId ||
1369 			proc->databaseId == 0)	/* always include WalSender */
1370 		{
1371 			/* Fetch xid just once - see GetNewTransactionId */
1372 			TransactionId xid = pgxact->xid;
1373 
1374 			/* First consider the transaction's own Xid, if any */
1375 			if (TransactionIdIsNormal(xid) &&
1376 				TransactionIdPrecedes(xid, result))
1377 				result = xid;
1378 
1379 			/*
1380 			 * Also consider the transaction's Xmin, if set.
1381 			 *
1382 			 * We must check both Xid and Xmin because a transaction might
1383 			 * have an Xmin but not (yet) an Xid; conversely, if it has an
1384 			 * Xid, that could determine some not-yet-set Xmin.
1385 			 */
1386 			xid = pgxact->xmin; /* Fetch just once */
1387 			if (TransactionIdIsNormal(xid) &&
1388 				TransactionIdPrecedes(xid, result))
1389 				result = xid;
1390 		}
1391 	}
1392 
1393 	/* fetch into volatile var while ProcArrayLock is held */
1394 	replication_slot_xmin = procArray->replication_slot_xmin;
1395 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1396 
1397 	if (RecoveryInProgress())
1398 	{
1399 		/*
1400 		 * Check to see whether KnownAssignedXids contains an xid value older
1401 		 * than the main procarray.
1402 		 */
1403 		TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
1404 
1405 		LWLockRelease(ProcArrayLock);
1406 
1407 		if (TransactionIdIsNormal(kaxmin) &&
1408 			TransactionIdPrecedes(kaxmin, result))
1409 			result = kaxmin;
1410 	}
1411 	else
1412 	{
1413 		/*
1414 		 * No other information needed, so release the lock immediately.
1415 		 */
1416 		LWLockRelease(ProcArrayLock);
1417 
1418 		/*
1419 		 * Compute the cutoff XID by subtracting vacuum_defer_cleanup_age,
1420 		 * being careful not to generate a "permanent" XID.
1421 		 *
1422 		 * vacuum_defer_cleanup_age provides some additional "slop" for the
1423 		 * benefit of hot standby queries on standby servers.  This is quick
1424 		 * and dirty, and perhaps not all that useful unless the master has a
1425 		 * predictable transaction rate, but it offers some protection when
1426 		 * there's no walsender connection.  Note that we are assuming
1427 		 * vacuum_defer_cleanup_age isn't large enough to cause wraparound ---
1428 		 * so guc.c should limit it to no more than the xidStopLimit threshold
1429 		 * in varsup.c.  Also note that we intentionally don't apply
1430 		 * vacuum_defer_cleanup_age on standby servers.
1431 		 */
1432 		result -= vacuum_defer_cleanup_age;
1433 		if (!TransactionIdIsNormal(result))
1434 			result = FirstNormalTransactionId;
1435 	}
1436 
1437 	/*
1438 	 * Check whether there are replication slots requiring an older xmin.
1439 	 */
1440 	if (!(flags & PROCARRAY_SLOTS_XMIN) &&
1441 		TransactionIdIsValid(replication_slot_xmin) &&
1442 		NormalTransactionIdPrecedes(replication_slot_xmin, result))
1443 		result = replication_slot_xmin;
1444 
1445 	/*
1446 	 * After locks have been released and defer_cleanup_age has been applied,
1447 	 * check whether we need to back up further to make logical decoding
1448 	 * possible. We need to do so if we're computing the global limit (rel =
1449 	 * NULL) or if the passed relation is a catalog relation of some kind.
1450 	 */
1451 	if (!(flags & PROCARRAY_SLOTS_XMIN) &&
1452 		(rel == NULL ||
1453 		 RelationIsAccessibleInLogicalDecoding(rel)) &&
1454 		TransactionIdIsValid(replication_slot_catalog_xmin) &&
1455 		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
1456 		result = replication_slot_catalog_xmin;
1457 
1458 	return result;
1459 }
1460 
1461 /*
1462  * GetMaxSnapshotXidCount -- get max size for snapshot XID array
1463  *
1464  * We have to export this for use by snapmgr.c.
1465  */
1466 int
GetMaxSnapshotXidCount(void)1467 GetMaxSnapshotXidCount(void)
1468 {
1469 	return procArray->maxProcs;
1470 }
1471 
1472 /*
1473  * GetMaxSnapshotSubxidCount -- get max size for snapshot sub-XID array
1474  *
1475  * We have to export this for use by snapmgr.c.
1476  */
1477 int
GetMaxSnapshotSubxidCount(void)1478 GetMaxSnapshotSubxidCount(void)
1479 {
1480 	return TOTAL_MAX_CACHED_SUBXIDS;
1481 }
1482 
1483 /*
1484  * GetSnapshotData -- returns information about running transactions.
1485  *
1486  * The returned snapshot includes xmin (lowest still-running xact ID),
1487  * xmax (highest completed xact ID + 1), and a list of running xact IDs
1488  * in the range xmin <= xid < xmax.  It is used as follows:
1489  *		All xact IDs < xmin are considered finished.
1490  *		All xact IDs >= xmax are considered still running.
1491  *		For an xact ID xmin <= xid < xmax, consult list to see whether
1492  *		it is considered running or not.
1493  * This ensures that the set of transactions seen as "running" by the
1494  * current xact will not change after it takes the snapshot.
1495  *
1496  * All running top-level XIDs are included in the snapshot, except for lazy
1497  * VACUUM processes.  We also try to include running subtransaction XIDs,
1498  * but since PGPROC has only a limited cache area for subxact XIDs, full
1499  * information may not be available.  If we find any overflowed subxid arrays,
1500  * we have to mark the snapshot's subxid data as overflowed, and extra work
1501  * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
1502  * in tqual.c).
1503  *
1504  * We also update the following backend-global variables:
1505  *		TransactionXmin: the oldest xmin of any snapshot in use in the
1506  *			current transaction (this is the same as MyPgXact->xmin).
1507  *		RecentXmin: the xmin computed for the most recent snapshot.  XIDs
1508  *			older than this are known not running any more.
1509  *		RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
1510  *			running transactions, except those running LAZY VACUUM).  This is
1511  *			the same computation done by
1512  *			GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM).
1513  *		RecentGlobalDataXmin: the global xmin for non-catalog tables
1514  *			>= RecentGlobalXmin
1515  *
1516  * Note: this function should probably not be called with an argument that's
1517  * not statically allocated (see xip allocation below).
1518  */
1519 Snapshot
GetSnapshotData(Snapshot snapshot)1520 GetSnapshotData(Snapshot snapshot)
1521 {
1522 	ProcArrayStruct *arrayP = procArray;
1523 	TransactionId xmin;
1524 	TransactionId xmax;
1525 	TransactionId globalxmin;
1526 	int			index;
1527 	int			count = 0;
1528 	int			subcount = 0;
1529 	bool		suboverflowed = false;
1530 	volatile TransactionId replication_slot_xmin = InvalidTransactionId;
1531 	volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1532 
1533 	Assert(snapshot != NULL);
1534 
1535 	/*
1536 	 * Allocating space for maxProcs xids is usually overkill; numProcs would
1537 	 * be sufficient.  But it seems better to do the malloc while not holding
1538 	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
1539 	 * more subxip storage than is probably needed.
1540 	 *
1541 	 * This does open a possibility for avoiding repeated malloc/free: since
1542 	 * maxProcs does not change at runtime, we can simply reuse the previous
1543 	 * xip arrays if any.  (This relies on the fact that all callers pass
1544 	 * static SnapshotData structs.)
1545 	 */
1546 	if (snapshot->xip == NULL)
1547 	{
1548 		/*
1549 		 * First call for this snapshot. Snapshot is same size whether or not
1550 		 * we are in recovery, see later comments.
1551 		 */
1552 		snapshot->xip = (TransactionId *)
1553 			malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
1554 		if (snapshot->xip == NULL)
1555 			ereport(ERROR,
1556 					(errcode(ERRCODE_OUT_OF_MEMORY),
1557 					 errmsg("out of memory")));
1558 		Assert(snapshot->subxip == NULL);
1559 		snapshot->subxip = (TransactionId *)
1560 			malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
1561 		if (snapshot->subxip == NULL)
1562 			ereport(ERROR,
1563 					(errcode(ERRCODE_OUT_OF_MEMORY),
1564 					 errmsg("out of memory")));
1565 	}
1566 
1567 	/*
1568 	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
1569 	 * going to set MyPgXact->xmin.
1570 	 */
1571 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1572 
1573 	/* xmax is always latestCompletedXid + 1 */
1574 	xmax = ShmemVariableCache->latestCompletedXid;
1575 	Assert(TransactionIdIsNormal(xmax));
1576 	TransactionIdAdvance(xmax);
1577 
1578 	/* initialize xmin calculation with xmax */
1579 	globalxmin = xmin = xmax;
1580 
1581 	snapshot->takenDuringRecovery = RecoveryInProgress();
1582 
1583 	if (!snapshot->takenDuringRecovery)
1584 	{
1585 		int		   *pgprocnos = arrayP->pgprocnos;
1586 		int			numProcs;
1587 
1588 		/*
1589 		 * Spin over procArray checking xid, xmin, and subxids.  The goal is
1590 		 * to gather all active xids, find the lowest xmin, and try to record
1591 		 * subxids.
1592 		 */
1593 		numProcs = arrayP->numProcs;
1594 		for (index = 0; index < numProcs; index++)
1595 		{
1596 			int			pgprocno = pgprocnos[index];
1597 			volatile PGXACT *pgxact = &allPgXact[pgprocno];
1598 			TransactionId xid;
1599 
1600 			/*
1601 			 * Backend is doing logical decoding which manages xmin
1602 			 * separately, check below.
1603 			 */
1604 			if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
1605 				continue;
1606 
1607 			/* Ignore procs running LAZY VACUUM */
1608 			if (pgxact->vacuumFlags & PROC_IN_VACUUM)
1609 				continue;
1610 
1611 			/* Update globalxmin to be the smallest valid xmin */
1612 			xid = pgxact->xmin; /* fetch just once */
1613 			if (TransactionIdIsNormal(xid) &&
1614 				NormalTransactionIdPrecedes(xid, globalxmin))
1615 				globalxmin = xid;
1616 
1617 			/* Fetch xid just once - see GetNewTransactionId */
1618 			xid = pgxact->xid;
1619 
1620 			/*
1621 			 * If the transaction has no XID assigned, we can skip it; it
1622 			 * won't have sub-XIDs either.  If the XID is >= xmax, we can also
1623 			 * skip it; such transactions will be treated as running anyway
1624 			 * (and any sub-XIDs will also be >= xmax).
1625 			 */
1626 			if (!TransactionIdIsNormal(xid)
1627 				|| !NormalTransactionIdPrecedes(xid, xmax))
1628 				continue;
1629 
1630 			/*
1631 			 * We don't include our own XIDs (if any) in the snapshot, but we
1632 			 * must include them in xmin.
1633 			 */
1634 			if (NormalTransactionIdPrecedes(xid, xmin))
1635 				xmin = xid;
1636 			if (pgxact == MyPgXact)
1637 				continue;
1638 
1639 			/* Add XID to snapshot. */
1640 			snapshot->xip[count++] = xid;
1641 
1642 			/*
1643 			 * Save subtransaction XIDs if possible (if we've already
1644 			 * overflowed, there's no point).  Note that the subxact XIDs must
1645 			 * be later than their parent, so no need to check them against
1646 			 * xmin.  We could filter against xmax, but it seems better not to
1647 			 * do that much work while holding the ProcArrayLock.
1648 			 *
1649 			 * The other backend can add more subxids concurrently, but cannot
1650 			 * remove any.  Hence it's important to fetch nxids just once.
1651 			 * Should be safe to use memcpy, though.  (We needn't worry about
1652 			 * missing any xids added concurrently, because they must postdate
1653 			 * xmax.)
1654 			 *
1655 			 * Again, our own XIDs are not included in the snapshot.
1656 			 */
1657 			if (!suboverflowed)
1658 			{
1659 				if (pgxact->overflowed)
1660 					suboverflowed = true;
1661 				else
1662 				{
1663 					int			nxids = pgxact->nxids;
1664 
1665 					if (nxids > 0)
1666 					{
1667 						volatile PGPROC *proc = &allProcs[pgprocno];
1668 
1669 						memcpy(snapshot->subxip + subcount,
1670 							   (void *) proc->subxids.xids,
1671 							   nxids * sizeof(TransactionId));
1672 						subcount += nxids;
1673 					}
1674 				}
1675 			}
1676 		}
1677 	}
1678 	else
1679 	{
1680 		/*
1681 		 * We're in hot standby, so get XIDs from KnownAssignedXids.
1682 		 *
1683 		 * We store all xids directly into subxip[]. Here's why:
1684 		 *
1685 		 * In recovery we don't know which xids are top-level and which are
1686 		 * subxacts, a design choice that greatly simplifies xid processing.
1687 		 *
1688 		 * It seems like we would want to try to put xids into xip[] only, but
1689 		 * that is fairly small. We would either need to make that bigger or
1690 		 * to increase the rate at which we WAL-log xid assignment; neither is
1691 		 * an appealing choice.
1692 		 *
1693 		 * We could try to store xids into xip[] first and then into subxip[]
1694 		 * if there are too many xids. That only works if the snapshot doesn't
1695 		 * overflow because we do not search subxip[] in that case. A simpler
1696 		 * way is to just store all xids in the subxact array because this is
1697 		 * by far the bigger array. We just leave the xip array empty.
1698 		 *
1699 		 * Either way we need to change the way XidInMVCCSnapshot() works
1700 		 * depending upon when the snapshot was taken, or change normal
1701 		 * snapshot processing so it matches.
1702 		 *
1703 		 * Note: It is possible for recovery to end before we finish taking
1704 		 * the snapshot, and for newly assigned transaction ids to be added to
1705 		 * the ProcArray.  xmax cannot change while we hold ProcArrayLock, so
1706 		 * those newly added transaction ids would be filtered away, so we
1707 		 * need not be concerned about them.
1708 		 */
1709 		subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
1710 												  xmax);
1711 
1712 		if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
1713 			suboverflowed = true;
1714 	}
1715 
1716 
1717 	/* fetch into volatile var while ProcArrayLock is held */
1718 	replication_slot_xmin = procArray->replication_slot_xmin;
1719 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1720 
1721 	if (!TransactionIdIsValid(MyPgXact->xmin))
1722 		MyPgXact->xmin = TransactionXmin = xmin;
1723 
1724 	LWLockRelease(ProcArrayLock);
1725 
1726 	/*
1727 	 * Update globalxmin to include actual process xids.  This is a slightly
1728 	 * different way of computing it than GetOldestXmin uses, but should give
1729 	 * the same result.
1730 	 */
1731 	if (TransactionIdPrecedes(xmin, globalxmin))
1732 		globalxmin = xmin;
1733 
1734 	/* Update global variables too */
1735 	RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
1736 	if (!TransactionIdIsNormal(RecentGlobalXmin))
1737 		RecentGlobalXmin = FirstNormalTransactionId;
1738 
1739 	/* Check whether there's a replication slot requiring an older xmin. */
1740 	if (TransactionIdIsValid(replication_slot_xmin) &&
1741 		NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
1742 		RecentGlobalXmin = replication_slot_xmin;
1743 
1744 	/* Non-catalog tables can be vacuumed if older than this xid */
1745 	RecentGlobalDataXmin = RecentGlobalXmin;
1746 
1747 	/*
1748 	 * Check whether there's a replication slot requiring an older catalog
1749 	 * xmin.
1750 	 */
1751 	if (TransactionIdIsNormal(replication_slot_catalog_xmin) &&
1752 		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, RecentGlobalXmin))
1753 		RecentGlobalXmin = replication_slot_catalog_xmin;
1754 
1755 	RecentXmin = xmin;
1756 
1757 	snapshot->xmin = xmin;
1758 	snapshot->xmax = xmax;
1759 	snapshot->xcnt = count;
1760 	snapshot->subxcnt = subcount;
1761 	snapshot->suboverflowed = suboverflowed;
1762 
1763 	snapshot->curcid = GetCurrentCommandId(false);
1764 
1765 	/*
1766 	 * This is a new snapshot, so set both refcounts are zero, and mark it as
1767 	 * not copied in persistent memory.
1768 	 */
1769 	snapshot->active_count = 0;
1770 	snapshot->regd_count = 0;
1771 	snapshot->copied = false;
1772 
1773 	if (old_snapshot_threshold < 0)
1774 	{
1775 		/*
1776 		 * If not using "snapshot too old" feature, fill related fields with
1777 		 * dummy values that don't require any locking.
1778 		 */
1779 		snapshot->lsn = InvalidXLogRecPtr;
1780 		snapshot->whenTaken = 0;
1781 	}
1782 	else
1783 	{
1784 		/*
1785 		 * Capture the current time and WAL stream location in case this
1786 		 * snapshot becomes old enough to need to fall back on the special
1787 		 * "old snapshot" logic.
1788 		 */
1789 		snapshot->lsn = GetXLogInsertRecPtr();
1790 		snapshot->whenTaken = GetSnapshotCurrentTimestamp();
1791 		MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
1792 	}
1793 
1794 	return snapshot;
1795 }
1796 
1797 /*
1798  * ProcArrayInstallImportedXmin -- install imported xmin into MyPgXact->xmin
1799  *
1800  * This is called when installing a snapshot imported from another
1801  * transaction.  To ensure that OldestXmin doesn't go backwards, we must
1802  * check that the source transaction is still running, and we'd better do
1803  * that atomically with installing the new xmin.
1804  *
1805  * Returns TRUE if successful, FALSE if source xact is no longer running.
1806  */
1807 bool
ProcArrayInstallImportedXmin(TransactionId xmin,VirtualTransactionId * sourcevxid)1808 ProcArrayInstallImportedXmin(TransactionId xmin,
1809 							 VirtualTransactionId *sourcevxid)
1810 {
1811 	bool		result = false;
1812 	ProcArrayStruct *arrayP = procArray;
1813 	int			index;
1814 
1815 	Assert(TransactionIdIsNormal(xmin));
1816 	if (!sourcevxid)
1817 		return false;
1818 
1819 	/* Get lock so source xact can't end while we're doing this */
1820 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1821 
1822 	for (index = 0; index < arrayP->numProcs; index++)
1823 	{
1824 		int			pgprocno = arrayP->pgprocnos[index];
1825 		volatile PGPROC *proc = &allProcs[pgprocno];
1826 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1827 		TransactionId xid;
1828 
1829 		/* Ignore procs running LAZY VACUUM */
1830 		if (pgxact->vacuumFlags & PROC_IN_VACUUM)
1831 			continue;
1832 
1833 		/* We are only interested in the specific virtual transaction. */
1834 		if (proc->backendId != sourcevxid->backendId)
1835 			continue;
1836 		if (proc->lxid != sourcevxid->localTransactionId)
1837 			continue;
1838 
1839 		/*
1840 		 * We check the transaction's database ID for paranoia's sake: if it's
1841 		 * in another DB then its xmin does not cover us.  Caller should have
1842 		 * detected this already, so we just treat any funny cases as
1843 		 * "transaction not found".
1844 		 */
1845 		if (proc->databaseId != MyDatabaseId)
1846 			continue;
1847 
1848 		/*
1849 		 * Likewise, let's just make real sure its xmin does cover us.
1850 		 */
1851 		xid = pgxact->xmin;		/* fetch just once */
1852 		if (!TransactionIdIsNormal(xid) ||
1853 			!TransactionIdPrecedesOrEquals(xid, xmin))
1854 			continue;
1855 
1856 		/*
1857 		 * We're good.  Install the new xmin.  As in GetSnapshotData, set
1858 		 * TransactionXmin too.  (Note that because snapmgr.c called
1859 		 * GetSnapshotData first, we'll be overwriting a valid xmin here, so
1860 		 * we don't check that.)
1861 		 */
1862 		MyPgXact->xmin = TransactionXmin = xmin;
1863 
1864 		result = true;
1865 		break;
1866 	}
1867 
1868 	LWLockRelease(ProcArrayLock);
1869 
1870 	return result;
1871 }
1872 
1873 /*
1874  * ProcArrayInstallRestoredXmin -- install restored xmin into MyPgXact->xmin
1875  *
1876  * This is like ProcArrayInstallImportedXmin, but we have a pointer to the
1877  * PGPROC of the transaction from which we imported the snapshot, rather than
1878  * an XID.
1879  *
1880  * Returns TRUE if successful, FALSE if source xact is no longer running.
1881  */
1882 bool
ProcArrayInstallRestoredXmin(TransactionId xmin,PGPROC * proc)1883 ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
1884 {
1885 	bool		result = false;
1886 	TransactionId xid;
1887 	volatile PGXACT *pgxact;
1888 
1889 	Assert(TransactionIdIsNormal(xmin));
1890 	Assert(proc != NULL);
1891 
1892 	/* Get lock so source xact can't end while we're doing this */
1893 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1894 
1895 	pgxact = &allPgXact[proc->pgprocno];
1896 
1897 	/*
1898 	 * Be certain that the referenced PGPROC has an advertised xmin which is
1899 	 * no later than the one we're installing, so that the system-wide xmin
1900 	 * can't go backwards.  Also, make sure it's running in the same database,
1901 	 * so that the per-database xmin cannot go backwards.
1902 	 */
1903 	xid = pgxact->xmin;			/* fetch just once */
1904 	if (proc->databaseId == MyDatabaseId &&
1905 		TransactionIdIsNormal(xid) &&
1906 		TransactionIdPrecedesOrEquals(xid, xmin))
1907 	{
1908 		MyPgXact->xmin = TransactionXmin = xmin;
1909 		result = true;
1910 	}
1911 
1912 	LWLockRelease(ProcArrayLock);
1913 
1914 	return result;
1915 }
1916 
1917 /*
1918  * GetRunningTransactionData -- returns information about running transactions.
1919  *
1920  * Similar to GetSnapshotData but returns more information. We include
1921  * all PGXACTs with an assigned TransactionId, even VACUUM processes and
1922  * prepared transactions.
1923  *
1924  * We acquire XidGenLock and ProcArrayLock, but the caller is responsible for
1925  * releasing them. Acquiring XidGenLock ensures that no new XIDs enter the proc
1926  * array until the caller has WAL-logged this snapshot, and releases the
1927  * lock. Acquiring ProcArrayLock ensures that no transactions commit until the
1928  * lock is released.
1929  *
1930  * The returned data structure is statically allocated; caller should not
1931  * modify it, and must not assume it is valid past the next call.
1932  *
1933  * This is never executed during recovery so there is no need to look at
1934  * KnownAssignedXids.
1935  *
1936  * Dummy PGXACTs from prepared transaction are included, meaning that this
1937  * may return entries with duplicated TransactionId values coming from
1938  * transaction finishing to prepare.  Nothing is done about duplicated
1939  * entries here to not hold on ProcArrayLock more than necessary.
1940  *
1941  * We don't worry about updating other counters, we want to keep this as
1942  * simple as possible and leave GetSnapshotData() as the primary code for
1943  * that bookkeeping.
1944  *
1945  * Note that if any transaction has overflowed its cached subtransactions
1946  * then there is no real need include any subtransactions. That isn't a
1947  * common enough case to worry about optimising the size of the WAL record,
1948  * and we may wish to see that data for diagnostic purposes anyway.
1949  */
1950 RunningTransactions
GetRunningTransactionData(void)1951 GetRunningTransactionData(void)
1952 {
1953 	/* result workspace */
1954 	static RunningTransactionsData CurrentRunningXactsData;
1955 
1956 	ProcArrayStruct *arrayP = procArray;
1957 	RunningTransactions CurrentRunningXacts = &CurrentRunningXactsData;
1958 	TransactionId latestCompletedXid;
1959 	TransactionId oldestRunningXid;
1960 	TransactionId *xids;
1961 	int			index;
1962 	int			count;
1963 	int			subcount;
1964 	bool		suboverflowed;
1965 
1966 	Assert(!RecoveryInProgress());
1967 
1968 	/*
1969 	 * Allocating space for maxProcs xids is usually overkill; numProcs would
1970 	 * be sufficient.  But it seems better to do the malloc while not holding
1971 	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
1972 	 * more subxip storage than is probably needed.
1973 	 *
1974 	 * Should only be allocated in bgwriter, since only ever executed during
1975 	 * checkpoints.
1976 	 */
1977 	if (CurrentRunningXacts->xids == NULL)
1978 	{
1979 		/*
1980 		 * First call
1981 		 */
1982 		CurrentRunningXacts->xids = (TransactionId *)
1983 			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
1984 		if (CurrentRunningXacts->xids == NULL)
1985 			ereport(ERROR,
1986 					(errcode(ERRCODE_OUT_OF_MEMORY),
1987 					 errmsg("out of memory")));
1988 	}
1989 
1990 	xids = CurrentRunningXacts->xids;
1991 
1992 	count = subcount = 0;
1993 	suboverflowed = false;
1994 
1995 	/*
1996 	 * Ensure that no xids enter or leave the procarray while we obtain
1997 	 * snapshot.
1998 	 */
1999 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2000 	LWLockAcquire(XidGenLock, LW_SHARED);
2001 
2002 	latestCompletedXid = ShmemVariableCache->latestCompletedXid;
2003 
2004 	oldestRunningXid = ShmemVariableCache->nextXid;
2005 
2006 	/*
2007 	 * Spin over procArray collecting all xids
2008 	 */
2009 	for (index = 0; index < arrayP->numProcs; index++)
2010 	{
2011 		int			pgprocno = arrayP->pgprocnos[index];
2012 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2013 		TransactionId xid;
2014 
2015 		/* Fetch xid just once - see GetNewTransactionId */
2016 		xid = pgxact->xid;
2017 
2018 		/*
2019 		 * We don't need to store transactions that don't have a TransactionId
2020 		 * yet because they will not show as running on a standby server.
2021 		 */
2022 		if (!TransactionIdIsValid(xid))
2023 			continue;
2024 
2025 		xids[count++] = xid;
2026 
2027 		if (TransactionIdPrecedes(xid, oldestRunningXid))
2028 			oldestRunningXid = xid;
2029 
2030 		if (pgxact->overflowed)
2031 			suboverflowed = true;
2032 	}
2033 
2034 	/*
2035 	 * Spin over procArray collecting all subxids, but only if there hasn't
2036 	 * been a suboverflow.
2037 	 */
2038 	if (!suboverflowed)
2039 	{
2040 		for (index = 0; index < arrayP->numProcs; index++)
2041 		{
2042 			int			pgprocno = arrayP->pgprocnos[index];
2043 			volatile PGPROC *proc = &allProcs[pgprocno];
2044 			volatile PGXACT *pgxact = &allPgXact[pgprocno];
2045 			int			nxids;
2046 
2047 			/*
2048 			 * Save subtransaction XIDs. Other backends can't add or remove
2049 			 * entries while we're holding XidGenLock.
2050 			 */
2051 			nxids = pgxact->nxids;
2052 			if (nxids > 0)
2053 			{
2054 				memcpy(&xids[count], (void *) proc->subxids.xids,
2055 					   nxids * sizeof(TransactionId));
2056 				count += nxids;
2057 				subcount += nxids;
2058 
2059 				/*
2060 				 * Top-level XID of a transaction is always less than any of
2061 				 * its subxids, so we don't need to check if any of the
2062 				 * subxids are smaller than oldestRunningXid
2063 				 */
2064 			}
2065 		}
2066 	}
2067 
2068 	/*
2069 	 * It's important *not* to include the limits set by slots here because
2070 	 * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
2071 	 * were to be included here the initial value could never increase because
2072 	 * of a circular dependency where slots only increase their limits when
2073 	 * running xacts increases oldestRunningXid and running xacts only
2074 	 * increases if slots do.
2075 	 */
2076 
2077 	CurrentRunningXacts->xcnt = count - subcount;
2078 	CurrentRunningXacts->subxcnt = subcount;
2079 	CurrentRunningXacts->subxid_overflow = suboverflowed;
2080 	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
2081 	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
2082 	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
2083 
2084 	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
2085 	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
2086 	Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));
2087 
2088 	/* We don't release the locks here, the caller is responsible for that */
2089 
2090 	return CurrentRunningXacts;
2091 }
2092 
2093 /*
2094  * GetOldestActiveTransactionId()
2095  *
2096  * Similar to GetSnapshotData but returns just oldestActiveXid. We include
2097  * all PGXACTs with an assigned TransactionId, even VACUUM processes.
2098  * We look at all databases, though there is no need to include WALSender
2099  * since this has no effect on hot standby conflicts.
2100  *
2101  * This is never executed during recovery so there is no need to look at
2102  * KnownAssignedXids.
2103  *
2104  * We don't worry about updating other counters, we want to keep this as
2105  * simple as possible and leave GetSnapshotData() as the primary code for
2106  * that bookkeeping.
2107  */
2108 TransactionId
GetOldestActiveTransactionId(void)2109 GetOldestActiveTransactionId(void)
2110 {
2111 	ProcArrayStruct *arrayP = procArray;
2112 	TransactionId oldestRunningXid;
2113 	int			index;
2114 
2115 	Assert(!RecoveryInProgress());
2116 
2117 	/*
2118 	 * Read nextXid, as the upper bound of what's still active.
2119 	 *
2120 	 * Reading a TransactionId is atomic, but we must grab the lock to make
2121 	 * sure that all XIDs < nextXid are already present in the proc array (or
2122 	 * have already completed), when we spin over it.
2123 	 */
2124 	LWLockAcquire(XidGenLock, LW_SHARED);
2125 	oldestRunningXid = ShmemVariableCache->nextXid;
2126 	LWLockRelease(XidGenLock);
2127 
2128 	/*
2129 	 * Spin over procArray collecting all xids and subxids.
2130 	 */
2131 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2132 	for (index = 0; index < arrayP->numProcs; index++)
2133 	{
2134 		int			pgprocno = arrayP->pgprocnos[index];
2135 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2136 		TransactionId xid;
2137 
2138 		/* Fetch xid just once - see GetNewTransactionId */
2139 		xid = pgxact->xid;
2140 
2141 		if (!TransactionIdIsNormal(xid))
2142 			continue;
2143 
2144 		if (TransactionIdPrecedes(xid, oldestRunningXid))
2145 			oldestRunningXid = xid;
2146 
2147 		/*
2148 		 * Top-level XID of a transaction is always less than any of its
2149 		 * subxids, so we don't need to check if any of the subxids are
2150 		 * smaller than oldestRunningXid
2151 		 */
2152 	}
2153 	LWLockRelease(ProcArrayLock);
2154 
2155 	return oldestRunningXid;
2156 }
2157 
2158 /*
2159  * GetOldestSafeDecodingTransactionId -- lowest xid not affected by vacuum
2160  *
2161  * Returns the oldest xid that we can guarantee not to have been affected by
2162  * vacuum, i.e. no rows >= that xid have been vacuumed away unless the
2163  * transaction aborted. Note that the value can (and most of the time will) be
2164  * much more conservative than what really has been affected by vacuum, but we
2165  * currently don't have better data available.
2166  *
2167  * This is useful to initialize the cutoff xid after which a new changeset
2168  * extraction replication slot can start decoding changes.
2169  *
2170  * Must be called with ProcArrayLock held either shared or exclusively,
2171  * although most callers will want to use exclusive mode since it is expected
2172  * that the caller will immediately use the xid to peg the xmin horizon.
2173  */
2174 TransactionId
GetOldestSafeDecodingTransactionId(bool catalogOnly)2175 GetOldestSafeDecodingTransactionId(bool catalogOnly)
2176 {
2177 	ProcArrayStruct *arrayP = procArray;
2178 	TransactionId oldestSafeXid;
2179 	int			index;
2180 	bool		recovery_in_progress = RecoveryInProgress();
2181 
2182 	Assert(LWLockHeldByMe(ProcArrayLock));
2183 
2184 	/*
2185 	 * Acquire XidGenLock, so no transactions can acquire an xid while we're
2186 	 * running. If no transaction with xid were running concurrently a new xid
2187 	 * could influence the RecentXmin et al.
2188 	 *
2189 	 * We initialize the computation to nextXid since that's guaranteed to be
2190 	 * a safe, albeit pessimal, value.
2191 	 */
2192 	LWLockAcquire(XidGenLock, LW_SHARED);
2193 	oldestSafeXid = ShmemVariableCache->nextXid;
2194 
2195 	/*
2196 	 * If there's already a slot pegging the xmin horizon, we can start with
2197 	 * that value, it's guaranteed to be safe since it's computed by this
2198 	 * routine initially and has been enforced since.  We can always use the
2199 	 * slot's general xmin horizon, but the catalog horizon is only usable
2200 	 * when we only catalog data is going to be looked at.
2201 	 */
2202 	if (TransactionIdIsValid(procArray->replication_slot_xmin) &&
2203 		TransactionIdPrecedes(procArray->replication_slot_xmin,
2204 							  oldestSafeXid))
2205 		oldestSafeXid = procArray->replication_slot_xmin;
2206 
2207 	if (catalogOnly &&
2208 		TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
2209 		TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
2210 							  oldestSafeXid))
2211 		oldestSafeXid = procArray->replication_slot_catalog_xmin;
2212 
2213 	/*
2214 	 * If we're not in recovery, we walk over the procarray and collect the
2215 	 * lowest xid. Since we're called with ProcArrayLock held and have
2216 	 * acquired XidGenLock, no entries can vanish concurrently, since
2217 	 * PGXACT->xid is only set with XidGenLock held and only cleared with
2218 	 * ProcArrayLock held.
2219 	 *
2220 	 * In recovery we can't lower the safe value besides what we've computed
2221 	 * above, so we'll have to wait a bit longer there. We unfortunately can
2222 	 * *not* use KnownAssignedXidsGetOldestXmin() since the KnownAssignedXids
2223 	 * machinery can miss values and return an older value than is safe.
2224 	 */
2225 	if (!recovery_in_progress)
2226 	{
2227 		/*
2228 		 * Spin over procArray collecting all min(PGXACT->xid)
2229 		 */
2230 		for (index = 0; index < arrayP->numProcs; index++)
2231 		{
2232 			int			pgprocno = arrayP->pgprocnos[index];
2233 			volatile PGXACT *pgxact = &allPgXact[pgprocno];
2234 			TransactionId xid;
2235 
2236 			/* Fetch xid just once - see GetNewTransactionId */
2237 			xid = pgxact->xid;
2238 
2239 			if (!TransactionIdIsNormal(xid))
2240 				continue;
2241 
2242 			if (TransactionIdPrecedes(xid, oldestSafeXid))
2243 				oldestSafeXid = xid;
2244 		}
2245 	}
2246 
2247 	LWLockRelease(XidGenLock);
2248 
2249 	return oldestSafeXid;
2250 }
2251 
2252 /*
2253  * GetVirtualXIDsDelayingChkpt -- Get the VXIDs of transactions that are
2254  * delaying checkpoint because they have critical actions in progress.
2255  *
2256  * Constructs an array of VXIDs of transactions that are currently in commit
2257  * critical sections, as shown by having delayChkpt set in their PGXACT.
2258  *
2259  * Returns a palloc'd array that should be freed by the caller.
2260  * *nvxids is the number of valid entries.
2261  *
2262  * Note that because backends set or clear delayChkpt without holding any lock,
2263  * the result is somewhat indeterminate, but we don't really care.  Even in
2264  * a multiprocessor with delayed writes to shared memory, it should be certain
2265  * that setting of delayChkpt will propagate to shared memory when the backend
2266  * takes a lock, so we cannot fail to see a virtual xact as delayChkpt if
2267  * it's already inserted its commit record.  Whether it takes a little while
2268  * for clearing of delayChkpt to propagate is unimportant for correctness.
2269  */
2270 VirtualTransactionId *
GetVirtualXIDsDelayingChkpt(int * nvxids)2271 GetVirtualXIDsDelayingChkpt(int *nvxids)
2272 {
2273 	VirtualTransactionId *vxids;
2274 	ProcArrayStruct *arrayP = procArray;
2275 	int			count = 0;
2276 	int			index;
2277 
2278 	/* allocate what's certainly enough result space */
2279 	vxids = (VirtualTransactionId *)
2280 		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2281 
2282 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2283 
2284 	for (index = 0; index < arrayP->numProcs; index++)
2285 	{
2286 		int			pgprocno = arrayP->pgprocnos[index];
2287 		volatile PGPROC *proc = &allProcs[pgprocno];
2288 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2289 
2290 		if (pgxact->delayChkpt)
2291 		{
2292 			VirtualTransactionId vxid;
2293 
2294 			GET_VXID_FROM_PGPROC(vxid, *proc);
2295 			if (VirtualTransactionIdIsValid(vxid))
2296 				vxids[count++] = vxid;
2297 		}
2298 	}
2299 
2300 	LWLockRelease(ProcArrayLock);
2301 
2302 	*nvxids = count;
2303 	return vxids;
2304 }
2305 
2306 /*
2307  * HaveVirtualXIDsDelayingChkpt -- Are any of the specified VXIDs delaying?
2308  *
2309  * This is used with the results of GetVirtualXIDsDelayingChkpt to see if any
2310  * of the specified VXIDs are still in critical sections of code.
2311  *
2312  * Note: this is O(N^2) in the number of vxacts that are/were delaying, but
2313  * those numbers should be small enough for it not to be a problem.
2314  */
2315 bool
HaveVirtualXIDsDelayingChkpt(VirtualTransactionId * vxids,int nvxids)2316 HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids)
2317 {
2318 	bool		result = false;
2319 	ProcArrayStruct *arrayP = procArray;
2320 	int			index;
2321 
2322 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2323 
2324 	for (index = 0; index < arrayP->numProcs; index++)
2325 	{
2326 		int			pgprocno = arrayP->pgprocnos[index];
2327 		volatile PGPROC *proc = &allProcs[pgprocno];
2328 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2329 		VirtualTransactionId vxid;
2330 
2331 		GET_VXID_FROM_PGPROC(vxid, *proc);
2332 
2333 		if (pgxact->delayChkpt && VirtualTransactionIdIsValid(vxid))
2334 		{
2335 			int			i;
2336 
2337 			for (i = 0; i < nvxids; i++)
2338 			{
2339 				if (VirtualTransactionIdEquals(vxid, vxids[i]))
2340 				{
2341 					result = true;
2342 					break;
2343 				}
2344 			}
2345 			if (result)
2346 				break;
2347 		}
2348 	}
2349 
2350 	LWLockRelease(ProcArrayLock);
2351 
2352 	return result;
2353 }
2354 
2355 /*
2356  * BackendPidGetProc -- get a backend's PGPROC given its PID
2357  *
2358  * Returns NULL if not found.  Note that it is up to the caller to be
2359  * sure that the question remains meaningful for long enough for the
2360  * answer to be used ...
2361  */
2362 PGPROC *
BackendPidGetProc(int pid)2363 BackendPidGetProc(int pid)
2364 {
2365 	PGPROC	   *result;
2366 
2367 	if (pid == 0)				/* never match dummy PGPROCs */
2368 		return NULL;
2369 
2370 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2371 
2372 	result = BackendPidGetProcWithLock(pid);
2373 
2374 	LWLockRelease(ProcArrayLock);
2375 
2376 	return result;
2377 }
2378 
2379 /*
2380  * BackendPidGetProcWithLock -- get a backend's PGPROC given its PID
2381  *
2382  * Same as above, except caller must be holding ProcArrayLock.  The found
2383  * entry, if any, can be assumed to be valid as long as the lock remains held.
2384  */
2385 PGPROC *
BackendPidGetProcWithLock(int pid)2386 BackendPidGetProcWithLock(int pid)
2387 {
2388 	PGPROC	   *result = NULL;
2389 	ProcArrayStruct *arrayP = procArray;
2390 	int			index;
2391 
2392 	if (pid == 0)				/* never match dummy PGPROCs */
2393 		return NULL;
2394 
2395 	for (index = 0; index < arrayP->numProcs; index++)
2396 	{
2397 		PGPROC	   *proc = &allProcs[arrayP->pgprocnos[index]];
2398 
2399 		if (proc->pid == pid)
2400 		{
2401 			result = proc;
2402 			break;
2403 		}
2404 	}
2405 
2406 	return result;
2407 }
2408 
2409 /*
2410  * BackendXidGetPid -- get a backend's pid given its XID
2411  *
2412  * Returns 0 if not found or it's a prepared transaction.  Note that
2413  * it is up to the caller to be sure that the question remains
2414  * meaningful for long enough for the answer to be used ...
2415  *
2416  * Only main transaction Ids are considered.  This function is mainly
2417  * useful for determining what backend owns a lock.
2418  *
2419  * Beware that not every xact has an XID assigned.  However, as long as you
2420  * only call this using an XID found on disk, you're safe.
2421  */
2422 int
BackendXidGetPid(TransactionId xid)2423 BackendXidGetPid(TransactionId xid)
2424 {
2425 	int			result = 0;
2426 	ProcArrayStruct *arrayP = procArray;
2427 	int			index;
2428 
2429 	if (xid == InvalidTransactionId)	/* never match invalid xid */
2430 		return 0;
2431 
2432 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2433 
2434 	for (index = 0; index < arrayP->numProcs; index++)
2435 	{
2436 		int			pgprocno = arrayP->pgprocnos[index];
2437 		volatile PGPROC *proc = &allProcs[pgprocno];
2438 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2439 
2440 		if (pgxact->xid == xid)
2441 		{
2442 			result = proc->pid;
2443 			break;
2444 		}
2445 	}
2446 
2447 	LWLockRelease(ProcArrayLock);
2448 
2449 	return result;
2450 }
2451 
2452 /*
2453  * IsBackendPid -- is a given pid a running backend
2454  *
2455  * This is not called by the backend, but is called by external modules.
2456  */
2457 bool
IsBackendPid(int pid)2458 IsBackendPid(int pid)
2459 {
2460 	return (BackendPidGetProc(pid) != NULL);
2461 }
2462 
2463 
2464 /*
2465  * GetCurrentVirtualXIDs -- returns an array of currently active VXIDs.
2466  *
2467  * The array is palloc'd. The number of valid entries is returned into *nvxids.
2468  *
2469  * The arguments allow filtering the set of VXIDs returned.  Our own process
2470  * is always skipped.  In addition:
2471  *	If limitXmin is not InvalidTransactionId, skip processes with
2472  *		xmin > limitXmin.
2473  *	If excludeXmin0 is true, skip processes with xmin = 0.
2474  *	If allDbs is false, skip processes attached to other databases.
2475  *	If excludeVacuum isn't zero, skip processes for which
2476  *		(vacuumFlags & excludeVacuum) is not zero.
2477  *
2478  * Note: the purpose of the limitXmin and excludeXmin0 parameters is to
2479  * allow skipping backends whose oldest live snapshot is no older than
2480  * some snapshot we have.  Since we examine the procarray with only shared
2481  * lock, there are race conditions: a backend could set its xmin just after
2482  * we look.  Indeed, on multiprocessors with weak memory ordering, the
2483  * other backend could have set its xmin *before* we look.  We know however
2484  * that such a backend must have held shared ProcArrayLock overlapping our
2485  * own hold of ProcArrayLock, else we would see its xmin update.  Therefore,
2486  * any snapshot the other backend is taking concurrently with our scan cannot
2487  * consider any transactions as still running that we think are committed
2488  * (since backends must hold ProcArrayLock exclusive to commit).
2489  */
2490 VirtualTransactionId *
GetCurrentVirtualXIDs(TransactionId limitXmin,bool excludeXmin0,bool allDbs,int excludeVacuum,int * nvxids)2491 GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
2492 					  bool allDbs, int excludeVacuum,
2493 					  int *nvxids)
2494 {
2495 	VirtualTransactionId *vxids;
2496 	ProcArrayStruct *arrayP = procArray;
2497 	int			count = 0;
2498 	int			index;
2499 
2500 	/* allocate what's certainly enough result space */
2501 	vxids = (VirtualTransactionId *)
2502 		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2503 
2504 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2505 
2506 	for (index = 0; index < arrayP->numProcs; index++)
2507 	{
2508 		int			pgprocno = arrayP->pgprocnos[index];
2509 		volatile PGPROC *proc = &allProcs[pgprocno];
2510 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2511 
2512 		if (proc == MyProc)
2513 			continue;
2514 
2515 		if (excludeVacuum & pgxact->vacuumFlags)
2516 			continue;
2517 
2518 		if (allDbs || proc->databaseId == MyDatabaseId)
2519 		{
2520 			/* Fetch xmin just once - might change on us */
2521 			TransactionId pxmin = pgxact->xmin;
2522 
2523 			if (excludeXmin0 && !TransactionIdIsValid(pxmin))
2524 				continue;
2525 
2526 			/*
2527 			 * InvalidTransactionId precedes all other XIDs, so a proc that
2528 			 * hasn't set xmin yet will not be rejected by this test.
2529 			 */
2530 			if (!TransactionIdIsValid(limitXmin) ||
2531 				TransactionIdPrecedesOrEquals(pxmin, limitXmin))
2532 			{
2533 				VirtualTransactionId vxid;
2534 
2535 				GET_VXID_FROM_PGPROC(vxid, *proc);
2536 				if (VirtualTransactionIdIsValid(vxid))
2537 					vxids[count++] = vxid;
2538 			}
2539 		}
2540 	}
2541 
2542 	LWLockRelease(ProcArrayLock);
2543 
2544 	*nvxids = count;
2545 	return vxids;
2546 }
2547 
2548 /*
2549  * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
2550  *
2551  * Usage is limited to conflict resolution during recovery on standby servers.
2552  * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
2553  * in cases where we cannot accurately determine a value for latestRemovedXid.
2554  *
2555  * If limitXmin is InvalidTransactionId then we want to kill everybody,
2556  * so we're not worried if they have a snapshot or not, nor does it really
2557  * matter what type of lock we hold.
2558  *
2559  * All callers that are checking xmins always now supply a valid and useful
2560  * value for limitXmin. The limitXmin is always lower than the lowest
2561  * numbered KnownAssignedXid that is not already a FATAL error. This is
2562  * because we only care about cleanup records that are cleaning up tuple
2563  * versions from committed transactions. In that case they will only occur
2564  * at the point where the record is less than the lowest running xid. That
2565  * allows us to say that if any backend takes a snapshot concurrently with
2566  * us then the conflict assessment made here would never include the snapshot
2567  * that is being derived. So we take LW_SHARED on the ProcArray and allow
2568  * concurrent snapshots when limitXmin is valid. We might think about adding
2569  *	 Assert(limitXmin < lowest(KnownAssignedXids))
2570  * but that would not be true in the case of FATAL errors lagging in array,
2571  * but we already know those are bogus anyway, so we skip that test.
2572  *
2573  * If dbOid is valid we skip backends attached to other databases.
2574  *
2575  * Be careful to *not* pfree the result from this function. We reuse
2576  * this array sufficiently often that we use malloc for the result.
2577  */
2578 VirtualTransactionId *
GetConflictingVirtualXIDs(TransactionId limitXmin,Oid dbOid)2579 GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
2580 {
2581 	static VirtualTransactionId *vxids;
2582 	ProcArrayStruct *arrayP = procArray;
2583 	int			count = 0;
2584 	int			index;
2585 
2586 	/*
2587 	 * If first time through, get workspace to remember main XIDs in. We
2588 	 * malloc it permanently to avoid repeated palloc/pfree overhead. Allow
2589 	 * result space, remembering room for a terminator.
2590 	 */
2591 	if (vxids == NULL)
2592 	{
2593 		vxids = (VirtualTransactionId *)
2594 			malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
2595 		if (vxids == NULL)
2596 			ereport(ERROR,
2597 					(errcode(ERRCODE_OUT_OF_MEMORY),
2598 					 errmsg("out of memory")));
2599 	}
2600 
2601 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2602 
2603 	for (index = 0; index < arrayP->numProcs; index++)
2604 	{
2605 		int			pgprocno = arrayP->pgprocnos[index];
2606 		volatile PGPROC *proc = &allProcs[pgprocno];
2607 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2608 
2609 		/* Exclude prepared transactions */
2610 		if (proc->pid == 0)
2611 			continue;
2612 
2613 		if (!OidIsValid(dbOid) ||
2614 			proc->databaseId == dbOid)
2615 		{
2616 			/* Fetch xmin just once - can't change on us, but good coding */
2617 			TransactionId pxmin = pgxact->xmin;
2618 
2619 			/*
2620 			 * We ignore an invalid pxmin because this means that backend has
2621 			 * no snapshot currently. We hold a Share lock to avoid contention
2622 			 * with users taking snapshots.  That is not a problem because the
2623 			 * current xmin is always at least one higher than the latest
2624 			 * removed xid, so any new snapshot would never conflict with the
2625 			 * test here.
2626 			 */
2627 			if (!TransactionIdIsValid(limitXmin) ||
2628 				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
2629 			{
2630 				VirtualTransactionId vxid;
2631 
2632 				GET_VXID_FROM_PGPROC(vxid, *proc);
2633 				if (VirtualTransactionIdIsValid(vxid))
2634 					vxids[count++] = vxid;
2635 			}
2636 		}
2637 	}
2638 
2639 	LWLockRelease(ProcArrayLock);
2640 
2641 	/* add the terminator */
2642 	vxids[count].backendId = InvalidBackendId;
2643 	vxids[count].localTransactionId = InvalidLocalTransactionId;
2644 
2645 	return vxids;
2646 }
2647 
2648 /*
2649  * CancelVirtualTransaction - used in recovery conflict processing
2650  *
2651  * Returns pid of the process signaled, or 0 if not found.
2652  */
2653 pid_t
CancelVirtualTransaction(VirtualTransactionId vxid,ProcSignalReason sigmode)2654 CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
2655 {
2656 	return SignalVirtualTransaction(vxid, sigmode, true);
2657 }
2658 
2659 pid_t
SignalVirtualTransaction(VirtualTransactionId vxid,ProcSignalReason sigmode,bool conflictPending)2660 SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode,
2661 						 bool conflictPending)
2662 {
2663 	ProcArrayStruct *arrayP = procArray;
2664 	int			index;
2665 	pid_t		pid = 0;
2666 
2667 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2668 
2669 	for (index = 0; index < arrayP->numProcs; index++)
2670 	{
2671 		int			pgprocno = arrayP->pgprocnos[index];
2672 		volatile PGPROC *proc = &allProcs[pgprocno];
2673 		VirtualTransactionId procvxid;
2674 
2675 		GET_VXID_FROM_PGPROC(procvxid, *proc);
2676 
2677 		if (procvxid.backendId == vxid.backendId &&
2678 			procvxid.localTransactionId == vxid.localTransactionId)
2679 		{
2680 			proc->recoveryConflictPending = conflictPending;
2681 			pid = proc->pid;
2682 			if (pid != 0)
2683 			{
2684 				/*
2685 				 * Kill the pid if it's still here. If not, that's what we
2686 				 * wanted so ignore any errors.
2687 				 */
2688 				(void) SendProcSignal(pid, sigmode, vxid.backendId);
2689 			}
2690 			break;
2691 		}
2692 	}
2693 
2694 	LWLockRelease(ProcArrayLock);
2695 
2696 	return pid;
2697 }
2698 
2699 /*
2700  * MinimumActiveBackends --- count backends (other than myself) that are
2701  *		in active transactions.  Return true if the count exceeds the
2702  *		minimum threshold passed.  This is used as a heuristic to decide if
2703  *		a pre-XLOG-flush delay is worthwhile during commit.
2704  *
2705  * Do not count backends that are blocked waiting for locks, since they are
2706  * not going to get to run until someone else commits.
2707  */
2708 bool
MinimumActiveBackends(int min)2709 MinimumActiveBackends(int min)
2710 {
2711 	ProcArrayStruct *arrayP = procArray;
2712 	int			count = 0;
2713 	int			index;
2714 
2715 	/* Quick short-circuit if no minimum is specified */
2716 	if (min == 0)
2717 		return true;
2718 
2719 	/*
2720 	 * Note: for speed, we don't acquire ProcArrayLock.  This is a little bit
2721 	 * bogus, but since we are only testing fields for zero or nonzero, it
2722 	 * should be OK.  The result is only used for heuristic purposes anyway...
2723 	 */
2724 	for (index = 0; index < arrayP->numProcs; index++)
2725 	{
2726 		int			pgprocno = arrayP->pgprocnos[index];
2727 		volatile PGPROC *proc = &allProcs[pgprocno];
2728 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2729 
2730 		/*
2731 		 * Since we're not holding a lock, need to be prepared to deal with
2732 		 * garbage, as someone could have incremented numProcs but not yet
2733 		 * filled the structure.
2734 		 *
2735 		 * If someone just decremented numProcs, 'proc' could also point to a
2736 		 * PGPROC entry that's no longer in the array. It still points to a
2737 		 * PGPROC struct, though, because freed PGPROC entries just go to the
2738 		 * free list and are recycled. Its contents are nonsense in that case,
2739 		 * but that's acceptable for this function.
2740 		 */
2741 		if (pgprocno == -1)
2742 			continue;			/* do not count deleted entries */
2743 		if (proc == MyProc)
2744 			continue;			/* do not count myself */
2745 		if (pgxact->xid == InvalidTransactionId)
2746 			continue;			/* do not count if no XID assigned */
2747 		if (proc->pid == 0)
2748 			continue;			/* do not count prepared xacts */
2749 		if (proc->waitLock != NULL)
2750 			continue;			/* do not count if blocked on a lock */
2751 		count++;
2752 		if (count >= min)
2753 			break;
2754 	}
2755 
2756 	return count >= min;
2757 }
2758 
2759 /*
2760  * CountDBBackends --- count backends that are using specified database
2761  */
2762 int
CountDBBackends(Oid databaseid)2763 CountDBBackends(Oid databaseid)
2764 {
2765 	ProcArrayStruct *arrayP = procArray;
2766 	int			count = 0;
2767 	int			index;
2768 
2769 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2770 
2771 	for (index = 0; index < arrayP->numProcs; index++)
2772 	{
2773 		int			pgprocno = arrayP->pgprocnos[index];
2774 		volatile PGPROC *proc = &allProcs[pgprocno];
2775 
2776 		if (proc->pid == 0)
2777 			continue;			/* do not count prepared xacts */
2778 		if (!OidIsValid(databaseid) ||
2779 			proc->databaseId == databaseid)
2780 			count++;
2781 	}
2782 
2783 	LWLockRelease(ProcArrayLock);
2784 
2785 	return count;
2786 }
2787 
2788 /*
2789  * CountDBConnections --- counts database backends ignoring any background
2790  *		worker processes
2791  */
2792 int
CountDBConnections(Oid databaseid)2793 CountDBConnections(Oid databaseid)
2794 {
2795 	ProcArrayStruct *arrayP = procArray;
2796 	int			count = 0;
2797 	int			index;
2798 
2799 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2800 
2801 	for (index = 0; index < arrayP->numProcs; index++)
2802 	{
2803 		int			pgprocno = arrayP->pgprocnos[index];
2804 		volatile PGPROC *proc = &allProcs[pgprocno];
2805 
2806 		if (proc->pid == 0)
2807 			continue;			/* do not count prepared xacts */
2808 		if (proc->isBackgroundWorker)
2809 			continue;			/* do not count background workers */
2810 		if (!OidIsValid(databaseid) ||
2811 			proc->databaseId == databaseid)
2812 			count++;
2813 	}
2814 
2815 	LWLockRelease(ProcArrayLock);
2816 
2817 	return count;
2818 }
2819 
2820 /*
2821  * CancelDBBackends --- cancel backends that are using specified database
2822  */
2823 void
CancelDBBackends(Oid databaseid,ProcSignalReason sigmode,bool conflictPending)2824 CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
2825 {
2826 	ProcArrayStruct *arrayP = procArray;
2827 	int			index;
2828 	pid_t		pid = 0;
2829 
2830 	/* tell all backends to die */
2831 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2832 
2833 	for (index = 0; index < arrayP->numProcs; index++)
2834 	{
2835 		int			pgprocno = arrayP->pgprocnos[index];
2836 		volatile PGPROC *proc = &allProcs[pgprocno];
2837 
2838 		if (databaseid == InvalidOid || proc->databaseId == databaseid)
2839 		{
2840 			VirtualTransactionId procvxid;
2841 
2842 			GET_VXID_FROM_PGPROC(procvxid, *proc);
2843 
2844 			proc->recoveryConflictPending = conflictPending;
2845 			pid = proc->pid;
2846 			if (pid != 0)
2847 			{
2848 				/*
2849 				 * Kill the pid if it's still here. If not, that's what we
2850 				 * wanted so ignore any errors.
2851 				 */
2852 				(void) SendProcSignal(pid, sigmode, procvxid.backendId);
2853 			}
2854 		}
2855 	}
2856 
2857 	LWLockRelease(ProcArrayLock);
2858 }
2859 
2860 /*
2861  * CountUserBackends --- count backends that are used by specified user
2862  */
2863 int
CountUserBackends(Oid roleid)2864 CountUserBackends(Oid roleid)
2865 {
2866 	ProcArrayStruct *arrayP = procArray;
2867 	int			count = 0;
2868 	int			index;
2869 
2870 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2871 
2872 	for (index = 0; index < arrayP->numProcs; index++)
2873 	{
2874 		int			pgprocno = arrayP->pgprocnos[index];
2875 		volatile PGPROC *proc = &allProcs[pgprocno];
2876 
2877 		if (proc->pid == 0)
2878 			continue;			/* do not count prepared xacts */
2879 		if (proc->isBackgroundWorker)
2880 			continue;			/* do not count background workers */
2881 		if (proc->roleId == roleid)
2882 			count++;
2883 	}
2884 
2885 	LWLockRelease(ProcArrayLock);
2886 
2887 	return count;
2888 }
2889 
2890 /*
2891  * CountOtherDBBackends -- check for other backends running in the given DB
2892  *
2893  * If there are other backends in the DB, we will wait a maximum of 5 seconds
2894  * for them to exit.  Autovacuum backends are encouraged to exit early by
2895  * sending them SIGTERM, but normal user backends are just waited for.
2896  *
2897  * The current backend is always ignored; it is caller's responsibility to
2898  * check whether the current backend uses the given DB, if it's important.
2899  *
2900  * Returns TRUE if there are (still) other backends in the DB, FALSE if not.
2901  * Also, *nbackends and *nprepared are set to the number of other backends
2902  * and prepared transactions in the DB, respectively.
2903  *
2904  * This function is used to interlock DROP DATABASE and related commands
2905  * against there being any active backends in the target DB --- dropping the
2906  * DB while active backends remain would be a Bad Thing.  Note that we cannot
2907  * detect here the possibility of a newly-started backend that is trying to
2908  * connect to the doomed database, so additional interlocking is needed during
2909  * backend startup.  The caller should normally hold an exclusive lock on the
2910  * target DB before calling this, which is one reason we mustn't wait
2911  * indefinitely.
2912  */
2913 bool
CountOtherDBBackends(Oid databaseId,int * nbackends,int * nprepared)2914 CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
2915 {
2916 	ProcArrayStruct *arrayP = procArray;
2917 
2918 #define MAXAUTOVACPIDS	10		/* max autovacs to SIGTERM per iteration */
2919 	int			autovac_pids[MAXAUTOVACPIDS];
2920 	int			tries;
2921 
2922 	/* 50 tries with 100ms sleep between tries makes 5 sec total wait */
2923 	for (tries = 0; tries < 50; tries++)
2924 	{
2925 		int			nautovacs = 0;
2926 		bool		found = false;
2927 		int			index;
2928 
2929 		CHECK_FOR_INTERRUPTS();
2930 
2931 		*nbackends = *nprepared = 0;
2932 
2933 		LWLockAcquire(ProcArrayLock, LW_SHARED);
2934 
2935 		for (index = 0; index < arrayP->numProcs; index++)
2936 		{
2937 			int			pgprocno = arrayP->pgprocnos[index];
2938 			volatile PGPROC *proc = &allProcs[pgprocno];
2939 			volatile PGXACT *pgxact = &allPgXact[pgprocno];
2940 
2941 			if (proc->databaseId != databaseId)
2942 				continue;
2943 			if (proc == MyProc)
2944 				continue;
2945 
2946 			found = true;
2947 
2948 			if (proc->pid == 0)
2949 				(*nprepared)++;
2950 			else
2951 			{
2952 				(*nbackends)++;
2953 				if ((pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&
2954 					nautovacs < MAXAUTOVACPIDS)
2955 					autovac_pids[nautovacs++] = proc->pid;
2956 			}
2957 		}
2958 
2959 		LWLockRelease(ProcArrayLock);
2960 
2961 		if (!found)
2962 			return false;		/* no conflicting backends, so done */
2963 
2964 		/*
2965 		 * Send SIGTERM to any conflicting autovacuums before sleeping. We
2966 		 * postpone this step until after the loop because we don't want to
2967 		 * hold ProcArrayLock while issuing kill(). We have no idea what might
2968 		 * block kill() inside the kernel...
2969 		 */
2970 		for (index = 0; index < nautovacs; index++)
2971 			(void) kill(autovac_pids[index], SIGTERM);	/* ignore any error */
2972 
2973 		/* sleep, then try again */
2974 		pg_usleep(100 * 1000L); /* 100ms */
2975 	}
2976 
2977 	return true;				/* timed out, still conflicts */
2978 }
2979 
2980 /*
2981  * ProcArraySetReplicationSlotXmin
2982  *
2983  * Install limits to future computations of the xmin horizon to prevent vacuum
2984  * and HOT pruning from removing affected rows still needed by clients with
2985  * replicaton slots.
2986  */
2987 void
ProcArraySetReplicationSlotXmin(TransactionId xmin,TransactionId catalog_xmin,bool already_locked)2988 ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
2989 								bool already_locked)
2990 {
2991 	Assert(!already_locked || LWLockHeldByMe(ProcArrayLock));
2992 
2993 	if (!already_locked)
2994 		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2995 
2996 	procArray->replication_slot_xmin = xmin;
2997 	procArray->replication_slot_catalog_xmin = catalog_xmin;
2998 
2999 	if (!already_locked)
3000 		LWLockRelease(ProcArrayLock);
3001 }
3002 
3003 /*
3004  * ProcArrayGetReplicationSlotXmin
3005  *
3006  * Return the current slot xmin limits. That's useful to be able to remove
3007  * data that's older than those limits.
3008  */
3009 void
ProcArrayGetReplicationSlotXmin(TransactionId * xmin,TransactionId * catalog_xmin)3010 ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
3011 								TransactionId *catalog_xmin)
3012 {
3013 	LWLockAcquire(ProcArrayLock, LW_SHARED);
3014 
3015 	if (xmin != NULL)
3016 		*xmin = procArray->replication_slot_xmin;
3017 
3018 	if (catalog_xmin != NULL)
3019 		*catalog_xmin = procArray->replication_slot_catalog_xmin;
3020 
3021 	LWLockRelease(ProcArrayLock);
3022 }
3023 
3024 
3025 #define XidCacheRemove(i) \
3026 	do { \
3027 		MyProc->subxids.xids[i] = MyProc->subxids.xids[MyPgXact->nxids - 1]; \
3028 		MyPgXact->nxids--; \
3029 	} while (0)
3030 
3031 /*
3032  * XidCacheRemoveRunningXids
3033  *
3034  * Remove a bunch of TransactionIds from the list of known-running
3035  * subtransactions for my backend.  Both the specified xid and those in
3036  * the xids[] array (of length nxids) are removed from the subxids cache.
3037  * latestXid must be the latest XID among the group.
3038  */
3039 void
XidCacheRemoveRunningXids(TransactionId xid,int nxids,const TransactionId * xids,TransactionId latestXid)3040 XidCacheRemoveRunningXids(TransactionId xid,
3041 						  int nxids, const TransactionId *xids,
3042 						  TransactionId latestXid)
3043 {
3044 	int			i,
3045 				j;
3046 
3047 	Assert(TransactionIdIsValid(xid));
3048 
3049 	/*
3050 	 * We must hold ProcArrayLock exclusively in order to remove transactions
3051 	 * from the PGPROC array.  (See src/backend/access/transam/README.)  It's
3052 	 * possible this could be relaxed since we know this routine is only used
3053 	 * to abort subtransactions, but pending closer analysis we'd best be
3054 	 * conservative.
3055 	 */
3056 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3057 
3058 	/*
3059 	 * Under normal circumstances xid and xids[] will be in increasing order,
3060 	 * as will be the entries in subxids.  Scan backwards to avoid O(N^2)
3061 	 * behavior when removing a lot of xids.
3062 	 */
3063 	for (i = nxids - 1; i >= 0; i--)
3064 	{
3065 		TransactionId anxid = xids[i];
3066 
3067 		for (j = MyPgXact->nxids - 1; j >= 0; j--)
3068 		{
3069 			if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
3070 			{
3071 				XidCacheRemove(j);
3072 				break;
3073 			}
3074 		}
3075 
3076 		/*
3077 		 * Ordinarily we should have found it, unless the cache has
3078 		 * overflowed. However it's also possible for this routine to be
3079 		 * invoked multiple times for the same subtransaction, in case of an
3080 		 * error during AbortSubTransaction.  So instead of Assert, emit a
3081 		 * debug warning.
3082 		 */
3083 		if (j < 0 && !MyPgXact->overflowed)
3084 			elog(WARNING, "did not find subXID %u in MyProc", anxid);
3085 	}
3086 
3087 	for (j = MyPgXact->nxids - 1; j >= 0; j--)
3088 	{
3089 		if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
3090 		{
3091 			XidCacheRemove(j);
3092 			break;
3093 		}
3094 	}
3095 	/* Ordinarily we should have found it, unless the cache has overflowed */
3096 	if (j < 0 && !MyPgXact->overflowed)
3097 		elog(WARNING, "did not find subXID %u in MyProc", xid);
3098 
3099 	/* Also advance global latestCompletedXid while holding the lock */
3100 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
3101 							  latestXid))
3102 		ShmemVariableCache->latestCompletedXid = latestXid;
3103 
3104 	LWLockRelease(ProcArrayLock);
3105 }
3106 
3107 #ifdef XIDCACHE_DEBUG
3108 
3109 /*
3110  * Print stats about effectiveness of XID cache
3111  */
3112 static void
DisplayXidCache(void)3113 DisplayXidCache(void)
3114 {
3115 	fprintf(stderr,
3116 			"XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, knownassigned: %ld, nooflo: %ld, slow: %ld\n",
3117 			xc_by_recent_xmin,
3118 			xc_by_known_xact,
3119 			xc_by_my_xact,
3120 			xc_by_latest_xid,
3121 			xc_by_main_xid,
3122 			xc_by_child_xid,
3123 			xc_by_known_assigned,
3124 			xc_no_overflow,
3125 			xc_slow_answer);
3126 }
3127 #endif							/* XIDCACHE_DEBUG */
3128 
3129 
3130 /* ----------------------------------------------
3131  *		KnownAssignedTransactions sub-module
3132  * ----------------------------------------------
3133  */
3134 
3135 /*
3136  * In Hot Standby mode, we maintain a list of transactions that are (or were)
3137  * running in the master at the current point in WAL.  These XIDs must be
3138  * treated as running by standby transactions, even though they are not in
3139  * the standby server's PGXACT array.
3140  *
3141  * We record all XIDs that we know have been assigned.  That includes all the
3142  * XIDs seen in WAL records, plus all unobserved XIDs that we can deduce have
3143  * been assigned.  We can deduce the existence of unobserved XIDs because we
3144  * know XIDs are assigned in sequence, with no gaps.  The KnownAssignedXids
3145  * list expands as new XIDs are observed or inferred, and contracts when
3146  * transaction completion records arrive.
3147  *
3148  * During hot standby we do not fret too much about the distinction between
3149  * top-level XIDs and subtransaction XIDs. We store both together in the
3150  * KnownAssignedXids list.  In backends, this is copied into snapshots in
3151  * GetSnapshotData(), taking advantage of the fact that XidInMVCCSnapshot()
3152  * doesn't care about the distinction either.  Subtransaction XIDs are
3153  * effectively treated as top-level XIDs and in the typical case pg_subtrans
3154  * links are *not* maintained (which does not affect visibility).
3155  *
3156  * We have room in KnownAssignedXids and in snapshots to hold maxProcs *
3157  * (1 + PGPROC_MAX_CACHED_SUBXIDS) XIDs, so every master transaction must
3158  * report its subtransaction XIDs in a WAL XLOG_XACT_ASSIGNMENT record at
3159  * least every PGPROC_MAX_CACHED_SUBXIDS.  When we receive one of these
3160  * records, we mark the subXIDs as children of the top XID in pg_subtrans,
3161  * and then remove them from KnownAssignedXids.  This prevents overflow of
3162  * KnownAssignedXids and snapshots, at the cost that status checks for these
3163  * subXIDs will take a slower path through TransactionIdIsInProgress().
3164  * This means that KnownAssignedXids is not necessarily complete for subXIDs,
3165  * though it should be complete for top-level XIDs; this is the same situation
3166  * that holds with respect to the PGPROC entries in normal running.
3167  *
3168  * When we throw away subXIDs from KnownAssignedXids, we need to keep track of
3169  * that, similarly to tracking overflow of a PGPROC's subxids array.  We do
3170  * that by remembering the lastOverflowedXID, ie the last thrown-away subXID.
3171  * As long as that is within the range of interesting XIDs, we have to assume
3172  * that subXIDs are missing from snapshots.  (Note that subXID overflow occurs
3173  * on primary when 65th subXID arrives, whereas on standby it occurs when 64th
3174  * subXID arrives - that is not an error.)
3175  *
3176  * Should a backend on primary somehow disappear before it can write an abort
3177  * record, then we just leave those XIDs in KnownAssignedXids. They actually
3178  * aborted but we think they were running; the distinction is irrelevant
3179  * because either way any changes done by the transaction are not visible to
3180  * backends in the standby.  We prune KnownAssignedXids when
3181  * XLOG_RUNNING_XACTS arrives, to forestall possible overflow of the
3182  * array due to such dead XIDs.
3183  */
3184 
3185 /*
3186  * RecordKnownAssignedTransactionIds
3187  *		Record the given XID in KnownAssignedXids, as well as any preceding
3188  *		unobserved XIDs.
3189  *
3190  * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
3191  * associated with a transaction. Must be called for each record after we
3192  * have executed StartupCLOG() et al, since we must ExtendCLOG() etc..
3193  *
3194  * Called during recovery in analogy with and in place of GetNewTransactionId()
3195  */
3196 void
RecordKnownAssignedTransactionIds(TransactionId xid)3197 RecordKnownAssignedTransactionIds(TransactionId xid)
3198 {
3199 	Assert(standbyState >= STANDBY_INITIALIZED);
3200 	Assert(TransactionIdIsValid(xid));
3201 	Assert(TransactionIdIsValid(latestObservedXid));
3202 
3203 	elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
3204 		 xid, latestObservedXid);
3205 
3206 	/*
3207 	 * When a newly observed xid arrives, it is frequently the case that it is
3208 	 * *not* the next xid in sequence. When this occurs, we must treat the
3209 	 * intervening xids as running also.
3210 	 */
3211 	if (TransactionIdFollows(xid, latestObservedXid))
3212 	{
3213 		TransactionId next_expected_xid;
3214 
3215 		/*
3216 		 * Extend subtrans like we do in GetNewTransactionId() during normal
3217 		 * operation using individual extend steps. Note that we do not need
3218 		 * to extend clog since its extensions are WAL logged.
3219 		 *
3220 		 * This part has to be done regardless of standbyState since we
3221 		 * immediately start assigning subtransactions to their toplevel
3222 		 * transactions.
3223 		 */
3224 		next_expected_xid = latestObservedXid;
3225 		while (TransactionIdPrecedes(next_expected_xid, xid))
3226 		{
3227 			TransactionIdAdvance(next_expected_xid);
3228 			ExtendSUBTRANS(next_expected_xid);
3229 		}
3230 		Assert(next_expected_xid == xid);
3231 
3232 		/*
3233 		 * If the KnownAssignedXids machinery isn't up yet, there's nothing
3234 		 * more to do since we don't track assigned xids yet.
3235 		 */
3236 		if (standbyState <= STANDBY_INITIALIZED)
3237 		{
3238 			latestObservedXid = xid;
3239 			return;
3240 		}
3241 
3242 		/*
3243 		 * Add (latestObservedXid, xid] onto the KnownAssignedXids array.
3244 		 */
3245 		next_expected_xid = latestObservedXid;
3246 		TransactionIdAdvance(next_expected_xid);
3247 		KnownAssignedXidsAdd(next_expected_xid, xid, false);
3248 
3249 		/*
3250 		 * Now we can advance latestObservedXid
3251 		 */
3252 		latestObservedXid = xid;
3253 
3254 		/* ShmemVariableCache->nextXid must be beyond any observed xid */
3255 		next_expected_xid = latestObservedXid;
3256 		TransactionIdAdvance(next_expected_xid);
3257 		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
3258 		if (TransactionIdFollows(next_expected_xid, ShmemVariableCache->nextXid))
3259 			ShmemVariableCache->nextXid = next_expected_xid;
3260 		LWLockRelease(XidGenLock);
3261 	}
3262 }
3263 
3264 /*
3265  * ExpireTreeKnownAssignedTransactionIds
3266  *		Remove the given XIDs from KnownAssignedXids.
3267  *
3268  * Called during recovery in analogy with and in place of ProcArrayEndTransaction()
3269  */
3270 void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid,int nsubxids,TransactionId * subxids,TransactionId max_xid)3271 ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
3272 									  TransactionId *subxids, TransactionId max_xid)
3273 {
3274 	Assert(standbyState >= STANDBY_INITIALIZED);
3275 
3276 	/*
3277 	 * Uses same locking as transaction commit
3278 	 */
3279 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3280 
3281 	KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
3282 
3283 	/* As in ProcArrayEndTransaction, advance latestCompletedXid */
3284 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
3285 							  max_xid))
3286 		ShmemVariableCache->latestCompletedXid = max_xid;
3287 
3288 	LWLockRelease(ProcArrayLock);
3289 }
3290 
3291 /*
3292  * ExpireAllKnownAssignedTransactionIds
3293  *		Remove all entries in KnownAssignedXids and reset lastOverflowedXid.
3294  */
3295 void
ExpireAllKnownAssignedTransactionIds(void)3296 ExpireAllKnownAssignedTransactionIds(void)
3297 {
3298 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3299 	KnownAssignedXidsRemovePreceding(InvalidTransactionId);
3300 
3301 	/*
3302 	 * Reset lastOverflowedXid.  Currently, lastOverflowedXid has no use after
3303 	 * the call of this function.  But do this for unification with what
3304 	 * ExpireOldKnownAssignedTransactionIds() do.
3305 	 */
3306 	procArray->lastOverflowedXid = InvalidTransactionId;
3307 	LWLockRelease(ProcArrayLock);
3308 }
3309 
3310 /*
3311  * ExpireOldKnownAssignedTransactionIds
3312  *		Remove KnownAssignedXids entries preceding the given XID and
3313  *		potentially reset lastOverflowedXid.
3314  */
3315 void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)3316 ExpireOldKnownAssignedTransactionIds(TransactionId xid)
3317 {
3318 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3319 
3320 	/*
3321 	 * Reset lastOverflowedXid if we know all transactions that have been
3322 	 * possibly running are being gone.  Not doing so could cause an incorrect
3323 	 * lastOverflowedXid value, which makes extra snapshots be marked as
3324 	 * suboverflowed.
3325 	 */
3326 	if (TransactionIdPrecedes(procArray->lastOverflowedXid, xid))
3327 		procArray->lastOverflowedXid = InvalidTransactionId;
3328 	KnownAssignedXidsRemovePreceding(xid);
3329 	LWLockRelease(ProcArrayLock);
3330 }
3331 
3332 
3333 /*
3334  * Private module functions to manipulate KnownAssignedXids
3335  *
3336  * There are 5 main uses of the KnownAssignedXids data structure:
3337  *
3338  *	* backends taking snapshots - all valid XIDs need to be copied out
3339  *	* backends seeking to determine presence of a specific XID
3340  *	* startup process adding new known-assigned XIDs
3341  *	* startup process removing specific XIDs as transactions end
3342  *	* startup process pruning array when special WAL records arrive
3343  *
3344  * This data structure is known to be a hot spot during Hot Standby, so we
3345  * go to some lengths to make these operations as efficient and as concurrent
3346  * as possible.
3347  *
3348  * The XIDs are stored in an array in sorted order --- TransactionIdPrecedes
3349  * order, to be exact --- to allow binary search for specific XIDs.  Note:
3350  * in general TransactionIdPrecedes would not provide a total order, but
3351  * we know that the entries present at any instant should not extend across
3352  * a large enough fraction of XID space to wrap around (the master would
3353  * shut down for fear of XID wrap long before that happens).  So it's OK to
3354  * use TransactionIdPrecedes as a binary-search comparator.
3355  *
3356  * It's cheap to maintain the sortedness during insertions, since new known
3357  * XIDs are always reported in XID order; we just append them at the right.
3358  *
3359  * To keep individual deletions cheap, we need to allow gaps in the array.
3360  * This is implemented by marking array elements as valid or invalid using
3361  * the parallel boolean array KnownAssignedXidsValid[].  A deletion is done
3362  * by setting KnownAssignedXidsValid[i] to false, *without* clearing the
3363  * XID entry itself.  This preserves the property that the XID entries are
3364  * sorted, so we can do binary searches easily.  Periodically we compress
3365  * out the unused entries; that's much cheaper than having to compress the
3366  * array immediately on every deletion.
3367  *
3368  * The actually valid items in KnownAssignedXids[] and KnownAssignedXidsValid[]
3369  * are those with indexes tail <= i < head; items outside this subscript range
3370  * have unspecified contents.  When head reaches the end of the array, we
3371  * force compression of unused entries rather than wrapping around, since
3372  * allowing wraparound would greatly complicate the search logic.  We maintain
3373  * an explicit tail pointer so that pruning of old XIDs can be done without
3374  * immediately moving the array contents.  In most cases only a small fraction
3375  * of the array contains valid entries at any instant.
3376  *
3377  * Although only the startup process can ever change the KnownAssignedXids
3378  * data structure, we still need interlocking so that standby backends will
3379  * not observe invalid intermediate states.  The convention is that backends
3380  * must hold shared ProcArrayLock to examine the array.  To remove XIDs from
3381  * the array, the startup process must hold ProcArrayLock exclusively, for
3382  * the usual transactional reasons (compare commit/abort of a transaction
3383  * during normal running).  Compressing unused entries out of the array
3384  * likewise requires exclusive lock.  To add XIDs to the array, we just insert
3385  * them into slots to the right of the head pointer and then advance the head
3386  * pointer.  This wouldn't require any lock at all, except that on machines
3387  * with weak memory ordering we need to be careful that other processors
3388  * see the array element changes before they see the head pointer change.
3389  * We handle this by using a spinlock to protect reads and writes of the
3390  * head/tail pointers.  (We could dispense with the spinlock if we were to
3391  * create suitable memory access barrier primitives and use those instead.)
3392  * The spinlock must be taken to read or write the head/tail pointers unless
3393  * the caller holds ProcArrayLock exclusively.
3394  *
3395  * Algorithmic analysis:
3396  *
3397  * If we have a maximum of M slots, with N XIDs currently spread across
3398  * S elements then we have N <= S <= M always.
3399  *
3400  *	* Adding a new XID is O(1) and needs little locking (unless compression
3401  *		must happen)
3402  *	* Compressing the array is O(S) and requires exclusive lock
3403  *	* Removing an XID is O(logS) and requires exclusive lock
3404  *	* Taking a snapshot is O(S) and requires shared lock
3405  *	* Checking for an XID is O(logS) and requires shared lock
3406  *
3407  * In comparison, using a hash table for KnownAssignedXids would mean that
3408  * taking snapshots would be O(M). If we can maintain S << M then the
3409  * sorted array technique will deliver significantly faster snapshots.
3410  * If we try to keep S too small then we will spend too much time compressing,
3411  * so there is an optimal point for any workload mix. We use a heuristic to
3412  * decide when to compress the array, though trimming also helps reduce
3413  * frequency of compressing. The heuristic requires us to track the number of
3414  * currently valid XIDs in the array.
3415  */
3416 
3417 
3418 /*
3419  * Compress KnownAssignedXids by shifting valid data down to the start of the
3420  * array, removing any gaps.
3421  *
3422  * A compression step is forced if "force" is true, otherwise we do it
3423  * only if a heuristic indicates it's a good time to do it.
3424  *
3425  * Caller must hold ProcArrayLock in exclusive mode.
3426  */
3427 static void
KnownAssignedXidsCompress(bool force)3428 KnownAssignedXidsCompress(bool force)
3429 {
3430 	/* use volatile pointer to prevent code rearrangement */
3431 	volatile ProcArrayStruct *pArray = procArray;
3432 	int			head,
3433 				tail;
3434 	int			compress_index;
3435 	int			i;
3436 
3437 	/* no spinlock required since we hold ProcArrayLock exclusively */
3438 	head = pArray->headKnownAssignedXids;
3439 	tail = pArray->tailKnownAssignedXids;
3440 
3441 	if (!force)
3442 	{
3443 		/*
3444 		 * If we can choose how much to compress, use a heuristic to avoid
3445 		 * compressing too often or not often enough.
3446 		 *
3447 		 * Heuristic is if we have a large enough current spread and less than
3448 		 * 50% of the elements are currently in use, then compress. This
3449 		 * should ensure we compress fairly infrequently. We could compress
3450 		 * less often though the virtual array would spread out more and
3451 		 * snapshots would become more expensive.
3452 		 */
3453 		int			nelements = head - tail;
3454 
3455 		if (nelements < 4 * PROCARRAY_MAXPROCS ||
3456 			nelements < 2 * pArray->numKnownAssignedXids)
3457 			return;
3458 	}
3459 
3460 	/*
3461 	 * We compress the array by reading the valid values from tail to head,
3462 	 * re-aligning data to 0th element.
3463 	 */
3464 	compress_index = 0;
3465 	for (i = tail; i < head; i++)
3466 	{
3467 		if (KnownAssignedXidsValid[i])
3468 		{
3469 			KnownAssignedXids[compress_index] = KnownAssignedXids[i];
3470 			KnownAssignedXidsValid[compress_index] = true;
3471 			compress_index++;
3472 		}
3473 	}
3474 
3475 	pArray->tailKnownAssignedXids = 0;
3476 	pArray->headKnownAssignedXids = compress_index;
3477 }
3478 
3479 /*
3480  * Add xids into KnownAssignedXids at the head of the array.
3481  *
3482  * xids from from_xid to to_xid, inclusive, are added to the array.
3483  *
3484  * If exclusive_lock is true then caller already holds ProcArrayLock in
3485  * exclusive mode, so we need no extra locking here.  Else caller holds no
3486  * lock, so we need to be sure we maintain sufficient interlocks against
3487  * concurrent readers.  (Only the startup process ever calls this, so no need
3488  * to worry about concurrent writers.)
3489  */
3490 static void
KnownAssignedXidsAdd(TransactionId from_xid,TransactionId to_xid,bool exclusive_lock)3491 KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
3492 					 bool exclusive_lock)
3493 {
3494 	/* use volatile pointer to prevent code rearrangement */
3495 	volatile ProcArrayStruct *pArray = procArray;
3496 	TransactionId next_xid;
3497 	int			head,
3498 				tail;
3499 	int			nxids;
3500 	int			i;
3501 
3502 	Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));
3503 
3504 	/*
3505 	 * Calculate how many array slots we'll need.  Normally this is cheap; in
3506 	 * the unusual case where the XIDs cross the wrap point, we do it the hard
3507 	 * way.
3508 	 */
3509 	if (to_xid >= from_xid)
3510 		nxids = to_xid - from_xid + 1;
3511 	else
3512 	{
3513 		nxids = 1;
3514 		next_xid = from_xid;
3515 		while (TransactionIdPrecedes(next_xid, to_xid))
3516 		{
3517 			nxids++;
3518 			TransactionIdAdvance(next_xid);
3519 		}
3520 	}
3521 
3522 	/*
3523 	 * Since only the startup process modifies the head/tail pointers, we
3524 	 * don't need a lock to read them here.
3525 	 */
3526 	head = pArray->headKnownAssignedXids;
3527 	tail = pArray->tailKnownAssignedXids;
3528 
3529 	Assert(head >= 0 && head <= pArray->maxKnownAssignedXids);
3530 	Assert(tail >= 0 && tail < pArray->maxKnownAssignedXids);
3531 
3532 	/*
3533 	 * Verify that insertions occur in TransactionId sequence.  Note that even
3534 	 * if the last existing element is marked invalid, it must still have a
3535 	 * correctly sequenced XID value.
3536 	 */
3537 	if (head > tail &&
3538 		TransactionIdFollowsOrEquals(KnownAssignedXids[head - 1], from_xid))
3539 	{
3540 		KnownAssignedXidsDisplay(LOG);
3541 		elog(ERROR, "out-of-order XID insertion in KnownAssignedXids");
3542 	}
3543 
3544 	/*
3545 	 * If our xids won't fit in the remaining space, compress out free space
3546 	 */
3547 	if (head + nxids > pArray->maxKnownAssignedXids)
3548 	{
3549 		/* must hold lock to compress */
3550 		if (!exclusive_lock)
3551 			LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3552 
3553 		KnownAssignedXidsCompress(true);
3554 
3555 		head = pArray->headKnownAssignedXids;
3556 		/* note: we no longer care about the tail pointer */
3557 
3558 		if (!exclusive_lock)
3559 			LWLockRelease(ProcArrayLock);
3560 
3561 		/*
3562 		 * If it still won't fit then we're out of memory
3563 		 */
3564 		if (head + nxids > pArray->maxKnownAssignedXids)
3565 			elog(ERROR, "too many KnownAssignedXids");
3566 	}
3567 
3568 	/* Now we can insert the xids into the space starting at head */
3569 	next_xid = from_xid;
3570 	for (i = 0; i < nxids; i++)
3571 	{
3572 		KnownAssignedXids[head] = next_xid;
3573 		KnownAssignedXidsValid[head] = true;
3574 		TransactionIdAdvance(next_xid);
3575 		head++;
3576 	}
3577 
3578 	/* Adjust count of number of valid entries */
3579 	pArray->numKnownAssignedXids += nxids;
3580 
3581 	/*
3582 	 * Now update the head pointer.  We use a spinlock to protect this
3583 	 * pointer, not because the update is likely to be non-atomic, but to
3584 	 * ensure that other processors see the above array updates before they
3585 	 * see the head pointer change.
3586 	 *
3587 	 * If we're holding ProcArrayLock exclusively, there's no need to take the
3588 	 * spinlock.
3589 	 */
3590 	if (exclusive_lock)
3591 		pArray->headKnownAssignedXids = head;
3592 	else
3593 	{
3594 		SpinLockAcquire(&pArray->known_assigned_xids_lck);
3595 		pArray->headKnownAssignedXids = head;
3596 		SpinLockRelease(&pArray->known_assigned_xids_lck);
3597 	}
3598 }
3599 
3600 /*
3601  * KnownAssignedXidsSearch
3602  *
3603  * Searches KnownAssignedXids for a specific xid and optionally removes it.
3604  * Returns true if it was found, false if not.
3605  *
3606  * Caller must hold ProcArrayLock in shared or exclusive mode.
3607  * Exclusive lock must be held for remove = true.
3608  */
3609 static bool
KnownAssignedXidsSearch(TransactionId xid,bool remove)3610 KnownAssignedXidsSearch(TransactionId xid, bool remove)
3611 {
3612 	/* use volatile pointer to prevent code rearrangement */
3613 	volatile ProcArrayStruct *pArray = procArray;
3614 	int			first,
3615 				last;
3616 	int			head;
3617 	int			tail;
3618 	int			result_index = -1;
3619 
3620 	if (remove)
3621 	{
3622 		/* we hold ProcArrayLock exclusively, so no need for spinlock */
3623 		tail = pArray->tailKnownAssignedXids;
3624 		head = pArray->headKnownAssignedXids;
3625 	}
3626 	else
3627 	{
3628 		/* take spinlock to ensure we see up-to-date array contents */
3629 		SpinLockAcquire(&pArray->known_assigned_xids_lck);
3630 		tail = pArray->tailKnownAssignedXids;
3631 		head = pArray->headKnownAssignedXids;
3632 		SpinLockRelease(&pArray->known_assigned_xids_lck);
3633 	}
3634 
3635 	/*
3636 	 * Standard binary search.  Note we can ignore the KnownAssignedXidsValid
3637 	 * array here, since even invalid entries will contain sorted XIDs.
3638 	 */
3639 	first = tail;
3640 	last = head - 1;
3641 	while (first <= last)
3642 	{
3643 		int			mid_index;
3644 		TransactionId mid_xid;
3645 
3646 		mid_index = (first + last) / 2;
3647 		mid_xid = KnownAssignedXids[mid_index];
3648 
3649 		if (xid == mid_xid)
3650 		{
3651 			result_index = mid_index;
3652 			break;
3653 		}
3654 		else if (TransactionIdPrecedes(xid, mid_xid))
3655 			last = mid_index - 1;
3656 		else
3657 			first = mid_index + 1;
3658 	}
3659 
3660 	if (result_index < 0)
3661 		return false;			/* not in array */
3662 
3663 	if (!KnownAssignedXidsValid[result_index])
3664 		return false;			/* in array, but invalid */
3665 
3666 	if (remove)
3667 	{
3668 		KnownAssignedXidsValid[result_index] = false;
3669 
3670 		pArray->numKnownAssignedXids--;
3671 		Assert(pArray->numKnownAssignedXids >= 0);
3672 
3673 		/*
3674 		 * If we're removing the tail element then advance tail pointer over
3675 		 * any invalid elements.  This will speed future searches.
3676 		 */
3677 		if (result_index == tail)
3678 		{
3679 			tail++;
3680 			while (tail < head && !KnownAssignedXidsValid[tail])
3681 				tail++;
3682 			if (tail >= head)
3683 			{
3684 				/* Array is empty, so we can reset both pointers */
3685 				pArray->headKnownAssignedXids = 0;
3686 				pArray->tailKnownAssignedXids = 0;
3687 			}
3688 			else
3689 			{
3690 				pArray->tailKnownAssignedXids = tail;
3691 			}
3692 		}
3693 	}
3694 
3695 	return true;
3696 }
3697 
3698 /*
3699  * Is the specified XID present in KnownAssignedXids[]?
3700  *
3701  * Caller must hold ProcArrayLock in shared or exclusive mode.
3702  */
3703 static bool
KnownAssignedXidExists(TransactionId xid)3704 KnownAssignedXidExists(TransactionId xid)
3705 {
3706 	Assert(TransactionIdIsValid(xid));
3707 
3708 	return KnownAssignedXidsSearch(xid, false);
3709 }
3710 
3711 /*
3712  * Remove the specified XID from KnownAssignedXids[].
3713  *
3714  * Caller must hold ProcArrayLock in exclusive mode.
3715  */
3716 static void
KnownAssignedXidsRemove(TransactionId xid)3717 KnownAssignedXidsRemove(TransactionId xid)
3718 {
3719 	Assert(TransactionIdIsValid(xid));
3720 
3721 	elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
3722 
3723 	/*
3724 	 * Note: we cannot consider it an error to remove an XID that's not
3725 	 * present.  We intentionally remove subxact IDs while processing
3726 	 * XLOG_XACT_ASSIGNMENT, to avoid array overflow.  Then those XIDs will be
3727 	 * removed again when the top-level xact commits or aborts.
3728 	 *
3729 	 * It might be possible to track such XIDs to distinguish this case from
3730 	 * actual errors, but it would be complicated and probably not worth it.
3731 	 * So, just ignore the search result.
3732 	 */
3733 	(void) KnownAssignedXidsSearch(xid, true);
3734 }
3735 
3736 /*
3737  * KnownAssignedXidsRemoveTree
3738  *		Remove xid (if it's not InvalidTransactionId) and all the subxids.
3739  *
3740  * Caller must hold ProcArrayLock in exclusive mode.
3741  */
3742 static void
KnownAssignedXidsRemoveTree(TransactionId xid,int nsubxids,TransactionId * subxids)3743 KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
3744 							TransactionId *subxids)
3745 {
3746 	int			i;
3747 
3748 	if (TransactionIdIsValid(xid))
3749 		KnownAssignedXidsRemove(xid);
3750 
3751 	for (i = 0; i < nsubxids; i++)
3752 		KnownAssignedXidsRemove(subxids[i]);
3753 
3754 	/* Opportunistically compress the array */
3755 	KnownAssignedXidsCompress(false);
3756 }
3757 
3758 /*
3759  * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
3760  * then clear the whole table.
3761  *
3762  * Caller must hold ProcArrayLock in exclusive mode.
3763  */
3764 static void
KnownAssignedXidsRemovePreceding(TransactionId removeXid)3765 KnownAssignedXidsRemovePreceding(TransactionId removeXid)
3766 {
3767 	/* use volatile pointer to prevent code rearrangement */
3768 	volatile ProcArrayStruct *pArray = procArray;
3769 	int			count = 0;
3770 	int			head,
3771 				tail,
3772 				i;
3773 
3774 	if (!TransactionIdIsValid(removeXid))
3775 	{
3776 		elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
3777 		pArray->numKnownAssignedXids = 0;
3778 		pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
3779 		return;
3780 	}
3781 
3782 	elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
3783 
3784 	/*
3785 	 * Mark entries invalid starting at the tail.  Since array is sorted, we
3786 	 * can stop as soon as we reach an entry >= removeXid.
3787 	 */
3788 	tail = pArray->tailKnownAssignedXids;
3789 	head = pArray->headKnownAssignedXids;
3790 
3791 	for (i = tail; i < head; i++)
3792 	{
3793 		if (KnownAssignedXidsValid[i])
3794 		{
3795 			TransactionId knownXid = KnownAssignedXids[i];
3796 
3797 			if (TransactionIdFollowsOrEquals(knownXid, removeXid))
3798 				break;
3799 
3800 			if (!StandbyTransactionIdIsPrepared(knownXid))
3801 			{
3802 				KnownAssignedXidsValid[i] = false;
3803 				count++;
3804 			}
3805 		}
3806 	}
3807 
3808 	pArray->numKnownAssignedXids -= count;
3809 	Assert(pArray->numKnownAssignedXids >= 0);
3810 
3811 	/*
3812 	 * Advance the tail pointer if we've marked the tail item invalid.
3813 	 */
3814 	for (i = tail; i < head; i++)
3815 	{
3816 		if (KnownAssignedXidsValid[i])
3817 			break;
3818 	}
3819 	if (i >= head)
3820 	{
3821 		/* Array is empty, so we can reset both pointers */
3822 		pArray->headKnownAssignedXids = 0;
3823 		pArray->tailKnownAssignedXids = 0;
3824 	}
3825 	else
3826 	{
3827 		pArray->tailKnownAssignedXids = i;
3828 	}
3829 
3830 	/* Opportunistically compress the array */
3831 	KnownAssignedXidsCompress(false);
3832 }
3833 
3834 /*
3835  * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
3836  * We filter out anything >= xmax.
3837  *
3838  * Returns the number of XIDs stored into xarray[].  Caller is responsible
3839  * that array is large enough.
3840  *
3841  * Caller must hold ProcArrayLock in (at least) shared mode.
3842  */
3843 static int
KnownAssignedXidsGet(TransactionId * xarray,TransactionId xmax)3844 KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
3845 {
3846 	TransactionId xtmp = InvalidTransactionId;
3847 
3848 	return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
3849 }
3850 
3851 /*
3852  * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus
3853  * we reduce *xmin to the lowest xid value seen if not already lower.
3854  *
3855  * Caller must hold ProcArrayLock in (at least) shared mode.
3856  */
3857 static int
KnownAssignedXidsGetAndSetXmin(TransactionId * xarray,TransactionId * xmin,TransactionId xmax)3858 KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
3859 							   TransactionId xmax)
3860 {
3861 	int			count = 0;
3862 	int			head,
3863 				tail;
3864 	int			i;
3865 
3866 	/*
3867 	 * Fetch head just once, since it may change while we loop. We can stop
3868 	 * once we reach the initially seen head, since we are certain that an xid
3869 	 * cannot enter and then leave the array while we hold ProcArrayLock.  We
3870 	 * might miss newly-added xids, but they should be >= xmax so irrelevant
3871 	 * anyway.
3872 	 *
3873 	 * Must take spinlock to ensure we see up-to-date array contents.
3874 	 */
3875 	SpinLockAcquire(&procArray->known_assigned_xids_lck);
3876 	tail = procArray->tailKnownAssignedXids;
3877 	head = procArray->headKnownAssignedXids;
3878 	SpinLockRelease(&procArray->known_assigned_xids_lck);
3879 
3880 	for (i = tail; i < head; i++)
3881 	{
3882 		/* Skip any gaps in the array */
3883 		if (KnownAssignedXidsValid[i])
3884 		{
3885 			TransactionId knownXid = KnownAssignedXids[i];
3886 
3887 			/*
3888 			 * Update xmin if required.  Only the first XID need be checked,
3889 			 * since the array is sorted.
3890 			 */
3891 			if (count == 0 &&
3892 				TransactionIdPrecedes(knownXid, *xmin))
3893 				*xmin = knownXid;
3894 
3895 			/*
3896 			 * Filter out anything >= xmax, again relying on sorted property
3897 			 * of array.
3898 			 */
3899 			if (TransactionIdIsValid(xmax) &&
3900 				TransactionIdFollowsOrEquals(knownXid, xmax))
3901 				break;
3902 
3903 			/* Add knownXid into output array */
3904 			xarray[count++] = knownXid;
3905 		}
3906 	}
3907 
3908 	return count;
3909 }
3910 
3911 /*
3912  * Get oldest XID in the KnownAssignedXids array, or InvalidTransactionId
3913  * if nothing there.
3914  */
3915 static TransactionId
KnownAssignedXidsGetOldestXmin(void)3916 KnownAssignedXidsGetOldestXmin(void)
3917 {
3918 	int			head,
3919 				tail;
3920 	int			i;
3921 
3922 	/*
3923 	 * Fetch head just once, since it may change while we loop.
3924 	 */
3925 	SpinLockAcquire(&procArray->known_assigned_xids_lck);
3926 	tail = procArray->tailKnownAssignedXids;
3927 	head = procArray->headKnownAssignedXids;
3928 	SpinLockRelease(&procArray->known_assigned_xids_lck);
3929 
3930 	for (i = tail; i < head; i++)
3931 	{
3932 		/* Skip any gaps in the array */
3933 		if (KnownAssignedXidsValid[i])
3934 			return KnownAssignedXids[i];
3935 	}
3936 
3937 	return InvalidTransactionId;
3938 }
3939 
3940 /*
3941  * Display KnownAssignedXids to provide debug trail
3942  *
3943  * Currently this is only called within startup process, so we need no
3944  * special locking.
3945  *
3946  * Note this is pretty expensive, and much of the expense will be incurred
3947  * even if the elog message will get discarded.  It's not currently called
3948  * in any performance-critical places, however, so no need to be tenser.
3949  */
3950 static void
KnownAssignedXidsDisplay(int trace_level)3951 KnownAssignedXidsDisplay(int trace_level)
3952 {
3953 	/* use volatile pointer to prevent code rearrangement */
3954 	volatile ProcArrayStruct *pArray = procArray;
3955 	StringInfoData buf;
3956 	int			head,
3957 				tail,
3958 				i;
3959 	int			nxids = 0;
3960 
3961 	tail = pArray->tailKnownAssignedXids;
3962 	head = pArray->headKnownAssignedXids;
3963 
3964 	initStringInfo(&buf);
3965 
3966 	for (i = tail; i < head; i++)
3967 	{
3968 		if (KnownAssignedXidsValid[i])
3969 		{
3970 			nxids++;
3971 			appendStringInfo(&buf, "[%d]=%u ", i, KnownAssignedXids[i]);
3972 		}
3973 	}
3974 
3975 	elog(trace_level, "%d KnownAssignedXids (num=%d tail=%d head=%d) %s",
3976 		 nxids,
3977 		 pArray->numKnownAssignedXids,
3978 		 pArray->tailKnownAssignedXids,
3979 		 pArray->headKnownAssignedXids,
3980 		 buf.data);
3981 
3982 	pfree(buf.data);
3983 }
3984 
3985 /*
3986  * KnownAssignedXidsReset
3987  *		Resets KnownAssignedXids to be empty
3988  */
3989 static void
KnownAssignedXidsReset(void)3990 KnownAssignedXidsReset(void)
3991 {
3992 	/* use volatile pointer to prevent code rearrangement */
3993 	volatile ProcArrayStruct *pArray = procArray;
3994 
3995 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3996 
3997 	pArray->numKnownAssignedXids = 0;
3998 	pArray->tailKnownAssignedXids = 0;
3999 	pArray->headKnownAssignedXids = 0;
4000 
4001 	LWLockRelease(ProcArrayLock);
4002 }
4003