1 /*-------------------------------------------------------------------------
2  *
3  * procarray.c
4  *	  POSTGRES process array code.
5  *
6  *
7  * This module maintains arrays of the PGPROC and PGXACT structures for all
8  * active backends.  Although there are several uses for this, the principal
9  * one is as a means of determining the set of currently running transactions.
10  *
11  * Because of various subtle race conditions it is critical that a backend
12  * hold the correct locks while setting or clearing its MyPgXact->xid field.
13  * See notes in src/backend/access/transam/README.
14  *
15  * The process arrays now also include structures representing prepared
16  * transactions.  The xid and subxids fields of these are valid, as are the
17  * myProcLocks lists.  They can be distinguished from regular backend PGPROCs
18  * at need by checking for pid == 0.
19  *
20  * During hot standby, we also keep a list of XIDs representing transactions
21  * that are known to be running in the master (or more precisely, were running
22  * as of the current point in the WAL stream).  This list is kept in the
23  * KnownAssignedXids array, and is updated by watching the sequence of
24  * arriving XIDs.  This is necessary because if we leave those XIDs out of
25  * snapshots taken for standby queries, then they will appear to be already
26  * complete, leading to MVCC failures.  Note that in hot standby, the PGPROC
27  * array represents standby processes, which by definition are not running
28  * transactions that have XIDs.
29  *
30  * It is perhaps possible for a backend on the master to terminate without
31  * writing an abort record for its transaction.  While that shouldn't really
32  * happen, it would tie up KnownAssignedXids indefinitely, so we protect
33  * ourselves by pruning the array when a valid list of running XIDs arrives.
34  *
35  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
36  * Portions Copyright (c) 1994, Regents of the University of California
37  *
38  *
39  * IDENTIFICATION
40  *	  src/backend/storage/ipc/procarray.c
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45 
46 #include <signal.h>
47 
48 #include "access/clog.h"
49 #include "access/subtrans.h"
50 #include "access/transam.h"
51 #include "access/twophase.h"
52 #include "access/xact.h"
53 #include "access/xlog.h"
54 #include "catalog/catalog.h"
55 #include "miscadmin.h"
56 #include "pgstat.h"
57 #include "storage/proc.h"
58 #include "storage/procarray.h"
59 #include "storage/spin.h"
60 #include "utils/builtins.h"
61 #include "utils/rel.h"
62 #include "utils/snapmgr.h"
63 
64 #define UINT32_ACCESS_ONCE(var)		 ((uint32)(*((volatile uint32 *)&(var))))
65 
66 /* Our shared memory area */
67 typedef struct ProcArrayStruct
68 {
69 	int			numProcs;		/* number of valid procs entries */
70 	int			maxProcs;		/* allocated size of procs array */
71 
72 	/*
73 	 * Known assigned XIDs handling
74 	 */
75 	int			maxKnownAssignedXids;	/* allocated size of array */
76 	int			numKnownAssignedXids;	/* current # of valid entries */
77 	int			tailKnownAssignedXids;	/* index of oldest valid element */
78 	int			headKnownAssignedXids;	/* index of newest element, + 1 */
79 	slock_t		known_assigned_xids_lck;	/* protects head/tail pointers */
80 
81 	/*
82 	 * Highest subxid that has been removed from KnownAssignedXids array to
83 	 * prevent overflow; or InvalidTransactionId if none.  We track this for
84 	 * similar reasons to tracking overflowing cached subxids in PGXACT
85 	 * entries.  Must hold exclusive ProcArrayLock to change this, and shared
86 	 * lock to read it.
87 	 */
88 	TransactionId lastOverflowedXid;
89 
90 	/* oldest xmin of any replication slot */
91 	TransactionId replication_slot_xmin;
92 	/* oldest catalog xmin of any replication slot */
93 	TransactionId replication_slot_catalog_xmin;
94 
95 	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
96 	int			pgprocnos[FLEXIBLE_ARRAY_MEMBER];
97 } ProcArrayStruct;
98 
99 static ProcArrayStruct *procArray;
100 
101 static PGPROC *allProcs;
102 static PGXACT *allPgXact;
103 
104 /*
105  * Bookkeeping for tracking emulated transactions in recovery
106  */
107 static TransactionId *KnownAssignedXids;
108 static bool *KnownAssignedXidsValid;
109 static TransactionId latestObservedXid = InvalidTransactionId;
110 
111 /*
112  * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
113  * the highest xid that might still be running that we don't have in
114  * KnownAssignedXids.
115  */
116 static TransactionId standbySnapshotPendingXmin;
117 
118 #ifdef XIDCACHE_DEBUG
119 
120 /* counters for XidCache measurement */
121 static long xc_by_recent_xmin = 0;
122 static long xc_by_known_xact = 0;
123 static long xc_by_my_xact = 0;
124 static long xc_by_latest_xid = 0;
125 static long xc_by_main_xid = 0;
126 static long xc_by_child_xid = 0;
127 static long xc_by_known_assigned = 0;
128 static long xc_no_overflow = 0;
129 static long xc_slow_answer = 0;
130 
131 #define xc_by_recent_xmin_inc()		(xc_by_recent_xmin++)
132 #define xc_by_known_xact_inc()		(xc_by_known_xact++)
133 #define xc_by_my_xact_inc()			(xc_by_my_xact++)
134 #define xc_by_latest_xid_inc()		(xc_by_latest_xid++)
135 #define xc_by_main_xid_inc()		(xc_by_main_xid++)
136 #define xc_by_child_xid_inc()		(xc_by_child_xid++)
137 #define xc_by_known_assigned_inc()	(xc_by_known_assigned++)
138 #define xc_no_overflow_inc()		(xc_no_overflow++)
139 #define xc_slow_answer_inc()		(xc_slow_answer++)
140 
141 static void DisplayXidCache(void);
142 #else							/* !XIDCACHE_DEBUG */
143 
144 #define xc_by_recent_xmin_inc()		((void) 0)
145 #define xc_by_known_xact_inc()		((void) 0)
146 #define xc_by_my_xact_inc()			((void) 0)
147 #define xc_by_latest_xid_inc()		((void) 0)
148 #define xc_by_main_xid_inc()		((void) 0)
149 #define xc_by_child_xid_inc()		((void) 0)
150 #define xc_by_known_assigned_inc()	((void) 0)
151 #define xc_no_overflow_inc()		((void) 0)
152 #define xc_slow_answer_inc()		((void) 0)
153 #endif							/* XIDCACHE_DEBUG */
154 
155 /* Primitives for KnownAssignedXids array handling for standby */
156 static void KnownAssignedXidsCompress(bool force);
157 static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
158 								 bool exclusive_lock);
159 static bool KnownAssignedXidsSearch(TransactionId xid, bool remove);
160 static bool KnownAssignedXidExists(TransactionId xid);
161 static void KnownAssignedXidsRemove(TransactionId xid);
162 static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
163 										TransactionId *subxids);
164 static void KnownAssignedXidsRemovePreceding(TransactionId xid);
165 static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
166 static int	KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
167 										   TransactionId *xmin,
168 										   TransactionId xmax);
169 static TransactionId KnownAssignedXidsGetOldestXmin(void);
170 static void KnownAssignedXidsDisplay(int trace_level);
171 static void KnownAssignedXidsReset(void);
172 static inline void ProcArrayEndTransactionInternal(PGPROC *proc,
173 												   PGXACT *pgxact, TransactionId latestXid);
174 static void ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid);
175 
176 /*
177  * Report shared-memory space needed by CreateSharedProcArray.
178  */
179 Size
ProcArrayShmemSize(void)180 ProcArrayShmemSize(void)
181 {
182 	Size		size;
183 
184 	/* Size of the ProcArray structure itself */
185 #define PROCARRAY_MAXPROCS	(MaxBackends + max_prepared_xacts)
186 
187 	size = offsetof(ProcArrayStruct, pgprocnos);
188 	size = add_size(size, mul_size(sizeof(int), PROCARRAY_MAXPROCS));
189 
190 	/*
191 	 * During Hot Standby processing we have a data structure called
192 	 * KnownAssignedXids, created in shared memory. Local data structures are
193 	 * also created in various backends during GetSnapshotData(),
194 	 * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the
195 	 * main structures created in those functions must be identically sized,
196 	 * since we may at times copy the whole of the data structures around. We
197 	 * refer to this size as TOTAL_MAX_CACHED_SUBXIDS.
198 	 *
199 	 * Ideally we'd only create this structure if we were actually doing hot
200 	 * standby in the current run, but we don't know that yet at the time
201 	 * shared memory is being set up.
202 	 */
203 #define TOTAL_MAX_CACHED_SUBXIDS \
204 	((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
205 
206 	if (EnableHotStandby)
207 	{
208 		size = add_size(size,
209 						mul_size(sizeof(TransactionId),
210 								 TOTAL_MAX_CACHED_SUBXIDS));
211 		size = add_size(size,
212 						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
213 	}
214 
215 	return size;
216 }
217 
218 /*
219  * Initialize the shared PGPROC array during postmaster startup.
220  */
221 void
CreateSharedProcArray(void)222 CreateSharedProcArray(void)
223 {
224 	bool		found;
225 
226 	/* Create or attach to the ProcArray shared structure */
227 	procArray = (ProcArrayStruct *)
228 		ShmemInitStruct("Proc Array",
229 						add_size(offsetof(ProcArrayStruct, pgprocnos),
230 								 mul_size(sizeof(int),
231 										  PROCARRAY_MAXPROCS)),
232 						&found);
233 
234 	if (!found)
235 	{
236 		/*
237 		 * We're the first - initialize.
238 		 */
239 		procArray->numProcs = 0;
240 		procArray->maxProcs = PROCARRAY_MAXPROCS;
241 		procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
242 		procArray->numKnownAssignedXids = 0;
243 		procArray->tailKnownAssignedXids = 0;
244 		procArray->headKnownAssignedXids = 0;
245 		SpinLockInit(&procArray->known_assigned_xids_lck);
246 		procArray->lastOverflowedXid = InvalidTransactionId;
247 		procArray->replication_slot_xmin = InvalidTransactionId;
248 		procArray->replication_slot_catalog_xmin = InvalidTransactionId;
249 	}
250 
251 	allProcs = ProcGlobal->allProcs;
252 	allPgXact = ProcGlobal->allPgXact;
253 
254 	/* Create or attach to the KnownAssignedXids arrays too, if needed */
255 	if (EnableHotStandby)
256 	{
257 		KnownAssignedXids = (TransactionId *)
258 			ShmemInitStruct("KnownAssignedXids",
259 							mul_size(sizeof(TransactionId),
260 									 TOTAL_MAX_CACHED_SUBXIDS),
261 							&found);
262 		KnownAssignedXidsValid = (bool *)
263 			ShmemInitStruct("KnownAssignedXidsValid",
264 							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
265 							&found);
266 	}
267 
268 	/* Register and initialize fields of ProcLWLockTranche */
269 	LWLockRegisterTranche(LWTRANCHE_PROC, "proc");
270 }
271 
272 /*
273  * Add the specified PGPROC to the shared array.
274  */
275 void
ProcArrayAdd(PGPROC * proc)276 ProcArrayAdd(PGPROC *proc)
277 {
278 	ProcArrayStruct *arrayP = procArray;
279 	int			index;
280 
281 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
282 
283 	if (arrayP->numProcs >= arrayP->maxProcs)
284 	{
285 		/*
286 		 * Oops, no room.  (This really shouldn't happen, since there is a
287 		 * fixed supply of PGPROC structs too, and so we should have failed
288 		 * earlier.)
289 		 */
290 		LWLockRelease(ProcArrayLock);
291 		ereport(FATAL,
292 				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
293 				 errmsg("sorry, too many clients already")));
294 	}
295 
296 	/*
297 	 * Keep the procs array sorted by (PGPROC *) so that we can utilize
298 	 * locality of references much better. This is useful while traversing the
299 	 * ProcArray because there is an increased likelihood of finding the next
300 	 * PGPROC structure in the cache.
301 	 *
302 	 * Since the occurrence of adding/removing a proc is much lower than the
303 	 * access to the ProcArray itself, the overhead should be marginal
304 	 */
305 	for (index = 0; index < arrayP->numProcs; index++)
306 	{
307 		/*
308 		 * If we are the first PGPROC or if we have found our right position
309 		 * in the array, break
310 		 */
311 		if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
312 			break;
313 	}
314 
315 	memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
316 			(arrayP->numProcs - index) * sizeof(int));
317 	arrayP->pgprocnos[index] = proc->pgprocno;
318 	arrayP->numProcs++;
319 
320 	LWLockRelease(ProcArrayLock);
321 }
322 
323 /*
324  * Remove the specified PGPROC from the shared array.
325  *
326  * When latestXid is a valid XID, we are removing a live 2PC gxact from the
327  * array, and thus causing it to appear as "not running" anymore.  In this
328  * case we must advance latestCompletedXid.  (This is essentially the same
329  * as ProcArrayEndTransaction followed by removal of the PGPROC, but we take
330  * the ProcArrayLock only once, and don't damage the content of the PGPROC;
331  * twophase.c depends on the latter.)
332  */
333 void
ProcArrayRemove(PGPROC * proc,TransactionId latestXid)334 ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
335 {
336 	ProcArrayStruct *arrayP = procArray;
337 	int			index;
338 
339 #ifdef XIDCACHE_DEBUG
340 	/* dump stats at backend shutdown, but not prepared-xact end */
341 	if (proc->pid != 0)
342 		DisplayXidCache();
343 #endif
344 
345 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
346 
347 	if (TransactionIdIsValid(latestXid))
348 	{
349 		Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
350 
351 		/* Advance global latestCompletedXid while holding the lock */
352 		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
353 								  latestXid))
354 			ShmemVariableCache->latestCompletedXid = latestXid;
355 	}
356 	else
357 	{
358 		/* Shouldn't be trying to remove a live transaction here */
359 		Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
360 	}
361 
362 	for (index = 0; index < arrayP->numProcs; index++)
363 	{
364 		if (arrayP->pgprocnos[index] == proc->pgprocno)
365 		{
366 			/* Keep the PGPROC array sorted. See notes above */
367 			memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
368 					(arrayP->numProcs - index - 1) * sizeof(int));
369 			arrayP->pgprocnos[arrayP->numProcs - 1] = -1;	/* for debugging */
370 			arrayP->numProcs--;
371 			LWLockRelease(ProcArrayLock);
372 			return;
373 		}
374 	}
375 
376 	/* Oops */
377 	LWLockRelease(ProcArrayLock);
378 
379 	elog(LOG, "failed to find proc %p in ProcArray", proc);
380 }
381 
382 
383 /*
384  * ProcArrayEndTransaction -- mark a transaction as no longer running
385  *
386  * This is used interchangeably for commit and abort cases.  The transaction
387  * commit/abort must already be reported to WAL and pg_xact.
388  *
389  * proc is currently always MyProc, but we pass it explicitly for flexibility.
390  * latestXid is the latest Xid among the transaction's main XID and
391  * subtransactions, or InvalidTransactionId if it has no XID.  (We must ask
392  * the caller to pass latestXid, instead of computing it from the PGPROC's
393  * contents, because the subxid information in the PGPROC might be
394  * incomplete.)
395  */
396 void
ProcArrayEndTransaction(PGPROC * proc,TransactionId latestXid)397 ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
398 {
399 	PGXACT	   *pgxact = &allPgXact[proc->pgprocno];
400 
401 	if (TransactionIdIsValid(latestXid))
402 	{
403 		/*
404 		 * We must lock ProcArrayLock while clearing our advertised XID, so
405 		 * that we do not exit the set of "running" transactions while someone
406 		 * else is taking a snapshot.  See discussion in
407 		 * src/backend/access/transam/README.
408 		 */
409 		Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
410 
411 		/*
412 		 * If we can immediately acquire ProcArrayLock, we clear our own XID
413 		 * and release the lock.  If not, use group XID clearing to improve
414 		 * efficiency.
415 		 */
416 		if (LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE))
417 		{
418 			ProcArrayEndTransactionInternal(proc, pgxact, latestXid);
419 			LWLockRelease(ProcArrayLock);
420 		}
421 		else
422 			ProcArrayGroupClearXid(proc, latestXid);
423 	}
424 	else
425 	{
426 		/*
427 		 * If we have no XID, we don't need to lock, since we won't affect
428 		 * anyone else's calculation of a snapshot.  We might change their
429 		 * estimate of global xmin, but that's OK.
430 		 */
431 		Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
432 
433 		proc->lxid = InvalidLocalTransactionId;
434 		pgxact->xmin = InvalidTransactionId;
435 		/* must be cleared with xid/xmin: */
436 		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
437 		pgxact->delayChkpt = false; /* be sure this is cleared in abort */
438 		proc->recoveryConflictPending = false;
439 
440 		Assert(pgxact->nxids == 0);
441 		Assert(pgxact->overflowed == false);
442 	}
443 }
444 
445 /*
446  * Mark a write transaction as no longer running.
447  *
448  * We don't do any locking here; caller must handle that.
449  */
450 static inline void
ProcArrayEndTransactionInternal(PGPROC * proc,PGXACT * pgxact,TransactionId latestXid)451 ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
452 								TransactionId latestXid)
453 {
454 	pgxact->xid = InvalidTransactionId;
455 	proc->lxid = InvalidLocalTransactionId;
456 	pgxact->xmin = InvalidTransactionId;
457 	/* must be cleared with xid/xmin: */
458 	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
459 	pgxact->delayChkpt = false; /* be sure this is cleared in abort */
460 	proc->recoveryConflictPending = false;
461 
462 	/* Clear the subtransaction-XID cache too while holding the lock */
463 	pgxact->nxids = 0;
464 	pgxact->overflowed = false;
465 
466 	/* Also advance global latestCompletedXid while holding the lock */
467 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
468 							  latestXid))
469 		ShmemVariableCache->latestCompletedXid = latestXid;
470 }
471 
472 /*
473  * ProcArrayGroupClearXid -- group XID clearing
474  *
475  * When we cannot immediately acquire ProcArrayLock in exclusive mode at
476  * commit time, add ourselves to a list of processes that need their XIDs
477  * cleared.  The first process to add itself to the list will acquire
478  * ProcArrayLock in exclusive mode and perform ProcArrayEndTransactionInternal
479  * on behalf of all group members.  This avoids a great deal of contention
480  * around ProcArrayLock when many processes are trying to commit at once,
481  * since the lock need not be repeatedly handed off from one committing
482  * process to the next.
483  */
484 static void
ProcArrayGroupClearXid(PGPROC * proc,TransactionId latestXid)485 ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid)
486 {
487 	PROC_HDR   *procglobal = ProcGlobal;
488 	uint32		nextidx;
489 	uint32		wakeidx;
490 
491 	/* We should definitely have an XID to clear. */
492 	Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
493 
494 	/* Add ourselves to the list of processes needing a group XID clear. */
495 	proc->procArrayGroupMember = true;
496 	proc->procArrayGroupMemberXid = latestXid;
497 	while (true)
498 	{
499 		nextidx = pg_atomic_read_u32(&procglobal->procArrayGroupFirst);
500 		pg_atomic_write_u32(&proc->procArrayGroupNext, nextidx);
501 
502 		if (pg_atomic_compare_exchange_u32(&procglobal->procArrayGroupFirst,
503 										   &nextidx,
504 										   (uint32) proc->pgprocno))
505 			break;
506 	}
507 
508 	/*
509 	 * If the list was not empty, the leader will clear our XID.  It is
510 	 * impossible to have followers without a leader because the first process
511 	 * that has added itself to the list will always have nextidx as
512 	 * INVALID_PGPROCNO.
513 	 */
514 	if (nextidx != INVALID_PGPROCNO)
515 	{
516 		int			extraWaits = 0;
517 
518 		/* Sleep until the leader clears our XID. */
519 		pgstat_report_wait_start(WAIT_EVENT_PROCARRAY_GROUP_UPDATE);
520 		for (;;)
521 		{
522 			/* acts as a read barrier */
523 			PGSemaphoreLock(proc->sem);
524 			if (!proc->procArrayGroupMember)
525 				break;
526 			extraWaits++;
527 		}
528 		pgstat_report_wait_end();
529 
530 		Assert(pg_atomic_read_u32(&proc->procArrayGroupNext) == INVALID_PGPROCNO);
531 
532 		/* Fix semaphore count for any absorbed wakeups */
533 		while (extraWaits-- > 0)
534 			PGSemaphoreUnlock(proc->sem);
535 		return;
536 	}
537 
538 	/* We are the leader.  Acquire the lock on behalf of everyone. */
539 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
540 
541 	/*
542 	 * Now that we've got the lock, clear the list of processes waiting for
543 	 * group XID clearing, saving a pointer to the head of the list.  Trying
544 	 * to pop elements one at a time could lead to an ABA problem.
545 	 */
546 	nextidx = pg_atomic_exchange_u32(&procglobal->procArrayGroupFirst,
547 									 INVALID_PGPROCNO);
548 
549 	/* Remember head of list so we can perform wakeups after dropping lock. */
550 	wakeidx = nextidx;
551 
552 	/* Walk the list and clear all XIDs. */
553 	while (nextidx != INVALID_PGPROCNO)
554 	{
555 		PGPROC	   *nextproc = &allProcs[nextidx];
556 		PGXACT	   *pgxact = &allPgXact[nextidx];
557 
558 		ProcArrayEndTransactionInternal(nextproc, pgxact, nextproc->procArrayGroupMemberXid);
559 
560 		/* Move to next proc in list. */
561 		nextidx = pg_atomic_read_u32(&nextproc->procArrayGroupNext);
562 	}
563 
564 	/* We're done with the lock now. */
565 	LWLockRelease(ProcArrayLock);
566 
567 	/*
568 	 * Now that we've released the lock, go back and wake everybody up.  We
569 	 * don't do this under the lock so as to keep lock hold times to a
570 	 * minimum.  The system calls we need to perform to wake other processes
571 	 * up are probably much slower than the simple memory writes we did while
572 	 * holding the lock.
573 	 */
574 	while (wakeidx != INVALID_PGPROCNO)
575 	{
576 		PGPROC	   *nextproc = &allProcs[wakeidx];
577 
578 		wakeidx = pg_atomic_read_u32(&nextproc->procArrayGroupNext);
579 		pg_atomic_write_u32(&nextproc->procArrayGroupNext, INVALID_PGPROCNO);
580 
581 		/* ensure all previous writes are visible before follower continues. */
582 		pg_write_barrier();
583 
584 		nextproc->procArrayGroupMember = false;
585 
586 		if (nextproc != MyProc)
587 			PGSemaphoreUnlock(nextproc->sem);
588 	}
589 }
590 
591 /*
592  * ProcArrayClearTransaction -- clear the transaction fields
593  *
594  * This is used after successfully preparing a 2-phase transaction.  We are
595  * not actually reporting the transaction's XID as no longer running --- it
596  * will still appear as running because the 2PC's gxact is in the ProcArray
597  * too.  We just have to clear out our own PGXACT.
598  */
599 void
ProcArrayClearTransaction(PGPROC * proc)600 ProcArrayClearTransaction(PGPROC *proc)
601 {
602 	PGXACT	   *pgxact = &allPgXact[proc->pgprocno];
603 
604 	/*
605 	 * We can skip locking ProcArrayLock here, because this action does not
606 	 * actually change anyone's view of the set of running XIDs: our entry is
607 	 * duplicate with the gxact that has already been inserted into the
608 	 * ProcArray.
609 	 */
610 	pgxact->xid = InvalidTransactionId;
611 	proc->lxid = InvalidLocalTransactionId;
612 	pgxact->xmin = InvalidTransactionId;
613 	proc->recoveryConflictPending = false;
614 
615 	/* redundant, but just in case */
616 	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
617 	pgxact->delayChkpt = false;
618 
619 	/* Clear the subtransaction-XID cache too */
620 	pgxact->nxids = 0;
621 	pgxact->overflowed = false;
622 }
623 
624 /*
625  * ProcArrayInitRecovery -- initialize recovery xid mgmt environment
626  *
627  * Remember up to where the startup process initialized the CLOG and subtrans
628  * so we can ensure it's initialized gaplessly up to the point where necessary
629  * while in recovery.
630  */
631 void
ProcArrayInitRecovery(TransactionId initializedUptoXID)632 ProcArrayInitRecovery(TransactionId initializedUptoXID)
633 {
634 	Assert(standbyState == STANDBY_INITIALIZED);
635 	Assert(TransactionIdIsNormal(initializedUptoXID));
636 
637 	/*
638 	 * we set latestObservedXid to the xid SUBTRANS has been initialized up
639 	 * to, so we can extend it from that point onwards in
640 	 * RecordKnownAssignedTransactionIds, and when we get consistent in
641 	 * ProcArrayApplyRecoveryInfo().
642 	 */
643 	latestObservedXid = initializedUptoXID;
644 	TransactionIdRetreat(latestObservedXid);
645 }
646 
647 /*
648  * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
649  *
650  * Takes us through 3 states: Initialized, Pending and Ready.
651  * Normal case is to go all the way to Ready straight away, though there
652  * are atypical cases where we need to take it in steps.
653  *
654  * Use the data about running transactions on master to create the initial
655  * state of KnownAssignedXids. We also use these records to regularly prune
656  * KnownAssignedXids because we know it is possible that some transactions
657  * with FATAL errors fail to write abort records, which could cause eventual
658  * overflow.
659  *
660  * See comments for LogStandbySnapshot().
661  */
662 void
ProcArrayApplyRecoveryInfo(RunningTransactions running)663 ProcArrayApplyRecoveryInfo(RunningTransactions running)
664 {
665 	TransactionId *xids;
666 	int			nxids;
667 	int			i;
668 
669 	Assert(standbyState >= STANDBY_INITIALIZED);
670 	Assert(TransactionIdIsValid(running->nextXid));
671 	Assert(TransactionIdIsValid(running->oldestRunningXid));
672 	Assert(TransactionIdIsNormal(running->latestCompletedXid));
673 
674 	/*
675 	 * Remove stale transactions, if any.
676 	 */
677 	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
678 
679 	/*
680 	 * Remove stale locks, if any.
681 	 */
682 	StandbyReleaseOldLocks(running->oldestRunningXid);
683 
684 	/*
685 	 * If our snapshot is already valid, nothing else to do...
686 	 */
687 	if (standbyState == STANDBY_SNAPSHOT_READY)
688 		return;
689 
690 	/*
691 	 * If our initial RunningTransactionsData had an overflowed snapshot then
692 	 * we knew we were missing some subxids from our snapshot. If we continue
693 	 * to see overflowed snapshots then we might never be able to start up, so
694 	 * we make another test to see if our snapshot is now valid. We know that
695 	 * the missing subxids are equal to or earlier than nextXid. After we
696 	 * initialise we continue to apply changes during recovery, so once the
697 	 * oldestRunningXid is later than the nextXid from the initial snapshot we
698 	 * know that we no longer have missing information and can mark the
699 	 * snapshot as valid.
700 	 */
701 	if (standbyState == STANDBY_SNAPSHOT_PENDING)
702 	{
703 		/*
704 		 * If the snapshot isn't overflowed or if its empty we can reset our
705 		 * pending state and use this snapshot instead.
706 		 */
707 		if (!running->subxid_overflow || running->xcnt == 0)
708 		{
709 			/*
710 			 * If we have already collected known assigned xids, we need to
711 			 * throw them away before we apply the recovery snapshot.
712 			 */
713 			KnownAssignedXidsReset();
714 			standbyState = STANDBY_INITIALIZED;
715 		}
716 		else
717 		{
718 			if (TransactionIdPrecedes(standbySnapshotPendingXmin,
719 									  running->oldestRunningXid))
720 			{
721 				standbyState = STANDBY_SNAPSHOT_READY;
722 				elog(trace_recovery(DEBUG1),
723 					 "recovery snapshots are now enabled");
724 			}
725 			else
726 				elog(trace_recovery(DEBUG1),
727 					 "recovery snapshot waiting for non-overflowed snapshot or "
728 					 "until oldest active xid on standby is at least %u (now %u)",
729 					 standbySnapshotPendingXmin,
730 					 running->oldestRunningXid);
731 			return;
732 		}
733 	}
734 
735 	Assert(standbyState == STANDBY_INITIALIZED);
736 
737 	/*
738 	 * OK, we need to initialise from the RunningTransactionsData record.
739 	 *
740 	 * NB: this can be reached at least twice, so make sure new code can deal
741 	 * with that.
742 	 */
743 
744 	/*
745 	 * Nobody else is running yet, but take locks anyhow
746 	 */
747 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
748 
749 	/*
750 	 * KnownAssignedXids is sorted so we cannot just add the xids, we have to
751 	 * sort them first.
752 	 *
753 	 * Some of the new xids are top-level xids and some are subtransactions.
754 	 * We don't call SubtransSetParent because it doesn't matter yet. If we
755 	 * aren't overflowed then all xids will fit in snapshot and so we don't
756 	 * need subtrans. If we later overflow, an xid assignment record will add
757 	 * xids to subtrans. If RunningXacts is overflowed then we don't have
758 	 * enough information to correctly update subtrans anyway.
759 	 */
760 
761 	/*
762 	 * Allocate a temporary array to avoid modifying the array passed as
763 	 * argument.
764 	 */
765 	xids = palloc(sizeof(TransactionId) * (running->xcnt + running->subxcnt));
766 
767 	/*
768 	 * Add to the temp array any xids which have not already completed.
769 	 */
770 	nxids = 0;
771 	for (i = 0; i < running->xcnt + running->subxcnt; i++)
772 	{
773 		TransactionId xid = running->xids[i];
774 
775 		/*
776 		 * The running-xacts snapshot can contain xids that were still visible
777 		 * in the procarray when the snapshot was taken, but were already
778 		 * WAL-logged as completed. They're not running anymore, so ignore
779 		 * them.
780 		 */
781 		if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
782 			continue;
783 
784 		xids[nxids++] = xid;
785 	}
786 
787 	if (nxids > 0)
788 	{
789 		if (procArray->numKnownAssignedXids != 0)
790 		{
791 			LWLockRelease(ProcArrayLock);
792 			elog(ERROR, "KnownAssignedXids is not empty");
793 		}
794 
795 		/*
796 		 * Sort the array so that we can add them safely into
797 		 * KnownAssignedXids.
798 		 */
799 		qsort(xids, nxids, sizeof(TransactionId), xidComparator);
800 
801 		/*
802 		 * Add the sorted snapshot into KnownAssignedXids.  The running-xacts
803 		 * snapshot may include duplicated xids because of prepared
804 		 * transactions, so ignore them.
805 		 */
806 		for (i = 0; i < nxids; i++)
807 		{
808 			if (i > 0 && TransactionIdEquals(xids[i - 1], xids[i]))
809 			{
810 				elog(DEBUG1,
811 					 "found duplicated transaction %u for KnownAssignedXids insertion",
812 					 xids[i]);
813 				continue;
814 			}
815 			KnownAssignedXidsAdd(xids[i], xids[i], true);
816 		}
817 
818 		KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
819 	}
820 
821 	pfree(xids);
822 
823 	/*
824 	 * latestObservedXid is at least set to the point where SUBTRANS was
825 	 * started up to (cf. ProcArrayInitRecovery()) or to the biggest xid
826 	 * RecordKnownAssignedTransactionIds() was called for.  Initialize
827 	 * subtrans from thereon, up to nextXid - 1.
828 	 *
829 	 * We need to duplicate parts of RecordKnownAssignedTransactionId() here,
830 	 * because we've just added xids to the known assigned xids machinery that
831 	 * haven't gone through RecordKnownAssignedTransactionId().
832 	 */
833 	Assert(TransactionIdIsNormal(latestObservedXid));
834 	TransactionIdAdvance(latestObservedXid);
835 	while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
836 	{
837 		ExtendSUBTRANS(latestObservedXid);
838 		TransactionIdAdvance(latestObservedXid);
839 	}
840 	TransactionIdRetreat(latestObservedXid);	/* = running->nextXid - 1 */
841 
842 	/* ----------
843 	 * Now we've got the running xids we need to set the global values that
844 	 * are used to track snapshots as they evolve further.
845 	 *
846 	 * - latestCompletedXid which will be the xmax for snapshots
847 	 * - lastOverflowedXid which shows whether snapshots overflow
848 	 * - nextXid
849 	 *
850 	 * If the snapshot overflowed, then we still initialise with what we know,
851 	 * but the recovery snapshot isn't fully valid yet because we know there
852 	 * are some subxids missing. We don't know the specific subxids that are
853 	 * missing, so conservatively assume the last one is latestObservedXid.
854 	 * ----------
855 	 */
856 	if (running->subxid_overflow)
857 	{
858 		standbyState = STANDBY_SNAPSHOT_PENDING;
859 
860 		standbySnapshotPendingXmin = latestObservedXid;
861 		procArray->lastOverflowedXid = latestObservedXid;
862 	}
863 	else
864 	{
865 		standbyState = STANDBY_SNAPSHOT_READY;
866 
867 		standbySnapshotPendingXmin = InvalidTransactionId;
868 	}
869 
870 	/*
871 	 * If a transaction wrote a commit record in the gap between taking and
872 	 * logging the snapshot then latestCompletedXid may already be higher than
873 	 * the value from the snapshot, so check before we use the incoming value.
874 	 */
875 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
876 							  running->latestCompletedXid))
877 		ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
878 
879 	Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));
880 
881 	LWLockRelease(ProcArrayLock);
882 
883 	/* ShmemVariableCache->nextFullXid must be beyond any observed xid. */
884 	AdvanceNextFullTransactionIdPastXid(latestObservedXid);
885 
886 	Assert(FullTransactionIdIsValid(ShmemVariableCache->nextFullXid));
887 
888 	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
889 	if (standbyState == STANDBY_SNAPSHOT_READY)
890 		elog(trace_recovery(DEBUG1), "recovery snapshots are now enabled");
891 	else
892 		elog(trace_recovery(DEBUG1),
893 			 "recovery snapshot waiting for non-overflowed snapshot or "
894 			 "until oldest active xid on standby is at least %u (now %u)",
895 			 standbySnapshotPendingXmin,
896 			 running->oldestRunningXid);
897 }
898 
899 /*
900  * ProcArrayApplyXidAssignment
901  *		Process an XLOG_XACT_ASSIGNMENT WAL record
902  */
903 void
ProcArrayApplyXidAssignment(TransactionId topxid,int nsubxids,TransactionId * subxids)904 ProcArrayApplyXidAssignment(TransactionId topxid,
905 							int nsubxids, TransactionId *subxids)
906 {
907 	TransactionId max_xid;
908 	int			i;
909 
910 	Assert(standbyState >= STANDBY_INITIALIZED);
911 
912 	max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
913 
914 	/*
915 	 * Mark all the subtransactions as observed.
916 	 *
917 	 * NOTE: This will fail if the subxid contains too many previously
918 	 * unobserved xids to fit into known-assigned-xids. That shouldn't happen
919 	 * as the code stands, because xid-assignment records should never contain
920 	 * more than PGPROC_MAX_CACHED_SUBXIDS entries.
921 	 */
922 	RecordKnownAssignedTransactionIds(max_xid);
923 
924 	/*
925 	 * Notice that we update pg_subtrans with the top-level xid, rather than
926 	 * the parent xid. This is a difference between normal processing and
927 	 * recovery, yet is still correct in all cases. The reason is that
928 	 * subtransaction commit is not marked in clog until commit processing, so
929 	 * all aborted subtransactions have already been clearly marked in clog.
930 	 * As a result we are able to refer directly to the top-level
931 	 * transaction's state rather than skipping through all the intermediate
932 	 * states in the subtransaction tree. This should be the first time we
933 	 * have attempted to SubTransSetParent().
934 	 */
935 	for (i = 0; i < nsubxids; i++)
936 		SubTransSetParent(subxids[i], topxid);
937 
938 	/* KnownAssignedXids isn't maintained yet, so we're done for now */
939 	if (standbyState == STANDBY_INITIALIZED)
940 		return;
941 
942 	/*
943 	 * Uses same locking as transaction commit
944 	 */
945 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
946 
947 	/*
948 	 * Remove subxids from known-assigned-xacts.
949 	 */
950 	KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
951 
952 	/*
953 	 * Advance lastOverflowedXid to be at least the last of these subxids.
954 	 */
955 	if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
956 		procArray->lastOverflowedXid = max_xid;
957 
958 	LWLockRelease(ProcArrayLock);
959 }
960 
961 /*
962  * TransactionIdIsInProgress -- is given transaction running in some backend
963  *
964  * Aside from some shortcuts such as checking RecentXmin and our own Xid,
965  * there are four possibilities for finding a running transaction:
966  *
967  * 1. The given Xid is a main transaction Id.  We will find this out cheaply
968  * by looking at the PGXACT struct for each backend.
969  *
970  * 2. The given Xid is one of the cached subxact Xids in the PGPROC array.
971  * We can find this out cheaply too.
972  *
973  * 3. In Hot Standby mode, we must search the KnownAssignedXids list to see
974  * if the Xid is running on the master.
975  *
976  * 4. Search the SubTrans tree to find the Xid's topmost parent, and then see
977  * if that is running according to PGXACT or KnownAssignedXids.  This is the
978  * slowest way, but sadly it has to be done always if the others failed,
979  * unless we see that the cached subxact sets are complete (none have
980  * overflowed).
981  *
982  * ProcArrayLock has to be held while we do 1, 2, 3.  If we save the top Xids
983  * while doing 1 and 3, we can release the ProcArrayLock while we do 4.
984  * This buys back some concurrency (and we can't retrieve the main Xids from
985  * PGXACT again anyway; see GetNewTransactionId).
986  */
987 bool
TransactionIdIsInProgress(TransactionId xid)988 TransactionIdIsInProgress(TransactionId xid)
989 {
990 	static TransactionId *xids = NULL;
991 	int			nxids = 0;
992 	ProcArrayStruct *arrayP = procArray;
993 	TransactionId topxid;
994 	int			i,
995 				j;
996 
997 	/*
998 	 * Don't bother checking a transaction older than RecentXmin; it could not
999 	 * possibly still be running.  (Note: in particular, this guarantees that
1000 	 * we reject InvalidTransactionId, FrozenTransactionId, etc as not
1001 	 * running.)
1002 	 */
1003 	if (TransactionIdPrecedes(xid, RecentXmin))
1004 	{
1005 		xc_by_recent_xmin_inc();
1006 		return false;
1007 	}
1008 
1009 	/*
1010 	 * We may have just checked the status of this transaction, so if it is
1011 	 * already known to be completed, we can fall out without any access to
1012 	 * shared memory.
1013 	 */
1014 	if (TransactionIdIsKnownCompleted(xid))
1015 	{
1016 		xc_by_known_xact_inc();
1017 		return false;
1018 	}
1019 
1020 	/*
1021 	 * Also, we can handle our own transaction (and subtransactions) without
1022 	 * any access to shared memory.
1023 	 */
1024 	if (TransactionIdIsCurrentTransactionId(xid))
1025 	{
1026 		xc_by_my_xact_inc();
1027 		return true;
1028 	}
1029 
1030 	/*
1031 	 * If first time through, get workspace to remember main XIDs in. We
1032 	 * malloc it permanently to avoid repeated palloc/pfree overhead.
1033 	 */
1034 	if (xids == NULL)
1035 	{
1036 		/*
1037 		 * In hot standby mode, reserve enough space to hold all xids in the
1038 		 * known-assigned list. If we later finish recovery, we no longer need
1039 		 * the bigger array, but we don't bother to shrink it.
1040 		 */
1041 		int			maxxids = RecoveryInProgress() ? TOTAL_MAX_CACHED_SUBXIDS : arrayP->maxProcs;
1042 
1043 		xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
1044 		if (xids == NULL)
1045 			ereport(ERROR,
1046 					(errcode(ERRCODE_OUT_OF_MEMORY),
1047 					 errmsg("out of memory")));
1048 	}
1049 
1050 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1051 
1052 	/*
1053 	 * Now that we have the lock, we can check latestCompletedXid; if the
1054 	 * target Xid is after that, it's surely still running.
1055 	 */
1056 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
1057 	{
1058 		LWLockRelease(ProcArrayLock);
1059 		xc_by_latest_xid_inc();
1060 		return true;
1061 	}
1062 
1063 	/* No shortcuts, gotta grovel through the array */
1064 	for (i = 0; i < arrayP->numProcs; i++)
1065 	{
1066 		int			pgprocno = arrayP->pgprocnos[i];
1067 		PGPROC	   *proc = &allProcs[pgprocno];
1068 		PGXACT	   *pgxact = &allPgXact[pgprocno];
1069 		TransactionId pxid;
1070 		int			pxids;
1071 
1072 		/* Ignore my own proc --- dealt with it above */
1073 		if (proc == MyProc)
1074 			continue;
1075 
1076 		/* Fetch xid just once - see GetNewTransactionId */
1077 		pxid = UINT32_ACCESS_ONCE(pgxact->xid);
1078 
1079 		if (!TransactionIdIsValid(pxid))
1080 			continue;
1081 
1082 		/*
1083 		 * Step 1: check the main Xid
1084 		 */
1085 		if (TransactionIdEquals(pxid, xid))
1086 		{
1087 			LWLockRelease(ProcArrayLock);
1088 			xc_by_main_xid_inc();
1089 			return true;
1090 		}
1091 
1092 		/*
1093 		 * We can ignore main Xids that are younger than the target Xid, since
1094 		 * the target could not possibly be their child.
1095 		 */
1096 		if (TransactionIdPrecedes(xid, pxid))
1097 			continue;
1098 
1099 		/*
1100 		 * Step 2: check the cached child-Xids arrays
1101 		 */
1102 		pxids = pgxact->nxids;
1103 		pg_read_barrier();		/* pairs with barrier in GetNewTransactionId() */
1104 		for (j = pxids - 1; j >= 0; j--)
1105 		{
1106 			/* Fetch xid just once - see GetNewTransactionId */
1107 			TransactionId cxid = UINT32_ACCESS_ONCE(proc->subxids.xids[j]);
1108 
1109 			if (TransactionIdEquals(cxid, xid))
1110 			{
1111 				LWLockRelease(ProcArrayLock);
1112 				xc_by_child_xid_inc();
1113 				return true;
1114 			}
1115 		}
1116 
1117 		/*
1118 		 * Save the main Xid for step 4.  We only need to remember main Xids
1119 		 * that have uncached children.  (Note: there is no race condition
1120 		 * here because the overflowed flag cannot be cleared, only set, while
1121 		 * we hold ProcArrayLock.  So we can't miss an Xid that we need to
1122 		 * worry about.)
1123 		 */
1124 		if (pgxact->overflowed)
1125 			xids[nxids++] = pxid;
1126 	}
1127 
1128 	/*
1129 	 * Step 3: in hot standby mode, check the known-assigned-xids list.  XIDs
1130 	 * in the list must be treated as running.
1131 	 */
1132 	if (RecoveryInProgress())
1133 	{
1134 		/* none of the PGXACT entries should have XIDs in hot standby mode */
1135 		Assert(nxids == 0);
1136 
1137 		if (KnownAssignedXidExists(xid))
1138 		{
1139 			LWLockRelease(ProcArrayLock);
1140 			xc_by_known_assigned_inc();
1141 			return true;
1142 		}
1143 
1144 		/*
1145 		 * If the KnownAssignedXids overflowed, we have to check pg_subtrans
1146 		 * too.  Fetch all xids from KnownAssignedXids that are lower than
1147 		 * xid, since if xid is a subtransaction its parent will always have a
1148 		 * lower value.  Note we will collect both main and subXIDs here, but
1149 		 * there's no help for it.
1150 		 */
1151 		if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
1152 			nxids = KnownAssignedXidsGet(xids, xid);
1153 	}
1154 
1155 	LWLockRelease(ProcArrayLock);
1156 
1157 	/*
1158 	 * If none of the relevant caches overflowed, we know the Xid is not
1159 	 * running without even looking at pg_subtrans.
1160 	 */
1161 	if (nxids == 0)
1162 	{
1163 		xc_no_overflow_inc();
1164 		return false;
1165 	}
1166 
1167 	/*
1168 	 * Step 4: have to check pg_subtrans.
1169 	 *
1170 	 * At this point, we know it's either a subtransaction of one of the Xids
1171 	 * in xids[], or it's not running.  If it's an already-failed
1172 	 * subtransaction, we want to say "not running" even though its parent may
1173 	 * still be running.  So first, check pg_xact to see if it's been aborted.
1174 	 */
1175 	xc_slow_answer_inc();
1176 
1177 	if (TransactionIdDidAbort(xid))
1178 		return false;
1179 
1180 	/*
1181 	 * It isn't aborted, so check whether the transaction tree it belongs to
1182 	 * is still running (or, more precisely, whether it was running when we
1183 	 * held ProcArrayLock).
1184 	 */
1185 	topxid = SubTransGetTopmostTransaction(xid);
1186 	Assert(TransactionIdIsValid(topxid));
1187 	if (!TransactionIdEquals(topxid, xid))
1188 	{
1189 		for (i = 0; i < nxids; i++)
1190 		{
1191 			if (TransactionIdEquals(xids[i], topxid))
1192 				return true;
1193 		}
1194 	}
1195 
1196 	return false;
1197 }
1198 
1199 /*
1200  * TransactionIdIsActive -- is xid the top-level XID of an active backend?
1201  *
1202  * This differs from TransactionIdIsInProgress in that it ignores prepared
1203  * transactions, as well as transactions running on the master if we're in
1204  * hot standby.  Also, we ignore subtransactions since that's not needed
1205  * for current uses.
1206  */
1207 bool
TransactionIdIsActive(TransactionId xid)1208 TransactionIdIsActive(TransactionId xid)
1209 {
1210 	bool		result = false;
1211 	ProcArrayStruct *arrayP = procArray;
1212 	int			i;
1213 
1214 	/*
1215 	 * Don't bother checking a transaction older than RecentXmin; it could not
1216 	 * possibly still be running.
1217 	 */
1218 	if (TransactionIdPrecedes(xid, RecentXmin))
1219 		return false;
1220 
1221 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1222 
1223 	for (i = 0; i < arrayP->numProcs; i++)
1224 	{
1225 		int			pgprocno = arrayP->pgprocnos[i];
1226 		PGPROC	   *proc = &allProcs[pgprocno];
1227 		PGXACT	   *pgxact = &allPgXact[pgprocno];
1228 		TransactionId pxid;
1229 
1230 		/* Fetch xid just once - see GetNewTransactionId */
1231 		pxid = UINT32_ACCESS_ONCE(pgxact->xid);
1232 
1233 		if (!TransactionIdIsValid(pxid))
1234 			continue;
1235 
1236 		if (proc->pid == 0)
1237 			continue;			/* ignore prepared transactions */
1238 
1239 		if (TransactionIdEquals(pxid, xid))
1240 		{
1241 			result = true;
1242 			break;
1243 		}
1244 	}
1245 
1246 	LWLockRelease(ProcArrayLock);
1247 
1248 	return result;
1249 }
1250 
1251 
1252 /*
1253  * GetOldestXmin -- returns oldest transaction that was running
1254  *					when any current transaction was started.
1255  *
1256  * If rel is NULL or a shared relation, all backends are considered, otherwise
1257  * only backends running in this database are considered.
1258  *
1259  * The flags are used to ignore the backends in calculation when any of the
1260  * corresponding flags is set. Typically, if you want to ignore ones with
1261  * PROC_IN_VACUUM flag, you can use PROCARRAY_FLAGS_VACUUM.
1262  *
1263  * PROCARRAY_SLOTS_XMIN causes GetOldestXmin to ignore the xmin and
1264  * catalog_xmin of any replication slots that exist in the system when
1265  * calculating the oldest xmin.
1266  *
1267  * This is used by VACUUM to decide which deleted tuples must be preserved in
1268  * the passed in table. For shared relations backends in all databases must be
1269  * considered, but for non-shared relations that's not required, since only
1270  * backends in my own database could ever see the tuples in them. Also, we can
1271  * ignore concurrently running lazy VACUUMs because (a) they must be working
1272  * on other tables, and (b) they don't need to do snapshot-based lookups.
1273  *
1274  * This is also used to determine where to truncate pg_subtrans.  For that
1275  * backends in all databases have to be considered, so rel = NULL has to be
1276  * passed in.
1277  *
1278  * Note: we include all currently running xids in the set of considered xids.
1279  * This ensures that if a just-started xact has not yet set its snapshot,
1280  * when it does set the snapshot it cannot set xmin less than what we compute.
1281  * See notes in src/backend/access/transam/README.
1282  *
1283  * Note: despite the above, it's possible for the calculated value to move
1284  * backwards on repeated calls. The calculated value is conservative, so that
1285  * anything older is definitely not considered as running by anyone anymore,
1286  * but the exact value calculated depends on a number of things. For example,
1287  * if rel = NULL and there are no transactions running in the current
1288  * database, GetOldestXmin() returns latestCompletedXid. If a transaction
1289  * begins after that, its xmin will include in-progress transactions in other
1290  * databases that started earlier, so another call will return a lower value.
1291  * Nonetheless it is safe to vacuum a table in the current database with the
1292  * first result.  There are also replication-related effects: a walsender
1293  * process can set its xmin based on transactions that are no longer running
1294  * in the master but are still being replayed on the standby, thus possibly
1295  * making the GetOldestXmin reading go backwards.  In this case there is a
1296  * possibility that we lose data that the standby would like to have, but
1297  * unless the standby uses a replication slot to make its xmin persistent
1298  * there is little we can do about that --- data is only protected if the
1299  * walsender runs continuously while queries are executed on the standby.
1300  * (The Hot Standby code deals with such cases by failing standby queries
1301  * that needed to access already-removed data, so there's no integrity bug.)
1302  * The return value is also adjusted with vacuum_defer_cleanup_age, so
1303  * increasing that setting on the fly is another easy way to make
1304  * GetOldestXmin() move backwards, with no consequences for data integrity.
1305  */
1306 TransactionId
GetOldestXmin(Relation rel,int flags)1307 GetOldestXmin(Relation rel, int flags)
1308 {
1309 	ProcArrayStruct *arrayP = procArray;
1310 	TransactionId result;
1311 	int			index;
1312 	bool		allDbs;
1313 
1314 	TransactionId replication_slot_xmin = InvalidTransactionId;
1315 	TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1316 
1317 	/*
1318 	 * If we're not computing a relation specific limit, or if a shared
1319 	 * relation has been passed in, backends in all databases have to be
1320 	 * considered.
1321 	 */
1322 	allDbs = rel == NULL || rel->rd_rel->relisshared;
1323 
1324 	/* Cannot look for individual databases during recovery */
1325 	Assert(allDbs || !RecoveryInProgress());
1326 
1327 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1328 
1329 	/*
1330 	 * We initialize the MIN() calculation with latestCompletedXid + 1. This
1331 	 * is a lower bound for the XIDs that might appear in the ProcArray later,
1332 	 * and so protects us against overestimating the result due to future
1333 	 * additions.
1334 	 */
1335 	result = ShmemVariableCache->latestCompletedXid;
1336 	Assert(TransactionIdIsNormal(result));
1337 	TransactionIdAdvance(result);
1338 
1339 	for (index = 0; index < arrayP->numProcs; index++)
1340 	{
1341 		int			pgprocno = arrayP->pgprocnos[index];
1342 		PGPROC	   *proc = &allProcs[pgprocno];
1343 		PGXACT	   *pgxact = &allPgXact[pgprocno];
1344 
1345 		if (pgxact->vacuumFlags & (flags & PROCARRAY_PROC_FLAGS_MASK))
1346 			continue;
1347 
1348 		if (allDbs ||
1349 			proc->databaseId == MyDatabaseId ||
1350 			proc->databaseId == 0)	/* always include WalSender */
1351 		{
1352 			/* Fetch xid just once - see GetNewTransactionId */
1353 			TransactionId xid = UINT32_ACCESS_ONCE(pgxact->xid);
1354 
1355 			/* First consider the transaction's own Xid, if any */
1356 			if (TransactionIdIsNormal(xid) &&
1357 				TransactionIdPrecedes(xid, result))
1358 				result = xid;
1359 
1360 			/*
1361 			 * Also consider the transaction's Xmin, if set.
1362 			 *
1363 			 * We must check both Xid and Xmin because a transaction might
1364 			 * have an Xmin but not (yet) an Xid; conversely, if it has an
1365 			 * Xid, that could determine some not-yet-set Xmin.
1366 			 */
1367 			xid = UINT32_ACCESS_ONCE(pgxact->xmin);
1368 			if (TransactionIdIsNormal(xid) &&
1369 				TransactionIdPrecedes(xid, result))
1370 				result = xid;
1371 		}
1372 	}
1373 
1374 	/*
1375 	 * Fetch into local variable while ProcArrayLock is held - the
1376 	 * LWLockRelease below is a barrier, ensuring this happens inside the
1377 	 * lock.
1378 	 */
1379 	replication_slot_xmin = procArray->replication_slot_xmin;
1380 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1381 
1382 	if (RecoveryInProgress())
1383 	{
1384 		/*
1385 		 * Check to see whether KnownAssignedXids contains an xid value older
1386 		 * than the main procarray.
1387 		 */
1388 		TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
1389 
1390 		LWLockRelease(ProcArrayLock);
1391 
1392 		if (TransactionIdIsNormal(kaxmin) &&
1393 			TransactionIdPrecedes(kaxmin, result))
1394 			result = kaxmin;
1395 	}
1396 	else
1397 	{
1398 		/*
1399 		 * No other information needed, so release the lock immediately.
1400 		 */
1401 		LWLockRelease(ProcArrayLock);
1402 
1403 		/*
1404 		 * Compute the cutoff XID by subtracting vacuum_defer_cleanup_age,
1405 		 * being careful not to generate a "permanent" XID.
1406 		 *
1407 		 * vacuum_defer_cleanup_age provides some additional "slop" for the
1408 		 * benefit of hot standby queries on standby servers.  This is quick
1409 		 * and dirty, and perhaps not all that useful unless the master has a
1410 		 * predictable transaction rate, but it offers some protection when
1411 		 * there's no walsender connection.  Note that we are assuming
1412 		 * vacuum_defer_cleanup_age isn't large enough to cause wraparound ---
1413 		 * so guc.c should limit it to no more than the xidStopLimit threshold
1414 		 * in varsup.c.  Also note that we intentionally don't apply
1415 		 * vacuum_defer_cleanup_age on standby servers.
1416 		 */
1417 		result -= vacuum_defer_cleanup_age;
1418 		if (!TransactionIdIsNormal(result))
1419 			result = FirstNormalTransactionId;
1420 	}
1421 
1422 	/*
1423 	 * Check whether there are replication slots requiring an older xmin.
1424 	 */
1425 	if (!(flags & PROCARRAY_SLOTS_XMIN) &&
1426 		TransactionIdIsValid(replication_slot_xmin) &&
1427 		NormalTransactionIdPrecedes(replication_slot_xmin, result))
1428 		result = replication_slot_xmin;
1429 
1430 	/*
1431 	 * After locks have been released and defer_cleanup_age has been applied,
1432 	 * check whether we need to back up further to make logical decoding
1433 	 * possible. We need to do so if we're computing the global limit (rel =
1434 	 * NULL) or if the passed relation is a catalog relation of some kind.
1435 	 */
1436 	if (!(flags & PROCARRAY_SLOTS_XMIN) &&
1437 		(rel == NULL ||
1438 		 RelationIsAccessibleInLogicalDecoding(rel)) &&
1439 		TransactionIdIsValid(replication_slot_catalog_xmin) &&
1440 		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
1441 		result = replication_slot_catalog_xmin;
1442 
1443 	return result;
1444 }
1445 
1446 /*
1447  * GetMaxSnapshotXidCount -- get max size for snapshot XID array
1448  *
1449  * We have to export this for use by snapmgr.c.
1450  */
1451 int
GetMaxSnapshotXidCount(void)1452 GetMaxSnapshotXidCount(void)
1453 {
1454 	return procArray->maxProcs;
1455 }
1456 
1457 /*
1458  * GetMaxSnapshotSubxidCount -- get max size for snapshot sub-XID array
1459  *
1460  * We have to export this for use by snapmgr.c.
1461  */
1462 int
GetMaxSnapshotSubxidCount(void)1463 GetMaxSnapshotSubxidCount(void)
1464 {
1465 	return TOTAL_MAX_CACHED_SUBXIDS;
1466 }
1467 
1468 /*
1469  * GetSnapshotData -- returns information about running transactions.
1470  *
1471  * The returned snapshot includes xmin (lowest still-running xact ID),
1472  * xmax (highest completed xact ID + 1), and a list of running xact IDs
1473  * in the range xmin <= xid < xmax.  It is used as follows:
1474  *		All xact IDs < xmin are considered finished.
1475  *		All xact IDs >= xmax are considered still running.
1476  *		For an xact ID xmin <= xid < xmax, consult list to see whether
1477  *		it is considered running or not.
1478  * This ensures that the set of transactions seen as "running" by the
1479  * current xact will not change after it takes the snapshot.
1480  *
1481  * All running top-level XIDs are included in the snapshot, except for lazy
1482  * VACUUM processes.  We also try to include running subtransaction XIDs,
1483  * but since PGPROC has only a limited cache area for subxact XIDs, full
1484  * information may not be available.  If we find any overflowed subxid arrays,
1485  * we have to mark the snapshot's subxid data as overflowed, and extra work
1486  * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
1487  * in heapam_visibility.c).
1488  *
1489  * We also update the following backend-global variables:
1490  *		TransactionXmin: the oldest xmin of any snapshot in use in the
1491  *			current transaction (this is the same as MyPgXact->xmin).
1492  *		RecentXmin: the xmin computed for the most recent snapshot.  XIDs
1493  *			older than this are known not running any more.
1494  *		RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
1495  *			running transactions, except those running LAZY VACUUM).  This is
1496  *			the same computation done by
1497  *			GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM).
1498  *		RecentGlobalDataXmin: the global xmin for non-catalog tables
1499  *			>= RecentGlobalXmin
1500  *
1501  * Note: this function should probably not be called with an argument that's
1502  * not statically allocated (see xip allocation below).
1503  */
1504 Snapshot
GetSnapshotData(Snapshot snapshot)1505 GetSnapshotData(Snapshot snapshot)
1506 {
1507 	ProcArrayStruct *arrayP = procArray;
1508 	TransactionId xmin;
1509 	TransactionId xmax;
1510 	TransactionId globalxmin;
1511 	int			index;
1512 	int			count = 0;
1513 	int			subcount = 0;
1514 	bool		suboverflowed = false;
1515 	TransactionId replication_slot_xmin = InvalidTransactionId;
1516 	TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1517 
1518 	Assert(snapshot != NULL);
1519 
1520 	/*
1521 	 * Allocating space for maxProcs xids is usually overkill; numProcs would
1522 	 * be sufficient.  But it seems better to do the malloc while not holding
1523 	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
1524 	 * more subxip storage than is probably needed.
1525 	 *
1526 	 * This does open a possibility for avoiding repeated malloc/free: since
1527 	 * maxProcs does not change at runtime, we can simply reuse the previous
1528 	 * xip arrays if any.  (This relies on the fact that all callers pass
1529 	 * static SnapshotData structs.)
1530 	 */
1531 	if (snapshot->xip == NULL)
1532 	{
1533 		/*
1534 		 * First call for this snapshot. Snapshot is same size whether or not
1535 		 * we are in recovery, see later comments.
1536 		 */
1537 		snapshot->xip = (TransactionId *)
1538 			malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
1539 		if (snapshot->xip == NULL)
1540 			ereport(ERROR,
1541 					(errcode(ERRCODE_OUT_OF_MEMORY),
1542 					 errmsg("out of memory")));
1543 		Assert(snapshot->subxip == NULL);
1544 		snapshot->subxip = (TransactionId *)
1545 			malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
1546 		if (snapshot->subxip == NULL)
1547 			ereport(ERROR,
1548 					(errcode(ERRCODE_OUT_OF_MEMORY),
1549 					 errmsg("out of memory")));
1550 	}
1551 
1552 	/*
1553 	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
1554 	 * going to set MyPgXact->xmin.
1555 	 */
1556 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1557 
1558 	/* xmax is always latestCompletedXid + 1 */
1559 	xmax = ShmemVariableCache->latestCompletedXid;
1560 	Assert(TransactionIdIsNormal(xmax));
1561 	TransactionIdAdvance(xmax);
1562 
1563 	/* initialize xmin calculation with xmax */
1564 	globalxmin = xmin = xmax;
1565 
1566 	snapshot->takenDuringRecovery = RecoveryInProgress();
1567 
1568 	if (!snapshot->takenDuringRecovery)
1569 	{
1570 		int		   *pgprocnos = arrayP->pgprocnos;
1571 		int			numProcs;
1572 
1573 		/*
1574 		 * Spin over procArray checking xid, xmin, and subxids.  The goal is
1575 		 * to gather all active xids, find the lowest xmin, and try to record
1576 		 * subxids.
1577 		 */
1578 		numProcs = arrayP->numProcs;
1579 		for (index = 0; index < numProcs; index++)
1580 		{
1581 			int			pgprocno = pgprocnos[index];
1582 			PGXACT	   *pgxact = &allPgXact[pgprocno];
1583 			TransactionId xid;
1584 
1585 			/*
1586 			 * Skip over backends doing logical decoding which manages xmin
1587 			 * separately (check below) and ones running LAZY VACUUM.
1588 			 */
1589 			if (pgxact->vacuumFlags &
1590 				(PROC_IN_LOGICAL_DECODING | PROC_IN_VACUUM))
1591 				continue;
1592 
1593 			/* Update globalxmin to be the smallest valid xmin */
1594 			xid = UINT32_ACCESS_ONCE(pgxact->xmin);
1595 			if (TransactionIdIsNormal(xid) &&
1596 				NormalTransactionIdPrecedes(xid, globalxmin))
1597 				globalxmin = xid;
1598 
1599 			/* Fetch xid just once - see GetNewTransactionId */
1600 			xid = UINT32_ACCESS_ONCE(pgxact->xid);
1601 
1602 			/*
1603 			 * If the transaction has no XID assigned, we can skip it; it
1604 			 * won't have sub-XIDs either.  If the XID is >= xmax, we can also
1605 			 * skip it; such transactions will be treated as running anyway
1606 			 * (and any sub-XIDs will also be >= xmax).
1607 			 */
1608 			if (!TransactionIdIsNormal(xid)
1609 				|| !NormalTransactionIdPrecedes(xid, xmax))
1610 				continue;
1611 
1612 			/*
1613 			 * We don't include our own XIDs (if any) in the snapshot, but we
1614 			 * must include them in xmin.
1615 			 */
1616 			if (NormalTransactionIdPrecedes(xid, xmin))
1617 				xmin = xid;
1618 			if (pgxact == MyPgXact)
1619 				continue;
1620 
1621 			/* Add XID to snapshot. */
1622 			snapshot->xip[count++] = xid;
1623 
1624 			/*
1625 			 * Save subtransaction XIDs if possible (if we've already
1626 			 * overflowed, there's no point).  Note that the subxact XIDs must
1627 			 * be later than their parent, so no need to check them against
1628 			 * xmin.  We could filter against xmax, but it seems better not to
1629 			 * do that much work while holding the ProcArrayLock.
1630 			 *
1631 			 * The other backend can add more subxids concurrently, but cannot
1632 			 * remove any.  Hence it's important to fetch nxids just once.
1633 			 * Should be safe to use memcpy, though.  (We needn't worry about
1634 			 * missing any xids added concurrently, because they must postdate
1635 			 * xmax.)
1636 			 *
1637 			 * Again, our own XIDs are not included in the snapshot.
1638 			 */
1639 			if (!suboverflowed)
1640 			{
1641 				if (pgxact->overflowed)
1642 					suboverflowed = true;
1643 				else
1644 				{
1645 					int			nxids = pgxact->nxids;
1646 
1647 					if (nxids > 0)
1648 					{
1649 						PGPROC	   *proc = &allProcs[pgprocno];
1650 
1651 						pg_read_barrier();	/* pairs with GetNewTransactionId */
1652 
1653 						memcpy(snapshot->subxip + subcount,
1654 							   (void *) proc->subxids.xids,
1655 							   nxids * sizeof(TransactionId));
1656 						subcount += nxids;
1657 					}
1658 				}
1659 			}
1660 		}
1661 	}
1662 	else
1663 	{
1664 		/*
1665 		 * We're in hot standby, so get XIDs from KnownAssignedXids.
1666 		 *
1667 		 * We store all xids directly into subxip[]. Here's why:
1668 		 *
1669 		 * In recovery we don't know which xids are top-level and which are
1670 		 * subxacts, a design choice that greatly simplifies xid processing.
1671 		 *
1672 		 * It seems like we would want to try to put xids into xip[] only, but
1673 		 * that is fairly small. We would either need to make that bigger or
1674 		 * to increase the rate at which we WAL-log xid assignment; neither is
1675 		 * an appealing choice.
1676 		 *
1677 		 * We could try to store xids into xip[] first and then into subxip[]
1678 		 * if there are too many xids. That only works if the snapshot doesn't
1679 		 * overflow because we do not search subxip[] in that case. A simpler
1680 		 * way is to just store all xids in the subxact array because this is
1681 		 * by far the bigger array. We just leave the xip array empty.
1682 		 *
1683 		 * Either way we need to change the way XidInMVCCSnapshot() works
1684 		 * depending upon when the snapshot was taken, or change normal
1685 		 * snapshot processing so it matches.
1686 		 *
1687 		 * Note: It is possible for recovery to end before we finish taking
1688 		 * the snapshot, and for newly assigned transaction ids to be added to
1689 		 * the ProcArray.  xmax cannot change while we hold ProcArrayLock, so
1690 		 * those newly added transaction ids would be filtered away, so we
1691 		 * need not be concerned about them.
1692 		 */
1693 		subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
1694 												  xmax);
1695 
1696 		if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
1697 			suboverflowed = true;
1698 	}
1699 
1700 
1701 	/*
1702 	 * Fetch into local variable while ProcArrayLock is held - the
1703 	 * LWLockRelease below is a barrier, ensuring this happens inside the
1704 	 * lock.
1705 	 */
1706 	replication_slot_xmin = procArray->replication_slot_xmin;
1707 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1708 
1709 	if (!TransactionIdIsValid(MyPgXact->xmin))
1710 		MyPgXact->xmin = TransactionXmin = xmin;
1711 
1712 	LWLockRelease(ProcArrayLock);
1713 
1714 	/*
1715 	 * Update globalxmin to include actual process xids.  This is a slightly
1716 	 * different way of computing it than GetOldestXmin uses, but should give
1717 	 * the same result.
1718 	 */
1719 	if (TransactionIdPrecedes(xmin, globalxmin))
1720 		globalxmin = xmin;
1721 
1722 	/* Update global variables too */
1723 	RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
1724 	if (!TransactionIdIsNormal(RecentGlobalXmin))
1725 		RecentGlobalXmin = FirstNormalTransactionId;
1726 
1727 	/* Check whether there's a replication slot requiring an older xmin. */
1728 	if (TransactionIdIsValid(replication_slot_xmin) &&
1729 		NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
1730 		RecentGlobalXmin = replication_slot_xmin;
1731 
1732 	/* Non-catalog tables can be vacuumed if older than this xid */
1733 	RecentGlobalDataXmin = RecentGlobalXmin;
1734 
1735 	/*
1736 	 * Check whether there's a replication slot requiring an older catalog
1737 	 * xmin.
1738 	 */
1739 	if (TransactionIdIsNormal(replication_slot_catalog_xmin) &&
1740 		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, RecentGlobalXmin))
1741 		RecentGlobalXmin = replication_slot_catalog_xmin;
1742 
1743 	RecentXmin = xmin;
1744 
1745 	snapshot->xmin = xmin;
1746 	snapshot->xmax = xmax;
1747 	snapshot->xcnt = count;
1748 	snapshot->subxcnt = subcount;
1749 	snapshot->suboverflowed = suboverflowed;
1750 
1751 	snapshot->curcid = GetCurrentCommandId(false);
1752 
1753 	/*
1754 	 * This is a new snapshot, so set both refcounts are zero, and mark it as
1755 	 * not copied in persistent memory.
1756 	 */
1757 	snapshot->active_count = 0;
1758 	snapshot->regd_count = 0;
1759 	snapshot->copied = false;
1760 
1761 	if (old_snapshot_threshold < 0)
1762 	{
1763 		/*
1764 		 * If not using "snapshot too old" feature, fill related fields with
1765 		 * dummy values that don't require any locking.
1766 		 */
1767 		snapshot->lsn = InvalidXLogRecPtr;
1768 		snapshot->whenTaken = 0;
1769 	}
1770 	else
1771 	{
1772 		/*
1773 		 * Capture the current time and WAL stream location in case this
1774 		 * snapshot becomes old enough to need to fall back on the special
1775 		 * "old snapshot" logic.
1776 		 */
1777 		snapshot->lsn = GetXLogInsertRecPtr();
1778 		snapshot->whenTaken = GetSnapshotCurrentTimestamp();
1779 		MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
1780 	}
1781 
1782 	return snapshot;
1783 }
1784 
1785 /*
1786  * ProcArrayInstallImportedXmin -- install imported xmin into MyPgXact->xmin
1787  *
1788  * This is called when installing a snapshot imported from another
1789  * transaction.  To ensure that OldestXmin doesn't go backwards, we must
1790  * check that the source transaction is still running, and we'd better do
1791  * that atomically with installing the new xmin.
1792  *
1793  * Returns true if successful, false if source xact is no longer running.
1794  */
1795 bool
ProcArrayInstallImportedXmin(TransactionId xmin,VirtualTransactionId * sourcevxid)1796 ProcArrayInstallImportedXmin(TransactionId xmin,
1797 							 VirtualTransactionId *sourcevxid)
1798 {
1799 	bool		result = false;
1800 	ProcArrayStruct *arrayP = procArray;
1801 	int			index;
1802 
1803 	Assert(TransactionIdIsNormal(xmin));
1804 	if (!sourcevxid)
1805 		return false;
1806 
1807 	/* Get lock so source xact can't end while we're doing this */
1808 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1809 
1810 	for (index = 0; index < arrayP->numProcs; index++)
1811 	{
1812 		int			pgprocno = arrayP->pgprocnos[index];
1813 		PGPROC	   *proc = &allProcs[pgprocno];
1814 		PGXACT	   *pgxact = &allPgXact[pgprocno];
1815 		TransactionId xid;
1816 
1817 		/* Ignore procs running LAZY VACUUM */
1818 		if (pgxact->vacuumFlags & PROC_IN_VACUUM)
1819 			continue;
1820 
1821 		/* We are only interested in the specific virtual transaction. */
1822 		if (proc->backendId != sourcevxid->backendId)
1823 			continue;
1824 		if (proc->lxid != sourcevxid->localTransactionId)
1825 			continue;
1826 
1827 		/*
1828 		 * We check the transaction's database ID for paranoia's sake: if it's
1829 		 * in another DB then its xmin does not cover us.  Caller should have
1830 		 * detected this already, so we just treat any funny cases as
1831 		 * "transaction not found".
1832 		 */
1833 		if (proc->databaseId != MyDatabaseId)
1834 			continue;
1835 
1836 		/*
1837 		 * Likewise, let's just make real sure its xmin does cover us.
1838 		 */
1839 		xid = UINT32_ACCESS_ONCE(pgxact->xmin);
1840 		if (!TransactionIdIsNormal(xid) ||
1841 			!TransactionIdPrecedesOrEquals(xid, xmin))
1842 			continue;
1843 
1844 		/*
1845 		 * We're good.  Install the new xmin.  As in GetSnapshotData, set
1846 		 * TransactionXmin too.  (Note that because snapmgr.c called
1847 		 * GetSnapshotData first, we'll be overwriting a valid xmin here, so
1848 		 * we don't check that.)
1849 		 */
1850 		MyPgXact->xmin = TransactionXmin = xmin;
1851 
1852 		result = true;
1853 		break;
1854 	}
1855 
1856 	LWLockRelease(ProcArrayLock);
1857 
1858 	return result;
1859 }
1860 
1861 /*
1862  * ProcArrayInstallRestoredXmin -- install restored xmin into MyPgXact->xmin
1863  *
1864  * This is like ProcArrayInstallImportedXmin, but we have a pointer to the
1865  * PGPROC of the transaction from which we imported the snapshot, rather than
1866  * an XID.
1867  *
1868  * Returns true if successful, false if source xact is no longer running.
1869  */
1870 bool
ProcArrayInstallRestoredXmin(TransactionId xmin,PGPROC * proc)1871 ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
1872 {
1873 	bool		result = false;
1874 	TransactionId xid;
1875 	PGXACT	   *pgxact;
1876 
1877 	Assert(TransactionIdIsNormal(xmin));
1878 	Assert(proc != NULL);
1879 
1880 	/* Get lock so source xact can't end while we're doing this */
1881 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1882 
1883 	pgxact = &allPgXact[proc->pgprocno];
1884 
1885 	/*
1886 	 * Be certain that the referenced PGPROC has an advertised xmin which is
1887 	 * no later than the one we're installing, so that the system-wide xmin
1888 	 * can't go backwards.  Also, make sure it's running in the same database,
1889 	 * so that the per-database xmin cannot go backwards.
1890 	 */
1891 	xid = UINT32_ACCESS_ONCE(pgxact->xmin);
1892 	if (proc->databaseId == MyDatabaseId &&
1893 		TransactionIdIsNormal(xid) &&
1894 		TransactionIdPrecedesOrEquals(xid, xmin))
1895 	{
1896 		MyPgXact->xmin = TransactionXmin = xmin;
1897 		result = true;
1898 	}
1899 
1900 	LWLockRelease(ProcArrayLock);
1901 
1902 	return result;
1903 }
1904 
1905 /*
1906  * GetRunningTransactionData -- returns information about running transactions.
1907  *
1908  * Similar to GetSnapshotData but returns more information. We include
1909  * all PGXACTs with an assigned TransactionId, even VACUUM processes and
1910  * prepared transactions.
1911  *
1912  * We acquire XidGenLock and ProcArrayLock, but the caller is responsible for
1913  * releasing them. Acquiring XidGenLock ensures that no new XIDs enter the proc
1914  * array until the caller has WAL-logged this snapshot, and releases the
1915  * lock. Acquiring ProcArrayLock ensures that no transactions commit until the
1916  * lock is released.
1917  *
1918  * The returned data structure is statically allocated; caller should not
1919  * modify it, and must not assume it is valid past the next call.
1920  *
1921  * This is never executed during recovery so there is no need to look at
1922  * KnownAssignedXids.
1923  *
1924  * Dummy PGXACTs from prepared transaction are included, meaning that this
1925  * may return entries with duplicated TransactionId values coming from
1926  * transaction finishing to prepare.  Nothing is done about duplicated
1927  * entries here to not hold on ProcArrayLock more than necessary.
1928  *
1929  * We don't worry about updating other counters, we want to keep this as
1930  * simple as possible and leave GetSnapshotData() as the primary code for
1931  * that bookkeeping.
1932  *
1933  * Note that if any transaction has overflowed its cached subtransactions
1934  * then there is no real need include any subtransactions.
1935  */
1936 RunningTransactions
GetRunningTransactionData(void)1937 GetRunningTransactionData(void)
1938 {
1939 	/* result workspace */
1940 	static RunningTransactionsData CurrentRunningXactsData;
1941 
1942 	ProcArrayStruct *arrayP = procArray;
1943 	RunningTransactions CurrentRunningXacts = &CurrentRunningXactsData;
1944 	TransactionId latestCompletedXid;
1945 	TransactionId oldestRunningXid;
1946 	TransactionId *xids;
1947 	int			index;
1948 	int			count;
1949 	int			subcount;
1950 	bool		suboverflowed;
1951 
1952 	Assert(!RecoveryInProgress());
1953 
1954 	/*
1955 	 * Allocating space for maxProcs xids is usually overkill; numProcs would
1956 	 * be sufficient.  But it seems better to do the malloc while not holding
1957 	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
1958 	 * more subxip storage than is probably needed.
1959 	 *
1960 	 * Should only be allocated in bgwriter, since only ever executed during
1961 	 * checkpoints.
1962 	 */
1963 	if (CurrentRunningXacts->xids == NULL)
1964 	{
1965 		/*
1966 		 * First call
1967 		 */
1968 		CurrentRunningXacts->xids = (TransactionId *)
1969 			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
1970 		if (CurrentRunningXacts->xids == NULL)
1971 			ereport(ERROR,
1972 					(errcode(ERRCODE_OUT_OF_MEMORY),
1973 					 errmsg("out of memory")));
1974 	}
1975 
1976 	xids = CurrentRunningXacts->xids;
1977 
1978 	count = subcount = 0;
1979 	suboverflowed = false;
1980 
1981 	/*
1982 	 * Ensure that no xids enter or leave the procarray while we obtain
1983 	 * snapshot.
1984 	 */
1985 	LWLockAcquire(ProcArrayLock, LW_SHARED);
1986 	LWLockAcquire(XidGenLock, LW_SHARED);
1987 
1988 	latestCompletedXid = ShmemVariableCache->latestCompletedXid;
1989 
1990 	oldestRunningXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
1991 
1992 	/*
1993 	 * Spin over procArray collecting all xids
1994 	 */
1995 	for (index = 0; index < arrayP->numProcs; index++)
1996 	{
1997 		int			pgprocno = arrayP->pgprocnos[index];
1998 		PGXACT	   *pgxact = &allPgXact[pgprocno];
1999 		TransactionId xid;
2000 
2001 		/* Fetch xid just once - see GetNewTransactionId */
2002 		xid = UINT32_ACCESS_ONCE(pgxact->xid);
2003 
2004 		/*
2005 		 * We don't need to store transactions that don't have a TransactionId
2006 		 * yet because they will not show as running on a standby server.
2007 		 */
2008 		if (!TransactionIdIsValid(xid))
2009 			continue;
2010 
2011 		/*
2012 		 * Be careful not to exclude any xids before calculating the values of
2013 		 * oldestRunningXid and suboverflowed, since these are used to clean
2014 		 * up transaction information held on standbys.
2015 		 */
2016 		if (TransactionIdPrecedes(xid, oldestRunningXid))
2017 			oldestRunningXid = xid;
2018 
2019 		if (pgxact->overflowed)
2020 			suboverflowed = true;
2021 
2022 		/*
2023 		 * If we wished to exclude xids this would be the right place for it.
2024 		 * Procs with the PROC_IN_VACUUM flag set don't usually assign xids,
2025 		 * but they do during truncation at the end when they get the lock and
2026 		 * truncate, so it is not much of a problem to include them if they
2027 		 * are seen and it is cleaner to include them.
2028 		 */
2029 
2030 		xids[count++] = xid;
2031 	}
2032 
2033 	/*
2034 	 * Spin over procArray collecting all subxids, but only if there hasn't
2035 	 * been a suboverflow.
2036 	 */
2037 	if (!suboverflowed)
2038 	{
2039 		for (index = 0; index < arrayP->numProcs; index++)
2040 		{
2041 			int			pgprocno = arrayP->pgprocnos[index];
2042 			PGPROC	   *proc = &allProcs[pgprocno];
2043 			PGXACT	   *pgxact = &allPgXact[pgprocno];
2044 			int			nxids;
2045 
2046 			/*
2047 			 * Save subtransaction XIDs. Other backends can't add or remove
2048 			 * entries while we're holding XidGenLock.
2049 			 */
2050 			nxids = pgxact->nxids;
2051 			if (nxids > 0)
2052 			{
2053 				/* barrier not really required, as XidGenLock is held, but ... */
2054 				pg_read_barrier();	/* pairs with GetNewTransactionId */
2055 
2056 				memcpy(&xids[count], (void *) proc->subxids.xids,
2057 					   nxids * sizeof(TransactionId));
2058 				count += nxids;
2059 				subcount += nxids;
2060 
2061 				/*
2062 				 * Top-level XID of a transaction is always less than any of
2063 				 * its subxids, so we don't need to check if any of the
2064 				 * subxids are smaller than oldestRunningXid
2065 				 */
2066 			}
2067 		}
2068 	}
2069 
2070 	/*
2071 	 * It's important *not* to include the limits set by slots here because
2072 	 * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
2073 	 * were to be included here the initial value could never increase because
2074 	 * of a circular dependency where slots only increase their limits when
2075 	 * running xacts increases oldestRunningXid and running xacts only
2076 	 * increases if slots do.
2077 	 */
2078 
2079 	CurrentRunningXacts->xcnt = count - subcount;
2080 	CurrentRunningXacts->subxcnt = subcount;
2081 	CurrentRunningXacts->subxid_overflow = suboverflowed;
2082 	CurrentRunningXacts->nextXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
2083 	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
2084 	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
2085 
2086 	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
2087 	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
2088 	Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));
2089 
2090 	/* We don't release the locks here, the caller is responsible for that */
2091 
2092 	return CurrentRunningXacts;
2093 }
2094 
2095 /*
2096  * GetOldestActiveTransactionId()
2097  *
2098  * Similar to GetSnapshotData but returns just oldestActiveXid. We include
2099  * all PGXACTs with an assigned TransactionId, even VACUUM processes.
2100  * We look at all databases, though there is no need to include WALSender
2101  * since this has no effect on hot standby conflicts.
2102  *
2103  * This is never executed during recovery so there is no need to look at
2104  * KnownAssignedXids.
2105  *
2106  * We don't worry about updating other counters, we want to keep this as
2107  * simple as possible and leave GetSnapshotData() as the primary code for
2108  * that bookkeeping.
2109  */
2110 TransactionId
GetOldestActiveTransactionId(void)2111 GetOldestActiveTransactionId(void)
2112 {
2113 	ProcArrayStruct *arrayP = procArray;
2114 	TransactionId oldestRunningXid;
2115 	int			index;
2116 
2117 	Assert(!RecoveryInProgress());
2118 
2119 	/*
2120 	 * Read nextXid, as the upper bound of what's still active.
2121 	 *
2122 	 * Reading a TransactionId is atomic, but we must grab the lock to make
2123 	 * sure that all XIDs < nextXid are already present in the proc array (or
2124 	 * have already completed), when we spin over it.
2125 	 */
2126 	LWLockAcquire(XidGenLock, LW_SHARED);
2127 	oldestRunningXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
2128 	LWLockRelease(XidGenLock);
2129 
2130 	/*
2131 	 * Spin over procArray collecting all xids and subxids.
2132 	 */
2133 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2134 	for (index = 0; index < arrayP->numProcs; index++)
2135 	{
2136 		int			pgprocno = arrayP->pgprocnos[index];
2137 		PGXACT	   *pgxact = &allPgXact[pgprocno];
2138 		TransactionId xid;
2139 
2140 		/* Fetch xid just once - see GetNewTransactionId */
2141 		xid = UINT32_ACCESS_ONCE(pgxact->xid);
2142 
2143 		if (!TransactionIdIsNormal(xid))
2144 			continue;
2145 
2146 		if (TransactionIdPrecedes(xid, oldestRunningXid))
2147 			oldestRunningXid = xid;
2148 
2149 		/*
2150 		 * Top-level XID of a transaction is always less than any of its
2151 		 * subxids, so we don't need to check if any of the subxids are
2152 		 * smaller than oldestRunningXid
2153 		 */
2154 	}
2155 	LWLockRelease(ProcArrayLock);
2156 
2157 	return oldestRunningXid;
2158 }
2159 
2160 /*
2161  * GetOldestSafeDecodingTransactionId -- lowest xid not affected by vacuum
2162  *
2163  * Returns the oldest xid that we can guarantee not to have been affected by
2164  * vacuum, i.e. no rows >= that xid have been vacuumed away unless the
2165  * transaction aborted. Note that the value can (and most of the time will) be
2166  * much more conservative than what really has been affected by vacuum, but we
2167  * currently don't have better data available.
2168  *
2169  * This is useful to initialize the cutoff xid after which a new changeset
2170  * extraction replication slot can start decoding changes.
2171  *
2172  * Must be called with ProcArrayLock held either shared or exclusively,
2173  * although most callers will want to use exclusive mode since it is expected
2174  * that the caller will immediately use the xid to peg the xmin horizon.
2175  */
2176 TransactionId
GetOldestSafeDecodingTransactionId(bool catalogOnly)2177 GetOldestSafeDecodingTransactionId(bool catalogOnly)
2178 {
2179 	ProcArrayStruct *arrayP = procArray;
2180 	TransactionId oldestSafeXid;
2181 	int			index;
2182 	bool		recovery_in_progress = RecoveryInProgress();
2183 
2184 	Assert(LWLockHeldByMe(ProcArrayLock));
2185 
2186 	/*
2187 	 * Acquire XidGenLock, so no transactions can acquire an xid while we're
2188 	 * running. If no transaction with xid were running concurrently a new xid
2189 	 * could influence the RecentXmin et al.
2190 	 *
2191 	 * We initialize the computation to nextXid since that's guaranteed to be
2192 	 * a safe, albeit pessimal, value.
2193 	 */
2194 	LWLockAcquire(XidGenLock, LW_SHARED);
2195 	oldestSafeXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
2196 
2197 	/*
2198 	 * If there's already a slot pegging the xmin horizon, we can start with
2199 	 * that value, it's guaranteed to be safe since it's computed by this
2200 	 * routine initially and has been enforced since.  We can always use the
2201 	 * slot's general xmin horizon, but the catalog horizon is only usable
2202 	 * when only catalog data is going to be looked at.
2203 	 */
2204 	if (TransactionIdIsValid(procArray->replication_slot_xmin) &&
2205 		TransactionIdPrecedes(procArray->replication_slot_xmin,
2206 							  oldestSafeXid))
2207 		oldestSafeXid = procArray->replication_slot_xmin;
2208 
2209 	if (catalogOnly &&
2210 		TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
2211 		TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
2212 							  oldestSafeXid))
2213 		oldestSafeXid = procArray->replication_slot_catalog_xmin;
2214 
2215 	/*
2216 	 * If we're not in recovery, we walk over the procarray and collect the
2217 	 * lowest xid. Since we're called with ProcArrayLock held and have
2218 	 * acquired XidGenLock, no entries can vanish concurrently, since
2219 	 * PGXACT->xid is only set with XidGenLock held and only cleared with
2220 	 * ProcArrayLock held.
2221 	 *
2222 	 * In recovery we can't lower the safe value besides what we've computed
2223 	 * above, so we'll have to wait a bit longer there. We unfortunately can
2224 	 * *not* use KnownAssignedXidsGetOldestXmin() since the KnownAssignedXids
2225 	 * machinery can miss values and return an older value than is safe.
2226 	 */
2227 	if (!recovery_in_progress)
2228 	{
2229 		/*
2230 		 * Spin over procArray collecting all min(PGXACT->xid)
2231 		 */
2232 		for (index = 0; index < arrayP->numProcs; index++)
2233 		{
2234 			int			pgprocno = arrayP->pgprocnos[index];
2235 			PGXACT	   *pgxact = &allPgXact[pgprocno];
2236 			TransactionId xid;
2237 
2238 			/* Fetch xid just once - see GetNewTransactionId */
2239 			xid = UINT32_ACCESS_ONCE(pgxact->xid);
2240 
2241 			if (!TransactionIdIsNormal(xid))
2242 				continue;
2243 
2244 			if (TransactionIdPrecedes(xid, oldestSafeXid))
2245 				oldestSafeXid = xid;
2246 		}
2247 	}
2248 
2249 	LWLockRelease(XidGenLock);
2250 
2251 	return oldestSafeXid;
2252 }
2253 
2254 /*
2255  * GetVirtualXIDsDelayingChkpt -- Get the VXIDs of transactions that are
2256  * delaying checkpoint because they have critical actions in progress.
2257  *
2258  * Constructs an array of VXIDs of transactions that are currently in commit
2259  * critical sections, as shown by having delayChkpt set in their PGXACT.
2260  *
2261  * Returns a palloc'd array that should be freed by the caller.
2262  * *nvxids is the number of valid entries.
2263  *
2264  * Note that because backends set or clear delayChkpt without holding any lock,
2265  * the result is somewhat indeterminate, but we don't really care.  Even in
2266  * a multiprocessor with delayed writes to shared memory, it should be certain
2267  * that setting of delayChkpt will propagate to shared memory when the backend
2268  * takes a lock, so we cannot fail to see a virtual xact as delayChkpt if
2269  * it's already inserted its commit record.  Whether it takes a little while
2270  * for clearing of delayChkpt to propagate is unimportant for correctness.
2271  */
2272 VirtualTransactionId *
GetVirtualXIDsDelayingChkpt(int * nvxids)2273 GetVirtualXIDsDelayingChkpt(int *nvxids)
2274 {
2275 	VirtualTransactionId *vxids;
2276 	ProcArrayStruct *arrayP = procArray;
2277 	int			count = 0;
2278 	int			index;
2279 
2280 	/* allocate what's certainly enough result space */
2281 	vxids = (VirtualTransactionId *)
2282 		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2283 
2284 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2285 
2286 	for (index = 0; index < arrayP->numProcs; index++)
2287 	{
2288 		int			pgprocno = arrayP->pgprocnos[index];
2289 		PGPROC	   *proc = &allProcs[pgprocno];
2290 		PGXACT	   *pgxact = &allPgXact[pgprocno];
2291 
2292 		if (pgxact->delayChkpt)
2293 		{
2294 			VirtualTransactionId vxid;
2295 
2296 			GET_VXID_FROM_PGPROC(vxid, *proc);
2297 			if (VirtualTransactionIdIsValid(vxid))
2298 				vxids[count++] = vxid;
2299 		}
2300 	}
2301 
2302 	LWLockRelease(ProcArrayLock);
2303 
2304 	*nvxids = count;
2305 	return vxids;
2306 }
2307 
2308 /*
2309  * HaveVirtualXIDsDelayingChkpt -- Are any of the specified VXIDs delaying?
2310  *
2311  * This is used with the results of GetVirtualXIDsDelayingChkpt to see if any
2312  * of the specified VXIDs are still in critical sections of code.
2313  *
2314  * Note: this is O(N^2) in the number of vxacts that are/were delaying, but
2315  * those numbers should be small enough for it not to be a problem.
2316  */
2317 bool
HaveVirtualXIDsDelayingChkpt(VirtualTransactionId * vxids,int nvxids)2318 HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids)
2319 {
2320 	bool		result = false;
2321 	ProcArrayStruct *arrayP = procArray;
2322 	int			index;
2323 
2324 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2325 
2326 	for (index = 0; index < arrayP->numProcs; index++)
2327 	{
2328 		int			pgprocno = arrayP->pgprocnos[index];
2329 		PGPROC	   *proc = &allProcs[pgprocno];
2330 		PGXACT	   *pgxact = &allPgXact[pgprocno];
2331 		VirtualTransactionId vxid;
2332 
2333 		GET_VXID_FROM_PGPROC(vxid, *proc);
2334 
2335 		if (pgxact->delayChkpt && VirtualTransactionIdIsValid(vxid))
2336 		{
2337 			int			i;
2338 
2339 			for (i = 0; i < nvxids; i++)
2340 			{
2341 				if (VirtualTransactionIdEquals(vxid, vxids[i]))
2342 				{
2343 					result = true;
2344 					break;
2345 				}
2346 			}
2347 			if (result)
2348 				break;
2349 		}
2350 	}
2351 
2352 	LWLockRelease(ProcArrayLock);
2353 
2354 	return result;
2355 }
2356 
2357 /*
2358  * BackendPidGetProc -- get a backend's PGPROC given its PID
2359  *
2360  * Returns NULL if not found.  Note that it is up to the caller to be
2361  * sure that the question remains meaningful for long enough for the
2362  * answer to be used ...
2363  */
2364 PGPROC *
BackendPidGetProc(int pid)2365 BackendPidGetProc(int pid)
2366 {
2367 	PGPROC	   *result;
2368 
2369 	if (pid == 0)				/* never match dummy PGPROCs */
2370 		return NULL;
2371 
2372 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2373 
2374 	result = BackendPidGetProcWithLock(pid);
2375 
2376 	LWLockRelease(ProcArrayLock);
2377 
2378 	return result;
2379 }
2380 
2381 /*
2382  * BackendPidGetProcWithLock -- get a backend's PGPROC given its PID
2383  *
2384  * Same as above, except caller must be holding ProcArrayLock.  The found
2385  * entry, if any, can be assumed to be valid as long as the lock remains held.
2386  */
2387 PGPROC *
BackendPidGetProcWithLock(int pid)2388 BackendPidGetProcWithLock(int pid)
2389 {
2390 	PGPROC	   *result = NULL;
2391 	ProcArrayStruct *arrayP = procArray;
2392 	int			index;
2393 
2394 	if (pid == 0)				/* never match dummy PGPROCs */
2395 		return NULL;
2396 
2397 	for (index = 0; index < arrayP->numProcs; index++)
2398 	{
2399 		PGPROC	   *proc = &allProcs[arrayP->pgprocnos[index]];
2400 
2401 		if (proc->pid == pid)
2402 		{
2403 			result = proc;
2404 			break;
2405 		}
2406 	}
2407 
2408 	return result;
2409 }
2410 
2411 /*
2412  * BackendXidGetPid -- get a backend's pid given its XID
2413  *
2414  * Returns 0 if not found or it's a prepared transaction.  Note that
2415  * it is up to the caller to be sure that the question remains
2416  * meaningful for long enough for the answer to be used ...
2417  *
2418  * Only main transaction Ids are considered.  This function is mainly
2419  * useful for determining what backend owns a lock.
2420  *
2421  * Beware that not every xact has an XID assigned.  However, as long as you
2422  * only call this using an XID found on disk, you're safe.
2423  */
2424 int
BackendXidGetPid(TransactionId xid)2425 BackendXidGetPid(TransactionId xid)
2426 {
2427 	int			result = 0;
2428 	ProcArrayStruct *arrayP = procArray;
2429 	int			index;
2430 
2431 	if (xid == InvalidTransactionId)	/* never match invalid xid */
2432 		return 0;
2433 
2434 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2435 
2436 	for (index = 0; index < arrayP->numProcs; index++)
2437 	{
2438 		int			pgprocno = arrayP->pgprocnos[index];
2439 		PGPROC	   *proc = &allProcs[pgprocno];
2440 		PGXACT	   *pgxact = &allPgXact[pgprocno];
2441 
2442 		if (pgxact->xid == xid)
2443 		{
2444 			result = proc->pid;
2445 			break;
2446 		}
2447 	}
2448 
2449 	LWLockRelease(ProcArrayLock);
2450 
2451 	return result;
2452 }
2453 
2454 /*
2455  * IsBackendPid -- is a given pid a running backend
2456  *
2457  * This is not called by the backend, but is called by external modules.
2458  */
2459 bool
IsBackendPid(int pid)2460 IsBackendPid(int pid)
2461 {
2462 	return (BackendPidGetProc(pid) != NULL);
2463 }
2464 
2465 
2466 /*
2467  * GetCurrentVirtualXIDs -- returns an array of currently active VXIDs.
2468  *
2469  * The array is palloc'd. The number of valid entries is returned into *nvxids.
2470  *
2471  * The arguments allow filtering the set of VXIDs returned.  Our own process
2472  * is always skipped.  In addition:
2473  *	If limitXmin is not InvalidTransactionId, skip processes with
2474  *		xmin > limitXmin.
2475  *	If excludeXmin0 is true, skip processes with xmin = 0.
2476  *	If allDbs is false, skip processes attached to other databases.
2477  *	If excludeVacuum isn't zero, skip processes for which
2478  *		(vacuumFlags & excludeVacuum) is not zero.
2479  *
2480  * Note: the purpose of the limitXmin and excludeXmin0 parameters is to
2481  * allow skipping backends whose oldest live snapshot is no older than
2482  * some snapshot we have.  Since we examine the procarray with only shared
2483  * lock, there are race conditions: a backend could set its xmin just after
2484  * we look.  Indeed, on multiprocessors with weak memory ordering, the
2485  * other backend could have set its xmin *before* we look.  We know however
2486  * that such a backend must have held shared ProcArrayLock overlapping our
2487  * own hold of ProcArrayLock, else we would see its xmin update.  Therefore,
2488  * any snapshot the other backend is taking concurrently with our scan cannot
2489  * consider any transactions as still running that we think are committed
2490  * (since backends must hold ProcArrayLock exclusive to commit).
2491  */
2492 VirtualTransactionId *
GetCurrentVirtualXIDs(TransactionId limitXmin,bool excludeXmin0,bool allDbs,int excludeVacuum,int * nvxids)2493 GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
2494 					  bool allDbs, int excludeVacuum,
2495 					  int *nvxids)
2496 {
2497 	VirtualTransactionId *vxids;
2498 	ProcArrayStruct *arrayP = procArray;
2499 	int			count = 0;
2500 	int			index;
2501 
2502 	/* allocate what's certainly enough result space */
2503 	vxids = (VirtualTransactionId *)
2504 		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2505 
2506 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2507 
2508 	for (index = 0; index < arrayP->numProcs; index++)
2509 	{
2510 		int			pgprocno = arrayP->pgprocnos[index];
2511 		PGPROC	   *proc = &allProcs[pgprocno];
2512 		PGXACT	   *pgxact = &allPgXact[pgprocno];
2513 
2514 		if (proc == MyProc)
2515 			continue;
2516 
2517 		if (excludeVacuum & pgxact->vacuumFlags)
2518 			continue;
2519 
2520 		if (allDbs || proc->databaseId == MyDatabaseId)
2521 		{
2522 			/* Fetch xmin just once - might change on us */
2523 			TransactionId pxmin = UINT32_ACCESS_ONCE(pgxact->xmin);
2524 
2525 			if (excludeXmin0 && !TransactionIdIsValid(pxmin))
2526 				continue;
2527 
2528 			/*
2529 			 * InvalidTransactionId precedes all other XIDs, so a proc that
2530 			 * hasn't set xmin yet will not be rejected by this test.
2531 			 */
2532 			if (!TransactionIdIsValid(limitXmin) ||
2533 				TransactionIdPrecedesOrEquals(pxmin, limitXmin))
2534 			{
2535 				VirtualTransactionId vxid;
2536 
2537 				GET_VXID_FROM_PGPROC(vxid, *proc);
2538 				if (VirtualTransactionIdIsValid(vxid))
2539 					vxids[count++] = vxid;
2540 			}
2541 		}
2542 	}
2543 
2544 	LWLockRelease(ProcArrayLock);
2545 
2546 	*nvxids = count;
2547 	return vxids;
2548 }
2549 
2550 /*
2551  * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
2552  *
2553  * Usage is limited to conflict resolution during recovery on standby servers.
2554  * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
2555  * in cases where we cannot accurately determine a value for latestRemovedXid.
2556  *
2557  * If limitXmin is InvalidTransactionId then we want to kill everybody,
2558  * so we're not worried if they have a snapshot or not, nor does it really
2559  * matter what type of lock we hold.
2560  *
2561  * All callers that are checking xmins always now supply a valid and useful
2562  * value for limitXmin. The limitXmin is always lower than the lowest
2563  * numbered KnownAssignedXid that is not already a FATAL error. This is
2564  * because we only care about cleanup records that are cleaning up tuple
2565  * versions from committed transactions. In that case they will only occur
2566  * at the point where the record is less than the lowest running xid. That
2567  * allows us to say that if any backend takes a snapshot concurrently with
2568  * us then the conflict assessment made here would never include the snapshot
2569  * that is being derived. So we take LW_SHARED on the ProcArray and allow
2570  * concurrent snapshots when limitXmin is valid. We might think about adding
2571  *	 Assert(limitXmin < lowest(KnownAssignedXids))
2572  * but that would not be true in the case of FATAL errors lagging in array,
2573  * but we already know those are bogus anyway, so we skip that test.
2574  *
2575  * If dbOid is valid we skip backends attached to other databases.
2576  *
2577  * Be careful to *not* pfree the result from this function. We reuse
2578  * this array sufficiently often that we use malloc for the result.
2579  */
2580 VirtualTransactionId *
GetConflictingVirtualXIDs(TransactionId limitXmin,Oid dbOid)2581 GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
2582 {
2583 	static VirtualTransactionId *vxids;
2584 	ProcArrayStruct *arrayP = procArray;
2585 	int			count = 0;
2586 	int			index;
2587 
2588 	/*
2589 	 * If first time through, get workspace to remember main XIDs in. We
2590 	 * malloc it permanently to avoid repeated palloc/pfree overhead. Allow
2591 	 * result space, remembering room for a terminator.
2592 	 */
2593 	if (vxids == NULL)
2594 	{
2595 		vxids = (VirtualTransactionId *)
2596 			malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
2597 		if (vxids == NULL)
2598 			ereport(ERROR,
2599 					(errcode(ERRCODE_OUT_OF_MEMORY),
2600 					 errmsg("out of memory")));
2601 	}
2602 
2603 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2604 
2605 	for (index = 0; index < arrayP->numProcs; index++)
2606 	{
2607 		int			pgprocno = arrayP->pgprocnos[index];
2608 		PGPROC	   *proc = &allProcs[pgprocno];
2609 		PGXACT	   *pgxact = &allPgXact[pgprocno];
2610 
2611 		/* Exclude prepared transactions */
2612 		if (proc->pid == 0)
2613 			continue;
2614 
2615 		if (!OidIsValid(dbOid) ||
2616 			proc->databaseId == dbOid)
2617 		{
2618 			/* Fetch xmin just once - can't change on us, but good coding */
2619 			TransactionId pxmin = UINT32_ACCESS_ONCE(pgxact->xmin);
2620 
2621 			/*
2622 			 * We ignore an invalid pxmin because this means that backend has
2623 			 * no snapshot currently. We hold a Share lock to avoid contention
2624 			 * with users taking snapshots.  That is not a problem because the
2625 			 * current xmin is always at least one higher than the latest
2626 			 * removed xid, so any new snapshot would never conflict with the
2627 			 * test here.
2628 			 */
2629 			if (!TransactionIdIsValid(limitXmin) ||
2630 				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
2631 			{
2632 				VirtualTransactionId vxid;
2633 
2634 				GET_VXID_FROM_PGPROC(vxid, *proc);
2635 				if (VirtualTransactionIdIsValid(vxid))
2636 					vxids[count++] = vxid;
2637 			}
2638 		}
2639 	}
2640 
2641 	LWLockRelease(ProcArrayLock);
2642 
2643 	/* add the terminator */
2644 	vxids[count].backendId = InvalidBackendId;
2645 	vxids[count].localTransactionId = InvalidLocalTransactionId;
2646 
2647 	return vxids;
2648 }
2649 
2650 /*
2651  * CancelVirtualTransaction - used in recovery conflict processing
2652  *
2653  * Returns pid of the process signaled, or 0 if not found.
2654  */
2655 pid_t
CancelVirtualTransaction(VirtualTransactionId vxid,ProcSignalReason sigmode)2656 CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
2657 {
2658 	return SignalVirtualTransaction(vxid, sigmode, true);
2659 }
2660 
2661 pid_t
SignalVirtualTransaction(VirtualTransactionId vxid,ProcSignalReason sigmode,bool conflictPending)2662 SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode,
2663 						 bool conflictPending)
2664 {
2665 	ProcArrayStruct *arrayP = procArray;
2666 	int			index;
2667 	pid_t		pid = 0;
2668 
2669 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2670 
2671 	for (index = 0; index < arrayP->numProcs; index++)
2672 	{
2673 		int			pgprocno = arrayP->pgprocnos[index];
2674 		PGPROC	   *proc = &allProcs[pgprocno];
2675 		VirtualTransactionId procvxid;
2676 
2677 		GET_VXID_FROM_PGPROC(procvxid, *proc);
2678 
2679 		if (procvxid.backendId == vxid.backendId &&
2680 			procvxid.localTransactionId == vxid.localTransactionId)
2681 		{
2682 			proc->recoveryConflictPending = conflictPending;
2683 			pid = proc->pid;
2684 			if (pid != 0)
2685 			{
2686 				/*
2687 				 * Kill the pid if it's still here. If not, that's what we
2688 				 * wanted so ignore any errors.
2689 				 */
2690 				(void) SendProcSignal(pid, sigmode, vxid.backendId);
2691 			}
2692 			break;
2693 		}
2694 	}
2695 
2696 	LWLockRelease(ProcArrayLock);
2697 
2698 	return pid;
2699 }
2700 
2701 /*
2702  * MinimumActiveBackends --- count backends (other than myself) that are
2703  *		in active transactions.  Return true if the count exceeds the
2704  *		minimum threshold passed.  This is used as a heuristic to decide if
2705  *		a pre-XLOG-flush delay is worthwhile during commit.
2706  *
2707  * Do not count backends that are blocked waiting for locks, since they are
2708  * not going to get to run until someone else commits.
2709  */
2710 bool
MinimumActiveBackends(int min)2711 MinimumActiveBackends(int min)
2712 {
2713 	ProcArrayStruct *arrayP = procArray;
2714 	int			count = 0;
2715 	int			index;
2716 
2717 	/* Quick short-circuit if no minimum is specified */
2718 	if (min == 0)
2719 		return true;
2720 
2721 	/*
2722 	 * Note: for speed, we don't acquire ProcArrayLock.  This is a little bit
2723 	 * bogus, but since we are only testing fields for zero or nonzero, it
2724 	 * should be OK.  The result is only used for heuristic purposes anyway...
2725 	 */
2726 	for (index = 0; index < arrayP->numProcs; index++)
2727 	{
2728 		int			pgprocno = arrayP->pgprocnos[index];
2729 		PGPROC	   *proc = &allProcs[pgprocno];
2730 		PGXACT	   *pgxact = &allPgXact[pgprocno];
2731 
2732 		/*
2733 		 * Since we're not holding a lock, need to be prepared to deal with
2734 		 * garbage, as someone could have incremented numProcs but not yet
2735 		 * filled the structure.
2736 		 *
2737 		 * If someone just decremented numProcs, 'proc' could also point to a
2738 		 * PGPROC entry that's no longer in the array. It still points to a
2739 		 * PGPROC struct, though, because freed PGPROC entries just go to the
2740 		 * free list and are recycled. Its contents are nonsense in that case,
2741 		 * but that's acceptable for this function.
2742 		 */
2743 		if (pgprocno == -1)
2744 			continue;			/* do not count deleted entries */
2745 		if (proc == MyProc)
2746 			continue;			/* do not count myself */
2747 		if (pgxact->xid == InvalidTransactionId)
2748 			continue;			/* do not count if no XID assigned */
2749 		if (proc->pid == 0)
2750 			continue;			/* do not count prepared xacts */
2751 		if (proc->waitLock != NULL)
2752 			continue;			/* do not count if blocked on a lock */
2753 		count++;
2754 		if (count >= min)
2755 			break;
2756 	}
2757 
2758 	return count >= min;
2759 }
2760 
2761 /*
2762  * CountDBBackends --- count backends that are using specified database
2763  */
2764 int
CountDBBackends(Oid databaseid)2765 CountDBBackends(Oid databaseid)
2766 {
2767 	ProcArrayStruct *arrayP = procArray;
2768 	int			count = 0;
2769 	int			index;
2770 
2771 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2772 
2773 	for (index = 0; index < arrayP->numProcs; index++)
2774 	{
2775 		int			pgprocno = arrayP->pgprocnos[index];
2776 		PGPROC	   *proc = &allProcs[pgprocno];
2777 
2778 		if (proc->pid == 0)
2779 			continue;			/* do not count prepared xacts */
2780 		if (!OidIsValid(databaseid) ||
2781 			proc->databaseId == databaseid)
2782 			count++;
2783 	}
2784 
2785 	LWLockRelease(ProcArrayLock);
2786 
2787 	return count;
2788 }
2789 
2790 /*
2791  * CountDBConnections --- counts database backends ignoring any background
2792  *		worker processes
2793  */
2794 int
CountDBConnections(Oid databaseid)2795 CountDBConnections(Oid databaseid)
2796 {
2797 	ProcArrayStruct *arrayP = procArray;
2798 	int			count = 0;
2799 	int			index;
2800 
2801 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2802 
2803 	for (index = 0; index < arrayP->numProcs; index++)
2804 	{
2805 		int			pgprocno = arrayP->pgprocnos[index];
2806 		PGPROC	   *proc = &allProcs[pgprocno];
2807 
2808 		if (proc->pid == 0)
2809 			continue;			/* do not count prepared xacts */
2810 		if (proc->isBackgroundWorker)
2811 			continue;			/* do not count background workers */
2812 		if (!OidIsValid(databaseid) ||
2813 			proc->databaseId == databaseid)
2814 			count++;
2815 	}
2816 
2817 	LWLockRelease(ProcArrayLock);
2818 
2819 	return count;
2820 }
2821 
2822 /*
2823  * CancelDBBackends --- cancel backends that are using specified database
2824  */
2825 void
CancelDBBackends(Oid databaseid,ProcSignalReason sigmode,bool conflictPending)2826 CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
2827 {
2828 	ProcArrayStruct *arrayP = procArray;
2829 	int			index;
2830 	pid_t		pid = 0;
2831 
2832 	/* tell all backends to die */
2833 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2834 
2835 	for (index = 0; index < arrayP->numProcs; index++)
2836 	{
2837 		int			pgprocno = arrayP->pgprocnos[index];
2838 		PGPROC	   *proc = &allProcs[pgprocno];
2839 
2840 		if (databaseid == InvalidOid || proc->databaseId == databaseid)
2841 		{
2842 			VirtualTransactionId procvxid;
2843 
2844 			GET_VXID_FROM_PGPROC(procvxid, *proc);
2845 
2846 			proc->recoveryConflictPending = conflictPending;
2847 			pid = proc->pid;
2848 			if (pid != 0)
2849 			{
2850 				/*
2851 				 * Kill the pid if it's still here. If not, that's what we
2852 				 * wanted so ignore any errors.
2853 				 */
2854 				(void) SendProcSignal(pid, sigmode, procvxid.backendId);
2855 			}
2856 		}
2857 	}
2858 
2859 	LWLockRelease(ProcArrayLock);
2860 }
2861 
2862 /*
2863  * CountUserBackends --- count backends that are used by specified user
2864  */
2865 int
CountUserBackends(Oid roleid)2866 CountUserBackends(Oid roleid)
2867 {
2868 	ProcArrayStruct *arrayP = procArray;
2869 	int			count = 0;
2870 	int			index;
2871 
2872 	LWLockAcquire(ProcArrayLock, LW_SHARED);
2873 
2874 	for (index = 0; index < arrayP->numProcs; index++)
2875 	{
2876 		int			pgprocno = arrayP->pgprocnos[index];
2877 		PGPROC	   *proc = &allProcs[pgprocno];
2878 
2879 		if (proc->pid == 0)
2880 			continue;			/* do not count prepared xacts */
2881 		if (proc->isBackgroundWorker)
2882 			continue;			/* do not count background workers */
2883 		if (proc->roleId == roleid)
2884 			count++;
2885 	}
2886 
2887 	LWLockRelease(ProcArrayLock);
2888 
2889 	return count;
2890 }
2891 
2892 /*
2893  * CountOtherDBBackends -- check for other backends running in the given DB
2894  *
2895  * If there are other backends in the DB, we will wait a maximum of 5 seconds
2896  * for them to exit.  Autovacuum backends are encouraged to exit early by
2897  * sending them SIGTERM, but normal user backends are just waited for.
2898  *
2899  * The current backend is always ignored; it is caller's responsibility to
2900  * check whether the current backend uses the given DB, if it's important.
2901  *
2902  * Returns true if there are (still) other backends in the DB, false if not.
2903  * Also, *nbackends and *nprepared are set to the number of other backends
2904  * and prepared transactions in the DB, respectively.
2905  *
2906  * This function is used to interlock DROP DATABASE and related commands
2907  * against there being any active backends in the target DB --- dropping the
2908  * DB while active backends remain would be a Bad Thing.  Note that we cannot
2909  * detect here the possibility of a newly-started backend that is trying to
2910  * connect to the doomed database, so additional interlocking is needed during
2911  * backend startup.  The caller should normally hold an exclusive lock on the
2912  * target DB before calling this, which is one reason we mustn't wait
2913  * indefinitely.
2914  */
2915 bool
CountOtherDBBackends(Oid databaseId,int * nbackends,int * nprepared)2916 CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
2917 {
2918 	ProcArrayStruct *arrayP = procArray;
2919 
2920 #define MAXAUTOVACPIDS	10		/* max autovacs to SIGTERM per iteration */
2921 	int			autovac_pids[MAXAUTOVACPIDS];
2922 	int			tries;
2923 
2924 	/* 50 tries with 100ms sleep between tries makes 5 sec total wait */
2925 	for (tries = 0; tries < 50; tries++)
2926 	{
2927 		int			nautovacs = 0;
2928 		bool		found = false;
2929 		int			index;
2930 
2931 		CHECK_FOR_INTERRUPTS();
2932 
2933 		*nbackends = *nprepared = 0;
2934 
2935 		LWLockAcquire(ProcArrayLock, LW_SHARED);
2936 
2937 		for (index = 0; index < arrayP->numProcs; index++)
2938 		{
2939 			int			pgprocno = arrayP->pgprocnos[index];
2940 			PGPROC	   *proc = &allProcs[pgprocno];
2941 			PGXACT	   *pgxact = &allPgXact[pgprocno];
2942 
2943 			if (proc->databaseId != databaseId)
2944 				continue;
2945 			if (proc == MyProc)
2946 				continue;
2947 
2948 			found = true;
2949 
2950 			if (proc->pid == 0)
2951 				(*nprepared)++;
2952 			else
2953 			{
2954 				(*nbackends)++;
2955 				if ((pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&
2956 					nautovacs < MAXAUTOVACPIDS)
2957 					autovac_pids[nautovacs++] = proc->pid;
2958 			}
2959 		}
2960 
2961 		LWLockRelease(ProcArrayLock);
2962 
2963 		if (!found)
2964 			return false;		/* no conflicting backends, so done */
2965 
2966 		/*
2967 		 * Send SIGTERM to any conflicting autovacuums before sleeping. We
2968 		 * postpone this step until after the loop because we don't want to
2969 		 * hold ProcArrayLock while issuing kill(). We have no idea what might
2970 		 * block kill() inside the kernel...
2971 		 */
2972 		for (index = 0; index < nautovacs; index++)
2973 			(void) kill(autovac_pids[index], SIGTERM);	/* ignore any error */
2974 
2975 		/* sleep, then try again */
2976 		pg_usleep(100 * 1000L); /* 100ms */
2977 	}
2978 
2979 	return true;				/* timed out, still conflicts */
2980 }
2981 
2982 /*
2983  * ProcArraySetReplicationSlotXmin
2984  *
2985  * Install limits to future computations of the xmin horizon to prevent vacuum
2986  * and HOT pruning from removing affected rows still needed by clients with
2987  * replication slots.
2988  */
2989 void
ProcArraySetReplicationSlotXmin(TransactionId xmin,TransactionId catalog_xmin,bool already_locked)2990 ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
2991 								bool already_locked)
2992 {
2993 	Assert(!already_locked || LWLockHeldByMe(ProcArrayLock));
2994 
2995 	if (!already_locked)
2996 		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2997 
2998 	procArray->replication_slot_xmin = xmin;
2999 	procArray->replication_slot_catalog_xmin = catalog_xmin;
3000 
3001 	if (!already_locked)
3002 		LWLockRelease(ProcArrayLock);
3003 }
3004 
3005 /*
3006  * ProcArrayGetReplicationSlotXmin
3007  *
3008  * Return the current slot xmin limits. That's useful to be able to remove
3009  * data that's older than those limits.
3010  */
3011 void
ProcArrayGetReplicationSlotXmin(TransactionId * xmin,TransactionId * catalog_xmin)3012 ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
3013 								TransactionId *catalog_xmin)
3014 {
3015 	LWLockAcquire(ProcArrayLock, LW_SHARED);
3016 
3017 	if (xmin != NULL)
3018 		*xmin = procArray->replication_slot_xmin;
3019 
3020 	if (catalog_xmin != NULL)
3021 		*catalog_xmin = procArray->replication_slot_catalog_xmin;
3022 
3023 	LWLockRelease(ProcArrayLock);
3024 }
3025 
3026 
3027 #define XidCacheRemove(i) \
3028 	do { \
3029 		MyProc->subxids.xids[i] = MyProc->subxids.xids[MyPgXact->nxids - 1]; \
3030 		pg_write_barrier(); \
3031 		MyPgXact->nxids--; \
3032 	} while (0)
3033 
3034 /*
3035  * XidCacheRemoveRunningXids
3036  *
3037  * Remove a bunch of TransactionIds from the list of known-running
3038  * subtransactions for my backend.  Both the specified xid and those in
3039  * the xids[] array (of length nxids) are removed from the subxids cache.
3040  * latestXid must be the latest XID among the group.
3041  */
3042 void
XidCacheRemoveRunningXids(TransactionId xid,int nxids,const TransactionId * xids,TransactionId latestXid)3043 XidCacheRemoveRunningXids(TransactionId xid,
3044 						  int nxids, const TransactionId *xids,
3045 						  TransactionId latestXid)
3046 {
3047 	int			i,
3048 				j;
3049 
3050 	Assert(TransactionIdIsValid(xid));
3051 
3052 	/*
3053 	 * We must hold ProcArrayLock exclusively in order to remove transactions
3054 	 * from the PGPROC array.  (See src/backend/access/transam/README.)  It's
3055 	 * possible this could be relaxed since we know this routine is only used
3056 	 * to abort subtransactions, but pending closer analysis we'd best be
3057 	 * conservative.
3058 	 *
3059 	 * Note that we do not have to be careful about memory ordering of our own
3060 	 * reads wrt. GetNewTransactionId() here - only this process can modify
3061 	 * relevant fields of MyProc/MyPgXact.  But we do have to be careful about
3062 	 * our own writes being well ordered.
3063 	 */
3064 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3065 
3066 	/*
3067 	 * Under normal circumstances xid and xids[] will be in increasing order,
3068 	 * as will be the entries in subxids.  Scan backwards to avoid O(N^2)
3069 	 * behavior when removing a lot of xids.
3070 	 */
3071 	for (i = nxids - 1; i >= 0; i--)
3072 	{
3073 		TransactionId anxid = xids[i];
3074 
3075 		for (j = MyPgXact->nxids - 1; j >= 0; j--)
3076 		{
3077 			if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
3078 			{
3079 				XidCacheRemove(j);
3080 				break;
3081 			}
3082 		}
3083 
3084 		/*
3085 		 * Ordinarily we should have found it, unless the cache has
3086 		 * overflowed. However it's also possible for this routine to be
3087 		 * invoked multiple times for the same subtransaction, in case of an
3088 		 * error during AbortSubTransaction.  So instead of Assert, emit a
3089 		 * debug warning.
3090 		 */
3091 		if (j < 0 && !MyPgXact->overflowed)
3092 			elog(WARNING, "did not find subXID %u in MyProc", anxid);
3093 	}
3094 
3095 	for (j = MyPgXact->nxids - 1; j >= 0; j--)
3096 	{
3097 		if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
3098 		{
3099 			XidCacheRemove(j);
3100 			break;
3101 		}
3102 	}
3103 	/* Ordinarily we should have found it, unless the cache has overflowed */
3104 	if (j < 0 && !MyPgXact->overflowed)
3105 		elog(WARNING, "did not find subXID %u in MyProc", xid);
3106 
3107 	/* Also advance global latestCompletedXid while holding the lock */
3108 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
3109 							  latestXid))
3110 		ShmemVariableCache->latestCompletedXid = latestXid;
3111 
3112 	LWLockRelease(ProcArrayLock);
3113 }
3114 
3115 #ifdef XIDCACHE_DEBUG
3116 
3117 /*
3118  * Print stats about effectiveness of XID cache
3119  */
3120 static void
DisplayXidCache(void)3121 DisplayXidCache(void)
3122 {
3123 	fprintf(stderr,
3124 			"XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, knownassigned: %ld, nooflo: %ld, slow: %ld\n",
3125 			xc_by_recent_xmin,
3126 			xc_by_known_xact,
3127 			xc_by_my_xact,
3128 			xc_by_latest_xid,
3129 			xc_by_main_xid,
3130 			xc_by_child_xid,
3131 			xc_by_known_assigned,
3132 			xc_no_overflow,
3133 			xc_slow_answer);
3134 }
3135 #endif							/* XIDCACHE_DEBUG */
3136 
3137 
3138 /* ----------------------------------------------
3139  *		KnownAssignedTransactions sub-module
3140  * ----------------------------------------------
3141  */
3142 
3143 /*
3144  * In Hot Standby mode, we maintain a list of transactions that are (or were)
3145  * running in the master at the current point in WAL.  These XIDs must be
3146  * treated as running by standby transactions, even though they are not in
3147  * the standby server's PGXACT array.
3148  *
3149  * We record all XIDs that we know have been assigned.  That includes all the
3150  * XIDs seen in WAL records, plus all unobserved XIDs that we can deduce have
3151  * been assigned.  We can deduce the existence of unobserved XIDs because we
3152  * know XIDs are assigned in sequence, with no gaps.  The KnownAssignedXids
3153  * list expands as new XIDs are observed or inferred, and contracts when
3154  * transaction completion records arrive.
3155  *
3156  * During hot standby we do not fret too much about the distinction between
3157  * top-level XIDs and subtransaction XIDs. We store both together in the
3158  * KnownAssignedXids list.  In backends, this is copied into snapshots in
3159  * GetSnapshotData(), taking advantage of the fact that XidInMVCCSnapshot()
3160  * doesn't care about the distinction either.  Subtransaction XIDs are
3161  * effectively treated as top-level XIDs and in the typical case pg_subtrans
3162  * links are *not* maintained (which does not affect visibility).
3163  *
3164  * We have room in KnownAssignedXids and in snapshots to hold maxProcs *
3165  * (1 + PGPROC_MAX_CACHED_SUBXIDS) XIDs, so every master transaction must
3166  * report its subtransaction XIDs in a WAL XLOG_XACT_ASSIGNMENT record at
3167  * least every PGPROC_MAX_CACHED_SUBXIDS.  When we receive one of these
3168  * records, we mark the subXIDs as children of the top XID in pg_subtrans,
3169  * and then remove them from KnownAssignedXids.  This prevents overflow of
3170  * KnownAssignedXids and snapshots, at the cost that status checks for these
3171  * subXIDs will take a slower path through TransactionIdIsInProgress().
3172  * This means that KnownAssignedXids is not necessarily complete for subXIDs,
3173  * though it should be complete for top-level XIDs; this is the same situation
3174  * that holds with respect to the PGPROC entries in normal running.
3175  *
3176  * When we throw away subXIDs from KnownAssignedXids, we need to keep track of
3177  * that, similarly to tracking overflow of a PGPROC's subxids array.  We do
3178  * that by remembering the lastOverflowedXID, ie the last thrown-away subXID.
3179  * As long as that is within the range of interesting XIDs, we have to assume
3180  * that subXIDs are missing from snapshots.  (Note that subXID overflow occurs
3181  * on primary when 65th subXID arrives, whereas on standby it occurs when 64th
3182  * subXID arrives - that is not an error.)
3183  *
3184  * Should a backend on primary somehow disappear before it can write an abort
3185  * record, then we just leave those XIDs in KnownAssignedXids. They actually
3186  * aborted but we think they were running; the distinction is irrelevant
3187  * because either way any changes done by the transaction are not visible to
3188  * backends in the standby.  We prune KnownAssignedXids when
3189  * XLOG_RUNNING_XACTS arrives, to forestall possible overflow of the
3190  * array due to such dead XIDs.
3191  */
3192 
3193 /*
3194  * RecordKnownAssignedTransactionIds
3195  *		Record the given XID in KnownAssignedXids, as well as any preceding
3196  *		unobserved XIDs.
3197  *
3198  * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
3199  * associated with a transaction. Must be called for each record after we
3200  * have executed StartupCLOG() et al, since we must ExtendCLOG() etc..
3201  *
3202  * Called during recovery in analogy with and in place of GetNewTransactionId()
3203  */
3204 void
RecordKnownAssignedTransactionIds(TransactionId xid)3205 RecordKnownAssignedTransactionIds(TransactionId xid)
3206 {
3207 	Assert(standbyState >= STANDBY_INITIALIZED);
3208 	Assert(TransactionIdIsValid(xid));
3209 	Assert(TransactionIdIsValid(latestObservedXid));
3210 
3211 	elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
3212 		 xid, latestObservedXid);
3213 
3214 	/*
3215 	 * When a newly observed xid arrives, it is frequently the case that it is
3216 	 * *not* the next xid in sequence. When this occurs, we must treat the
3217 	 * intervening xids as running also.
3218 	 */
3219 	if (TransactionIdFollows(xid, latestObservedXid))
3220 	{
3221 		TransactionId next_expected_xid;
3222 
3223 		/*
3224 		 * Extend subtrans like we do in GetNewTransactionId() during normal
3225 		 * operation using individual extend steps. Note that we do not need
3226 		 * to extend clog since its extensions are WAL logged.
3227 		 *
3228 		 * This part has to be done regardless of standbyState since we
3229 		 * immediately start assigning subtransactions to their toplevel
3230 		 * transactions.
3231 		 */
3232 		next_expected_xid = latestObservedXid;
3233 		while (TransactionIdPrecedes(next_expected_xid, xid))
3234 		{
3235 			TransactionIdAdvance(next_expected_xid);
3236 			ExtendSUBTRANS(next_expected_xid);
3237 		}
3238 		Assert(next_expected_xid == xid);
3239 
3240 		/*
3241 		 * If the KnownAssignedXids machinery isn't up yet, there's nothing
3242 		 * more to do since we don't track assigned xids yet.
3243 		 */
3244 		if (standbyState <= STANDBY_INITIALIZED)
3245 		{
3246 			latestObservedXid = xid;
3247 			return;
3248 		}
3249 
3250 		/*
3251 		 * Add (latestObservedXid, xid] onto the KnownAssignedXids array.
3252 		 */
3253 		next_expected_xid = latestObservedXid;
3254 		TransactionIdAdvance(next_expected_xid);
3255 		KnownAssignedXidsAdd(next_expected_xid, xid, false);
3256 
3257 		/*
3258 		 * Now we can advance latestObservedXid
3259 		 */
3260 		latestObservedXid = xid;
3261 
3262 		/* ShmemVariableCache->nextFullXid must be beyond any observed xid */
3263 		AdvanceNextFullTransactionIdPastXid(latestObservedXid);
3264 		next_expected_xid = latestObservedXid;
3265 		TransactionIdAdvance(next_expected_xid);
3266 	}
3267 }
3268 
3269 /*
3270  * ExpireTreeKnownAssignedTransactionIds
3271  *		Remove the given XIDs from KnownAssignedXids.
3272  *
3273  * Called during recovery in analogy with and in place of ProcArrayEndTransaction()
3274  */
3275 void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid,int nsubxids,TransactionId * subxids,TransactionId max_xid)3276 ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
3277 									  TransactionId *subxids, TransactionId max_xid)
3278 {
3279 	Assert(standbyState >= STANDBY_INITIALIZED);
3280 
3281 	/*
3282 	 * Uses same locking as transaction commit
3283 	 */
3284 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3285 
3286 	KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
3287 
3288 	/* As in ProcArrayEndTransaction, advance latestCompletedXid */
3289 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
3290 							  max_xid))
3291 		ShmemVariableCache->latestCompletedXid = max_xid;
3292 
3293 	LWLockRelease(ProcArrayLock);
3294 }
3295 
3296 /*
3297  * ExpireAllKnownAssignedTransactionIds
3298  *		Remove all entries in KnownAssignedXids and reset lastOverflowedXid.
3299  */
3300 void
ExpireAllKnownAssignedTransactionIds(void)3301 ExpireAllKnownAssignedTransactionIds(void)
3302 {
3303 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3304 	KnownAssignedXidsRemovePreceding(InvalidTransactionId);
3305 
3306 	/*
3307 	 * Reset lastOverflowedXid.  Currently, lastOverflowedXid has no use after
3308 	 * the call of this function.  But do this for unification with what
3309 	 * ExpireOldKnownAssignedTransactionIds() do.
3310 	 */
3311 	procArray->lastOverflowedXid = InvalidTransactionId;
3312 	LWLockRelease(ProcArrayLock);
3313 }
3314 
3315 /*
3316  * ExpireOldKnownAssignedTransactionIds
3317  *		Remove KnownAssignedXids entries preceding the given XID and
3318  *		potentially reset lastOverflowedXid.
3319  */
3320 void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)3321 ExpireOldKnownAssignedTransactionIds(TransactionId xid)
3322 {
3323 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3324 
3325 	/*
3326 	 * Reset lastOverflowedXid if we know all transactions that have been
3327 	 * possibly running are being gone.  Not doing so could cause an incorrect
3328 	 * lastOverflowedXid value, which makes extra snapshots be marked as
3329 	 * suboverflowed.
3330 	 */
3331 	if (TransactionIdPrecedes(procArray->lastOverflowedXid, xid))
3332 		procArray->lastOverflowedXid = InvalidTransactionId;
3333 	KnownAssignedXidsRemovePreceding(xid);
3334 	LWLockRelease(ProcArrayLock);
3335 }
3336 
3337 
3338 /*
3339  * Private module functions to manipulate KnownAssignedXids
3340  *
3341  * There are 5 main uses of the KnownAssignedXids data structure:
3342  *
3343  *	* backends taking snapshots - all valid XIDs need to be copied out
3344  *	* backends seeking to determine presence of a specific XID
3345  *	* startup process adding new known-assigned XIDs
3346  *	* startup process removing specific XIDs as transactions end
3347  *	* startup process pruning array when special WAL records arrive
3348  *
3349  * This data structure is known to be a hot spot during Hot Standby, so we
3350  * go to some lengths to make these operations as efficient and as concurrent
3351  * as possible.
3352  *
3353  * The XIDs are stored in an array in sorted order --- TransactionIdPrecedes
3354  * order, to be exact --- to allow binary search for specific XIDs.  Note:
3355  * in general TransactionIdPrecedes would not provide a total order, but
3356  * we know that the entries present at any instant should not extend across
3357  * a large enough fraction of XID space to wrap around (the master would
3358  * shut down for fear of XID wrap long before that happens).  So it's OK to
3359  * use TransactionIdPrecedes as a binary-search comparator.
3360  *
3361  * It's cheap to maintain the sortedness during insertions, since new known
3362  * XIDs are always reported in XID order; we just append them at the right.
3363  *
3364  * To keep individual deletions cheap, we need to allow gaps in the array.
3365  * This is implemented by marking array elements as valid or invalid using
3366  * the parallel boolean array KnownAssignedXidsValid[].  A deletion is done
3367  * by setting KnownAssignedXidsValid[i] to false, *without* clearing the
3368  * XID entry itself.  This preserves the property that the XID entries are
3369  * sorted, so we can do binary searches easily.  Periodically we compress
3370  * out the unused entries; that's much cheaper than having to compress the
3371  * array immediately on every deletion.
3372  *
3373  * The actually valid items in KnownAssignedXids[] and KnownAssignedXidsValid[]
3374  * are those with indexes tail <= i < head; items outside this subscript range
3375  * have unspecified contents.  When head reaches the end of the array, we
3376  * force compression of unused entries rather than wrapping around, since
3377  * allowing wraparound would greatly complicate the search logic.  We maintain
3378  * an explicit tail pointer so that pruning of old XIDs can be done without
3379  * immediately moving the array contents.  In most cases only a small fraction
3380  * of the array contains valid entries at any instant.
3381  *
3382  * Although only the startup process can ever change the KnownAssignedXids
3383  * data structure, we still need interlocking so that standby backends will
3384  * not observe invalid intermediate states.  The convention is that backends
3385  * must hold shared ProcArrayLock to examine the array.  To remove XIDs from
3386  * the array, the startup process must hold ProcArrayLock exclusively, for
3387  * the usual transactional reasons (compare commit/abort of a transaction
3388  * during normal running).  Compressing unused entries out of the array
3389  * likewise requires exclusive lock.  To add XIDs to the array, we just insert
3390  * them into slots to the right of the head pointer and then advance the head
3391  * pointer.  This wouldn't require any lock at all, except that on machines
3392  * with weak memory ordering we need to be careful that other processors
3393  * see the array element changes before they see the head pointer change.
3394  * We handle this by using a spinlock to protect reads and writes of the
3395  * head/tail pointers.  (We could dispense with the spinlock if we were to
3396  * create suitable memory access barrier primitives and use those instead.)
3397  * The spinlock must be taken to read or write the head/tail pointers unless
3398  * the caller holds ProcArrayLock exclusively.
3399  *
3400  * Algorithmic analysis:
3401  *
3402  * If we have a maximum of M slots, with N XIDs currently spread across
3403  * S elements then we have N <= S <= M always.
3404  *
3405  *	* Adding a new XID is O(1) and needs little locking (unless compression
3406  *		must happen)
3407  *	* Compressing the array is O(S) and requires exclusive lock
3408  *	* Removing an XID is O(logS) and requires exclusive lock
3409  *	* Taking a snapshot is O(S) and requires shared lock
3410  *	* Checking for an XID is O(logS) and requires shared lock
3411  *
3412  * In comparison, using a hash table for KnownAssignedXids would mean that
3413  * taking snapshots would be O(M). If we can maintain S << M then the
3414  * sorted array technique will deliver significantly faster snapshots.
3415  * If we try to keep S too small then we will spend too much time compressing,
3416  * so there is an optimal point for any workload mix. We use a heuristic to
3417  * decide when to compress the array, though trimming also helps reduce
3418  * frequency of compressing. The heuristic requires us to track the number of
3419  * currently valid XIDs in the array.
3420  */
3421 
3422 
3423 /*
3424  * Compress KnownAssignedXids by shifting valid data down to the start of the
3425  * array, removing any gaps.
3426  *
3427  * A compression step is forced if "force" is true, otherwise we do it
3428  * only if a heuristic indicates it's a good time to do it.
3429  *
3430  * Caller must hold ProcArrayLock in exclusive mode.
3431  */
3432 static void
KnownAssignedXidsCompress(bool force)3433 KnownAssignedXidsCompress(bool force)
3434 {
3435 	ProcArrayStruct *pArray = procArray;
3436 	int			head,
3437 				tail;
3438 	int			compress_index;
3439 	int			i;
3440 
3441 	/* no spinlock required since we hold ProcArrayLock exclusively */
3442 	head = pArray->headKnownAssignedXids;
3443 	tail = pArray->tailKnownAssignedXids;
3444 
3445 	if (!force)
3446 	{
3447 		/*
3448 		 * If we can choose how much to compress, use a heuristic to avoid
3449 		 * compressing too often or not often enough.
3450 		 *
3451 		 * Heuristic is if we have a large enough current spread and less than
3452 		 * 50% of the elements are currently in use, then compress. This
3453 		 * should ensure we compress fairly infrequently. We could compress
3454 		 * less often though the virtual array would spread out more and
3455 		 * snapshots would become more expensive.
3456 		 */
3457 		int			nelements = head - tail;
3458 
3459 		if (nelements < 4 * PROCARRAY_MAXPROCS ||
3460 			nelements < 2 * pArray->numKnownAssignedXids)
3461 			return;
3462 	}
3463 
3464 	/*
3465 	 * We compress the array by reading the valid values from tail to head,
3466 	 * re-aligning data to 0th element.
3467 	 */
3468 	compress_index = 0;
3469 	for (i = tail; i < head; i++)
3470 	{
3471 		if (KnownAssignedXidsValid[i])
3472 		{
3473 			KnownAssignedXids[compress_index] = KnownAssignedXids[i];
3474 			KnownAssignedXidsValid[compress_index] = true;
3475 			compress_index++;
3476 		}
3477 	}
3478 
3479 	pArray->tailKnownAssignedXids = 0;
3480 	pArray->headKnownAssignedXids = compress_index;
3481 }
3482 
3483 /*
3484  * Add xids into KnownAssignedXids at the head of the array.
3485  *
3486  * xids from from_xid to to_xid, inclusive, are added to the array.
3487  *
3488  * If exclusive_lock is true then caller already holds ProcArrayLock in
3489  * exclusive mode, so we need no extra locking here.  Else caller holds no
3490  * lock, so we need to be sure we maintain sufficient interlocks against
3491  * concurrent readers.  (Only the startup process ever calls this, so no need
3492  * to worry about concurrent writers.)
3493  */
3494 static void
KnownAssignedXidsAdd(TransactionId from_xid,TransactionId to_xid,bool exclusive_lock)3495 KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
3496 					 bool exclusive_lock)
3497 {
3498 	ProcArrayStruct *pArray = procArray;
3499 	TransactionId next_xid;
3500 	int			head,
3501 				tail;
3502 	int			nxids;
3503 	int			i;
3504 
3505 	Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));
3506 
3507 	/*
3508 	 * Calculate how many array slots we'll need.  Normally this is cheap; in
3509 	 * the unusual case where the XIDs cross the wrap point, we do it the hard
3510 	 * way.
3511 	 */
3512 	if (to_xid >= from_xid)
3513 		nxids = to_xid - from_xid + 1;
3514 	else
3515 	{
3516 		nxids = 1;
3517 		next_xid = from_xid;
3518 		while (TransactionIdPrecedes(next_xid, to_xid))
3519 		{
3520 			nxids++;
3521 			TransactionIdAdvance(next_xid);
3522 		}
3523 	}
3524 
3525 	/*
3526 	 * Since only the startup process modifies the head/tail pointers, we
3527 	 * don't need a lock to read them here.
3528 	 */
3529 	head = pArray->headKnownAssignedXids;
3530 	tail = pArray->tailKnownAssignedXids;
3531 
3532 	Assert(head >= 0 && head <= pArray->maxKnownAssignedXids);
3533 	Assert(tail >= 0 && tail < pArray->maxKnownAssignedXids);
3534 
3535 	/*
3536 	 * Verify that insertions occur in TransactionId sequence.  Note that even
3537 	 * if the last existing element is marked invalid, it must still have a
3538 	 * correctly sequenced XID value.
3539 	 */
3540 	if (head > tail &&
3541 		TransactionIdFollowsOrEquals(KnownAssignedXids[head - 1], from_xid))
3542 	{
3543 		KnownAssignedXidsDisplay(LOG);
3544 		elog(ERROR, "out-of-order XID insertion in KnownAssignedXids");
3545 	}
3546 
3547 	/*
3548 	 * If our xids won't fit in the remaining space, compress out free space
3549 	 */
3550 	if (head + nxids > pArray->maxKnownAssignedXids)
3551 	{
3552 		/* must hold lock to compress */
3553 		if (!exclusive_lock)
3554 			LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3555 
3556 		KnownAssignedXidsCompress(true);
3557 
3558 		head = pArray->headKnownAssignedXids;
3559 		/* note: we no longer care about the tail pointer */
3560 
3561 		if (!exclusive_lock)
3562 			LWLockRelease(ProcArrayLock);
3563 
3564 		/*
3565 		 * If it still won't fit then we're out of memory
3566 		 */
3567 		if (head + nxids > pArray->maxKnownAssignedXids)
3568 			elog(ERROR, "too many KnownAssignedXids");
3569 	}
3570 
3571 	/* Now we can insert the xids into the space starting at head */
3572 	next_xid = from_xid;
3573 	for (i = 0; i < nxids; i++)
3574 	{
3575 		KnownAssignedXids[head] = next_xid;
3576 		KnownAssignedXidsValid[head] = true;
3577 		TransactionIdAdvance(next_xid);
3578 		head++;
3579 	}
3580 
3581 	/* Adjust count of number of valid entries */
3582 	pArray->numKnownAssignedXids += nxids;
3583 
3584 	/*
3585 	 * Now update the head pointer.  We use a spinlock to protect this
3586 	 * pointer, not because the update is likely to be non-atomic, but to
3587 	 * ensure that other processors see the above array updates before they
3588 	 * see the head pointer change.
3589 	 *
3590 	 * If we're holding ProcArrayLock exclusively, there's no need to take the
3591 	 * spinlock.
3592 	 */
3593 	if (exclusive_lock)
3594 		pArray->headKnownAssignedXids = head;
3595 	else
3596 	{
3597 		SpinLockAcquire(&pArray->known_assigned_xids_lck);
3598 		pArray->headKnownAssignedXids = head;
3599 		SpinLockRelease(&pArray->known_assigned_xids_lck);
3600 	}
3601 }
3602 
3603 /*
3604  * KnownAssignedXidsSearch
3605  *
3606  * Searches KnownAssignedXids for a specific xid and optionally removes it.
3607  * Returns true if it was found, false if not.
3608  *
3609  * Caller must hold ProcArrayLock in shared or exclusive mode.
3610  * Exclusive lock must be held for remove = true.
3611  */
3612 static bool
KnownAssignedXidsSearch(TransactionId xid,bool remove)3613 KnownAssignedXidsSearch(TransactionId xid, bool remove)
3614 {
3615 	ProcArrayStruct *pArray = procArray;
3616 	int			first,
3617 				last;
3618 	int			head;
3619 	int			tail;
3620 	int			result_index = -1;
3621 
3622 	if (remove)
3623 	{
3624 		/* we hold ProcArrayLock exclusively, so no need for spinlock */
3625 		tail = pArray->tailKnownAssignedXids;
3626 		head = pArray->headKnownAssignedXids;
3627 	}
3628 	else
3629 	{
3630 		/* take spinlock to ensure we see up-to-date array contents */
3631 		SpinLockAcquire(&pArray->known_assigned_xids_lck);
3632 		tail = pArray->tailKnownAssignedXids;
3633 		head = pArray->headKnownAssignedXids;
3634 		SpinLockRelease(&pArray->known_assigned_xids_lck);
3635 	}
3636 
3637 	/*
3638 	 * Standard binary search.  Note we can ignore the KnownAssignedXidsValid
3639 	 * array here, since even invalid entries will contain sorted XIDs.
3640 	 */
3641 	first = tail;
3642 	last = head - 1;
3643 	while (first <= last)
3644 	{
3645 		int			mid_index;
3646 		TransactionId mid_xid;
3647 
3648 		mid_index = (first + last) / 2;
3649 		mid_xid = KnownAssignedXids[mid_index];
3650 
3651 		if (xid == mid_xid)
3652 		{
3653 			result_index = mid_index;
3654 			break;
3655 		}
3656 		else if (TransactionIdPrecedes(xid, mid_xid))
3657 			last = mid_index - 1;
3658 		else
3659 			first = mid_index + 1;
3660 	}
3661 
3662 	if (result_index < 0)
3663 		return false;			/* not in array */
3664 
3665 	if (!KnownAssignedXidsValid[result_index])
3666 		return false;			/* in array, but invalid */
3667 
3668 	if (remove)
3669 	{
3670 		KnownAssignedXidsValid[result_index] = false;
3671 
3672 		pArray->numKnownAssignedXids--;
3673 		Assert(pArray->numKnownAssignedXids >= 0);
3674 
3675 		/*
3676 		 * If we're removing the tail element then advance tail pointer over
3677 		 * any invalid elements.  This will speed future searches.
3678 		 */
3679 		if (result_index == tail)
3680 		{
3681 			tail++;
3682 			while (tail < head && !KnownAssignedXidsValid[tail])
3683 				tail++;
3684 			if (tail >= head)
3685 			{
3686 				/* Array is empty, so we can reset both pointers */
3687 				pArray->headKnownAssignedXids = 0;
3688 				pArray->tailKnownAssignedXids = 0;
3689 			}
3690 			else
3691 			{
3692 				pArray->tailKnownAssignedXids = tail;
3693 			}
3694 		}
3695 	}
3696 
3697 	return true;
3698 }
3699 
3700 /*
3701  * Is the specified XID present in KnownAssignedXids[]?
3702  *
3703  * Caller must hold ProcArrayLock in shared or exclusive mode.
3704  */
3705 static bool
KnownAssignedXidExists(TransactionId xid)3706 KnownAssignedXidExists(TransactionId xid)
3707 {
3708 	Assert(TransactionIdIsValid(xid));
3709 
3710 	return KnownAssignedXidsSearch(xid, false);
3711 }
3712 
3713 /*
3714  * Remove the specified XID from KnownAssignedXids[].
3715  *
3716  * Caller must hold ProcArrayLock in exclusive mode.
3717  */
3718 static void
KnownAssignedXidsRemove(TransactionId xid)3719 KnownAssignedXidsRemove(TransactionId xid)
3720 {
3721 	Assert(TransactionIdIsValid(xid));
3722 
3723 	elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
3724 
3725 	/*
3726 	 * Note: we cannot consider it an error to remove an XID that's not
3727 	 * present.  We intentionally remove subxact IDs while processing
3728 	 * XLOG_XACT_ASSIGNMENT, to avoid array overflow.  Then those XIDs will be
3729 	 * removed again when the top-level xact commits or aborts.
3730 	 *
3731 	 * It might be possible to track such XIDs to distinguish this case from
3732 	 * actual errors, but it would be complicated and probably not worth it.
3733 	 * So, just ignore the search result.
3734 	 */
3735 	(void) KnownAssignedXidsSearch(xid, true);
3736 }
3737 
3738 /*
3739  * KnownAssignedXidsRemoveTree
3740  *		Remove xid (if it's not InvalidTransactionId) and all the subxids.
3741  *
3742  * Caller must hold ProcArrayLock in exclusive mode.
3743  */
3744 static void
KnownAssignedXidsRemoveTree(TransactionId xid,int nsubxids,TransactionId * subxids)3745 KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
3746 							TransactionId *subxids)
3747 {
3748 	int			i;
3749 
3750 	if (TransactionIdIsValid(xid))
3751 		KnownAssignedXidsRemove(xid);
3752 
3753 	for (i = 0; i < nsubxids; i++)
3754 		KnownAssignedXidsRemove(subxids[i]);
3755 
3756 	/* Opportunistically compress the array */
3757 	KnownAssignedXidsCompress(false);
3758 }
3759 
3760 /*
3761  * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
3762  * then clear the whole table.
3763  *
3764  * Caller must hold ProcArrayLock in exclusive mode.
3765  */
3766 static void
KnownAssignedXidsRemovePreceding(TransactionId removeXid)3767 KnownAssignedXidsRemovePreceding(TransactionId removeXid)
3768 {
3769 	ProcArrayStruct *pArray = procArray;
3770 	int			count = 0;
3771 	int			head,
3772 				tail,
3773 				i;
3774 
3775 	if (!TransactionIdIsValid(removeXid))
3776 	{
3777 		elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
3778 		pArray->numKnownAssignedXids = 0;
3779 		pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
3780 		return;
3781 	}
3782 
3783 	elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
3784 
3785 	/*
3786 	 * Mark entries invalid starting at the tail.  Since array is sorted, we
3787 	 * can stop as soon as we reach an entry >= removeXid.
3788 	 */
3789 	tail = pArray->tailKnownAssignedXids;
3790 	head = pArray->headKnownAssignedXids;
3791 
3792 	for (i = tail; i < head; i++)
3793 	{
3794 		if (KnownAssignedXidsValid[i])
3795 		{
3796 			TransactionId knownXid = KnownAssignedXids[i];
3797 
3798 			if (TransactionIdFollowsOrEquals(knownXid, removeXid))
3799 				break;
3800 
3801 			if (!StandbyTransactionIdIsPrepared(knownXid))
3802 			{
3803 				KnownAssignedXidsValid[i] = false;
3804 				count++;
3805 			}
3806 		}
3807 	}
3808 
3809 	pArray->numKnownAssignedXids -= count;
3810 	Assert(pArray->numKnownAssignedXids >= 0);
3811 
3812 	/*
3813 	 * Advance the tail pointer if we've marked the tail item invalid.
3814 	 */
3815 	for (i = tail; i < head; i++)
3816 	{
3817 		if (KnownAssignedXidsValid[i])
3818 			break;
3819 	}
3820 	if (i >= head)
3821 	{
3822 		/* Array is empty, so we can reset both pointers */
3823 		pArray->headKnownAssignedXids = 0;
3824 		pArray->tailKnownAssignedXids = 0;
3825 	}
3826 	else
3827 	{
3828 		pArray->tailKnownAssignedXids = i;
3829 	}
3830 
3831 	/* Opportunistically compress the array */
3832 	KnownAssignedXidsCompress(false);
3833 }
3834 
3835 /*
3836  * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
3837  * We filter out anything >= xmax.
3838  *
3839  * Returns the number of XIDs stored into xarray[].  Caller is responsible
3840  * that array is large enough.
3841  *
3842  * Caller must hold ProcArrayLock in (at least) shared mode.
3843  */
3844 static int
KnownAssignedXidsGet(TransactionId * xarray,TransactionId xmax)3845 KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
3846 {
3847 	TransactionId xtmp = InvalidTransactionId;
3848 
3849 	return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
3850 }
3851 
3852 /*
3853  * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus
3854  * we reduce *xmin to the lowest xid value seen if not already lower.
3855  *
3856  * Caller must hold ProcArrayLock in (at least) shared mode.
3857  */
3858 static int
KnownAssignedXidsGetAndSetXmin(TransactionId * xarray,TransactionId * xmin,TransactionId xmax)3859 KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
3860 							   TransactionId xmax)
3861 {
3862 	int			count = 0;
3863 	int			head,
3864 				tail;
3865 	int			i;
3866 
3867 	/*
3868 	 * Fetch head just once, since it may change while we loop. We can stop
3869 	 * once we reach the initially seen head, since we are certain that an xid
3870 	 * cannot enter and then leave the array while we hold ProcArrayLock.  We
3871 	 * might miss newly-added xids, but they should be >= xmax so irrelevant
3872 	 * anyway.
3873 	 *
3874 	 * Must take spinlock to ensure we see up-to-date array contents.
3875 	 */
3876 	SpinLockAcquire(&procArray->known_assigned_xids_lck);
3877 	tail = procArray->tailKnownAssignedXids;
3878 	head = procArray->headKnownAssignedXids;
3879 	SpinLockRelease(&procArray->known_assigned_xids_lck);
3880 
3881 	for (i = tail; i < head; i++)
3882 	{
3883 		/* Skip any gaps in the array */
3884 		if (KnownAssignedXidsValid[i])
3885 		{
3886 			TransactionId knownXid = KnownAssignedXids[i];
3887 
3888 			/*
3889 			 * Update xmin if required.  Only the first XID need be checked,
3890 			 * since the array is sorted.
3891 			 */
3892 			if (count == 0 &&
3893 				TransactionIdPrecedes(knownXid, *xmin))
3894 				*xmin = knownXid;
3895 
3896 			/*
3897 			 * Filter out anything >= xmax, again relying on sorted property
3898 			 * of array.
3899 			 */
3900 			if (TransactionIdIsValid(xmax) &&
3901 				TransactionIdFollowsOrEquals(knownXid, xmax))
3902 				break;
3903 
3904 			/* Add knownXid into output array */
3905 			xarray[count++] = knownXid;
3906 		}
3907 	}
3908 
3909 	return count;
3910 }
3911 
3912 /*
3913  * Get oldest XID in the KnownAssignedXids array, or InvalidTransactionId
3914  * if nothing there.
3915  */
3916 static TransactionId
KnownAssignedXidsGetOldestXmin(void)3917 KnownAssignedXidsGetOldestXmin(void)
3918 {
3919 	int			head,
3920 				tail;
3921 	int			i;
3922 
3923 	/*
3924 	 * Fetch head just once, since it may change while we loop.
3925 	 */
3926 	SpinLockAcquire(&procArray->known_assigned_xids_lck);
3927 	tail = procArray->tailKnownAssignedXids;
3928 	head = procArray->headKnownAssignedXids;
3929 	SpinLockRelease(&procArray->known_assigned_xids_lck);
3930 
3931 	for (i = tail; i < head; i++)
3932 	{
3933 		/* Skip any gaps in the array */
3934 		if (KnownAssignedXidsValid[i])
3935 			return KnownAssignedXids[i];
3936 	}
3937 
3938 	return InvalidTransactionId;
3939 }
3940 
3941 /*
3942  * Display KnownAssignedXids to provide debug trail
3943  *
3944  * Currently this is only called within startup process, so we need no
3945  * special locking.
3946  *
3947  * Note this is pretty expensive, and much of the expense will be incurred
3948  * even if the elog message will get discarded.  It's not currently called
3949  * in any performance-critical places, however, so no need to be tenser.
3950  */
3951 static void
KnownAssignedXidsDisplay(int trace_level)3952 KnownAssignedXidsDisplay(int trace_level)
3953 {
3954 	ProcArrayStruct *pArray = procArray;
3955 	StringInfoData buf;
3956 	int			head,
3957 				tail,
3958 				i;
3959 	int			nxids = 0;
3960 
3961 	tail = pArray->tailKnownAssignedXids;
3962 	head = pArray->headKnownAssignedXids;
3963 
3964 	initStringInfo(&buf);
3965 
3966 	for (i = tail; i < head; i++)
3967 	{
3968 		if (KnownAssignedXidsValid[i])
3969 		{
3970 			nxids++;
3971 			appendStringInfo(&buf, "[%d]=%u ", i, KnownAssignedXids[i]);
3972 		}
3973 	}
3974 
3975 	elog(trace_level, "%d KnownAssignedXids (num=%d tail=%d head=%d) %s",
3976 		 nxids,
3977 		 pArray->numKnownAssignedXids,
3978 		 pArray->tailKnownAssignedXids,
3979 		 pArray->headKnownAssignedXids,
3980 		 buf.data);
3981 
3982 	pfree(buf.data);
3983 }
3984 
3985 /*
3986  * KnownAssignedXidsReset
3987  *		Resets KnownAssignedXids to be empty
3988  */
3989 static void
KnownAssignedXidsReset(void)3990 KnownAssignedXidsReset(void)
3991 {
3992 	ProcArrayStruct *pArray = procArray;
3993 
3994 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3995 
3996 	pArray->numKnownAssignedXids = 0;
3997 	pArray->tailKnownAssignedXids = 0;
3998 	pArray->headKnownAssignedXids = 0;
3999 
4000 	LWLockRelease(ProcArrayLock);
4001 }
4002