1 /*-------------------------------------------------------------------------
2  *
3  * standby.c
4  *	  Misc functions used in Hot Standby mode.
5  *
6  *	All functions for handling RM_STANDBY_ID, which relate to
7  *	AccessExclusiveLocks and starting snapshots for Hot Standby mode.
8  *	Plus conflict recovery processing.
9  *
10  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *	  src/backend/storage/ipc/standby.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 #include "access/transam.h"
20 #include "access/twophase.h"
21 #include "access/xact.h"
22 #include "access/xlog.h"
23 #include "access/xloginsert.h"
24 #include "miscadmin.h"
25 #include "pgstat.h"
26 #include "storage/bufmgr.h"
27 #include "storage/lmgr.h"
28 #include "storage/proc.h"
29 #include "storage/procarray.h"
30 #include "storage/sinvaladt.h"
31 #include "storage/standby.h"
32 #include "utils/hsearch.h"
33 #include "utils/memutils.h"
34 #include "utils/ps_status.h"
35 #include "utils/timeout.h"
36 #include "utils/timestamp.h"
37 
38 /* User-settable GUC parameters */
39 int			vacuum_defer_cleanup_age;
40 int			max_standby_archive_delay = 30 * 1000;
41 int			max_standby_streaming_delay = 30 * 1000;
42 
43 static HTAB *RecoveryLockLists;
44 
45 /* Flags set by timeout handlers */
46 static volatile sig_atomic_t got_standby_deadlock_timeout = false;
47 static volatile sig_atomic_t got_standby_lock_timeout = false;
48 
49 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
50 												   ProcSignalReason reason,
51 												   uint32 wait_event_info,
52 												   bool report_waiting);
53 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
54 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
55 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
56 
57 /*
58  * Keep track of all the locks owned by a given transaction.
59  */
60 typedef struct RecoveryLockListsEntry
61 {
62 	TransactionId xid;
63 	List	   *locks;
64 } RecoveryLockListsEntry;
65 
66 /*
67  * InitRecoveryTransactionEnvironment
68  *		Initialize tracking of in-progress transactions in master
69  *
70  * We need to issue shared invalidations and hold locks. Holding locks
71  * means others may want to wait on us, so we need to make a lock table
72  * vxact entry like a real transaction. We could create and delete
73  * lock table entries for each transaction but its simpler just to create
74  * one permanent entry and leave it there all the time. Locks are then
75  * acquired and released as needed. Yes, this means you can see the
76  * Startup process in pg_locks once we have run this.
77  */
78 void
InitRecoveryTransactionEnvironment(void)79 InitRecoveryTransactionEnvironment(void)
80 {
81 	VirtualTransactionId vxid;
82 	HASHCTL		hash_ctl;
83 
84 	/*
85 	 * Initialize the hash table for tracking the list of locks held by each
86 	 * transaction.
87 	 */
88 	memset(&hash_ctl, 0, sizeof(hash_ctl));
89 	hash_ctl.keysize = sizeof(TransactionId);
90 	hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
91 	RecoveryLockLists = hash_create("RecoveryLockLists",
92 									64,
93 									&hash_ctl,
94 									HASH_ELEM | HASH_BLOBS);
95 
96 	/*
97 	 * Initialize shared invalidation management for Startup process, being
98 	 * careful to register ourselves as a sendOnly process so we don't need to
99 	 * read messages, nor will we get signaled when the queue starts filling
100 	 * up.
101 	 */
102 	SharedInvalBackendInit(true);
103 
104 	/*
105 	 * Lock a virtual transaction id for Startup process.
106 	 *
107 	 * We need to do GetNextLocalTransactionId() because
108 	 * SharedInvalBackendInit() leaves localTransactionId invalid and the lock
109 	 * manager doesn't like that at all.
110 	 *
111 	 * Note that we don't need to run XactLockTableInsert() because nobody
112 	 * needs to wait on xids. That sounds a little strange, but table locks
113 	 * are held by vxids and row level locks are held by xids. All queries
114 	 * hold AccessShareLocks so never block while we write or lock new rows.
115 	 */
116 	vxid.backendId = MyBackendId;
117 	vxid.localTransactionId = GetNextLocalTransactionId();
118 	VirtualXactLockTableInsert(vxid);
119 
120 	standbyState = STANDBY_INITIALIZED;
121 }
122 
123 /*
124  * ShutdownRecoveryTransactionEnvironment
125  *		Shut down transaction tracking
126  *
127  * Prepare to switch from hot standby mode to normal operation. Shut down
128  * recovery-time transaction tracking.
129  *
130  * This must be called even in shutdown of startup process if transaction
131  * tracking has been initialized. Otherwise some locks the tracked
132  * transactions were holding will not be released and and may interfere with
133  * the processes still running (but will exit soon later) at the exit of
134  * startup process.
135  */
136 void
ShutdownRecoveryTransactionEnvironment(void)137 ShutdownRecoveryTransactionEnvironment(void)
138 {
139 	/*
140 	 * Do nothing if RecoveryLockLists is NULL because which means that
141 	 * transaction tracking has not been yet initialized or has been already
142 	 * shutdowned. This prevents transaction tracking from being shutdowned
143 	 * unexpectedly more than once.
144 	 */
145 	if (RecoveryLockLists == NULL)
146 		return;
147 
148 	/* Mark all tracked in-progress transactions as finished. */
149 	ExpireAllKnownAssignedTransactionIds();
150 
151 	/* Release all locks the tracked transactions were holding */
152 	StandbyReleaseAllLocks();
153 
154 	/* Destroy the hash table of locks. */
155 	hash_destroy(RecoveryLockLists);
156 	RecoveryLockLists = NULL;
157 
158 	/* Cleanup our VirtualTransaction */
159 	VirtualXactLockTableCleanup();
160 }
161 
162 
163 /*
164  * -----------------------------------------------------
165  *		Standby wait timers and backend cancel logic
166  * -----------------------------------------------------
167  */
168 
169 /*
170  * Determine the cutoff time at which we want to start canceling conflicting
171  * transactions.  Returns zero (a time safely in the past) if we are willing
172  * to wait forever.
173  */
174 static TimestampTz
GetStandbyLimitTime(void)175 GetStandbyLimitTime(void)
176 {
177 	TimestampTz rtime;
178 	bool		fromStream;
179 
180 	/*
181 	 * The cutoff time is the last WAL data receipt time plus the appropriate
182 	 * delay variable.  Delay of -1 means wait forever.
183 	 */
184 	GetXLogReceiptTime(&rtime, &fromStream);
185 	if (fromStream)
186 	{
187 		if (max_standby_streaming_delay < 0)
188 			return 0;			/* wait forever */
189 		return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
190 	}
191 	else
192 	{
193 		if (max_standby_archive_delay < 0)
194 			return 0;			/* wait forever */
195 		return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
196 	}
197 }
198 
199 #define STANDBY_INITIAL_WAIT_US  1000
200 static int	standbyWait_us = STANDBY_INITIAL_WAIT_US;
201 
202 /*
203  * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
204  * We wait here for a while then return. If we decide we can't wait any
205  * more then we return true, if we can wait some more return false.
206  */
207 static bool
WaitExceedsMaxStandbyDelay(uint32 wait_event_info)208 WaitExceedsMaxStandbyDelay(uint32 wait_event_info)
209 {
210 	TimestampTz ltime;
211 
212 	CHECK_FOR_INTERRUPTS();
213 
214 	/* Are we past the limit time? */
215 	ltime = GetStandbyLimitTime();
216 	if (ltime && GetCurrentTimestamp() >= ltime)
217 		return true;
218 
219 	/*
220 	 * Sleep a bit (this is essential to avoid busy-waiting).
221 	 */
222 	pgstat_report_wait_start(wait_event_info);
223 	pg_usleep(standbyWait_us);
224 	pgstat_report_wait_end();
225 
226 	/*
227 	 * Progressively increase the sleep times, but not to more than 1s, since
228 	 * pg_usleep isn't interruptible on some platforms.
229 	 */
230 	standbyWait_us *= 2;
231 	if (standbyWait_us > 1000000)
232 		standbyWait_us = 1000000;
233 
234 	return false;
235 }
236 
237 /*
238  * This is the main executioner for any query backend that conflicts with
239  * recovery processing. Judgement has already been passed on it within
240  * a specific rmgr. Here we just issue the orders to the procs. The procs
241  * then throw the required error as instructed.
242  *
243  * If report_waiting is true, "waiting" is reported in PS display if necessary.
244  * If the caller has already reported that, report_waiting should be false.
245  * Otherwise, "waiting" is reported twice unexpectedly.
246  */
247 static void
ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId * waitlist,ProcSignalReason reason,uint32 wait_event_info,bool report_waiting)248 ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
249 									   ProcSignalReason reason, uint32 wait_event_info,
250 									   bool report_waiting)
251 {
252 	TimestampTz waitStart = 0;
253 	char	   *new_status;
254 
255 	/* Fast exit, to avoid a kernel call if there's no work to be done. */
256 	if (!VirtualTransactionIdIsValid(*waitlist))
257 		return;
258 
259 	if (report_waiting)
260 		waitStart = GetCurrentTimestamp();
261 	new_status = NULL;			/* we haven't changed the ps display */
262 
263 	while (VirtualTransactionIdIsValid(*waitlist))
264 	{
265 		/* reset standbyWait_us for each xact we wait for */
266 		standbyWait_us = STANDBY_INITIAL_WAIT_US;
267 
268 		/* wait until the virtual xid is gone */
269 		while (!VirtualXactLock(*waitlist, false))
270 		{
271 			/*
272 			 * Report via ps if we have been waiting for more than 500 msec
273 			 * (should that be configurable?)
274 			 */
275 			if (update_process_title && new_status == NULL && report_waiting &&
276 				TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
277 										   500))
278 			{
279 				const char *old_status;
280 				int			len;
281 
282 				old_status = get_ps_display(&len);
283 				new_status = (char *) palloc(len + 8 + 1);
284 				memcpy(new_status, old_status, len);
285 				strcpy(new_status + len, " waiting");
286 				set_ps_display(new_status);
287 				new_status[len] = '\0'; /* truncate off " waiting" */
288 			}
289 
290 			/* Is it time to kill it? */
291 			if (WaitExceedsMaxStandbyDelay(wait_event_info))
292 			{
293 				pid_t		pid;
294 
295 				/*
296 				 * Now find out who to throw out of the balloon.
297 				 */
298 				Assert(VirtualTransactionIdIsValid(*waitlist));
299 				pid = CancelVirtualTransaction(*waitlist, reason);
300 
301 				/*
302 				 * Wait a little bit for it to die so that we avoid flooding
303 				 * an unresponsive backend when system is heavily loaded.
304 				 */
305 				if (pid != 0)
306 					pg_usleep(5000L);
307 			}
308 		}
309 
310 		/* The virtual transaction is gone now, wait for the next one */
311 		waitlist++;
312 	}
313 
314 	/* Reset ps display if we changed it */
315 	if (new_status)
316 	{
317 		set_ps_display(new_status);
318 		pfree(new_status);
319 	}
320 }
321 
322 void
ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,RelFileNode node)323 ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
324 {
325 	VirtualTransactionId *backends;
326 
327 	/*
328 	 * If we get passed InvalidTransactionId then we do nothing (no conflict).
329 	 *
330 	 * This can happen when replaying already-applied WAL records after a
331 	 * standby crash or restart, or when replaying an XLOG_HEAP2_VISIBLE
332 	 * record that marks as frozen a page which was already all-visible.  It's
333 	 * also quite common with records generated during index deletion
334 	 * (original execution of the deletion can reason that a recovery conflict
335 	 * which is sufficient for the deletion operation must take place before
336 	 * replay of the deletion record itself).
337 	 */
338 	if (!TransactionIdIsValid(latestRemovedXid))
339 		return;
340 
341 	backends = GetConflictingVirtualXIDs(latestRemovedXid,
342 										 node.dbNode);
343 
344 	ResolveRecoveryConflictWithVirtualXIDs(backends,
345 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
346 										   WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
347 										   true);
348 }
349 
350 void
ResolveRecoveryConflictWithTablespace(Oid tsid)351 ResolveRecoveryConflictWithTablespace(Oid tsid)
352 {
353 	VirtualTransactionId *temp_file_users;
354 
355 	/*
356 	 * Standby users may be currently using this tablespace for their
357 	 * temporary files. We only care about current users because
358 	 * temp_tablespace parameter will just ignore tablespaces that no longer
359 	 * exist.
360 	 *
361 	 * Ask everybody to cancel their queries immediately so we can ensure no
362 	 * temp files remain and we can remove the tablespace. Nuke the entire
363 	 * site from orbit, it's the only way to be sure.
364 	 *
365 	 * XXX: We could work out the pids of active backends using this
366 	 * tablespace by examining the temp filenames in the directory. We would
367 	 * then convert the pids into VirtualXIDs before attempting to cancel
368 	 * them.
369 	 *
370 	 * We don't wait for commit because drop tablespace is non-transactional.
371 	 */
372 	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
373 												InvalidOid);
374 	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
375 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
376 										   WAIT_EVENT_RECOVERY_CONFLICT_TABLESPACE,
377 										   true);
378 }
379 
380 void
ResolveRecoveryConflictWithDatabase(Oid dbid)381 ResolveRecoveryConflictWithDatabase(Oid dbid)
382 {
383 	/*
384 	 * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
385 	 * only waits for transactions and completely idle sessions would block
386 	 * us. This is rare enough that we do this as simply as possible: no wait,
387 	 * just force them off immediately.
388 	 *
389 	 * No locking is required here because we already acquired
390 	 * AccessExclusiveLock. Anybody trying to connect while we do this will
391 	 * block during InitPostgres() and then disconnect when they see the
392 	 * database has been removed.
393 	 */
394 	while (CountDBBackends(dbid) > 0)
395 	{
396 		CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true);
397 
398 		/*
399 		 * Wait awhile for them to die so that we avoid flooding an
400 		 * unresponsive backend when system is heavily loaded.
401 		 */
402 		pg_usleep(10000);
403 	}
404 }
405 
406 /*
407  * ResolveRecoveryConflictWithLock is called from ProcSleep()
408  * to resolve conflicts with other backends holding relation locks.
409  *
410  * The WaitLatch sleep normally done in ProcSleep()
411  * (when not InHotStandby) is performed here, for code clarity.
412  *
413  * We either resolve conflicts immediately or set a timeout to wake us at
414  * the limit of our patience.
415  *
416  * Resolve conflicts by canceling to all backends holding a conflicting
417  * lock.  As we are already queued to be granted the lock, no new lock
418  * requests conflicting with ours will be granted in the meantime.
419  *
420  * We also must check for deadlocks involving the Startup process and
421  * hot-standby backend processes. If deadlock_timeout is reached in
422  * this function, all the backends holding the conflicting locks are
423  * requested to check themselves for deadlocks.
424  */
425 void
ResolveRecoveryConflictWithLock(LOCKTAG locktag)426 ResolveRecoveryConflictWithLock(LOCKTAG locktag)
427 {
428 	TimestampTz ltime;
429 
430 	Assert(InHotStandby);
431 
432 	ltime = GetStandbyLimitTime();
433 
434 	if (GetCurrentTimestamp() >= ltime && ltime != 0)
435 	{
436 		/*
437 		 * We're already behind, so clear a path as quickly as possible.
438 		 */
439 		VirtualTransactionId *backends;
440 
441 		backends = GetLockConflicts(&locktag, AccessExclusiveLock, NULL);
442 
443 		/*
444 		 * Prevent ResolveRecoveryConflictWithVirtualXIDs() from reporting
445 		 * "waiting" in PS display by disabling its argument report_waiting
446 		 * because the caller, WaitOnLock(), has already reported that.
447 		 */
448 		ResolveRecoveryConflictWithVirtualXIDs(backends,
449 											   PROCSIG_RECOVERY_CONFLICT_LOCK,
450 											   PG_WAIT_LOCK | locktag.locktag_type,
451 											   false);
452 	}
453 	else
454 	{
455 		/*
456 		 * Wait (or wait again) until ltime, and check for deadlocks as well
457 		 * if we will be waiting longer than deadlock_timeout
458 		 */
459 		EnableTimeoutParams timeouts[2];
460 		int			cnt = 0;
461 
462 		if (ltime != 0)
463 		{
464 			got_standby_lock_timeout = false;
465 			timeouts[cnt].id = STANDBY_LOCK_TIMEOUT;
466 			timeouts[cnt].type = TMPARAM_AT;
467 			timeouts[cnt].fin_time = ltime;
468 			cnt++;
469 		}
470 
471 		got_standby_deadlock_timeout = false;
472 		timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
473 		timeouts[cnt].type = TMPARAM_AFTER;
474 		timeouts[cnt].delay_ms = DeadlockTimeout;
475 		cnt++;
476 
477 		enable_timeouts(timeouts, cnt);
478 	}
479 
480 	/* Wait to be signaled by the release of the Relation Lock */
481 	ProcWaitForSignal(PG_WAIT_LOCK | locktag.locktag_type);
482 
483 	/*
484 	 * Exit if ltime is reached. Then all the backends holding conflicting
485 	 * locks will be canceled in the next ResolveRecoveryConflictWithLock()
486 	 * call.
487 	 */
488 	if (got_standby_lock_timeout)
489 		goto cleanup;
490 
491 	if (got_standby_deadlock_timeout)
492 	{
493 		VirtualTransactionId *backends;
494 
495 		backends = GetLockConflicts(&locktag, AccessExclusiveLock, NULL);
496 
497 		/* Quick exit if there's no work to be done */
498 		if (!VirtualTransactionIdIsValid(*backends))
499 			goto cleanup;
500 
501 		/*
502 		 * Send signals to all the backends holding the conflicting locks, to
503 		 * ask them to check themselves for deadlocks.
504 		 */
505 		while (VirtualTransactionIdIsValid(*backends))
506 		{
507 			SignalVirtualTransaction(*backends,
508 									 PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
509 									 false);
510 			backends++;
511 		}
512 
513 		/*
514 		 * Wait again here to be signaled by the release of the Relation Lock,
515 		 * to prevent the subsequent RecoveryConflictWithLock() from causing
516 		 * deadlock_timeout and sending a request for deadlocks check again.
517 		 * Otherwise the request continues to be sent every deadlock_timeout
518 		 * until the relation locks are released or ltime is reached.
519 		 */
520 		got_standby_deadlock_timeout = false;
521 		ProcWaitForSignal(PG_WAIT_LOCK | locktag.locktag_type);
522 	}
523 
524 cleanup:
525 
526 	/*
527 	 * Clear any timeout requests established above.  We assume here that the
528 	 * Startup process doesn't have any other outstanding timeouts than those
529 	 * used by this function. If that stops being true, we could cancel the
530 	 * timeouts individually, but that'd be slower.
531 	 */
532 	disable_all_timeouts(false);
533 	got_standby_lock_timeout = false;
534 	got_standby_deadlock_timeout = false;
535 }
536 
537 /*
538  * ResolveRecoveryConflictWithBufferPin is called from LockBufferForCleanup()
539  * to resolve conflicts with other backends holding buffer pins.
540  *
541  * The ProcWaitForSignal() sleep normally done in LockBufferForCleanup()
542  * (when not InHotStandby) is performed here, for code clarity.
543  *
544  * We either resolve conflicts immediately or set a timeout to wake us at
545  * the limit of our patience.
546  *
547  * Resolve conflicts by sending a PROCSIG signal to all backends to check if
548  * they hold one of the buffer pins that is blocking Startup process. If so,
549  * those backends will take an appropriate error action, ERROR or FATAL.
550  *
551  * We also must check for deadlocks.  Deadlocks occur because if queries
552  * wait on a lock, that must be behind an AccessExclusiveLock, which can only
553  * be cleared if the Startup process replays a transaction completion record.
554  * If Startup process is also waiting then that is a deadlock. The deadlock
555  * can occur if the query is waiting and then the Startup sleeps, or if
556  * Startup is sleeping and the query waits on a lock. We protect against
557  * only the former sequence here, the latter sequence is checked prior to
558  * the query sleeping, in CheckRecoveryConflictDeadlock().
559  *
560  * Deadlocks are extremely rare, and relatively expensive to check for,
561  * so we don't do a deadlock check right away ... only if we have had to wait
562  * at least deadlock_timeout.
563  */
564 void
ResolveRecoveryConflictWithBufferPin(void)565 ResolveRecoveryConflictWithBufferPin(void)
566 {
567 	TimestampTz ltime;
568 
569 	Assert(InHotStandby);
570 
571 	ltime = GetStandbyLimitTime();
572 
573 	if (GetCurrentTimestamp() >= ltime && ltime != 0)
574 	{
575 		/*
576 		 * We're already behind, so clear a path as quickly as possible.
577 		 */
578 		SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
579 	}
580 	else
581 	{
582 		/*
583 		 * Wake up at ltime, and check for deadlocks as well if we will be
584 		 * waiting longer than deadlock_timeout
585 		 */
586 		EnableTimeoutParams timeouts[2];
587 		int			cnt = 0;
588 
589 		if (ltime != 0)
590 		{
591 			timeouts[cnt].id = STANDBY_TIMEOUT;
592 			timeouts[cnt].type = TMPARAM_AT;
593 			timeouts[cnt].fin_time = ltime;
594 			cnt++;
595 		}
596 
597 		got_standby_deadlock_timeout = false;
598 		timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
599 		timeouts[cnt].type = TMPARAM_AFTER;
600 		timeouts[cnt].delay_ms = DeadlockTimeout;
601 		cnt++;
602 
603 		enable_timeouts(timeouts, cnt);
604 	}
605 
606 	/* Wait to be signaled by UnpinBuffer() */
607 	ProcWaitForSignal(PG_WAIT_BUFFER_PIN);
608 
609 	if (got_standby_deadlock_timeout)
610 	{
611 		/*
612 		 * Send out a request for hot-standby backends to check themselves for
613 		 * deadlocks.
614 		 *
615 		 * XXX The subsequent ResolveRecoveryConflictWithBufferPin() will wait
616 		 * to be signaled by UnpinBuffer() again and send a request for
617 		 * deadlocks check if deadlock_timeout happens. This causes the
618 		 * request to continue to be sent every deadlock_timeout until the
619 		 * buffer is unpinned or ltime is reached. This would increase the
620 		 * workload in the startup process and backends. In practice it may
621 		 * not be so harmful because the period that the buffer is kept pinned
622 		 * is basically no so long. But we should fix this?
623 		 */
624 		SendRecoveryConflictWithBufferPin(
625 										  PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
626 	}
627 
628 	/*
629 	 * Clear any timeout requests established above.  We assume here that the
630 	 * Startup process doesn't have any other timeouts than what this function
631 	 * uses.  If that stops being true, we could cancel the timeouts
632 	 * individually, but that'd be slower.
633 	 */
634 	disable_all_timeouts(false);
635 	got_standby_deadlock_timeout = false;
636 }
637 
638 static void
SendRecoveryConflictWithBufferPin(ProcSignalReason reason)639 SendRecoveryConflictWithBufferPin(ProcSignalReason reason)
640 {
641 	Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ||
642 		   reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
643 
644 	/*
645 	 * We send signal to all backends to ask them if they are holding the
646 	 * buffer pin which is delaying the Startup process. We must not set the
647 	 * conflict flag yet, since most backends will be innocent. Let the
648 	 * SIGUSR1 handling in each backend decide their own fate.
649 	 */
650 	CancelDBBackends(InvalidOid, reason, false);
651 }
652 
653 /*
654  * In Hot Standby perform early deadlock detection.  We abort the lock
655  * wait if we are about to sleep while holding the buffer pin that Startup
656  * process is waiting for.
657  *
658  * Note: this code is pessimistic, because there is no way for it to
659  * determine whether an actual deadlock condition is present: the lock we
660  * need to wait for might be unrelated to any held by the Startup process.
661  * Sooner or later, this mechanism should get ripped out in favor of somehow
662  * accounting for buffer locks in DeadLockCheck().  However, errors here
663  * seem to be very low-probability in practice, so for now it's not worth
664  * the trouble.
665  */
666 void
CheckRecoveryConflictDeadlock(void)667 CheckRecoveryConflictDeadlock(void)
668 {
669 	Assert(!InRecovery);		/* do not call in Startup process */
670 
671 	if (!HoldingBufferPinThatDelaysRecovery())
672 		return;
673 
674 	/*
675 	 * Error message should match ProcessInterrupts() but we avoid calling
676 	 * that because we aren't handling an interrupt at this point. Note that
677 	 * we only cancel the current transaction here, so if we are in a
678 	 * subtransaction and the pin is held by a parent, then the Startup
679 	 * process will continue to wait even though we have avoided deadlock.
680 	 */
681 	ereport(ERROR,
682 			(errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
683 			 errmsg("canceling statement due to conflict with recovery"),
684 			 errdetail("User transaction caused buffer deadlock with recovery.")));
685 }
686 
687 
688 /* --------------------------------
689  *		timeout handler routines
690  * --------------------------------
691  */
692 
693 /*
694  * StandbyDeadLockHandler() will be called if STANDBY_DEADLOCK_TIMEOUT
695  * occurs before STANDBY_TIMEOUT.
696  */
697 void
StandbyDeadLockHandler(void)698 StandbyDeadLockHandler(void)
699 {
700 	got_standby_deadlock_timeout = true;
701 }
702 
703 /*
704  * StandbyTimeoutHandler() will be called if STANDBY_TIMEOUT is exceeded.
705  * Send out a request to release conflicting buffer pins unconditionally,
706  * so we can press ahead with applying changes in recovery.
707  */
708 void
StandbyTimeoutHandler(void)709 StandbyTimeoutHandler(void)
710 {
711 	/* forget any pending STANDBY_DEADLOCK_TIMEOUT request */
712 	disable_timeout(STANDBY_DEADLOCK_TIMEOUT, false);
713 
714 	SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
715 }
716 
717 /*
718  * StandbyLockTimeoutHandler() will be called if STANDBY_LOCK_TIMEOUT is exceeded.
719  */
720 void
StandbyLockTimeoutHandler(void)721 StandbyLockTimeoutHandler(void)
722 {
723 	got_standby_lock_timeout = true;
724 }
725 
726 /*
727  * -----------------------------------------------------
728  * Locking in Recovery Mode
729  * -----------------------------------------------------
730  *
731  * All locks are held by the Startup process using a single virtual
732  * transaction. This implementation is both simpler and in some senses,
733  * more correct. The locks held mean "some original transaction held
734  * this lock, so query access is not allowed at this time". So the Startup
735  * process is the proxy by which the original locks are implemented.
736  *
737  * We only keep track of AccessExclusiveLocks, which are only ever held by
738  * one transaction on one relation.
739  *
740  * We keep a hash table of lists of locks in local memory keyed by xid,
741  * RecoveryLockLists, so we can keep track of the various entries made by
742  * the Startup process's virtual xid in the shared lock table.
743  *
744  * List elements use type xl_standby_lock, since the WAL record type exactly
745  * matches the information that we need to keep track of.
746  *
747  * We use session locks rather than normal locks so we don't need
748  * ResourceOwners.
749  */
750 
751 
752 void
StandbyAcquireAccessExclusiveLock(TransactionId xid,Oid dbOid,Oid relOid)753 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
754 {
755 	RecoveryLockListsEntry *entry;
756 	xl_standby_lock *newlock;
757 	LOCKTAG		locktag;
758 	bool		found;
759 
760 	/* Already processed? */
761 	if (!TransactionIdIsValid(xid) ||
762 		TransactionIdDidCommit(xid) ||
763 		TransactionIdDidAbort(xid))
764 		return;
765 
766 	elog(trace_recovery(DEBUG4),
767 		 "adding recovery lock: db %u rel %u", dbOid, relOid);
768 
769 	/* dbOid is InvalidOid when we are locking a shared relation. */
770 	Assert(OidIsValid(relOid));
771 
772 	/* Create a new list for this xid, if we don't have one already. */
773 	entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
774 	if (!found)
775 	{
776 		entry->xid = xid;
777 		entry->locks = NIL;
778 	}
779 
780 	newlock = palloc(sizeof(xl_standby_lock));
781 	newlock->xid = xid;
782 	newlock->dbOid = dbOid;
783 	newlock->relOid = relOid;
784 	entry->locks = lappend(entry->locks, newlock);
785 
786 	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
787 
788 	(void) LockAcquire(&locktag, AccessExclusiveLock, true, false);
789 }
790 
791 static void
StandbyReleaseLockList(List * locks)792 StandbyReleaseLockList(List *locks)
793 {
794 	ListCell   *lc;
795 
796 	foreach(lc, locks)
797 	{
798 		xl_standby_lock *lock = (xl_standby_lock *) lfirst(lc);
799 		LOCKTAG		locktag;
800 
801 		elog(trace_recovery(DEBUG4),
802 			 "releasing recovery lock: xid %u db %u rel %u",
803 			 lock->xid, lock->dbOid, lock->relOid);
804 		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
805 		if (!LockRelease(&locktag, AccessExclusiveLock, true))
806 		{
807 			elog(LOG,
808 				 "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
809 				 lock->xid, lock->dbOid, lock->relOid);
810 			Assert(false);
811 		}
812 	}
813 
814 	list_free_deep(locks);
815 }
816 
817 static void
StandbyReleaseLocks(TransactionId xid)818 StandbyReleaseLocks(TransactionId xid)
819 {
820 	RecoveryLockListsEntry *entry;
821 
822 	if (TransactionIdIsValid(xid))
823 	{
824 		if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
825 		{
826 			StandbyReleaseLockList(entry->locks);
827 			hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
828 		}
829 	}
830 	else
831 		StandbyReleaseAllLocks();
832 }
833 
834 /*
835  * Release locks for a transaction tree, starting at xid down, from
836  * RecoveryLockLists.
837  *
838  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
839  * to remove any AccessExclusiveLocks requested by a transaction.
840  */
841 void
StandbyReleaseLockTree(TransactionId xid,int nsubxids,TransactionId * subxids)842 StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
843 {
844 	int			i;
845 
846 	StandbyReleaseLocks(xid);
847 
848 	for (i = 0; i < nsubxids; i++)
849 		StandbyReleaseLocks(subxids[i]);
850 }
851 
852 /*
853  * Called at end of recovery and when we see a shutdown checkpoint.
854  */
855 void
StandbyReleaseAllLocks(void)856 StandbyReleaseAllLocks(void)
857 {
858 	HASH_SEQ_STATUS status;
859 	RecoveryLockListsEntry *entry;
860 
861 	elog(trace_recovery(DEBUG2), "release all standby locks");
862 
863 	hash_seq_init(&status, RecoveryLockLists);
864 	while ((entry = hash_seq_search(&status)))
865 	{
866 		StandbyReleaseLockList(entry->locks);
867 		hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
868 	}
869 }
870 
871 /*
872  * StandbyReleaseOldLocks
873  *		Release standby locks held by top-level XIDs that aren't running,
874  *		as long as they're not prepared transactions.
875  */
876 void
StandbyReleaseOldLocks(TransactionId oldxid)877 StandbyReleaseOldLocks(TransactionId oldxid)
878 {
879 	HASH_SEQ_STATUS status;
880 	RecoveryLockListsEntry *entry;
881 
882 	hash_seq_init(&status, RecoveryLockLists);
883 	while ((entry = hash_seq_search(&status)))
884 	{
885 		Assert(TransactionIdIsValid(entry->xid));
886 
887 		/* Skip if prepared transaction. */
888 		if (StandbyTransactionIdIsPrepared(entry->xid))
889 			continue;
890 
891 		/* Skip if >= oldxid. */
892 		if (!TransactionIdPrecedes(entry->xid, oldxid))
893 			continue;
894 
895 		/* Remove all locks and hash table entry. */
896 		StandbyReleaseLockList(entry->locks);
897 		hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
898 	}
899 }
900 
901 /*
902  * --------------------------------------------------------------------
903  *		Recovery handling for Rmgr RM_STANDBY_ID
904  *
905  * These record types will only be created if XLogStandbyInfoActive()
906  * --------------------------------------------------------------------
907  */
908 
909 void
standby_redo(XLogReaderState * record)910 standby_redo(XLogReaderState *record)
911 {
912 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
913 
914 	/* Backup blocks are not used in standby records */
915 	Assert(!XLogRecHasAnyBlockRefs(record));
916 
917 	/* Do nothing if we're not in hot standby mode */
918 	if (standbyState == STANDBY_DISABLED)
919 		return;
920 
921 	if (info == XLOG_STANDBY_LOCK)
922 	{
923 		xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
924 		int			i;
925 
926 		for (i = 0; i < xlrec->nlocks; i++)
927 			StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
928 											  xlrec->locks[i].dbOid,
929 											  xlrec->locks[i].relOid);
930 	}
931 	else if (info == XLOG_RUNNING_XACTS)
932 	{
933 		xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
934 		RunningTransactionsData running;
935 
936 		running.xcnt = xlrec->xcnt;
937 		running.subxcnt = xlrec->subxcnt;
938 		running.subxid_overflow = xlrec->subxid_overflow;
939 		running.nextXid = xlrec->nextXid;
940 		running.latestCompletedXid = xlrec->latestCompletedXid;
941 		running.oldestRunningXid = xlrec->oldestRunningXid;
942 		running.xids = xlrec->xids;
943 
944 		ProcArrayApplyRecoveryInfo(&running);
945 	}
946 	else if (info == XLOG_INVALIDATIONS)
947 	{
948 		xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
949 
950 		ProcessCommittedInvalidationMessages(xlrec->msgs,
951 											 xlrec->nmsgs,
952 											 xlrec->relcacheInitFileInval,
953 											 xlrec->dbId,
954 											 xlrec->tsId);
955 	}
956 	else
957 		elog(PANIC, "standby_redo: unknown op code %u", info);
958 }
959 
960 /*
961  * Log details of the current snapshot to WAL. This allows the snapshot state
962  * to be reconstructed on the standby and for logical decoding.
963  *
964  * This is used for Hot Standby as follows:
965  *
966  * We can move directly to STANDBY_SNAPSHOT_READY at startup if we
967  * start from a shutdown checkpoint because we know nothing was running
968  * at that time and our recovery snapshot is known empty. In the more
969  * typical case of an online checkpoint we need to jump through a few
970  * hoops to get a correct recovery snapshot and this requires a two or
971  * sometimes a three stage process.
972  *
973  * The initial snapshot must contain all running xids and all current
974  * AccessExclusiveLocks at a point in time on the standby. Assembling
975  * that information while the server is running requires many and
976  * various LWLocks, so we choose to derive that information piece by
977  * piece and then re-assemble that info on the standby. When that
978  * information is fully assembled we move to STANDBY_SNAPSHOT_READY.
979  *
980  * Since locking on the primary when we derive the information is not
981  * strict, we note that there is a time window between the derivation and
982  * writing to WAL of the derived information. That allows race conditions
983  * that we must resolve, since xids and locks may enter or leave the
984  * snapshot during that window. This creates the issue that an xid or
985  * lock may start *after* the snapshot has been derived yet *before* the
986  * snapshot is logged in the running xacts WAL record. We resolve this by
987  * starting to accumulate changes at a point just prior to when we derive
988  * the snapshot on the primary, then ignore duplicates when we later apply
989  * the snapshot from the running xacts record. This is implemented during
990  * CreateCheckpoint() where we use the logical checkpoint location as
991  * our starting point and then write the running xacts record immediately
992  * before writing the main checkpoint WAL record. Since we always start
993  * up from a checkpoint and are immediately at our starting point, we
994  * unconditionally move to STANDBY_INITIALIZED. After this point we
995  * must do 4 things:
996  *	* move shared nextFullXid forwards as we see new xids
997  *	* extend the clog and subtrans with each new xid
998  *	* keep track of uncommitted known assigned xids
999  *	* keep track of uncommitted AccessExclusiveLocks
1000  *
1001  * When we see a commit/abort we must remove known assigned xids and locks
1002  * from the completing transaction. Attempted removals that cannot locate
1003  * an entry are expected and must not cause an error when we are in state
1004  * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
1005  * KnownAssignedXidsRemove().
1006  *
1007  * Later, when we apply the running xact data we must be careful to ignore
1008  * transactions already committed, since those commits raced ahead when
1009  * making WAL entries.
1010  *
1011  * The loose timing also means that locks may be recorded that have a
1012  * zero xid, since xids are removed from procs before locks are removed.
1013  * So we must prune the lock list down to ensure we hold locks only for
1014  * currently running xids, performed by StandbyReleaseOldLocks().
1015  * Zero xids should no longer be possible, but we may be replaying WAL
1016  * from a time when they were possible.
1017  *
1018  * For logical decoding only the running xacts information is needed;
1019  * there's no need to look at the locking information, but it's logged anyway,
1020  * as there's no independent knob to just enable logical decoding. For
1021  * details of how this is used, check snapbuild.c's introductory comment.
1022  *
1023  *
1024  * Returns the RecPtr of the last inserted record.
1025  */
1026 XLogRecPtr
LogStandbySnapshot(void)1027 LogStandbySnapshot(void)
1028 {
1029 	XLogRecPtr	recptr;
1030 	RunningTransactions running;
1031 	xl_standby_lock *locks;
1032 	int			nlocks;
1033 
1034 	Assert(XLogStandbyInfoActive());
1035 
1036 	/*
1037 	 * Get details of any AccessExclusiveLocks being held at the moment.
1038 	 */
1039 	locks = GetRunningTransactionLocks(&nlocks);
1040 	if (nlocks > 0)
1041 		LogAccessExclusiveLocks(nlocks, locks);
1042 	pfree(locks);
1043 
1044 	/*
1045 	 * Log details of all in-progress transactions. This should be the last
1046 	 * record we write, because standby will open up when it sees this.
1047 	 */
1048 	running = GetRunningTransactionData();
1049 
1050 	/*
1051 	 * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
1052 	 * For Hot Standby this can be done before inserting the WAL record
1053 	 * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
1054 	 * the clog. For logical decoding, though, the lock can't be released
1055 	 * early because the clog might be "in the future" from the POV of the
1056 	 * historic snapshot. This would allow for situations where we're waiting
1057 	 * for the end of a transaction listed in the xl_running_xacts record
1058 	 * which, according to the WAL, has committed before the xl_running_xacts
1059 	 * record. Fortunately this routine isn't executed frequently, and it's
1060 	 * only a shared lock.
1061 	 */
1062 	if (wal_level < WAL_LEVEL_LOGICAL)
1063 		LWLockRelease(ProcArrayLock);
1064 
1065 	recptr = LogCurrentRunningXacts(running);
1066 
1067 	/* Release lock if we kept it longer ... */
1068 	if (wal_level >= WAL_LEVEL_LOGICAL)
1069 		LWLockRelease(ProcArrayLock);
1070 
1071 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
1072 	LWLockRelease(XidGenLock);
1073 
1074 	return recptr;
1075 }
1076 
1077 /*
1078  * Record an enhanced snapshot of running transactions into WAL.
1079  *
1080  * The definitions of RunningTransactionsData and xl_xact_running_xacts are
1081  * similar. We keep them separate because xl_xact_running_xacts is a
1082  * contiguous chunk of memory and never exists fully until it is assembled in
1083  * WAL. The inserted records are marked as not being important for durability,
1084  * to avoid triggering superfluous checkpoint / archiving activity.
1085  */
1086 static XLogRecPtr
LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)1087 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
1088 {
1089 	xl_running_xacts xlrec;
1090 	XLogRecPtr	recptr;
1091 
1092 	xlrec.xcnt = CurrRunningXacts->xcnt;
1093 	xlrec.subxcnt = CurrRunningXacts->subxcnt;
1094 	xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
1095 	xlrec.nextXid = CurrRunningXacts->nextXid;
1096 	xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
1097 	xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
1098 
1099 	/* Header */
1100 	XLogBeginInsert();
1101 	XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
1102 	XLogRegisterData((char *) (&xlrec), MinSizeOfXactRunningXacts);
1103 
1104 	/* array of TransactionIds */
1105 	if (xlrec.xcnt > 0)
1106 		XLogRegisterData((char *) CurrRunningXacts->xids,
1107 						 (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId));
1108 
1109 	recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS);
1110 
1111 	if (CurrRunningXacts->subxid_overflow)
1112 		elog(trace_recovery(DEBUG2),
1113 			 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1114 			 CurrRunningXacts->xcnt,
1115 			 (uint32) (recptr >> 32), (uint32) recptr,
1116 			 CurrRunningXacts->oldestRunningXid,
1117 			 CurrRunningXacts->latestCompletedXid,
1118 			 CurrRunningXacts->nextXid);
1119 	else
1120 		elog(trace_recovery(DEBUG2),
1121 			 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1122 			 CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
1123 			 (uint32) (recptr >> 32), (uint32) recptr,
1124 			 CurrRunningXacts->oldestRunningXid,
1125 			 CurrRunningXacts->latestCompletedXid,
1126 			 CurrRunningXacts->nextXid);
1127 
1128 	/*
1129 	 * Ensure running_xacts information is synced to disk not too far in the
1130 	 * future. We don't want to stall anything though (i.e. use XLogFlush()),
1131 	 * so we let the wal writer do it during normal operation.
1132 	 * XLogSetAsyncXactLSN() conveniently will mark the LSN as to-be-synced
1133 	 * and nudge the WALWriter into action if sleeping. Check
1134 	 * XLogBackgroundFlush() for details why a record might not be flushed
1135 	 * without it.
1136 	 */
1137 	XLogSetAsyncXactLSN(recptr);
1138 
1139 	return recptr;
1140 }
1141 
1142 /*
1143  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
1144  * logged, as described in backend/storage/lmgr/README.
1145  */
1146 static void
LogAccessExclusiveLocks(int nlocks,xl_standby_lock * locks)1147 LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
1148 {
1149 	xl_standby_locks xlrec;
1150 
1151 	xlrec.nlocks = nlocks;
1152 
1153 	XLogBeginInsert();
1154 	XLogRegisterData((char *) &xlrec, offsetof(xl_standby_locks, locks));
1155 	XLogRegisterData((char *) locks, nlocks * sizeof(xl_standby_lock));
1156 	XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
1157 
1158 	(void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK);
1159 }
1160 
1161 /*
1162  * Individual logging of AccessExclusiveLocks for use during LockAcquire()
1163  */
1164 void
LogAccessExclusiveLock(Oid dbOid,Oid relOid)1165 LogAccessExclusiveLock(Oid dbOid, Oid relOid)
1166 {
1167 	xl_standby_lock xlrec;
1168 
1169 	xlrec.xid = GetCurrentTransactionId();
1170 
1171 	xlrec.dbOid = dbOid;
1172 	xlrec.relOid = relOid;
1173 
1174 	LogAccessExclusiveLocks(1, &xlrec);
1175 	MyXactFlags |= XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK;
1176 }
1177 
1178 /*
1179  * Prepare to log an AccessExclusiveLock, for use during LockAcquire()
1180  */
1181 void
LogAccessExclusiveLockPrepare(void)1182 LogAccessExclusiveLockPrepare(void)
1183 {
1184 	/*
1185 	 * Ensure that a TransactionId has been assigned to this transaction, for
1186 	 * two reasons, both related to lock release on the standby. First, we
1187 	 * must assign an xid so that RecordTransactionCommit() and
1188 	 * RecordTransactionAbort() do not optimise away the transaction
1189 	 * completion record which recovery relies upon to release locks. It's a
1190 	 * hack, but for a corner case not worth adding code for into the main
1191 	 * commit path. Second, we must assign an xid before the lock is recorded
1192 	 * in shared memory, otherwise a concurrently executing
1193 	 * GetRunningTransactionLocks() might see a lock associated with an
1194 	 * InvalidTransactionId which we later assert cannot happen.
1195 	 */
1196 	(void) GetCurrentTransactionId();
1197 }
1198 
1199 /*
1200  * Emit WAL for invalidations. This currently is only used for commits without
1201  * an xid but which contain invalidations.
1202  */
1203 void
LogStandbyInvalidations(int nmsgs,SharedInvalidationMessage * msgs,bool relcacheInitFileInval)1204 LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
1205 						bool relcacheInitFileInval)
1206 {
1207 	xl_invalidations xlrec;
1208 
1209 	/* prepare record */
1210 	memset(&xlrec, 0, sizeof(xlrec));
1211 	xlrec.dbId = MyDatabaseId;
1212 	xlrec.tsId = MyDatabaseTableSpace;
1213 	xlrec.relcacheInitFileInval = relcacheInitFileInval;
1214 	xlrec.nmsgs = nmsgs;
1215 
1216 	/* perform insertion */
1217 	XLogBeginInsert();
1218 	XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
1219 	XLogRegisterData((char *) msgs,
1220 					 nmsgs * sizeof(SharedInvalidationMessage));
1221 	XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
1222 }
1223