1 /*-------------------------------------------------------------------------
2  *
3  * standby.c
4  *	  Misc functions used in Hot Standby mode.
5  *
6  *	All functions for handling RM_STANDBY_ID, which relate to
7  *	AccessExclusiveLocks and starting snapshots for Hot Standby mode.
8  *	Plus conflict recovery processing.
9  *
10  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *	  src/backend/storage/ipc/standby.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 #include "access/transam.h"
20 #include "access/twophase.h"
21 #include "access/xact.h"
22 #include "access/xlog.h"
23 #include "access/xloginsert.h"
24 #include "miscadmin.h"
25 #include "storage/bufmgr.h"
26 #include "storage/lmgr.h"
27 #include "storage/proc.h"
28 #include "storage/procarray.h"
29 #include "storage/sinvaladt.h"
30 #include "storage/standby.h"
31 #include "utils/hsearch.h"
32 #include "utils/memutils.h"
33 #include "utils/ps_status.h"
34 #include "utils/timeout.h"
35 #include "utils/timestamp.h"
36 
37 /* User-settable GUC parameters */
38 int			vacuum_defer_cleanup_age;
39 int			max_standby_archive_delay = 30 * 1000;
40 int			max_standby_streaming_delay = 30 * 1000;
41 
42 static HTAB *RecoveryLockLists;
43 
44 /* Flags set by timeout handlers */
45 static volatile sig_atomic_t got_standby_deadlock_timeout = false;
46 static volatile sig_atomic_t got_standby_lock_timeout = false;
47 
48 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
49 									   ProcSignalReason reason, bool report_waiting);
50 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
51 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
52 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
53 
54 /*
55  * Keep track of all the locks owned by a given transaction.
56  */
57 typedef struct RecoveryLockListsEntry
58 {
59 	TransactionId	xid;
60 	List		   *locks;
61 } RecoveryLockListsEntry;
62 
63 /*
64  * InitRecoveryTransactionEnvironment
65  *		Initialize tracking of in-progress transactions in master
66  *
67  * We need to issue shared invalidations and hold locks. Holding locks
68  * means others may want to wait on us, so we need to make a lock table
69  * vxact entry like a real transaction. We could create and delete
70  * lock table entries for each transaction but its simpler just to create
71  * one permanent entry and leave it there all the time. Locks are then
72  * acquired and released as needed. Yes, this means you can see the
73  * Startup process in pg_locks once we have run this.
74  */
75 void
InitRecoveryTransactionEnvironment(void)76 InitRecoveryTransactionEnvironment(void)
77 {
78 	VirtualTransactionId vxid;
79 	HASHCTL			hash_ctl;
80 
81 	/*
82 	 * Initialize the hash table for tracking the list of locks held by each
83 	 * transaction.
84 	 */
85 	memset(&hash_ctl, 0, sizeof(hash_ctl));
86 	hash_ctl.keysize = sizeof(TransactionId);
87 	hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
88 	RecoveryLockLists = hash_create("RecoveryLockLists",
89 									64,
90 									&hash_ctl,
91 									HASH_ELEM | HASH_BLOBS);
92 
93 	/*
94 	 * Initialize shared invalidation management for Startup process, being
95 	 * careful to register ourselves as a sendOnly process so we don't need to
96 	 * read messages, nor will we get signalled when the queue starts filling
97 	 * up.
98 	 */
99 	SharedInvalBackendInit(true);
100 
101 	/*
102 	 * Lock a virtual transaction id for Startup process.
103 	 *
104 	 * We need to do GetNextLocalTransactionId() because
105 	 * SharedInvalBackendInit() leaves localTransactionid invalid and the lock
106 	 * manager doesn't like that at all.
107 	 *
108 	 * Note that we don't need to run XactLockTableInsert() because nobody
109 	 * needs to wait on xids. That sounds a little strange, but table locks
110 	 * are held by vxids and row level locks are held by xids. All queries
111 	 * hold AccessShareLocks so never block while we write or lock new rows.
112 	 */
113 	vxid.backendId = MyBackendId;
114 	vxid.localTransactionId = GetNextLocalTransactionId();
115 	VirtualXactLockTableInsert(vxid);
116 
117 	standbyState = STANDBY_INITIALIZED;
118 }
119 
120 /*
121  * ShutdownRecoveryTransactionEnvironment
122  *		Shut down transaction tracking
123  *
124  * Prepare to switch from hot standby mode to normal operation. Shut down
125  * recovery-time transaction tracking.
126  *
127  * This must be called even in shutdown of startup process if transaction
128  * tracking has been initialized. Otherwise some locks the tracked
129  * transactions were holding will not be released and and may interfere with
130  * the processes still running (but will exit soon later) at the exit of
131  * startup process.
132  */
133 void
ShutdownRecoveryTransactionEnvironment(void)134 ShutdownRecoveryTransactionEnvironment(void)
135 {
136 	/*
137 	 * Do nothing if RecoveryLockLists is NULL because which means that
138 	 * transaction tracking has not been yet initialized or has been already
139 	 * shutdowned. This prevents transaction tracking from being shutdowned
140 	 * unexpectedly more than once.
141 	 */
142 	if (RecoveryLockLists == NULL)
143 		return;
144 
145 	/* Mark all tracked in-progress transactions as finished. */
146 	ExpireAllKnownAssignedTransactionIds();
147 
148 	/* Release all locks the tracked transactions were holding */
149 	StandbyReleaseAllLocks();
150 
151 	/* Destroy the hash table of locks. */
152 	hash_destroy(RecoveryLockLists);
153 	RecoveryLockLists = NULL;
154 
155 	/* Cleanup our VirtualTransaction */
156 	VirtualXactLockTableCleanup();
157 }
158 
159 
160 /*
161  * -----------------------------------------------------
162  *		Standby wait timers and backend cancel logic
163  * -----------------------------------------------------
164  */
165 
166 /*
167  * Determine the cutoff time at which we want to start canceling conflicting
168  * transactions.  Returns zero (a time safely in the past) if we are willing
169  * to wait forever.
170  */
171 static TimestampTz
GetStandbyLimitTime(void)172 GetStandbyLimitTime(void)
173 {
174 	TimestampTz rtime;
175 	bool		fromStream;
176 
177 	/*
178 	 * The cutoff time is the last WAL data receipt time plus the appropriate
179 	 * delay variable.  Delay of -1 means wait forever.
180 	 */
181 	GetXLogReceiptTime(&rtime, &fromStream);
182 	if (fromStream)
183 	{
184 		if (max_standby_streaming_delay < 0)
185 			return 0;			/* wait forever */
186 		return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
187 	}
188 	else
189 	{
190 		if (max_standby_archive_delay < 0)
191 			return 0;			/* wait forever */
192 		return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
193 	}
194 }
195 
196 #define STANDBY_INITIAL_WAIT_US  1000
197 static int	standbyWait_us = STANDBY_INITIAL_WAIT_US;
198 
199 /*
200  * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
201  * We wait here for a while then return. If we decide we can't wait any
202  * more then we return true, if we can wait some more return false.
203  */
204 static bool
WaitExceedsMaxStandbyDelay(void)205 WaitExceedsMaxStandbyDelay(void)
206 {
207 	TimestampTz ltime;
208 
209 	CHECK_FOR_INTERRUPTS();
210 
211 	/* Are we past the limit time? */
212 	ltime = GetStandbyLimitTime();
213 	if (ltime && GetCurrentTimestamp() >= ltime)
214 		return true;
215 
216 	/*
217 	 * Sleep a bit (this is essential to avoid busy-waiting).
218 	 */
219 	pg_usleep(standbyWait_us);
220 
221 	/*
222 	 * Progressively increase the sleep times, but not to more than 1s, since
223 	 * pg_usleep isn't interruptable on some platforms.
224 	 */
225 	standbyWait_us *= 2;
226 	if (standbyWait_us > 1000000)
227 		standbyWait_us = 1000000;
228 
229 	return false;
230 }
231 
232 /*
233  * This is the main executioner for any query backend that conflicts with
234  * recovery processing. Judgement has already been passed on it within
235  * a specific rmgr. Here we just issue the orders to the procs. The procs
236  * then throw the required error as instructed.
237  *
238  * If report_waiting is true, "waiting" is reported in PS display if necessary.
239  * If the caller has already reported that, report_waiting should be false.
240  * Otherwise, "waiting" is reported twice unexpectedly.
241  */
242 static void
ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId * waitlist,ProcSignalReason reason,bool report_waiting)243 ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
244 									   ProcSignalReason reason, bool report_waiting)
245 {
246 	TimestampTz waitStart = 0;
247 	char	   *new_status;
248 
249 	/* Fast exit, to avoid a kernel call if there's no work to be done. */
250 	if (!VirtualTransactionIdIsValid(*waitlist))
251 		return;
252 
253 	if (report_waiting)
254 		waitStart = GetCurrentTimestamp();
255 	new_status = NULL;			/* we haven't changed the ps display */
256 
257 	while (VirtualTransactionIdIsValid(*waitlist))
258 	{
259 		/* reset standbyWait_us for each xact we wait for */
260 		standbyWait_us = STANDBY_INITIAL_WAIT_US;
261 
262 		/* wait until the virtual xid is gone */
263 		while (!VirtualXactLock(*waitlist, false))
264 		{
265 			/*
266 			 * Report via ps if we have been waiting for more than 500 msec
267 			 * (should that be configurable?)
268 			 */
269 			if (update_process_title && new_status == NULL && report_waiting &&
270 				TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
271 										   500))
272 			{
273 				const char *old_status;
274 				int			len;
275 
276 				old_status = get_ps_display(&len);
277 				new_status = (char *) palloc(len + 8 + 1);
278 				memcpy(new_status, old_status, len);
279 				strcpy(new_status + len, " waiting");
280 				set_ps_display(new_status, false);
281 				new_status[len] = '\0'; /* truncate off " waiting" */
282 			}
283 
284 			/* Is it time to kill it? */
285 			if (WaitExceedsMaxStandbyDelay())
286 			{
287 				pid_t		pid;
288 
289 				/*
290 				 * Now find out who to throw out of the balloon.
291 				 */
292 				Assert(VirtualTransactionIdIsValid(*waitlist));
293 				pid = CancelVirtualTransaction(*waitlist, reason);
294 
295 				/*
296 				 * Wait a little bit for it to die so that we avoid flooding
297 				 * an unresponsive backend when system is heavily loaded.
298 				 */
299 				if (pid != 0)
300 					pg_usleep(5000L);
301 			}
302 		}
303 
304 		/* The virtual transaction is gone now, wait for the next one */
305 		waitlist++;
306 	}
307 
308 	/* Reset ps display if we changed it */
309 	if (new_status)
310 	{
311 		set_ps_display(new_status, false);
312 		pfree(new_status);
313 	}
314 }
315 
316 void
ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,RelFileNode node)317 ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
318 {
319 	VirtualTransactionId *backends;
320 
321 	/*
322 	 * If we get passed InvalidTransactionId then we are a little surprised,
323 	 * but it is theoretically possible in normal running. It also happens
324 	 * when replaying already applied WAL records after a standby crash or
325 	 * restart, or when replaying an XLOG_HEAP2_VISIBLE record that marks as
326 	 * frozen a page which was already all-visible.  If latestRemovedXid is
327 	 * invalid then there is no conflict. That rule applies across all record
328 	 * types that suffer from this conflict.
329 	 */
330 	if (!TransactionIdIsValid(latestRemovedXid))
331 		return;
332 
333 	backends = GetConflictingVirtualXIDs(latestRemovedXid,
334 										 node.dbNode);
335 
336 	ResolveRecoveryConflictWithVirtualXIDs(backends,
337 										 PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
338 										 true);
339 }
340 
341 void
ResolveRecoveryConflictWithTablespace(Oid tsid)342 ResolveRecoveryConflictWithTablespace(Oid tsid)
343 {
344 	VirtualTransactionId *temp_file_users;
345 
346 	/*
347 	 * Standby users may be currently using this tablespace for their
348 	 * temporary files. We only care about current users because
349 	 * temp_tablespace parameter will just ignore tablespaces that no longer
350 	 * exist.
351 	 *
352 	 * Ask everybody to cancel their queries immediately so we can ensure no
353 	 * temp files remain and we can remove the tablespace. Nuke the entire
354 	 * site from orbit, it's the only way to be sure.
355 	 *
356 	 * XXX: We could work out the pids of active backends using this
357 	 * tablespace by examining the temp filenames in the directory. We would
358 	 * then convert the pids into VirtualXIDs before attempting to cancel
359 	 * them.
360 	 *
361 	 * We don't wait for commit because drop tablespace is non-transactional.
362 	 */
363 	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
364 												InvalidOid);
365 	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
366 									   PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
367 									   true);
368 }
369 
370 void
ResolveRecoveryConflictWithDatabase(Oid dbid)371 ResolveRecoveryConflictWithDatabase(Oid dbid)
372 {
373 	/*
374 	 * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
375 	 * only waits for transactions and completely idle sessions would block
376 	 * us. This is rare enough that we do this as simply as possible: no wait,
377 	 * just force them off immediately.
378 	 *
379 	 * No locking is required here because we already acquired
380 	 * AccessExclusiveLock. Anybody trying to connect while we do this will
381 	 * block during InitPostgres() and then disconnect when they see the
382 	 * database has been removed.
383 	 */
384 	while (CountDBBackends(dbid) > 0)
385 	{
386 		CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true);
387 
388 		/*
389 		 * Wait awhile for them to die so that we avoid flooding an
390 		 * unresponsive backend when system is heavily loaded.
391 		 */
392 		pg_usleep(10000);
393 	}
394 }
395 
396 /*
397  * ResolveRecoveryConflictWithLock is called from ProcSleep()
398  * to resolve conflicts with other backends holding relation locks.
399  *
400  * The WaitLatch sleep normally done in ProcSleep()
401  * (when not InHotStandby) is performed here, for code clarity.
402  *
403  * We either resolve conflicts immediately or set a timeout to wake us at
404  * the limit of our patience.
405  *
406  * Resolve conflicts by canceling to all backends holding a conflicting
407  * lock.  As we are already queued to be granted the lock, no new lock
408  * requests conflicting with ours will be granted in the meantime.
409  *
410  * We also must check for deadlocks involving the Startup process and
411  * hot-standby backend processes. If deadlock_timeout is reached in
412  * this function, all the backends holding the conflicting locks are
413  * requested to check themselves for deadlocks.
414  */
415 void
ResolveRecoveryConflictWithLock(LOCKTAG locktag)416 ResolveRecoveryConflictWithLock(LOCKTAG locktag)
417 {
418 	TimestampTz ltime;
419 
420 	Assert(InHotStandby);
421 
422 	ltime = GetStandbyLimitTime();
423 
424 	if (GetCurrentTimestamp() >= ltime && ltime != 0)
425 	{
426 		/*
427 		 * We're already behind, so clear a path as quickly as possible.
428 		 */
429 		VirtualTransactionId *backends;
430 
431 		backends = GetLockConflicts(&locktag, AccessExclusiveLock);
432 
433 		/*
434 		 * Prevent ResolveRecoveryConflictWithVirtualXIDs() from reporting
435 		 * "waiting" in PS display by disabling its argument report_waiting
436 		 * because the caller, WaitOnLock(), has already reported that.
437 		 */
438 		ResolveRecoveryConflictWithVirtualXIDs(backends,
439 											 PROCSIG_RECOVERY_CONFLICT_LOCK,
440 											 false);
441 	}
442 	else
443 	{
444 		/*
445 		 * Wait (or wait again) until ltime, and check for deadlocks as well
446 		 * if we will be waiting longer than deadlock_timeout
447 		 */
448 		EnableTimeoutParams timeouts[2];
449 		int			cnt = 0;
450 
451 		if (ltime != 0)
452 		{
453 			got_standby_lock_timeout = false;
454 			timeouts[cnt].id = STANDBY_LOCK_TIMEOUT;
455 			timeouts[cnt].type = TMPARAM_AT;
456 			timeouts[cnt].fin_time = ltime;
457 			cnt++;
458 		}
459 
460 		got_standby_deadlock_timeout = false;
461 		timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
462 		timeouts[cnt].type = TMPARAM_AFTER;
463 		timeouts[cnt].delay_ms = DeadlockTimeout;
464 		cnt++;
465 
466 		enable_timeouts(timeouts, cnt);
467 	}
468 
469 	/* Wait to be signaled by the release of the Relation Lock */
470 	ProcWaitForSignal();
471 
472 	/*
473 	 * Exit if ltime is reached. Then all the backends holding conflicting
474 	 * locks will be canceled in the next ResolveRecoveryConflictWithLock()
475 	 * call.
476 	 */
477 	if (got_standby_lock_timeout)
478 		goto cleanup;
479 
480 	if (got_standby_deadlock_timeout)
481 	{
482 		VirtualTransactionId *backends;
483 
484 		backends = GetLockConflicts(&locktag, AccessExclusiveLock);
485 
486 		/* Quick exit if there's no work to be done */
487 		if (!VirtualTransactionIdIsValid(*backends))
488 			goto cleanup;
489 
490 		/*
491 		 * Send signals to all the backends holding the conflicting locks, to
492 		 * ask them to check themselves for deadlocks.
493 		 */
494 		while (VirtualTransactionIdIsValid(*backends))
495 		{
496 			SignalVirtualTransaction(*backends,
497 									 PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
498 									 false);
499 			backends++;
500 		}
501 
502 		/*
503 		 * Wait again here to be signaled by the release of the Relation Lock,
504 		 * to prevent the subsequent RecoveryConflictWithLock() from causing
505 		 * deadlock_timeout and sending a request for deadlocks check again.
506 		 * Otherwise the request continues to be sent every deadlock_timeout
507 		 * until the relation locks are released or ltime is reached.
508 		 */
509 		got_standby_deadlock_timeout = false;
510 		ProcWaitForSignal();
511 	}
512 
513 cleanup:
514 
515 	/*
516 	 * Clear any timeout requests established above.  We assume here that the
517 	 * Startup process doesn't have any other outstanding timeouts than those
518 	 * used by this function. If that stops being true, we could cancel the
519 	 * timeouts individually, but that'd be slower.
520 	 */
521 	disable_all_timeouts(false);
522 	got_standby_lock_timeout = false;
523 	got_standby_deadlock_timeout = false;
524 }
525 
526 /*
527  * ResolveRecoveryConflictWithBufferPin is called from LockBufferForCleanup()
528  * to resolve conflicts with other backends holding buffer pins.
529  *
530  * The ProcWaitForSignal() sleep normally done in LockBufferForCleanup()
531  * (when not InHotStandby) is performed here, for code clarity.
532  *
533  * We either resolve conflicts immediately or set a timeout to wake us at
534  * the limit of our patience.
535  *
536  * Resolve conflicts by sending a PROCSIG signal to all backends to check if
537  * they hold one of the buffer pins that is blocking Startup process. If so,
538  * those backends will take an appropriate error action, ERROR or FATAL.
539  *
540  * We also must check for deadlocks.  Deadlocks occur because if queries
541  * wait on a lock, that must be behind an AccessExclusiveLock, which can only
542  * be cleared if the Startup process replays a transaction completion record.
543  * If Startup process is also waiting then that is a deadlock. The deadlock
544  * can occur if the query is waiting and then the Startup sleeps, or if
545  * Startup is sleeping and the query waits on a lock. We protect against
546  * only the former sequence here, the latter sequence is checked prior to
547  * the query sleeping, in CheckRecoveryConflictDeadlock().
548  *
549  * Deadlocks are extremely rare, and relatively expensive to check for,
550  * so we don't do a deadlock check right away ... only if we have had to wait
551  * at least deadlock_timeout.
552  */
553 void
ResolveRecoveryConflictWithBufferPin(void)554 ResolveRecoveryConflictWithBufferPin(void)
555 {
556 	TimestampTz ltime;
557 
558 	Assert(InHotStandby);
559 
560 	ltime = GetStandbyLimitTime();
561 
562 	if (GetCurrentTimestamp() >= ltime && ltime != 0)
563 	{
564 		/*
565 		 * We're already behind, so clear a path as quickly as possible.
566 		 */
567 		SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
568 	}
569 	else
570 	{
571 		/*
572 		 * Wake up at ltime, and check for deadlocks as well if we will be
573 		 * waiting longer than deadlock_timeout
574 		 */
575 		EnableTimeoutParams timeouts[2];
576 		int			cnt = 0;
577 
578 		if (ltime != 0)
579 		{
580 			timeouts[cnt].id = STANDBY_TIMEOUT;
581 			timeouts[cnt].type = TMPARAM_AT;
582 			timeouts[cnt].fin_time = ltime;
583 			cnt++;
584 		}
585 
586 		got_standby_deadlock_timeout = false;
587 		timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
588 		timeouts[cnt].type = TMPARAM_AFTER;
589 		timeouts[cnt].delay_ms = DeadlockTimeout;
590 		cnt++;
591 
592 		enable_timeouts(timeouts, cnt);
593 	}
594 
595 	/* Wait to be signaled by UnpinBuffer() */
596 	ProcWaitForSignal();
597 
598 	if (got_standby_deadlock_timeout)
599 	{
600 		/*
601 		 * Send out a request for hot-standby backends to check themselves for
602 		 * deadlocks.
603 		 *
604 		 * XXX The subsequent ResolveRecoveryConflictWithBufferPin() will wait
605 		 * to be signaled by UnpinBuffer() again and send a request for
606 		 * deadlocks check if deadlock_timeout happens. This causes the
607 		 * request to continue to be sent every deadlock_timeout until the
608 		 * buffer is unpinned or ltime is reached. This would increase the
609 		 * workload in the startup process and backends. In practice it may
610 		 * not be so harmful because the period that the buffer is kept pinned
611 		 * is basically no so long. But we should fix this?
612 		 */
613 		SendRecoveryConflictWithBufferPin(
614 										  PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
615 	}
616 
617 	/*
618 	 * Clear any timeout requests established above.  We assume here that the
619 	 * Startup process doesn't have any other timeouts than what this function
620 	 * uses.  If that stops being true, we could cancel the timeouts
621 	 * individually, but that'd be slower.
622 	 */
623 	disable_all_timeouts(false);
624 	got_standby_deadlock_timeout = false;
625 }
626 
627 static void
SendRecoveryConflictWithBufferPin(ProcSignalReason reason)628 SendRecoveryConflictWithBufferPin(ProcSignalReason reason)
629 {
630 	Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ||
631 		   reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
632 
633 	/*
634 	 * We send signal to all backends to ask them if they are holding the
635 	 * buffer pin which is delaying the Startup process. We must not set the
636 	 * conflict flag yet, since most backends will be innocent. Let the
637 	 * SIGUSR1 handling in each backend decide their own fate.
638 	 */
639 	CancelDBBackends(InvalidOid, reason, false);
640 }
641 
642 /*
643  * In Hot Standby perform early deadlock detection.  We abort the lock
644  * wait if we are about to sleep while holding the buffer pin that Startup
645  * process is waiting for.
646  *
647  * Note: this code is pessimistic, because there is no way for it to
648  * determine whether an actual deadlock condition is present: the lock we
649  * need to wait for might be unrelated to any held by the Startup process.
650  * Sooner or later, this mechanism should get ripped out in favor of somehow
651  * accounting for buffer locks in DeadLockCheck().  However, errors here
652  * seem to be very low-probability in practice, so for now it's not worth
653  * the trouble.
654  */
655 void
CheckRecoveryConflictDeadlock(void)656 CheckRecoveryConflictDeadlock(void)
657 {
658 	Assert(!InRecovery);		/* do not call in Startup process */
659 
660 	if (!HoldingBufferPinThatDelaysRecovery())
661 		return;
662 
663 	/*
664 	 * Error message should match ProcessInterrupts() but we avoid calling
665 	 * that because we aren't handling an interrupt at this point. Note that
666 	 * we only cancel the current transaction here, so if we are in a
667 	 * subtransaction and the pin is held by a parent, then the Startup
668 	 * process will continue to wait even though we have avoided deadlock.
669 	 */
670 	ereport(ERROR,
671 			(errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
672 			 errmsg("canceling statement due to conflict with recovery"),
673 	   errdetail("User transaction caused buffer deadlock with recovery.")));
674 }
675 
676 
677 /* --------------------------------
678  *		timeout handler routines
679  * --------------------------------
680  */
681 
682 /*
683  * StandbyDeadLockHandler() will be called if STANDBY_DEADLOCK_TIMEOUT
684  * occurs before STANDBY_TIMEOUT.
685  */
686 void
StandbyDeadLockHandler(void)687 StandbyDeadLockHandler(void)
688 {
689 	got_standby_deadlock_timeout = true;
690 }
691 
692 /*
693  * StandbyTimeoutHandler() will be called if STANDBY_TIMEOUT is exceeded.
694  * Send out a request to release conflicting buffer pins unconditionally,
695  * so we can press ahead with applying changes in recovery.
696  */
697 void
StandbyTimeoutHandler(void)698 StandbyTimeoutHandler(void)
699 {
700 	/* forget any pending STANDBY_DEADLOCK_TIMEOUT request */
701 	disable_timeout(STANDBY_DEADLOCK_TIMEOUT, false);
702 
703 	SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
704 }
705 
706 /*
707  * StandbyLockTimeoutHandler() will be called if STANDBY_LOCK_TIMEOUT is exceeded.
708  */
709 void
StandbyLockTimeoutHandler(void)710 StandbyLockTimeoutHandler(void)
711 {
712 	got_standby_lock_timeout = true;
713 }
714 
715 /*
716  * -----------------------------------------------------
717  * Locking in Recovery Mode
718  * -----------------------------------------------------
719  *
720  * All locks are held by the Startup process using a single virtual
721  * transaction. This implementation is both simpler and in some senses,
722  * more correct. The locks held mean "some original transaction held
723  * this lock, so query access is not allowed at this time". So the Startup
724  * process is the proxy by which the original locks are implemented.
725  *
726  * We only keep track of AccessExclusiveLocks, which are only ever held by
727  * one transaction on one relation.
728  *
729  * We keep a hash table of lists of locks in local memory keyed by xid,
730  * RecoveryLockLists, so we can keep track of the various entries made by
731  * the Startup process's virtual xid in the shared lock table.
732  *
733  * We record the lock against the top-level xid, rather than individual
734  * subtransaction xids. This means AccessExclusiveLocks held by aborted
735  * subtransactions are not released as early as possible on standbys.
736  *
737  * List elements use type xl_standby_lock, since the WAL record type exactly
738  * matches the information that we need to keep track of.
739  *
740  * We use session locks rather than normal locks so we don't need
741  * ResourceOwners.
742  */
743 
744 
745 void
StandbyAcquireAccessExclusiveLock(TransactionId xid,Oid dbOid,Oid relOid)746 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
747 {
748 	RecoveryLockListsEntry *entry;
749 	xl_standby_lock *newlock;
750 	LOCKTAG		locktag;
751 	bool		found;
752 
753 	/* Already processed? */
754 	if (!TransactionIdIsValid(xid) ||
755 		TransactionIdDidCommit(xid) ||
756 		TransactionIdDidAbort(xid))
757 		return;
758 
759 	elog(trace_recovery(DEBUG4),
760 		 "adding recovery lock: db %u rel %u", dbOid, relOid);
761 
762 	/* dbOid is InvalidOid when we are locking a shared relation. */
763 	Assert(OidIsValid(relOid));
764 
765 	/* Create a new list for this xid, if we don't have one already. */
766 	entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
767 	if (!found)
768 	{
769 		entry->xid = xid;
770 		entry->locks = NIL;
771 	}
772 
773 	newlock = palloc(sizeof(xl_standby_lock));
774 	newlock->xid = xid;
775 	newlock->dbOid = dbOid;
776 	newlock->relOid = relOid;
777 	entry->locks = lappend(entry->locks, newlock);
778 
779 	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
780 
781 	(void) LockAcquire(&locktag, AccessExclusiveLock, true, false);
782 }
783 
784 static void
StandbyReleaseLockList(List * locks)785 StandbyReleaseLockList(List *locks)
786 {
787 	while (locks)
788 	{
789 		xl_standby_lock *lock = (xl_standby_lock *) linitial(locks);
790 		LOCKTAG		locktag;
791 		elog(trace_recovery(DEBUG4),
792 			 "releasing recovery lock: xid %u db %u rel %u",
793 			 lock->xid, lock->dbOid, lock->relOid);
794 		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
795 		if (!LockRelease(&locktag, AccessExclusiveLock, true))
796 		{
797 			elog(LOG,
798 				 "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
799 				 lock->xid, lock->dbOid, lock->relOid);
800 			Assert(false);
801 		}
802 		pfree(lock);
803 		locks = list_delete_first(locks);
804 	}
805 }
806 
807 static void
StandbyReleaseLocks(TransactionId xid)808 StandbyReleaseLocks(TransactionId xid)
809 {
810 	RecoveryLockListsEntry *entry;
811 
812 	if (TransactionIdIsValid(xid))
813 	{
814 		if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
815 		{
816 			StandbyReleaseLockList(entry->locks);
817 			hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
818 		}
819 	}
820 	else
821 		StandbyReleaseAllLocks();
822 }
823 
824 /*
825  * Release locks for a transaction tree, starting at xid down, from
826  * RecoveryLockLists.
827  *
828  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
829  * to remove any AccessExclusiveLocks requested by a transaction.
830  */
831 void
StandbyReleaseLockTree(TransactionId xid,int nsubxids,TransactionId * subxids)832 StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
833 {
834 	int			i;
835 
836 	StandbyReleaseLocks(xid);
837 
838 	for (i = 0; i < nsubxids; i++)
839 		StandbyReleaseLocks(subxids[i]);
840 }
841 
842 /*
843  * Called at end of recovery and when we see a shutdown checkpoint.
844  */
845 void
StandbyReleaseAllLocks(void)846 StandbyReleaseAllLocks(void)
847 {
848 	HASH_SEQ_STATUS	status;
849 	RecoveryLockListsEntry *entry;
850 
851 	elog(trace_recovery(DEBUG2), "release all standby locks");
852 
853 	hash_seq_init(&status, RecoveryLockLists);
854 	while ((entry = hash_seq_search(&status)))
855 	{
856 		StandbyReleaseLockList(entry->locks);
857 		hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
858 	}
859 }
860 
861 /*
862  * StandbyReleaseOldLocks
863  *		Release standby locks held by top-level XIDs that aren't running,
864  *		as long as they're not prepared transactions.
865  */
866 void
StandbyReleaseOldLocks(int nxids,TransactionId * xids)867 StandbyReleaseOldLocks(int nxids, TransactionId *xids)
868 {
869 	HASH_SEQ_STATUS status;
870 	RecoveryLockListsEntry *entry;
871 
872 	hash_seq_init(&status, RecoveryLockLists);
873 	while ((entry = hash_seq_search(&status)))
874 	{
875 		bool		remove = false;
876 
877 		Assert(TransactionIdIsValid(entry->xid));
878 
879 		if (StandbyTransactionIdIsPrepared(entry->xid))
880 			remove = false;
881 		else
882 		{
883 			int			i;
884 			bool		found = false;
885 
886 			for (i = 0; i < nxids; i++)
887 			{
888 				if (entry->xid == xids[i])
889 				{
890 					found = true;
891 					break;
892 				}
893 			}
894 
895 			/*
896 			 * If its not a running transaction, remove it.
897 			 */
898 			if (!found)
899 				remove = true;
900 		}
901 
902 		if (remove)
903 		{
904 			StandbyReleaseLockList(entry->locks);
905 			hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
906 		}
907 	}
908 }
909 
910 /*
911  * --------------------------------------------------------------------
912  *		Recovery handling for Rmgr RM_STANDBY_ID
913  *
914  * These record types will only be created if XLogStandbyInfoActive()
915  * --------------------------------------------------------------------
916  */
917 
918 void
standby_redo(XLogReaderState * record)919 standby_redo(XLogReaderState *record)
920 {
921 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
922 
923 	/* Backup blocks are not used in standby records */
924 	Assert(!XLogRecHasAnyBlockRefs(record));
925 
926 	/* Do nothing if we're not in hot standby mode */
927 	if (standbyState == STANDBY_DISABLED)
928 		return;
929 
930 	if (info == XLOG_STANDBY_LOCK)
931 	{
932 		xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
933 		int			i;
934 
935 		for (i = 0; i < xlrec->nlocks; i++)
936 			StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
937 											  xlrec->locks[i].dbOid,
938 											  xlrec->locks[i].relOid);
939 	}
940 	else if (info == XLOG_RUNNING_XACTS)
941 	{
942 		xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
943 		RunningTransactionsData running;
944 
945 		running.xcnt = xlrec->xcnt;
946 		running.subxcnt = xlrec->subxcnt;
947 		running.subxid_overflow = xlrec->subxid_overflow;
948 		running.nextXid = xlrec->nextXid;
949 		running.latestCompletedXid = xlrec->latestCompletedXid;
950 		running.oldestRunningXid = xlrec->oldestRunningXid;
951 		running.xids = xlrec->xids;
952 
953 		ProcArrayApplyRecoveryInfo(&running);
954 	}
955 	else if (info == XLOG_INVALIDATIONS)
956 	{
957 		xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
958 
959 		ProcessCommittedInvalidationMessages(xlrec->msgs,
960 											 xlrec->nmsgs,
961 											 xlrec->relcacheInitFileInval,
962 											 xlrec->dbId,
963 											 xlrec->tsId);
964 	}
965 	else
966 		elog(PANIC, "standby_redo: unknown op code %u", info);
967 }
968 
969 /*
970  * Log details of the current snapshot to WAL. This allows the snapshot state
971  * to be reconstructed on the standby and for logical decoding.
972  *
973  * This is used for Hot Standby as follows:
974  *
975  * We can move directly to STANDBY_SNAPSHOT_READY at startup if we
976  * start from a shutdown checkpoint because we know nothing was running
977  * at that time and our recovery snapshot is known empty. In the more
978  * typical case of an online checkpoint we need to jump through a few
979  * hoops to get a correct recovery snapshot and this requires a two or
980  * sometimes a three stage process.
981  *
982  * The initial snapshot must contain all running xids and all current
983  * AccessExclusiveLocks at a point in time on the standby. Assembling
984  * that information while the server is running requires many and
985  * various LWLocks, so we choose to derive that information piece by
986  * piece and then re-assemble that info on the standby. When that
987  * information is fully assembled we move to STANDBY_SNAPSHOT_READY.
988  *
989  * Since locking on the primary when we derive the information is not
990  * strict, we note that there is a time window between the derivation and
991  * writing to WAL of the derived information. That allows race conditions
992  * that we must resolve, since xids and locks may enter or leave the
993  * snapshot during that window. This creates the issue that an xid or
994  * lock may start *after* the snapshot has been derived yet *before* the
995  * snapshot is logged in the running xacts WAL record. We resolve this by
996  * starting to accumulate changes at a point just prior to when we derive
997  * the snapshot on the primary, then ignore duplicates when we later apply
998  * the snapshot from the running xacts record. This is implemented during
999  * CreateCheckpoint() where we use the logical checkpoint location as
1000  * our starting point and then write the running xacts record immediately
1001  * before writing the main checkpoint WAL record. Since we always start
1002  * up from a checkpoint and are immediately at our starting point, we
1003  * unconditionally move to STANDBY_INITIALIZED. After this point we
1004  * must do 4 things:
1005  *	* move shared nextXid forwards as we see new xids
1006  *	* extend the clog and subtrans with each new xid
1007  *	* keep track of uncommitted known assigned xids
1008  *	* keep track of uncommitted AccessExclusiveLocks
1009  *
1010  * When we see a commit/abort we must remove known assigned xids and locks
1011  * from the completing transaction. Attempted removals that cannot locate
1012  * an entry are expected and must not cause an error when we are in state
1013  * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
1014  * KnownAssignedXidsRemove().
1015  *
1016  * Later, when we apply the running xact data we must be careful to ignore
1017  * transactions already committed, since those commits raced ahead when
1018  * making WAL entries.
1019  *
1020  * The loose timing also means that locks may be recorded that have a
1021  * zero xid, since xids are removed from procs before locks are removed.
1022  * So we must prune the lock list down to ensure we hold locks only for
1023  * currently running xids, performed by StandbyReleaseOldLocks().
1024  * Zero xids should no longer be possible, but we may be replaying WAL
1025  * from a time when they were possible.
1026  *
1027  * For logical decoding only the running xacts information is needed;
1028  * there's no need to look at the locking information, but it's logged anyway,
1029  * as there's no independent knob to just enable logical decoding. For
1030  * details of how this is used, check snapbuild.c's introductory comment.
1031  *
1032  *
1033  * Returns the RecPtr of the last inserted record.
1034  */
1035 XLogRecPtr
LogStandbySnapshot(void)1036 LogStandbySnapshot(void)
1037 {
1038 	XLogRecPtr	recptr;
1039 	RunningTransactions running;
1040 	xl_standby_lock *locks;
1041 	int			nlocks;
1042 
1043 	Assert(XLogStandbyInfoActive());
1044 
1045 	/*
1046 	 * Get details of any AccessExclusiveLocks being held at the moment.
1047 	 */
1048 	locks = GetRunningTransactionLocks(&nlocks);
1049 	if (nlocks > 0)
1050 		LogAccessExclusiveLocks(nlocks, locks);
1051 	pfree(locks);
1052 
1053 	/*
1054 	 * Log details of all in-progress transactions. This should be the last
1055 	 * record we write, because standby will open up when it sees this.
1056 	 */
1057 	running = GetRunningTransactionData();
1058 
1059 	/*
1060 	 * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
1061 	 * For Hot Standby this can be done before inserting the WAL record
1062 	 * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
1063 	 * the clog. For logical decoding, though, the lock can't be released
1064 	 * early because the clog might be "in the future" from the POV of the
1065 	 * historic snapshot. This would allow for situations where we're waiting
1066 	 * for the end of a transaction listed in the xl_running_xacts record
1067 	 * which, according to the WAL, has committed before the xl_running_xacts
1068 	 * record. Fortunately this routine isn't executed frequently, and it's
1069 	 * only a shared lock.
1070 	 */
1071 	if (wal_level < WAL_LEVEL_LOGICAL)
1072 		LWLockRelease(ProcArrayLock);
1073 
1074 	recptr = LogCurrentRunningXacts(running);
1075 
1076 	/* Release lock if we kept it longer ... */
1077 	if (wal_level >= WAL_LEVEL_LOGICAL)
1078 		LWLockRelease(ProcArrayLock);
1079 
1080 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
1081 	LWLockRelease(XidGenLock);
1082 
1083 	return recptr;
1084 }
1085 
1086 /*
1087  * Record an enhanced snapshot of running transactions into WAL.
1088  *
1089  * The definitions of RunningTransactionsData and xl_xact_running_xacts
1090  * are similar. We keep them separate because xl_xact_running_xacts
1091  * is a contiguous chunk of memory and never exists fully until it is
1092  * assembled in WAL.
1093  */
1094 static XLogRecPtr
LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)1095 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
1096 {
1097 	xl_running_xacts xlrec;
1098 	XLogRecPtr	recptr;
1099 
1100 	xlrec.xcnt = CurrRunningXacts->xcnt;
1101 	xlrec.subxcnt = CurrRunningXacts->subxcnt;
1102 	xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
1103 	xlrec.nextXid = CurrRunningXacts->nextXid;
1104 	xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
1105 	xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
1106 
1107 	/* Header */
1108 	XLogBeginInsert();
1109 	XLogRegisterData((char *) (&xlrec), MinSizeOfXactRunningXacts);
1110 
1111 	/* array of TransactionIds */
1112 	if (xlrec.xcnt > 0)
1113 		XLogRegisterData((char *) CurrRunningXacts->xids,
1114 					   (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId));
1115 
1116 	recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS);
1117 
1118 	if (CurrRunningXacts->subxid_overflow)
1119 		elog(trace_recovery(DEBUG2),
1120 			 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1121 			 CurrRunningXacts->xcnt,
1122 			 (uint32) (recptr >> 32), (uint32) recptr,
1123 			 CurrRunningXacts->oldestRunningXid,
1124 			 CurrRunningXacts->latestCompletedXid,
1125 			 CurrRunningXacts->nextXid);
1126 	else
1127 		elog(trace_recovery(DEBUG2),
1128 			 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1129 			 CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
1130 			 (uint32) (recptr >> 32), (uint32) recptr,
1131 			 CurrRunningXacts->oldestRunningXid,
1132 			 CurrRunningXacts->latestCompletedXid,
1133 			 CurrRunningXacts->nextXid);
1134 
1135 	/*
1136 	 * Ensure running_xacts information is synced to disk not too far in the
1137 	 * future. We don't want to stall anything though (i.e. use XLogFlush()),
1138 	 * so we let the wal writer do it during normal operation.
1139 	 * XLogSetAsyncXactLSN() conveniently will mark the LSN as to-be-synced
1140 	 * and nudge the WALWriter into action if sleeping. Check
1141 	 * XLogBackgroundFlush() for details why a record might not be flushed
1142 	 * without it.
1143 	 */
1144 	XLogSetAsyncXactLSN(recptr);
1145 
1146 	return recptr;
1147 }
1148 
1149 /*
1150  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
1151  * logged, as described in backend/storage/lmgr/README.
1152  */
1153 static void
LogAccessExclusiveLocks(int nlocks,xl_standby_lock * locks)1154 LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
1155 {
1156 	xl_standby_locks xlrec;
1157 
1158 	xlrec.nlocks = nlocks;
1159 
1160 	XLogBeginInsert();
1161 	XLogRegisterData((char *) &xlrec, offsetof(xl_standby_locks, locks));
1162 	XLogRegisterData((char *) locks, nlocks * sizeof(xl_standby_lock));
1163 
1164 	(void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK);
1165 }
1166 
1167 /*
1168  * Individual logging of AccessExclusiveLocks for use during LockAcquire()
1169  */
1170 void
LogAccessExclusiveLock(Oid dbOid,Oid relOid)1171 LogAccessExclusiveLock(Oid dbOid, Oid relOid)
1172 {
1173 	xl_standby_lock xlrec;
1174 
1175 	xlrec.xid = GetTopTransactionId();
1176 
1177 	/*
1178 	 * Decode the locktag back to the original values, to avoid sending lots
1179 	 * of empty bytes with every message.  See lock.h to check how a locktag
1180 	 * is defined for LOCKTAG_RELATION
1181 	 */
1182 	xlrec.dbOid = dbOid;
1183 	xlrec.relOid = relOid;
1184 
1185 	LogAccessExclusiveLocks(1, &xlrec);
1186 }
1187 
1188 /*
1189  * Prepare to log an AccessExclusiveLock, for use during LockAcquire()
1190  */
1191 void
LogAccessExclusiveLockPrepare(void)1192 LogAccessExclusiveLockPrepare(void)
1193 {
1194 	/*
1195 	 * Ensure that a TransactionId has been assigned to this transaction, for
1196 	 * two reasons, both related to lock release on the standby. First, we
1197 	 * must assign an xid so that RecordTransactionCommit() and
1198 	 * RecordTransactionAbort() do not optimise away the transaction
1199 	 * completion record which recovery relies upon to release locks. It's a
1200 	 * hack, but for a corner case not worth adding code for into the main
1201 	 * commit path. Second, we must assign an xid before the lock is recorded
1202 	 * in shared memory, otherwise a concurrently executing
1203 	 * GetRunningTransactionLocks() might see a lock associated with an
1204 	 * InvalidTransactionId which we later assert cannot happen.
1205 	 */
1206 	(void) GetTopTransactionId();
1207 }
1208 
1209 /*
1210  * Emit WAL for invalidations. This currently is only used for commits without
1211  * an xid but which contain invalidations.
1212  */
1213 void
LogStandbyInvalidations(int nmsgs,SharedInvalidationMessage * msgs,bool relcacheInitFileInval)1214 LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
1215 						bool relcacheInitFileInval)
1216 {
1217 	xl_invalidations xlrec;
1218 
1219 	/* prepare record */
1220 	memset(&xlrec, 0, sizeof(xlrec));
1221 	xlrec.dbId = MyDatabaseId;
1222 	xlrec.tsId = MyDatabaseTableSpace;
1223 	xlrec.relcacheInitFileInval = relcacheInitFileInval;
1224 	xlrec.nmsgs = nmsgs;
1225 
1226 	/* perform insertion */
1227 	XLogBeginInsert();
1228 	XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
1229 	XLogRegisterData((char *) msgs,
1230 					 nmsgs * sizeof(SharedInvalidationMessage));
1231 	XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
1232 }
1233