1 /*-------------------------------------------------------------------------
2  *
3  * standby.c
4  *	  Misc functions used in Hot Standby mode.
5  *
6  *	All functions for handling RM_STANDBY_ID, which relate to
7  *	AccessExclusiveLocks and starting snapshots for Hot Standby mode.
8  *	Plus conflict recovery processing.
9  *
10  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *	  src/backend/storage/ipc/standby.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 #include "access/transam.h"
20 #include "access/twophase.h"
21 #include "access/xact.h"
22 #include "access/xlog.h"
23 #include "access/xloginsert.h"
24 #include "miscadmin.h"
25 #include "pgstat.h"
26 #include "storage/bufmgr.h"
27 #include "storage/lmgr.h"
28 #include "storage/proc.h"
29 #include "storage/procarray.h"
30 #include "storage/sinvaladt.h"
31 #include "storage/standby.h"
32 #include "utils/hsearch.h"
33 #include "utils/memutils.h"
34 #include "utils/ps_status.h"
35 #include "utils/timeout.h"
36 #include "utils/timestamp.h"
37 
38 /* User-settable GUC parameters */
39 int			vacuum_defer_cleanup_age;
40 int			max_standby_archive_delay = 30 * 1000;
41 int			max_standby_streaming_delay = 30 * 1000;
42 bool		log_recovery_conflict_waits = false;
43 
44 static HTAB *RecoveryLockLists;
45 
46 /* Flags set by timeout handlers */
47 static volatile sig_atomic_t got_standby_deadlock_timeout = false;
48 static volatile sig_atomic_t got_standby_lock_timeout = false;
49 
50 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
51 												   ProcSignalReason reason,
52 												   uint32 wait_event_info,
53 												   bool report_waiting);
54 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
55 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
56 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
57 static const char *get_recovery_conflict_desc(ProcSignalReason reason);
58 
59 /*
60  * Keep track of all the locks owned by a given transaction.
61  */
62 typedef struct RecoveryLockListsEntry
63 {
64 	TransactionId xid;
65 	List	   *locks;
66 } RecoveryLockListsEntry;
67 
68 /*
69  * InitRecoveryTransactionEnvironment
70  *		Initialize tracking of our primary's in-progress transactions.
71  *
72  * We need to issue shared invalidations and hold locks. Holding locks
73  * means others may want to wait on us, so we need to make a lock table
74  * vxact entry like a real transaction. We could create and delete
75  * lock table entries for each transaction but its simpler just to create
76  * one permanent entry and leave it there all the time. Locks are then
77  * acquired and released as needed. Yes, this means you can see the
78  * Startup process in pg_locks once we have run this.
79  */
80 void
InitRecoveryTransactionEnvironment(void)81 InitRecoveryTransactionEnvironment(void)
82 {
83 	VirtualTransactionId vxid;
84 	HASHCTL		hash_ctl;
85 
86 	/*
87 	 * Initialize the hash table for tracking the list of locks held by each
88 	 * transaction.
89 	 */
90 	hash_ctl.keysize = sizeof(TransactionId);
91 	hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
92 	RecoveryLockLists = hash_create("RecoveryLockLists",
93 									64,
94 									&hash_ctl,
95 									HASH_ELEM | HASH_BLOBS);
96 
97 	/*
98 	 * Initialize shared invalidation management for Startup process, being
99 	 * careful to register ourselves as a sendOnly process so we don't need to
100 	 * read messages, nor will we get signaled when the queue starts filling
101 	 * up.
102 	 */
103 	SharedInvalBackendInit(true);
104 
105 	/*
106 	 * Lock a virtual transaction id for Startup process.
107 	 *
108 	 * We need to do GetNextLocalTransactionId() because
109 	 * SharedInvalBackendInit() leaves localTransactionId invalid and the lock
110 	 * manager doesn't like that at all.
111 	 *
112 	 * Note that we don't need to run XactLockTableInsert() because nobody
113 	 * needs to wait on xids. That sounds a little strange, but table locks
114 	 * are held by vxids and row level locks are held by xids. All queries
115 	 * hold AccessShareLocks so never block while we write or lock new rows.
116 	 */
117 	vxid.backendId = MyBackendId;
118 	vxid.localTransactionId = GetNextLocalTransactionId();
119 	VirtualXactLockTableInsert(vxid);
120 
121 	standbyState = STANDBY_INITIALIZED;
122 }
123 
124 /*
125  * ShutdownRecoveryTransactionEnvironment
126  *		Shut down transaction tracking
127  *
128  * Prepare to switch from hot standby mode to normal operation. Shut down
129  * recovery-time transaction tracking.
130  *
131  * This must be called even in shutdown of startup process if transaction
132  * tracking has been initialized. Otherwise some locks the tracked
133  * transactions were holding will not be released and and may interfere with
134  * the processes still running (but will exit soon later) at the exit of
135  * startup process.
136  */
137 void
ShutdownRecoveryTransactionEnvironment(void)138 ShutdownRecoveryTransactionEnvironment(void)
139 {
140 	/*
141 	 * Do nothing if RecoveryLockLists is NULL because which means that
142 	 * transaction tracking has not been yet initialized or has been already
143 	 * shutdowned. This prevents transaction tracking from being shutdowned
144 	 * unexpectedly more than once.
145 	 */
146 	if (RecoveryLockLists == NULL)
147 		return;
148 
149 	/* Mark all tracked in-progress transactions as finished. */
150 	ExpireAllKnownAssignedTransactionIds();
151 
152 	/* Release all locks the tracked transactions were holding */
153 	StandbyReleaseAllLocks();
154 
155 	/* Destroy the hash table of locks. */
156 	hash_destroy(RecoveryLockLists);
157 	RecoveryLockLists = NULL;
158 
159 	/* Cleanup our VirtualTransaction */
160 	VirtualXactLockTableCleanup();
161 }
162 
163 
164 /*
165  * -----------------------------------------------------
166  *		Standby wait timers and backend cancel logic
167  * -----------------------------------------------------
168  */
169 
170 /*
171  * Determine the cutoff time at which we want to start canceling conflicting
172  * transactions.  Returns zero (a time safely in the past) if we are willing
173  * to wait forever.
174  */
175 static TimestampTz
GetStandbyLimitTime(void)176 GetStandbyLimitTime(void)
177 {
178 	TimestampTz rtime;
179 	bool		fromStream;
180 
181 	/*
182 	 * The cutoff time is the last WAL data receipt time plus the appropriate
183 	 * delay variable.  Delay of -1 means wait forever.
184 	 */
185 	GetXLogReceiptTime(&rtime, &fromStream);
186 	if (fromStream)
187 	{
188 		if (max_standby_streaming_delay < 0)
189 			return 0;			/* wait forever */
190 		return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
191 	}
192 	else
193 	{
194 		if (max_standby_archive_delay < 0)
195 			return 0;			/* wait forever */
196 		return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
197 	}
198 }
199 
200 #define STANDBY_INITIAL_WAIT_US  1000
201 static int	standbyWait_us = STANDBY_INITIAL_WAIT_US;
202 
203 /*
204  * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
205  * We wait here for a while then return. If we decide we can't wait any
206  * more then we return true, if we can wait some more return false.
207  */
208 static bool
WaitExceedsMaxStandbyDelay(uint32 wait_event_info)209 WaitExceedsMaxStandbyDelay(uint32 wait_event_info)
210 {
211 	TimestampTz ltime;
212 
213 	CHECK_FOR_INTERRUPTS();
214 
215 	/* Are we past the limit time? */
216 	ltime = GetStandbyLimitTime();
217 	if (ltime && GetCurrentTimestamp() >= ltime)
218 		return true;
219 
220 	/*
221 	 * Sleep a bit (this is essential to avoid busy-waiting).
222 	 */
223 	pgstat_report_wait_start(wait_event_info);
224 	pg_usleep(standbyWait_us);
225 	pgstat_report_wait_end();
226 
227 	/*
228 	 * Progressively increase the sleep times, but not to more than 1s, since
229 	 * pg_usleep isn't interruptible on some platforms.
230 	 */
231 	standbyWait_us *= 2;
232 	if (standbyWait_us > 1000000)
233 		standbyWait_us = 1000000;
234 
235 	return false;
236 }
237 
238 /*
239  * Log the recovery conflict.
240  *
241  * wait_start is the timestamp when the caller started to wait.
242  * now is the timestamp when this function has been called.
243  * wait_list is the list of virtual transaction ids assigned to
244  * conflicting processes. still_waiting indicates whether
245  * the startup process is still waiting for the recovery conflict
246  * to be resolved or not.
247  */
248 void
LogRecoveryConflict(ProcSignalReason reason,TimestampTz wait_start,TimestampTz now,VirtualTransactionId * wait_list,bool still_waiting)249 LogRecoveryConflict(ProcSignalReason reason, TimestampTz wait_start,
250 					TimestampTz now, VirtualTransactionId *wait_list,
251 					bool still_waiting)
252 {
253 	long		secs;
254 	int			usecs;
255 	long		msecs;
256 	StringInfoData buf;
257 	int			nprocs = 0;
258 
259 	/*
260 	 * There must be no conflicting processes when the recovery conflict has
261 	 * already been resolved.
262 	 */
263 	Assert(still_waiting || wait_list == NULL);
264 
265 	TimestampDifference(wait_start, now, &secs, &usecs);
266 	msecs = secs * 1000 + usecs / 1000;
267 	usecs = usecs % 1000;
268 
269 	if (wait_list)
270 	{
271 		VirtualTransactionId *vxids;
272 
273 		/* Construct a string of list of the conflicting processes */
274 		vxids = wait_list;
275 		while (VirtualTransactionIdIsValid(*vxids))
276 		{
277 			PGPROC	   *proc = BackendIdGetProc(vxids->backendId);
278 
279 			/* proc can be NULL if the target backend is not active */
280 			if (proc)
281 			{
282 				if (nprocs == 0)
283 				{
284 					initStringInfo(&buf);
285 					appendStringInfo(&buf, "%d", proc->pid);
286 				}
287 				else
288 					appendStringInfo(&buf, ", %d", proc->pid);
289 
290 				nprocs++;
291 			}
292 
293 			vxids++;
294 		}
295 	}
296 
297 	/*
298 	 * If wait_list is specified, report the list of PIDs of active
299 	 * conflicting backends in a detail message. Note that if all the backends
300 	 * in the list are not active, no detail message is logged.
301 	 */
302 	if (still_waiting)
303 	{
304 		ereport(LOG,
305 				errmsg("recovery still waiting after %ld.%03d ms: %s",
306 					   msecs, usecs, get_recovery_conflict_desc(reason)),
307 				nprocs > 0 ? errdetail_log_plural("Conflicting process: %s.",
308 												  "Conflicting processes: %s.",
309 												  nprocs, buf.data) : 0);
310 	}
311 	else
312 	{
313 		ereport(LOG,
314 				errmsg("recovery finished waiting after %ld.%03d ms: %s",
315 					   msecs, usecs, get_recovery_conflict_desc(reason)));
316 	}
317 
318 	if (nprocs > 0)
319 		pfree(buf.data);
320 }
321 
322 /*
323  * This is the main executioner for any query backend that conflicts with
324  * recovery processing. Judgement has already been passed on it within
325  * a specific rmgr. Here we just issue the orders to the procs. The procs
326  * then throw the required error as instructed.
327  *
328  * If report_waiting is true, "waiting" is reported in PS display and the
329  * wait for recovery conflict is reported in the log, if necessary. If
330  * the caller is responsible for reporting them, report_waiting should be
331  * false. Otherwise, both the caller and this function report the same
332  * thing unexpectedly.
333  */
334 static void
ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId * waitlist,ProcSignalReason reason,uint32 wait_event_info,bool report_waiting)335 ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
336 									   ProcSignalReason reason, uint32 wait_event_info,
337 									   bool report_waiting)
338 {
339 	TimestampTz waitStart = 0;
340 	char	   *new_status = NULL;
341 	bool		logged_recovery_conflict = false;
342 
343 	/* Fast exit, to avoid a kernel call if there's no work to be done. */
344 	if (!VirtualTransactionIdIsValid(*waitlist))
345 		return;
346 
347 	/* Set the wait start timestamp for reporting */
348 	if (report_waiting && (log_recovery_conflict_waits || update_process_title))
349 		waitStart = GetCurrentTimestamp();
350 
351 	while (VirtualTransactionIdIsValid(*waitlist))
352 	{
353 		/* reset standbyWait_us for each xact we wait for */
354 		standbyWait_us = STANDBY_INITIAL_WAIT_US;
355 
356 		/* wait until the virtual xid is gone */
357 		while (!VirtualXactLock(*waitlist, false))
358 		{
359 			/* Is it time to kill it? */
360 			if (WaitExceedsMaxStandbyDelay(wait_event_info))
361 			{
362 				pid_t		pid;
363 
364 				/*
365 				 * Now find out who to throw out of the balloon.
366 				 */
367 				Assert(VirtualTransactionIdIsValid(*waitlist));
368 				pid = CancelVirtualTransaction(*waitlist, reason);
369 
370 				/*
371 				 * Wait a little bit for it to die so that we avoid flooding
372 				 * an unresponsive backend when system is heavily loaded.
373 				 */
374 				if (pid != 0)
375 					pg_usleep(5000L);
376 			}
377 
378 			if (waitStart != 0 && (!logged_recovery_conflict || new_status == NULL))
379 			{
380 				TimestampTz now = 0;
381 				bool		maybe_log_conflict;
382 				bool		maybe_update_title;
383 
384 				maybe_log_conflict = (log_recovery_conflict_waits && !logged_recovery_conflict);
385 				maybe_update_title = (update_process_title && new_status == NULL);
386 
387 				/* Get the current timestamp if not report yet */
388 				if (maybe_log_conflict || maybe_update_title)
389 					now = GetCurrentTimestamp();
390 
391 				/*
392 				 * Report via ps if we have been waiting for more than 500
393 				 * msec (should that be configurable?)
394 				 */
395 				if (maybe_update_title &&
396 					TimestampDifferenceExceeds(waitStart, now, 500))
397 				{
398 					const char *old_status;
399 					int			len;
400 
401 					old_status = get_ps_display(&len);
402 					new_status = (char *) palloc(len + 8 + 1);
403 					memcpy(new_status, old_status, len);
404 					strcpy(new_status + len, " waiting");
405 					set_ps_display(new_status);
406 					new_status[len] = '\0'; /* truncate off " waiting" */
407 				}
408 
409 				/*
410 				 * Emit the log message if the startup process is waiting
411 				 * longer than deadlock_timeout for recovery conflict.
412 				 */
413 				if (maybe_log_conflict &&
414 					TimestampDifferenceExceeds(waitStart, now, DeadlockTimeout))
415 				{
416 					LogRecoveryConflict(reason, waitStart, now, waitlist, true);
417 					logged_recovery_conflict = true;
418 				}
419 			}
420 		}
421 
422 		/* The virtual transaction is gone now, wait for the next one */
423 		waitlist++;
424 	}
425 
426 	/*
427 	 * Emit the log message if recovery conflict was resolved but the startup
428 	 * process waited longer than deadlock_timeout for it.
429 	 */
430 	if (logged_recovery_conflict)
431 		LogRecoveryConflict(reason, waitStart, GetCurrentTimestamp(),
432 							NULL, false);
433 
434 	/* Reset ps display if we changed it */
435 	if (new_status)
436 	{
437 		set_ps_display(new_status);
438 		pfree(new_status);
439 	}
440 }
441 
442 void
ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,RelFileNode node)443 ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
444 {
445 	VirtualTransactionId *backends;
446 
447 	/*
448 	 * If we get passed InvalidTransactionId then we do nothing (no conflict).
449 	 *
450 	 * This can happen when replaying already-applied WAL records after a
451 	 * standby crash or restart, or when replaying an XLOG_HEAP2_VISIBLE
452 	 * record that marks as frozen a page which was already all-visible.  It's
453 	 * also quite common with records generated during index deletion
454 	 * (original execution of the deletion can reason that a recovery conflict
455 	 * which is sufficient for the deletion operation must take place before
456 	 * replay of the deletion record itself).
457 	 */
458 	if (!TransactionIdIsValid(latestRemovedXid))
459 		return;
460 
461 	backends = GetConflictingVirtualXIDs(latestRemovedXid,
462 										 node.dbNode);
463 
464 	ResolveRecoveryConflictWithVirtualXIDs(backends,
465 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
466 										   WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
467 										   true);
468 }
469 
470 /*
471  * Variant of ResolveRecoveryConflictWithSnapshot that works with
472  * FullTransactionId values
473  */
474 void
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId latestRemovedFullXid,RelFileNode node)475 ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId latestRemovedFullXid,
476 										   RelFileNode node)
477 {
478 	/*
479 	 * ResolveRecoveryConflictWithSnapshot operates on 32-bit TransactionIds,
480 	 * so truncate the logged FullTransactionId.  If the logged value is very
481 	 * old, so that XID wrap-around already happened on it, there can't be any
482 	 * snapshots that still see it.
483 	 */
484 	FullTransactionId nextXid = ReadNextFullTransactionId();
485 	uint64		diff;
486 
487 	diff = U64FromFullTransactionId(nextXid) -
488 		U64FromFullTransactionId(latestRemovedFullXid);
489 	if (diff < MaxTransactionId / 2)
490 	{
491 		TransactionId latestRemovedXid;
492 
493 		latestRemovedXid = XidFromFullTransactionId(latestRemovedFullXid);
494 		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, node);
495 	}
496 }
497 
498 void
ResolveRecoveryConflictWithTablespace(Oid tsid)499 ResolveRecoveryConflictWithTablespace(Oid tsid)
500 {
501 	VirtualTransactionId *temp_file_users;
502 
503 	/*
504 	 * Standby users may be currently using this tablespace for their
505 	 * temporary files. We only care about current users because
506 	 * temp_tablespace parameter will just ignore tablespaces that no longer
507 	 * exist.
508 	 *
509 	 * Ask everybody to cancel their queries immediately so we can ensure no
510 	 * temp files remain and we can remove the tablespace. Nuke the entire
511 	 * site from orbit, it's the only way to be sure.
512 	 *
513 	 * XXX: We could work out the pids of active backends using this
514 	 * tablespace by examining the temp filenames in the directory. We would
515 	 * then convert the pids into VirtualXIDs before attempting to cancel
516 	 * them.
517 	 *
518 	 * We don't wait for commit because drop tablespace is non-transactional.
519 	 */
520 	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
521 												InvalidOid);
522 	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
523 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
524 										   WAIT_EVENT_RECOVERY_CONFLICT_TABLESPACE,
525 										   true);
526 }
527 
528 void
ResolveRecoveryConflictWithDatabase(Oid dbid)529 ResolveRecoveryConflictWithDatabase(Oid dbid)
530 {
531 	/*
532 	 * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
533 	 * only waits for transactions and completely idle sessions would block
534 	 * us. This is rare enough that we do this as simply as possible: no wait,
535 	 * just force them off immediately.
536 	 *
537 	 * No locking is required here because we already acquired
538 	 * AccessExclusiveLock. Anybody trying to connect while we do this will
539 	 * block during InitPostgres() and then disconnect when they see the
540 	 * database has been removed.
541 	 */
542 	while (CountDBBackends(dbid) > 0)
543 	{
544 		CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true);
545 
546 		/*
547 		 * Wait awhile for them to die so that we avoid flooding an
548 		 * unresponsive backend when system is heavily loaded.
549 		 */
550 		pg_usleep(10000);
551 	}
552 }
553 
554 /*
555  * ResolveRecoveryConflictWithLock is called from ProcSleep()
556  * to resolve conflicts with other backends holding relation locks.
557  *
558  * The WaitLatch sleep normally done in ProcSleep()
559  * (when not InHotStandby) is performed here, for code clarity.
560  *
561  * We either resolve conflicts immediately or set a timeout to wake us at
562  * the limit of our patience.
563  *
564  * Resolve conflicts by canceling to all backends holding a conflicting
565  * lock.  As we are already queued to be granted the lock, no new lock
566  * requests conflicting with ours will be granted in the meantime.
567  *
568  * We also must check for deadlocks involving the Startup process and
569  * hot-standby backend processes. If deadlock_timeout is reached in
570  * this function, all the backends holding the conflicting locks are
571  * requested to check themselves for deadlocks.
572  *
573  * logging_conflict should be true if the recovery conflict has not been
574  * logged yet even though logging is enabled. After deadlock_timeout is
575  * reached and the request for deadlock check is sent, we wait again to
576  * be signaled by the release of the lock if logging_conflict is false.
577  * Otherwise we return without waiting again so that the caller can report
578  * the recovery conflict. In this case, then, this function is called again
579  * with logging_conflict=false (because the recovery conflict has already
580  * been logged) and we will wait again for the lock to be released.
581  */
582 void
ResolveRecoveryConflictWithLock(LOCKTAG locktag,bool logging_conflict)583 ResolveRecoveryConflictWithLock(LOCKTAG locktag, bool logging_conflict)
584 {
585 	TimestampTz ltime;
586 	TimestampTz now;
587 
588 	Assert(InHotStandby);
589 
590 	ltime = GetStandbyLimitTime();
591 	now = GetCurrentTimestamp();
592 
593 	/*
594 	 * Update waitStart if first time through after the startup process
595 	 * started waiting for the lock. It should not be updated every time
596 	 * ResolveRecoveryConflictWithLock() is called during the wait.
597 	 *
598 	 * Use the current time obtained for comparison with ltime as waitStart
599 	 * (i.e., the time when this process started waiting for the lock). Since
600 	 * getting the current time newly can cause overhead, we reuse the
601 	 * already-obtained time to avoid that overhead.
602 	 *
603 	 * Note that waitStart is updated without holding the lock table's
604 	 * partition lock, to avoid the overhead by additional lock acquisition.
605 	 * This can cause "waitstart" in pg_locks to become NULL for a very short
606 	 * period of time after the wait started even though "granted" is false.
607 	 * This is OK in practice because we can assume that users are likely to
608 	 * look at "waitstart" when waiting for the lock for a long time.
609 	 */
610 	if (pg_atomic_read_u64(&MyProc->waitStart) == 0)
611 		pg_atomic_write_u64(&MyProc->waitStart, now);
612 
613 	if (now >= ltime && ltime != 0)
614 	{
615 		/*
616 		 * We're already behind, so clear a path as quickly as possible.
617 		 */
618 		VirtualTransactionId *backends;
619 
620 		backends = GetLockConflicts(&locktag, AccessExclusiveLock, NULL);
621 
622 		/*
623 		 * Prevent ResolveRecoveryConflictWithVirtualXIDs() from reporting
624 		 * "waiting" in PS display by disabling its argument report_waiting
625 		 * because the caller, WaitOnLock(), has already reported that.
626 		 */
627 		ResolveRecoveryConflictWithVirtualXIDs(backends,
628 											   PROCSIG_RECOVERY_CONFLICT_LOCK,
629 											   PG_WAIT_LOCK | locktag.locktag_type,
630 											   false);
631 	}
632 	else
633 	{
634 		/*
635 		 * Wait (or wait again) until ltime, and check for deadlocks as well
636 		 * if we will be waiting longer than deadlock_timeout
637 		 */
638 		EnableTimeoutParams timeouts[2];
639 		int			cnt = 0;
640 
641 		if (ltime != 0)
642 		{
643 			got_standby_lock_timeout = false;
644 			timeouts[cnt].id = STANDBY_LOCK_TIMEOUT;
645 			timeouts[cnt].type = TMPARAM_AT;
646 			timeouts[cnt].fin_time = ltime;
647 			cnt++;
648 		}
649 
650 		got_standby_deadlock_timeout = false;
651 		timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
652 		timeouts[cnt].type = TMPARAM_AFTER;
653 		timeouts[cnt].delay_ms = DeadlockTimeout;
654 		cnt++;
655 
656 		enable_timeouts(timeouts, cnt);
657 	}
658 
659 	/* Wait to be signaled by the release of the Relation Lock */
660 	ProcWaitForSignal(PG_WAIT_LOCK | locktag.locktag_type);
661 
662 	/*
663 	 * Exit if ltime is reached. Then all the backends holding conflicting
664 	 * locks will be canceled in the next ResolveRecoveryConflictWithLock()
665 	 * call.
666 	 */
667 	if (got_standby_lock_timeout)
668 		goto cleanup;
669 
670 	if (got_standby_deadlock_timeout)
671 	{
672 		VirtualTransactionId *backends;
673 
674 		backends = GetLockConflicts(&locktag, AccessExclusiveLock, NULL);
675 
676 		/* Quick exit if there's no work to be done */
677 		if (!VirtualTransactionIdIsValid(*backends))
678 			goto cleanup;
679 
680 		/*
681 		 * Send signals to all the backends holding the conflicting locks, to
682 		 * ask them to check themselves for deadlocks.
683 		 */
684 		while (VirtualTransactionIdIsValid(*backends))
685 		{
686 			SignalVirtualTransaction(*backends,
687 									 PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
688 									 false);
689 			backends++;
690 		}
691 
692 		/*
693 		 * Exit if the recovery conflict has not been logged yet even though
694 		 * logging is enabled, so that the caller can log that. Then
695 		 * RecoveryConflictWithLock() is called again and we will wait again
696 		 * for the lock to be released.
697 		 */
698 		if (logging_conflict)
699 			goto cleanup;
700 
701 		/*
702 		 * Wait again here to be signaled by the release of the Relation Lock,
703 		 * to prevent the subsequent RecoveryConflictWithLock() from causing
704 		 * deadlock_timeout and sending a request for deadlocks check again.
705 		 * Otherwise the request continues to be sent every deadlock_timeout
706 		 * until the relation locks are released or ltime is reached.
707 		 */
708 		got_standby_deadlock_timeout = false;
709 		ProcWaitForSignal(PG_WAIT_LOCK | locktag.locktag_type);
710 	}
711 
712 cleanup:
713 
714 	/*
715 	 * Clear any timeout requests established above.  We assume here that the
716 	 * Startup process doesn't have any other outstanding timeouts than those
717 	 * used by this function. If that stops being true, we could cancel the
718 	 * timeouts individually, but that'd be slower.
719 	 */
720 	disable_all_timeouts(false);
721 	got_standby_lock_timeout = false;
722 	got_standby_deadlock_timeout = false;
723 }
724 
725 /*
726  * ResolveRecoveryConflictWithBufferPin is called from LockBufferForCleanup()
727  * to resolve conflicts with other backends holding buffer pins.
728  *
729  * The ProcWaitForSignal() sleep normally done in LockBufferForCleanup()
730  * (when not InHotStandby) is performed here, for code clarity.
731  *
732  * We either resolve conflicts immediately or set a timeout to wake us at
733  * the limit of our patience.
734  *
735  * Resolve conflicts by sending a PROCSIG signal to all backends to check if
736  * they hold one of the buffer pins that is blocking Startup process. If so,
737  * those backends will take an appropriate error action, ERROR or FATAL.
738  *
739  * We also must check for deadlocks.  Deadlocks occur because if queries
740  * wait on a lock, that must be behind an AccessExclusiveLock, which can only
741  * be cleared if the Startup process replays a transaction completion record.
742  * If Startup process is also waiting then that is a deadlock. The deadlock
743  * can occur if the query is waiting and then the Startup sleeps, or if
744  * Startup is sleeping and the query waits on a lock. We protect against
745  * only the former sequence here, the latter sequence is checked prior to
746  * the query sleeping, in CheckRecoveryConflictDeadlock().
747  *
748  * Deadlocks are extremely rare, and relatively expensive to check for,
749  * so we don't do a deadlock check right away ... only if we have had to wait
750  * at least deadlock_timeout.
751  */
752 void
ResolveRecoveryConflictWithBufferPin(void)753 ResolveRecoveryConflictWithBufferPin(void)
754 {
755 	TimestampTz ltime;
756 
757 	Assert(InHotStandby);
758 
759 	ltime = GetStandbyLimitTime();
760 
761 	if (GetCurrentTimestamp() >= ltime && ltime != 0)
762 	{
763 		/*
764 		 * We're already behind, so clear a path as quickly as possible.
765 		 */
766 		SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
767 	}
768 	else
769 	{
770 		/*
771 		 * Wake up at ltime, and check for deadlocks as well if we will be
772 		 * waiting longer than deadlock_timeout
773 		 */
774 		EnableTimeoutParams timeouts[2];
775 		int			cnt = 0;
776 
777 		if (ltime != 0)
778 		{
779 			timeouts[cnt].id = STANDBY_TIMEOUT;
780 			timeouts[cnt].type = TMPARAM_AT;
781 			timeouts[cnt].fin_time = ltime;
782 			cnt++;
783 		}
784 
785 		got_standby_deadlock_timeout = false;
786 		timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
787 		timeouts[cnt].type = TMPARAM_AFTER;
788 		timeouts[cnt].delay_ms = DeadlockTimeout;
789 		cnt++;
790 
791 		enable_timeouts(timeouts, cnt);
792 	}
793 
794 	/*
795 	 * Wait to be signaled by UnpinBuffer().
796 	 *
797 	 * We assume that only UnpinBuffer() and the timeout requests established
798 	 * above can wake us up here. WakeupRecovery() called by walreceiver or
799 	 * SIGHUP signal handler, etc cannot do that because it uses the different
800 	 * latch from that ProcWaitForSignal() waits on.
801 	 */
802 	ProcWaitForSignal(PG_WAIT_BUFFER_PIN);
803 
804 	if (got_standby_deadlock_timeout)
805 	{
806 		/*
807 		 * Send out a request for hot-standby backends to check themselves for
808 		 * deadlocks.
809 		 *
810 		 * XXX The subsequent ResolveRecoveryConflictWithBufferPin() will wait
811 		 * to be signaled by UnpinBuffer() again and send a request for
812 		 * deadlocks check if deadlock_timeout happens. This causes the
813 		 * request to continue to be sent every deadlock_timeout until the
814 		 * buffer is unpinned or ltime is reached. This would increase the
815 		 * workload in the startup process and backends. In practice it may
816 		 * not be so harmful because the period that the buffer is kept pinned
817 		 * is basically no so long. But we should fix this?
818 		 */
819 		SendRecoveryConflictWithBufferPin(
820 										  PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
821 	}
822 
823 	/*
824 	 * Clear any timeout requests established above.  We assume here that the
825 	 * Startup process doesn't have any other timeouts than what this function
826 	 * uses.  If that stops being true, we could cancel the timeouts
827 	 * individually, but that'd be slower.
828 	 */
829 	disable_all_timeouts(false);
830 	got_standby_deadlock_timeout = false;
831 }
832 
833 static void
SendRecoveryConflictWithBufferPin(ProcSignalReason reason)834 SendRecoveryConflictWithBufferPin(ProcSignalReason reason)
835 {
836 	Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ||
837 		   reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
838 
839 	/*
840 	 * We send signal to all backends to ask them if they are holding the
841 	 * buffer pin which is delaying the Startup process. We must not set the
842 	 * conflict flag yet, since most backends will be innocent. Let the
843 	 * SIGUSR1 handling in each backend decide their own fate.
844 	 */
845 	CancelDBBackends(InvalidOid, reason, false);
846 }
847 
848 /*
849  * In Hot Standby perform early deadlock detection.  We abort the lock
850  * wait if we are about to sleep while holding the buffer pin that Startup
851  * process is waiting for.
852  *
853  * Note: this code is pessimistic, because there is no way for it to
854  * determine whether an actual deadlock condition is present: the lock we
855  * need to wait for might be unrelated to any held by the Startup process.
856  * Sooner or later, this mechanism should get ripped out in favor of somehow
857  * accounting for buffer locks in DeadLockCheck().  However, errors here
858  * seem to be very low-probability in practice, so for now it's not worth
859  * the trouble.
860  */
861 void
CheckRecoveryConflictDeadlock(void)862 CheckRecoveryConflictDeadlock(void)
863 {
864 	Assert(!InRecovery);		/* do not call in Startup process */
865 
866 	if (!HoldingBufferPinThatDelaysRecovery())
867 		return;
868 
869 	/*
870 	 * Error message should match ProcessInterrupts() but we avoid calling
871 	 * that because we aren't handling an interrupt at this point. Note that
872 	 * we only cancel the current transaction here, so if we are in a
873 	 * subtransaction and the pin is held by a parent, then the Startup
874 	 * process will continue to wait even though we have avoided deadlock.
875 	 */
876 	ereport(ERROR,
877 			(errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
878 			 errmsg("canceling statement due to conflict with recovery"),
879 			 errdetail("User transaction caused buffer deadlock with recovery.")));
880 }
881 
882 
883 /* --------------------------------
884  *		timeout handler routines
885  * --------------------------------
886  */
887 
888 /*
889  * StandbyDeadLockHandler() will be called if STANDBY_DEADLOCK_TIMEOUT
890  * occurs before STANDBY_TIMEOUT.
891  */
892 void
StandbyDeadLockHandler(void)893 StandbyDeadLockHandler(void)
894 {
895 	got_standby_deadlock_timeout = true;
896 }
897 
898 /*
899  * StandbyTimeoutHandler() will be called if STANDBY_TIMEOUT is exceeded.
900  * Send out a request to release conflicting buffer pins unconditionally,
901  * so we can press ahead with applying changes in recovery.
902  */
903 void
StandbyTimeoutHandler(void)904 StandbyTimeoutHandler(void)
905 {
906 	/* forget any pending STANDBY_DEADLOCK_TIMEOUT request */
907 	disable_timeout(STANDBY_DEADLOCK_TIMEOUT, false);
908 
909 	SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
910 }
911 
912 /*
913  * StandbyLockTimeoutHandler() will be called if STANDBY_LOCK_TIMEOUT is exceeded.
914  */
915 void
StandbyLockTimeoutHandler(void)916 StandbyLockTimeoutHandler(void)
917 {
918 	got_standby_lock_timeout = true;
919 }
920 
921 /*
922  * -----------------------------------------------------
923  * Locking in Recovery Mode
924  * -----------------------------------------------------
925  *
926  * All locks are held by the Startup process using a single virtual
927  * transaction. This implementation is both simpler and in some senses,
928  * more correct. The locks held mean "some original transaction held
929  * this lock, so query access is not allowed at this time". So the Startup
930  * process is the proxy by which the original locks are implemented.
931  *
932  * We only keep track of AccessExclusiveLocks, which are only ever held by
933  * one transaction on one relation.
934  *
935  * We keep a hash table of lists of locks in local memory keyed by xid,
936  * RecoveryLockLists, so we can keep track of the various entries made by
937  * the Startup process's virtual xid in the shared lock table.
938  *
939  * List elements use type xl_standby_lock, since the WAL record type exactly
940  * matches the information that we need to keep track of.
941  *
942  * We use session locks rather than normal locks so we don't need
943  * ResourceOwners.
944  */
945 
946 
947 void
StandbyAcquireAccessExclusiveLock(TransactionId xid,Oid dbOid,Oid relOid)948 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
949 {
950 	RecoveryLockListsEntry *entry;
951 	xl_standby_lock *newlock;
952 	LOCKTAG		locktag;
953 	bool		found;
954 
955 	/* Already processed? */
956 	if (!TransactionIdIsValid(xid) ||
957 		TransactionIdDidCommit(xid) ||
958 		TransactionIdDidAbort(xid))
959 		return;
960 
961 	elog(trace_recovery(DEBUG4),
962 		 "adding recovery lock: db %u rel %u", dbOid, relOid);
963 
964 	/* dbOid is InvalidOid when we are locking a shared relation. */
965 	Assert(OidIsValid(relOid));
966 
967 	/* Create a new list for this xid, if we don't have one already. */
968 	entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
969 	if (!found)
970 	{
971 		entry->xid = xid;
972 		entry->locks = NIL;
973 	}
974 
975 	newlock = palloc(sizeof(xl_standby_lock));
976 	newlock->xid = xid;
977 	newlock->dbOid = dbOid;
978 	newlock->relOid = relOid;
979 	entry->locks = lappend(entry->locks, newlock);
980 
981 	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
982 
983 	(void) LockAcquire(&locktag, AccessExclusiveLock, true, false);
984 }
985 
986 static void
StandbyReleaseLockList(List * locks)987 StandbyReleaseLockList(List *locks)
988 {
989 	ListCell   *lc;
990 
991 	foreach(lc, locks)
992 	{
993 		xl_standby_lock *lock = (xl_standby_lock *) lfirst(lc);
994 		LOCKTAG		locktag;
995 
996 		elog(trace_recovery(DEBUG4),
997 			 "releasing recovery lock: xid %u db %u rel %u",
998 			 lock->xid, lock->dbOid, lock->relOid);
999 		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
1000 		if (!LockRelease(&locktag, AccessExclusiveLock, true))
1001 		{
1002 			elog(LOG,
1003 				 "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
1004 				 lock->xid, lock->dbOid, lock->relOid);
1005 			Assert(false);
1006 		}
1007 	}
1008 
1009 	list_free_deep(locks);
1010 }
1011 
1012 static void
StandbyReleaseLocks(TransactionId xid)1013 StandbyReleaseLocks(TransactionId xid)
1014 {
1015 	RecoveryLockListsEntry *entry;
1016 
1017 	if (TransactionIdIsValid(xid))
1018 	{
1019 		if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
1020 		{
1021 			StandbyReleaseLockList(entry->locks);
1022 			hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
1023 		}
1024 	}
1025 	else
1026 		StandbyReleaseAllLocks();
1027 }
1028 
1029 /*
1030  * Release locks for a transaction tree, starting at xid down, from
1031  * RecoveryLockLists.
1032  *
1033  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
1034  * to remove any AccessExclusiveLocks requested by a transaction.
1035  */
1036 void
StandbyReleaseLockTree(TransactionId xid,int nsubxids,TransactionId * subxids)1037 StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
1038 {
1039 	int			i;
1040 
1041 	StandbyReleaseLocks(xid);
1042 
1043 	for (i = 0; i < nsubxids; i++)
1044 		StandbyReleaseLocks(subxids[i]);
1045 }
1046 
1047 /*
1048  * Called at end of recovery and when we see a shutdown checkpoint.
1049  */
1050 void
StandbyReleaseAllLocks(void)1051 StandbyReleaseAllLocks(void)
1052 {
1053 	HASH_SEQ_STATUS status;
1054 	RecoveryLockListsEntry *entry;
1055 
1056 	elog(trace_recovery(DEBUG2), "release all standby locks");
1057 
1058 	hash_seq_init(&status, RecoveryLockLists);
1059 	while ((entry = hash_seq_search(&status)))
1060 	{
1061 		StandbyReleaseLockList(entry->locks);
1062 		hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
1063 	}
1064 }
1065 
1066 /*
1067  * StandbyReleaseOldLocks
1068  *		Release standby locks held by top-level XIDs that aren't running,
1069  *		as long as they're not prepared transactions.
1070  */
1071 void
StandbyReleaseOldLocks(TransactionId oldxid)1072 StandbyReleaseOldLocks(TransactionId oldxid)
1073 {
1074 	HASH_SEQ_STATUS status;
1075 	RecoveryLockListsEntry *entry;
1076 
1077 	hash_seq_init(&status, RecoveryLockLists);
1078 	while ((entry = hash_seq_search(&status)))
1079 	{
1080 		Assert(TransactionIdIsValid(entry->xid));
1081 
1082 		/* Skip if prepared transaction. */
1083 		if (StandbyTransactionIdIsPrepared(entry->xid))
1084 			continue;
1085 
1086 		/* Skip if >= oldxid. */
1087 		if (!TransactionIdPrecedes(entry->xid, oldxid))
1088 			continue;
1089 
1090 		/* Remove all locks and hash table entry. */
1091 		StandbyReleaseLockList(entry->locks);
1092 		hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
1093 	}
1094 }
1095 
1096 /*
1097  * --------------------------------------------------------------------
1098  *		Recovery handling for Rmgr RM_STANDBY_ID
1099  *
1100  * These record types will only be created if XLogStandbyInfoActive()
1101  * --------------------------------------------------------------------
1102  */
1103 
1104 void
standby_redo(XLogReaderState * record)1105 standby_redo(XLogReaderState *record)
1106 {
1107 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1108 
1109 	/* Backup blocks are not used in standby records */
1110 	Assert(!XLogRecHasAnyBlockRefs(record));
1111 
1112 	/* Do nothing if we're not in hot standby mode */
1113 	if (standbyState == STANDBY_DISABLED)
1114 		return;
1115 
1116 	if (info == XLOG_STANDBY_LOCK)
1117 	{
1118 		xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
1119 		int			i;
1120 
1121 		for (i = 0; i < xlrec->nlocks; i++)
1122 			StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
1123 											  xlrec->locks[i].dbOid,
1124 											  xlrec->locks[i].relOid);
1125 	}
1126 	else if (info == XLOG_RUNNING_XACTS)
1127 	{
1128 		xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
1129 		RunningTransactionsData running;
1130 
1131 		running.xcnt = xlrec->xcnt;
1132 		running.subxcnt = xlrec->subxcnt;
1133 		running.subxid_overflow = xlrec->subxid_overflow;
1134 		running.nextXid = xlrec->nextXid;
1135 		running.latestCompletedXid = xlrec->latestCompletedXid;
1136 		running.oldestRunningXid = xlrec->oldestRunningXid;
1137 		running.xids = xlrec->xids;
1138 
1139 		ProcArrayApplyRecoveryInfo(&running);
1140 	}
1141 	else if (info == XLOG_INVALIDATIONS)
1142 	{
1143 		xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
1144 
1145 		ProcessCommittedInvalidationMessages(xlrec->msgs,
1146 											 xlrec->nmsgs,
1147 											 xlrec->relcacheInitFileInval,
1148 											 xlrec->dbId,
1149 											 xlrec->tsId);
1150 	}
1151 	else
1152 		elog(PANIC, "standby_redo: unknown op code %u", info);
1153 }
1154 
1155 /*
1156  * Log details of the current snapshot to WAL. This allows the snapshot state
1157  * to be reconstructed on the standby and for logical decoding.
1158  *
1159  * This is used for Hot Standby as follows:
1160  *
1161  * We can move directly to STANDBY_SNAPSHOT_READY at startup if we
1162  * start from a shutdown checkpoint because we know nothing was running
1163  * at that time and our recovery snapshot is known empty. In the more
1164  * typical case of an online checkpoint we need to jump through a few
1165  * hoops to get a correct recovery snapshot and this requires a two or
1166  * sometimes a three stage process.
1167  *
1168  * The initial snapshot must contain all running xids and all current
1169  * AccessExclusiveLocks at a point in time on the standby. Assembling
1170  * that information while the server is running requires many and
1171  * various LWLocks, so we choose to derive that information piece by
1172  * piece and then re-assemble that info on the standby. When that
1173  * information is fully assembled we move to STANDBY_SNAPSHOT_READY.
1174  *
1175  * Since locking on the primary when we derive the information is not
1176  * strict, we note that there is a time window between the derivation and
1177  * writing to WAL of the derived information. That allows race conditions
1178  * that we must resolve, since xids and locks may enter or leave the
1179  * snapshot during that window. This creates the issue that an xid or
1180  * lock may start *after* the snapshot has been derived yet *before* the
1181  * snapshot is logged in the running xacts WAL record. We resolve this by
1182  * starting to accumulate changes at a point just prior to when we derive
1183  * the snapshot on the primary, then ignore duplicates when we later apply
1184  * the snapshot from the running xacts record. This is implemented during
1185  * CreateCheckpoint() where we use the logical checkpoint location as
1186  * our starting point and then write the running xacts record immediately
1187  * before writing the main checkpoint WAL record. Since we always start
1188  * up from a checkpoint and are immediately at our starting point, we
1189  * unconditionally move to STANDBY_INITIALIZED. After this point we
1190  * must do 4 things:
1191  *	* move shared nextXid forwards as we see new xids
1192  *	* extend the clog and subtrans with each new xid
1193  *	* keep track of uncommitted known assigned xids
1194  *	* keep track of uncommitted AccessExclusiveLocks
1195  *
1196  * When we see a commit/abort we must remove known assigned xids and locks
1197  * from the completing transaction. Attempted removals that cannot locate
1198  * an entry are expected and must not cause an error when we are in state
1199  * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
1200  * KnownAssignedXidsRemove().
1201  *
1202  * Later, when we apply the running xact data we must be careful to ignore
1203  * transactions already committed, since those commits raced ahead when
1204  * making WAL entries.
1205  *
1206  * The loose timing also means that locks may be recorded that have a
1207  * zero xid, since xids are removed from procs before locks are removed.
1208  * So we must prune the lock list down to ensure we hold locks only for
1209  * currently running xids, performed by StandbyReleaseOldLocks().
1210  * Zero xids should no longer be possible, but we may be replaying WAL
1211  * from a time when they were possible.
1212  *
1213  * For logical decoding only the running xacts information is needed;
1214  * there's no need to look at the locking information, but it's logged anyway,
1215  * as there's no independent knob to just enable logical decoding. For
1216  * details of how this is used, check snapbuild.c's introductory comment.
1217  *
1218  *
1219  * Returns the RecPtr of the last inserted record.
1220  */
1221 XLogRecPtr
LogStandbySnapshot(void)1222 LogStandbySnapshot(void)
1223 {
1224 	XLogRecPtr	recptr;
1225 	RunningTransactions running;
1226 	xl_standby_lock *locks;
1227 	int			nlocks;
1228 
1229 	Assert(XLogStandbyInfoActive());
1230 
1231 	/*
1232 	 * Get details of any AccessExclusiveLocks being held at the moment.
1233 	 */
1234 	locks = GetRunningTransactionLocks(&nlocks);
1235 	if (nlocks > 0)
1236 		LogAccessExclusiveLocks(nlocks, locks);
1237 	pfree(locks);
1238 
1239 	/*
1240 	 * Log details of all in-progress transactions. This should be the last
1241 	 * record we write, because standby will open up when it sees this.
1242 	 */
1243 	running = GetRunningTransactionData();
1244 
1245 	/*
1246 	 * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
1247 	 * For Hot Standby this can be done before inserting the WAL record
1248 	 * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
1249 	 * the clog. For logical decoding, though, the lock can't be released
1250 	 * early because the clog might be "in the future" from the POV of the
1251 	 * historic snapshot. This would allow for situations where we're waiting
1252 	 * for the end of a transaction listed in the xl_running_xacts record
1253 	 * which, according to the WAL, has committed before the xl_running_xacts
1254 	 * record. Fortunately this routine isn't executed frequently, and it's
1255 	 * only a shared lock.
1256 	 */
1257 	if (wal_level < WAL_LEVEL_LOGICAL)
1258 		LWLockRelease(ProcArrayLock);
1259 
1260 	recptr = LogCurrentRunningXacts(running);
1261 
1262 	/* Release lock if we kept it longer ... */
1263 	if (wal_level >= WAL_LEVEL_LOGICAL)
1264 		LWLockRelease(ProcArrayLock);
1265 
1266 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
1267 	LWLockRelease(XidGenLock);
1268 
1269 	return recptr;
1270 }
1271 
1272 /*
1273  * Record an enhanced snapshot of running transactions into WAL.
1274  *
1275  * The definitions of RunningTransactionsData and xl_xact_running_xacts are
1276  * similar. We keep them separate because xl_xact_running_xacts is a
1277  * contiguous chunk of memory and never exists fully until it is assembled in
1278  * WAL. The inserted records are marked as not being important for durability,
1279  * to avoid triggering superfluous checkpoint / archiving activity.
1280  */
1281 static XLogRecPtr
LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)1282 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
1283 {
1284 	xl_running_xacts xlrec;
1285 	XLogRecPtr	recptr;
1286 
1287 	xlrec.xcnt = CurrRunningXacts->xcnt;
1288 	xlrec.subxcnt = CurrRunningXacts->subxcnt;
1289 	xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
1290 	xlrec.nextXid = CurrRunningXacts->nextXid;
1291 	xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
1292 	xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
1293 
1294 	/* Header */
1295 	XLogBeginInsert();
1296 	XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
1297 	XLogRegisterData((char *) (&xlrec), MinSizeOfXactRunningXacts);
1298 
1299 	/* array of TransactionIds */
1300 	if (xlrec.xcnt > 0)
1301 		XLogRegisterData((char *) CurrRunningXacts->xids,
1302 						 (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId));
1303 
1304 	recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS);
1305 
1306 	if (CurrRunningXacts->subxid_overflow)
1307 		elog(trace_recovery(DEBUG2),
1308 			 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1309 			 CurrRunningXacts->xcnt,
1310 			 LSN_FORMAT_ARGS(recptr),
1311 			 CurrRunningXacts->oldestRunningXid,
1312 			 CurrRunningXacts->latestCompletedXid,
1313 			 CurrRunningXacts->nextXid);
1314 	else
1315 		elog(trace_recovery(DEBUG2),
1316 			 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1317 			 CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
1318 			 LSN_FORMAT_ARGS(recptr),
1319 			 CurrRunningXacts->oldestRunningXid,
1320 			 CurrRunningXacts->latestCompletedXid,
1321 			 CurrRunningXacts->nextXid);
1322 
1323 	/*
1324 	 * Ensure running_xacts information is synced to disk not too far in the
1325 	 * future. We don't want to stall anything though (i.e. use XLogFlush()),
1326 	 * so we let the wal writer do it during normal operation.
1327 	 * XLogSetAsyncXactLSN() conveniently will mark the LSN as to-be-synced
1328 	 * and nudge the WALWriter into action if sleeping. Check
1329 	 * XLogBackgroundFlush() for details why a record might not be flushed
1330 	 * without it.
1331 	 */
1332 	XLogSetAsyncXactLSN(recptr);
1333 
1334 	return recptr;
1335 }
1336 
1337 /*
1338  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
1339  * logged, as described in backend/storage/lmgr/README.
1340  */
1341 static void
LogAccessExclusiveLocks(int nlocks,xl_standby_lock * locks)1342 LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
1343 {
1344 	xl_standby_locks xlrec;
1345 
1346 	xlrec.nlocks = nlocks;
1347 
1348 	XLogBeginInsert();
1349 	XLogRegisterData((char *) &xlrec, offsetof(xl_standby_locks, locks));
1350 	XLogRegisterData((char *) locks, nlocks * sizeof(xl_standby_lock));
1351 	XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
1352 
1353 	(void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK);
1354 }
1355 
1356 /*
1357  * Individual logging of AccessExclusiveLocks for use during LockAcquire()
1358  */
1359 void
LogAccessExclusiveLock(Oid dbOid,Oid relOid)1360 LogAccessExclusiveLock(Oid dbOid, Oid relOid)
1361 {
1362 	xl_standby_lock xlrec;
1363 
1364 	xlrec.xid = GetCurrentTransactionId();
1365 
1366 	xlrec.dbOid = dbOid;
1367 	xlrec.relOid = relOid;
1368 
1369 	LogAccessExclusiveLocks(1, &xlrec);
1370 	MyXactFlags |= XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK;
1371 }
1372 
1373 /*
1374  * Prepare to log an AccessExclusiveLock, for use during LockAcquire()
1375  */
1376 void
LogAccessExclusiveLockPrepare(void)1377 LogAccessExclusiveLockPrepare(void)
1378 {
1379 	/*
1380 	 * Ensure that a TransactionId has been assigned to this transaction, for
1381 	 * two reasons, both related to lock release on the standby. First, we
1382 	 * must assign an xid so that RecordTransactionCommit() and
1383 	 * RecordTransactionAbort() do not optimise away the transaction
1384 	 * completion record which recovery relies upon to release locks. It's a
1385 	 * hack, but for a corner case not worth adding code for into the main
1386 	 * commit path. Second, we must assign an xid before the lock is recorded
1387 	 * in shared memory, otherwise a concurrently executing
1388 	 * GetRunningTransactionLocks() might see a lock associated with an
1389 	 * InvalidTransactionId which we later assert cannot happen.
1390 	 */
1391 	(void) GetCurrentTransactionId();
1392 }
1393 
1394 /*
1395  * Emit WAL for invalidations. This currently is only used for commits without
1396  * an xid but which contain invalidations.
1397  */
1398 void
LogStandbyInvalidations(int nmsgs,SharedInvalidationMessage * msgs,bool relcacheInitFileInval)1399 LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
1400 						bool relcacheInitFileInval)
1401 {
1402 	xl_invalidations xlrec;
1403 
1404 	/* prepare record */
1405 	memset(&xlrec, 0, sizeof(xlrec));
1406 	xlrec.dbId = MyDatabaseId;
1407 	xlrec.tsId = MyDatabaseTableSpace;
1408 	xlrec.relcacheInitFileInval = relcacheInitFileInval;
1409 	xlrec.nmsgs = nmsgs;
1410 
1411 	/* perform insertion */
1412 	XLogBeginInsert();
1413 	XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
1414 	XLogRegisterData((char *) msgs,
1415 					 nmsgs * sizeof(SharedInvalidationMessage));
1416 	XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
1417 }
1418 
1419 /* Return the description of recovery conflict */
1420 static const char *
get_recovery_conflict_desc(ProcSignalReason reason)1421 get_recovery_conflict_desc(ProcSignalReason reason)
1422 {
1423 	const char *reasonDesc = _("unknown reason");
1424 
1425 	switch (reason)
1426 	{
1427 		case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
1428 			reasonDesc = _("recovery conflict on buffer pin");
1429 			break;
1430 		case PROCSIG_RECOVERY_CONFLICT_LOCK:
1431 			reasonDesc = _("recovery conflict on lock");
1432 			break;
1433 		case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
1434 			reasonDesc = _("recovery conflict on tablespace");
1435 			break;
1436 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
1437 			reasonDesc = _("recovery conflict on snapshot");
1438 			break;
1439 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
1440 			reasonDesc = _("recovery conflict on buffer deadlock");
1441 			break;
1442 		case PROCSIG_RECOVERY_CONFLICT_DATABASE:
1443 			reasonDesc = _("recovery conflict on database");
1444 			break;
1445 		default:
1446 			break;
1447 	}
1448 
1449 	return reasonDesc;
1450 }
1451