1 /*-------------------------------------------------------------------------
2  *
3  * standby.c
4  *	  Misc functions used in Hot Standby mode.
5  *
6  *	All functions for handling RM_STANDBY_ID, which relate to
7  *	AccessExclusiveLocks and starting snapshots for Hot Standby mode.
8  *	Plus conflict recovery processing.
9  *
10  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *	  src/backend/storage/ipc/standby.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 #include "access/transam.h"
20 #include "access/twophase.h"
21 #include "access/xact.h"
22 #include "access/xlog.h"
23 #include "access/xloginsert.h"
24 #include "miscadmin.h"
25 #include "pgstat.h"
26 #include "storage/bufmgr.h"
27 #include "storage/lmgr.h"
28 #include "storage/proc.h"
29 #include "storage/procarray.h"
30 #include "storage/sinvaladt.h"
31 #include "storage/standby.h"
32 #include "utils/hsearch.h"
33 #include "utils/memutils.h"
34 #include "utils/ps_status.h"
35 #include "utils/timeout.h"
36 #include "utils/timestamp.h"
37 
38 /* User-settable GUC parameters */
39 int			vacuum_defer_cleanup_age;
40 int			max_standby_archive_delay = 30 * 1000;
41 int			max_standby_streaming_delay = 30 * 1000;
42 
43 static HTAB *RecoveryLockLists;
44 
45 /* Flags set by timeout handlers */
46 static volatile sig_atomic_t got_standby_deadlock_timeout = false;
47 static volatile sig_atomic_t got_standby_lock_timeout = false;
48 
49 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
50 									   ProcSignalReason reason, bool report_waiting);
51 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
52 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
53 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
54 
55 /*
56  * Keep track of all the locks owned by a given transaction.
57  */
58 typedef struct RecoveryLockListsEntry
59 {
60 	TransactionId	xid;
61 	List		   *locks;
62 } RecoveryLockListsEntry;
63 
64 /*
65  * InitRecoveryTransactionEnvironment
66  *		Initialize tracking of in-progress transactions in master
67  *
68  * We need to issue shared invalidations and hold locks. Holding locks
69  * means others may want to wait on us, so we need to make a lock table
70  * vxact entry like a real transaction. We could create and delete
71  * lock table entries for each transaction but its simpler just to create
72  * one permanent entry and leave it there all the time. Locks are then
73  * acquired and released as needed. Yes, this means you can see the
74  * Startup process in pg_locks once we have run this.
75  */
76 void
InitRecoveryTransactionEnvironment(void)77 InitRecoveryTransactionEnvironment(void)
78 {
79 	VirtualTransactionId vxid;
80 	HASHCTL			hash_ctl;
81 
82 	/*
83 	 * Initialize the hash table for tracking the list of locks held by each
84 	 * transaction.
85 	 */
86 	memset(&hash_ctl, 0, sizeof(hash_ctl));
87 	hash_ctl.keysize = sizeof(TransactionId);
88 	hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
89 	RecoveryLockLists = hash_create("RecoveryLockLists",
90 									64,
91 									&hash_ctl,
92 									HASH_ELEM | HASH_BLOBS);
93 
94 	/*
95 	 * Initialize shared invalidation management for Startup process, being
96 	 * careful to register ourselves as a sendOnly process so we don't need to
97 	 * read messages, nor will we get signalled when the queue starts filling
98 	 * up.
99 	 */
100 	SharedInvalBackendInit(true);
101 
102 	/*
103 	 * Lock a virtual transaction id for Startup process.
104 	 *
105 	 * We need to do GetNextLocalTransactionId() because
106 	 * SharedInvalBackendInit() leaves localTransactionid invalid and the lock
107 	 * manager doesn't like that at all.
108 	 *
109 	 * Note that we don't need to run XactLockTableInsert() because nobody
110 	 * needs to wait on xids. That sounds a little strange, but table locks
111 	 * are held by vxids and row level locks are held by xids. All queries
112 	 * hold AccessShareLocks so never block while we write or lock new rows.
113 	 */
114 	vxid.backendId = MyBackendId;
115 	vxid.localTransactionId = GetNextLocalTransactionId();
116 	VirtualXactLockTableInsert(vxid);
117 
118 	standbyState = STANDBY_INITIALIZED;
119 }
120 
121 /*
122  * ShutdownRecoveryTransactionEnvironment
123  *		Shut down transaction tracking
124  *
125  * Prepare to switch from hot standby mode to normal operation. Shut down
126  * recovery-time transaction tracking.
127  *
128  * This must be called even in shutdown of startup process if transaction
129  * tracking has been initialized. Otherwise some locks the tracked
130  * transactions were holding will not be released and and may interfere with
131  * the processes still running (but will exit soon later) at the exit of
132  * startup process.
133  */
134 void
ShutdownRecoveryTransactionEnvironment(void)135 ShutdownRecoveryTransactionEnvironment(void)
136 {
137 	/*
138 	 * Do nothing if RecoveryLockLists is NULL because which means that
139 	 * transaction tracking has not been yet initialized or has been already
140 	 * shutdowned. This prevents transaction tracking from being shutdowned
141 	 * unexpectedly more than once.
142 	 */
143 	if (RecoveryLockLists == NULL)
144 		return;
145 
146 	/* Mark all tracked in-progress transactions as finished. */
147 	ExpireAllKnownAssignedTransactionIds();
148 
149 	/* Release all locks the tracked transactions were holding */
150 	StandbyReleaseAllLocks();
151 
152 	/* Destroy the hash table of locks. */
153 	hash_destroy(RecoveryLockLists);
154 	RecoveryLockLists = NULL;
155 
156 	/* Cleanup our VirtualTransaction */
157 	VirtualXactLockTableCleanup();
158 }
159 
160 
161 /*
162  * -----------------------------------------------------
163  *		Standby wait timers and backend cancel logic
164  * -----------------------------------------------------
165  */
166 
167 /*
168  * Determine the cutoff time at which we want to start canceling conflicting
169  * transactions.  Returns zero (a time safely in the past) if we are willing
170  * to wait forever.
171  */
172 static TimestampTz
GetStandbyLimitTime(void)173 GetStandbyLimitTime(void)
174 {
175 	TimestampTz rtime;
176 	bool		fromStream;
177 
178 	/*
179 	 * The cutoff time is the last WAL data receipt time plus the appropriate
180 	 * delay variable.  Delay of -1 means wait forever.
181 	 */
182 	GetXLogReceiptTime(&rtime, &fromStream);
183 	if (fromStream)
184 	{
185 		if (max_standby_streaming_delay < 0)
186 			return 0;			/* wait forever */
187 		return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
188 	}
189 	else
190 	{
191 		if (max_standby_archive_delay < 0)
192 			return 0;			/* wait forever */
193 		return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
194 	}
195 }
196 
197 #define STANDBY_INITIAL_WAIT_US  1000
198 static int	standbyWait_us = STANDBY_INITIAL_WAIT_US;
199 
200 /*
201  * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
202  * We wait here for a while then return. If we decide we can't wait any
203  * more then we return true, if we can wait some more return false.
204  */
205 static bool
WaitExceedsMaxStandbyDelay(void)206 WaitExceedsMaxStandbyDelay(void)
207 {
208 	TimestampTz ltime;
209 
210 	CHECK_FOR_INTERRUPTS();
211 
212 	/* Are we past the limit time? */
213 	ltime = GetStandbyLimitTime();
214 	if (ltime && GetCurrentTimestamp() >= ltime)
215 		return true;
216 
217 	/*
218 	 * Sleep a bit (this is essential to avoid busy-waiting).
219 	 */
220 	pg_usleep(standbyWait_us);
221 
222 	/*
223 	 * Progressively increase the sleep times, but not to more than 1s, since
224 	 * pg_usleep isn't interruptable on some platforms.
225 	 */
226 	standbyWait_us *= 2;
227 	if (standbyWait_us > 1000000)
228 		standbyWait_us = 1000000;
229 
230 	return false;
231 }
232 
233 /*
234  * This is the main executioner for any query backend that conflicts with
235  * recovery processing. Judgement has already been passed on it within
236  * a specific rmgr. Here we just issue the orders to the procs. The procs
237  * then throw the required error as instructed.
238  *
239  * If report_waiting is true, "waiting" is reported in PS display if necessary.
240  * If the caller has already reported that, report_waiting should be false.
241  * Otherwise, "waiting" is reported twice unexpectedly.
242  */
243 static void
ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId * waitlist,ProcSignalReason reason,bool report_waiting)244 ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
245 									   ProcSignalReason reason, bool report_waiting)
246 {
247 	TimestampTz waitStart = 0;
248 	char	   *new_status;
249 
250 	/* Fast exit, to avoid a kernel call if there's no work to be done. */
251 	if (!VirtualTransactionIdIsValid(*waitlist))
252 		return;
253 
254 	if (report_waiting)
255 		waitStart = GetCurrentTimestamp();
256 	new_status = NULL;			/* we haven't changed the ps display */
257 
258 	while (VirtualTransactionIdIsValid(*waitlist))
259 	{
260 		/* reset standbyWait_us for each xact we wait for */
261 		standbyWait_us = STANDBY_INITIAL_WAIT_US;
262 
263 		/* wait until the virtual xid is gone */
264 		while (!VirtualXactLock(*waitlist, false))
265 		{
266 			/*
267 			 * Report via ps if we have been waiting for more than 500 msec
268 			 * (should that be configurable?)
269 			 */
270 			if (update_process_title && new_status == NULL && report_waiting &&
271 				TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
272 										   500))
273 			{
274 				const char *old_status;
275 				int			len;
276 
277 				old_status = get_ps_display(&len);
278 				new_status = (char *) palloc(len + 8 + 1);
279 				memcpy(new_status, old_status, len);
280 				strcpy(new_status + len, " waiting");
281 				set_ps_display(new_status, false);
282 				new_status[len] = '\0'; /* truncate off " waiting" */
283 			}
284 
285 			/* Is it time to kill it? */
286 			if (WaitExceedsMaxStandbyDelay())
287 			{
288 				pid_t		pid;
289 
290 				/*
291 				 * Now find out who to throw out of the balloon.
292 				 */
293 				Assert(VirtualTransactionIdIsValid(*waitlist));
294 				pid = CancelVirtualTransaction(*waitlist, reason);
295 
296 				/*
297 				 * Wait a little bit for it to die so that we avoid flooding
298 				 * an unresponsive backend when system is heavily loaded.
299 				 */
300 				if (pid != 0)
301 					pg_usleep(5000L);
302 			}
303 		}
304 
305 		/* The virtual transaction is gone now, wait for the next one */
306 		waitlist++;
307 	}
308 
309 	/* Reset ps display if we changed it */
310 	if (new_status)
311 	{
312 		set_ps_display(new_status, false);
313 		pfree(new_status);
314 	}
315 }
316 
317 void
ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,RelFileNode node)318 ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
319 {
320 	VirtualTransactionId *backends;
321 
322 	/*
323 	 * If we get passed InvalidTransactionId then we are a little surprised,
324 	 * but it is theoretically possible in normal running. It also happens
325 	 * when replaying already applied WAL records after a standby crash or
326 	 * restart, or when replaying an XLOG_HEAP2_VISIBLE record that marks as
327 	 * frozen a page which was already all-visible.  If latestRemovedXid is
328 	 * invalid then there is no conflict. That rule applies across all record
329 	 * types that suffer from this conflict.
330 	 */
331 	if (!TransactionIdIsValid(latestRemovedXid))
332 		return;
333 
334 	backends = GetConflictingVirtualXIDs(latestRemovedXid,
335 										 node.dbNode);
336 
337 	ResolveRecoveryConflictWithVirtualXIDs(backends,
338 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
339 										   true);
340 }
341 
342 void
ResolveRecoveryConflictWithTablespace(Oid tsid)343 ResolveRecoveryConflictWithTablespace(Oid tsid)
344 {
345 	VirtualTransactionId *temp_file_users;
346 
347 	/*
348 	 * Standby users may be currently using this tablespace for their
349 	 * temporary files. We only care about current users because
350 	 * temp_tablespace parameter will just ignore tablespaces that no longer
351 	 * exist.
352 	 *
353 	 * Ask everybody to cancel their queries immediately so we can ensure no
354 	 * temp files remain and we can remove the tablespace. Nuke the entire
355 	 * site from orbit, it's the only way to be sure.
356 	 *
357 	 * XXX: We could work out the pids of active backends using this
358 	 * tablespace by examining the temp filenames in the directory. We would
359 	 * then convert the pids into VirtualXIDs before attempting to cancel
360 	 * them.
361 	 *
362 	 * We don't wait for commit because drop tablespace is non-transactional.
363 	 */
364 	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
365 												InvalidOid);
366 	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
367 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
368 										   true);
369 }
370 
371 void
ResolveRecoveryConflictWithDatabase(Oid dbid)372 ResolveRecoveryConflictWithDatabase(Oid dbid)
373 {
374 	/*
375 	 * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
376 	 * only waits for transactions and completely idle sessions would block
377 	 * us. This is rare enough that we do this as simply as possible: no wait,
378 	 * just force them off immediately.
379 	 *
380 	 * No locking is required here because we already acquired
381 	 * AccessExclusiveLock. Anybody trying to connect while we do this will
382 	 * block during InitPostgres() and then disconnect when they see the
383 	 * database has been removed.
384 	 */
385 	while (CountDBBackends(dbid) > 0)
386 	{
387 		CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true);
388 
389 		/*
390 		 * Wait awhile for them to die so that we avoid flooding an
391 		 * unresponsive backend when system is heavily loaded.
392 		 */
393 		pg_usleep(10000);
394 	}
395 }
396 
397 /*
398  * ResolveRecoveryConflictWithLock is called from ProcSleep()
399  * to resolve conflicts with other backends holding relation locks.
400  *
401  * The WaitLatch sleep normally done in ProcSleep()
402  * (when not InHotStandby) is performed here, for code clarity.
403  *
404  * We either resolve conflicts immediately or set a timeout to wake us at
405  * the limit of our patience.
406  *
407  * Resolve conflicts by canceling to all backends holding a conflicting
408  * lock.  As we are already queued to be granted the lock, no new lock
409  * requests conflicting with ours will be granted in the meantime.
410  *
411  * We also must check for deadlocks involving the Startup process and
412  * hot-standby backend processes. If deadlock_timeout is reached in
413  * this function, all the backends holding the conflicting locks are
414  * requested to check themselves for deadlocks.
415  */
416 void
ResolveRecoveryConflictWithLock(LOCKTAG locktag)417 ResolveRecoveryConflictWithLock(LOCKTAG locktag)
418 {
419 	TimestampTz ltime;
420 
421 	Assert(InHotStandby);
422 
423 	ltime = GetStandbyLimitTime();
424 
425 	if (GetCurrentTimestamp() >= ltime && ltime != 0)
426 	{
427 		/*
428 		 * We're already behind, so clear a path as quickly as possible.
429 		 */
430 		VirtualTransactionId *backends;
431 
432 		backends = GetLockConflicts(&locktag, AccessExclusiveLock);
433 
434 		/*
435 		 * Prevent ResolveRecoveryConflictWithVirtualXIDs() from reporting
436 		 * "waiting" in PS display by disabling its argument report_waiting
437 		 * because the caller, WaitOnLock(), has already reported that.
438 		 */
439 		ResolveRecoveryConflictWithVirtualXIDs(backends,
440 											   PROCSIG_RECOVERY_CONFLICT_LOCK,
441 											   false);
442 	}
443 	else
444 	{
445 		/*
446 		 * Wait (or wait again) until ltime, and check for deadlocks as well
447 		 * if we will be waiting longer than deadlock_timeout
448 		 */
449 		EnableTimeoutParams timeouts[2];
450 		int			cnt = 0;
451 
452 		if (ltime != 0)
453 		{
454 			got_standby_lock_timeout = false;
455 			timeouts[cnt].id = STANDBY_LOCK_TIMEOUT;
456 			timeouts[cnt].type = TMPARAM_AT;
457 			timeouts[cnt].fin_time = ltime;
458 			cnt++;
459 		}
460 
461 		got_standby_deadlock_timeout = false;
462 		timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
463 		timeouts[cnt].type = TMPARAM_AFTER;
464 		timeouts[cnt].delay_ms = DeadlockTimeout;
465 		cnt++;
466 
467 		enable_timeouts(timeouts, cnt);
468 	}
469 
470 	/* Wait to be signaled by the release of the Relation Lock */
471 	ProcWaitForSignal(PG_WAIT_LOCK | locktag.locktag_type);
472 
473 	/*
474 	 * Exit if ltime is reached. Then all the backends holding conflicting
475 	 * locks will be canceled in the next ResolveRecoveryConflictWithLock()
476 	 * call.
477 	 */
478 	if (got_standby_lock_timeout)
479 		goto cleanup;
480 
481 	if (got_standby_deadlock_timeout)
482 	{
483 		VirtualTransactionId *backends;
484 
485 		backends = GetLockConflicts(&locktag, AccessExclusiveLock);
486 
487 		/* Quick exit if there's no work to be done */
488 		if (!VirtualTransactionIdIsValid(*backends))
489 			goto cleanup;
490 
491 		/*
492 		 * Send signals to all the backends holding the conflicting locks, to
493 		 * ask them to check themselves for deadlocks.
494 		 */
495 		while (VirtualTransactionIdIsValid(*backends))
496 		{
497 			SignalVirtualTransaction(*backends,
498 									 PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
499 									 false);
500 			backends++;
501 		}
502 
503 		/*
504 		 * Wait again here to be signaled by the release of the Relation Lock,
505 		 * to prevent the subsequent RecoveryConflictWithLock() from causing
506 		 * deadlock_timeout and sending a request for deadlocks check again.
507 		 * Otherwise the request continues to be sent every deadlock_timeout
508 		 * until the relation locks are released or ltime is reached.
509 		 */
510 		got_standby_deadlock_timeout = false;
511 		ProcWaitForSignal(PG_WAIT_LOCK | locktag.locktag_type);
512 	}
513 
514 cleanup:
515 
516 	/*
517 	 * Clear any timeout requests established above.  We assume here that the
518 	 * Startup process doesn't have any other outstanding timeouts than those
519 	 * used by this function. If that stops being true, we could cancel the
520 	 * timeouts individually, but that'd be slower.
521 	 */
522 	disable_all_timeouts(false);
523 	got_standby_lock_timeout = false;
524 	got_standby_deadlock_timeout = false;
525 }
526 
527 /*
528  * ResolveRecoveryConflictWithBufferPin is called from LockBufferForCleanup()
529  * to resolve conflicts with other backends holding buffer pins.
530  *
531  * The ProcWaitForSignal() sleep normally done in LockBufferForCleanup()
532  * (when not InHotStandby) is performed here, for code clarity.
533  *
534  * We either resolve conflicts immediately or set a timeout to wake us at
535  * the limit of our patience.
536  *
537  * Resolve conflicts by sending a PROCSIG signal to all backends to check if
538  * they hold one of the buffer pins that is blocking Startup process. If so,
539  * those backends will take an appropriate error action, ERROR or FATAL.
540  *
541  * We also must check for deadlocks.  Deadlocks occur because if queries
542  * wait on a lock, that must be behind an AccessExclusiveLock, which can only
543  * be cleared if the Startup process replays a transaction completion record.
544  * If Startup process is also waiting then that is a deadlock. The deadlock
545  * can occur if the query is waiting and then the Startup sleeps, or if
546  * Startup is sleeping and the query waits on a lock. We protect against
547  * only the former sequence here, the latter sequence is checked prior to
548  * the query sleeping, in CheckRecoveryConflictDeadlock().
549  *
550  * Deadlocks are extremely rare, and relatively expensive to check for,
551  * so we don't do a deadlock check right away ... only if we have had to wait
552  * at least deadlock_timeout.
553  */
554 void
ResolveRecoveryConflictWithBufferPin(void)555 ResolveRecoveryConflictWithBufferPin(void)
556 {
557 	TimestampTz ltime;
558 
559 	Assert(InHotStandby);
560 
561 	ltime = GetStandbyLimitTime();
562 
563 	if (GetCurrentTimestamp() >= ltime && ltime != 0)
564 	{
565 		/*
566 		 * We're already behind, so clear a path as quickly as possible.
567 		 */
568 		SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
569 	}
570 	else
571 	{
572 		/*
573 		 * Wake up at ltime, and check for deadlocks as well if we will be
574 		 * waiting longer than deadlock_timeout
575 		 */
576 		EnableTimeoutParams timeouts[2];
577 		int			cnt = 0;
578 
579 		if (ltime != 0)
580 		{
581 			timeouts[cnt].id = STANDBY_TIMEOUT;
582 			timeouts[cnt].type = TMPARAM_AT;
583 			timeouts[cnt].fin_time = ltime;
584 			cnt++;
585 		}
586 
587 		got_standby_deadlock_timeout = false;
588 		timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
589 		timeouts[cnt].type = TMPARAM_AFTER;
590 		timeouts[cnt].delay_ms = DeadlockTimeout;
591 		cnt++;
592 
593 		enable_timeouts(timeouts, cnt);
594 	}
595 
596 	/* Wait to be signaled by UnpinBuffer() */
597 	ProcWaitForSignal(PG_WAIT_BUFFER_PIN);
598 
599 	if (got_standby_deadlock_timeout)
600 	{
601 		/*
602 		 * Send out a request for hot-standby backends to check themselves for
603 		 * deadlocks.
604 		 *
605 		 * XXX The subsequent ResolveRecoveryConflictWithBufferPin() will wait
606 		 * to be signaled by UnpinBuffer() again and send a request for
607 		 * deadlocks check if deadlock_timeout happens. This causes the
608 		 * request to continue to be sent every deadlock_timeout until the
609 		 * buffer is unpinned or ltime is reached. This would increase the
610 		 * workload in the startup process and backends. In practice it may
611 		 * not be so harmful because the period that the buffer is kept pinned
612 		 * is basically no so long. But we should fix this?
613 		 */
614 		SendRecoveryConflictWithBufferPin(
615 										  PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
616 	}
617 
618 	/*
619 	 * Clear any timeout requests established above.  We assume here that the
620 	 * Startup process doesn't have any other timeouts than what this function
621 	 * uses.  If that stops being true, we could cancel the timeouts
622 	 * individually, but that'd be slower.
623 	 */
624 	disable_all_timeouts(false);
625 	got_standby_deadlock_timeout = false;
626 }
627 
628 static void
SendRecoveryConflictWithBufferPin(ProcSignalReason reason)629 SendRecoveryConflictWithBufferPin(ProcSignalReason reason)
630 {
631 	Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ||
632 		   reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
633 
634 	/*
635 	 * We send signal to all backends to ask them if they are holding the
636 	 * buffer pin which is delaying the Startup process. We must not set the
637 	 * conflict flag yet, since most backends will be innocent. Let the
638 	 * SIGUSR1 handling in each backend decide their own fate.
639 	 */
640 	CancelDBBackends(InvalidOid, reason, false);
641 }
642 
643 /*
644  * In Hot Standby perform early deadlock detection.  We abort the lock
645  * wait if we are about to sleep while holding the buffer pin that Startup
646  * process is waiting for.
647  *
648  * Note: this code is pessimistic, because there is no way for it to
649  * determine whether an actual deadlock condition is present: the lock we
650  * need to wait for might be unrelated to any held by the Startup process.
651  * Sooner or later, this mechanism should get ripped out in favor of somehow
652  * accounting for buffer locks in DeadLockCheck().  However, errors here
653  * seem to be very low-probability in practice, so for now it's not worth
654  * the trouble.
655  */
656 void
CheckRecoveryConflictDeadlock(void)657 CheckRecoveryConflictDeadlock(void)
658 {
659 	Assert(!InRecovery);		/* do not call in Startup process */
660 
661 	if (!HoldingBufferPinThatDelaysRecovery())
662 		return;
663 
664 	/*
665 	 * Error message should match ProcessInterrupts() but we avoid calling
666 	 * that because we aren't handling an interrupt at this point. Note that
667 	 * we only cancel the current transaction here, so if we are in a
668 	 * subtransaction and the pin is held by a parent, then the Startup
669 	 * process will continue to wait even though we have avoided deadlock.
670 	 */
671 	ereport(ERROR,
672 			(errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
673 			 errmsg("canceling statement due to conflict with recovery"),
674 			 errdetail("User transaction caused buffer deadlock with recovery.")));
675 }
676 
677 
678 /* --------------------------------
679  *		timeout handler routines
680  * --------------------------------
681  */
682 
683 /*
684  * StandbyDeadLockHandler() will be called if STANDBY_DEADLOCK_TIMEOUT
685  * occurs before STANDBY_TIMEOUT.
686  */
687 void
StandbyDeadLockHandler(void)688 StandbyDeadLockHandler(void)
689 {
690 	got_standby_deadlock_timeout = true;
691 }
692 
693 /*
694  * StandbyTimeoutHandler() will be called if STANDBY_TIMEOUT is exceeded.
695  * Send out a request to release conflicting buffer pins unconditionally,
696  * so we can press ahead with applying changes in recovery.
697  */
698 void
StandbyTimeoutHandler(void)699 StandbyTimeoutHandler(void)
700 {
701 	/* forget any pending STANDBY_DEADLOCK_TIMEOUT request */
702 	disable_timeout(STANDBY_DEADLOCK_TIMEOUT, false);
703 
704 	SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
705 }
706 
707 /*
708  * StandbyLockTimeoutHandler() will be called if STANDBY_LOCK_TIMEOUT is exceeded.
709  */
710 void
StandbyLockTimeoutHandler(void)711 StandbyLockTimeoutHandler(void)
712 {
713 	got_standby_lock_timeout = true;
714 }
715 
716 /*
717  * -----------------------------------------------------
718  * Locking in Recovery Mode
719  * -----------------------------------------------------
720  *
721  * All locks are held by the Startup process using a single virtual
722  * transaction. This implementation is both simpler and in some senses,
723  * more correct. The locks held mean "some original transaction held
724  * this lock, so query access is not allowed at this time". So the Startup
725  * process is the proxy by which the original locks are implemented.
726  *
727  * We only keep track of AccessExclusiveLocks, which are only ever held by
728  * one transaction on one relation.
729  *
730  * We keep a hash table of lists of locks in local memory keyed by xid,
731  * RecoveryLockLists, so we can keep track of the various entries made by
732  * the Startup process's virtual xid in the shared lock table.
733  *
734  * List elements use type xl_standby_lock, since the WAL record type exactly
735  * matches the information that we need to keep track of.
736  *
737  * We use session locks rather than normal locks so we don't need
738  * ResourceOwners.
739  */
740 
741 
742 void
StandbyAcquireAccessExclusiveLock(TransactionId xid,Oid dbOid,Oid relOid)743 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
744 {
745 	RecoveryLockListsEntry *entry;
746 	xl_standby_lock *newlock;
747 	LOCKTAG		locktag;
748 	bool		found;
749 
750 	/* Already processed? */
751 	if (!TransactionIdIsValid(xid) ||
752 		TransactionIdDidCommit(xid) ||
753 		TransactionIdDidAbort(xid))
754 		return;
755 
756 	elog(trace_recovery(DEBUG4),
757 		 "adding recovery lock: db %u rel %u", dbOid, relOid);
758 
759 	/* dbOid is InvalidOid when we are locking a shared relation. */
760 	Assert(OidIsValid(relOid));
761 
762 	/* Create a new list for this xid, if we don't have one already. */
763 	entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
764 	if (!found)
765 	{
766 		entry->xid = xid;
767 		entry->locks = NIL;
768 	}
769 
770 	newlock = palloc(sizeof(xl_standby_lock));
771 	newlock->xid = xid;
772 	newlock->dbOid = dbOid;
773 	newlock->relOid = relOid;
774 	entry->locks = lappend(entry->locks, newlock);
775 
776 	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
777 
778 	(void) LockAcquire(&locktag, AccessExclusiveLock, true, false);
779 }
780 
781 static void
StandbyReleaseLockList(List * locks)782 StandbyReleaseLockList(List *locks)
783 {
784 	while (locks)
785 	{
786 		xl_standby_lock *lock = (xl_standby_lock *) linitial(locks);
787 		LOCKTAG		locktag;
788 		elog(trace_recovery(DEBUG4),
789 			 "releasing recovery lock: xid %u db %u rel %u",
790 			 lock->xid, lock->dbOid, lock->relOid);
791 		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
792 		if (!LockRelease(&locktag, AccessExclusiveLock, true))
793 		{
794 			elog(LOG,
795 				 "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
796 				 lock->xid, lock->dbOid, lock->relOid);
797 			Assert(false);
798 		}
799 		pfree(lock);
800 		locks = list_delete_first(locks);
801 	}
802 }
803 
804 static void
StandbyReleaseLocks(TransactionId xid)805 StandbyReleaseLocks(TransactionId xid)
806 {
807 	RecoveryLockListsEntry *entry;
808 
809 	if (TransactionIdIsValid(xid))
810 	{
811 		if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
812 		{
813 			StandbyReleaseLockList(entry->locks);
814 			hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
815 		}
816 	}
817 	else
818 		StandbyReleaseAllLocks();
819 }
820 
821 /*
822  * Release locks for a transaction tree, starting at xid down, from
823  * RecoveryLockLists.
824  *
825  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
826  * to remove any AccessExclusiveLocks requested by a transaction.
827  */
828 void
StandbyReleaseLockTree(TransactionId xid,int nsubxids,TransactionId * subxids)829 StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
830 {
831 	int			i;
832 
833 	StandbyReleaseLocks(xid);
834 
835 	for (i = 0; i < nsubxids; i++)
836 		StandbyReleaseLocks(subxids[i]);
837 }
838 
839 /*
840  * Called at end of recovery and when we see a shutdown checkpoint.
841  */
842 void
StandbyReleaseAllLocks(void)843 StandbyReleaseAllLocks(void)
844 {
845 	HASH_SEQ_STATUS	status;
846 	RecoveryLockListsEntry *entry;
847 
848 	elog(trace_recovery(DEBUG2), "release all standby locks");
849 
850 	hash_seq_init(&status, RecoveryLockLists);
851 	while ((entry = hash_seq_search(&status)))
852 	{
853 		StandbyReleaseLockList(entry->locks);
854 		hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
855 	}
856 }
857 
858 /*
859  * StandbyReleaseOldLocks
860  *		Release standby locks held by top-level XIDs that aren't running,
861  *		as long as they're not prepared transactions.
862  */
863 void
StandbyReleaseOldLocks(int nxids,TransactionId * xids)864 StandbyReleaseOldLocks(int nxids, TransactionId *xids)
865 {
866 	HASH_SEQ_STATUS status;
867 	RecoveryLockListsEntry *entry;
868 
869 	hash_seq_init(&status, RecoveryLockLists);
870 	while ((entry = hash_seq_search(&status)))
871 	{
872 		bool		remove = false;
873 
874 		Assert(TransactionIdIsValid(entry->xid));
875 
876 		if (StandbyTransactionIdIsPrepared(entry->xid))
877 			remove = false;
878 		else
879 		{
880 			int			i;
881 			bool		found = false;
882 
883 			for (i = 0; i < nxids; i++)
884 			{
885 				if (entry->xid == xids[i])
886 				{
887 					found = true;
888 					break;
889 				}
890 			}
891 
892 			/*
893 			 * If its not a running transaction, remove it.
894 			 */
895 			if (!found)
896 				remove = true;
897 		}
898 
899 		if (remove)
900 		{
901 			StandbyReleaseLockList(entry->locks);
902 			hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
903 		}
904 	}
905 }
906 
907 /*
908  * --------------------------------------------------------------------
909  *		Recovery handling for Rmgr RM_STANDBY_ID
910  *
911  * These record types will only be created if XLogStandbyInfoActive()
912  * --------------------------------------------------------------------
913  */
914 
915 void
standby_redo(XLogReaderState * record)916 standby_redo(XLogReaderState *record)
917 {
918 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
919 
920 	/* Backup blocks are not used in standby records */
921 	Assert(!XLogRecHasAnyBlockRefs(record));
922 
923 	/* Do nothing if we're not in hot standby mode */
924 	if (standbyState == STANDBY_DISABLED)
925 		return;
926 
927 	if (info == XLOG_STANDBY_LOCK)
928 	{
929 		xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
930 		int			i;
931 
932 		for (i = 0; i < xlrec->nlocks; i++)
933 			StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
934 											  xlrec->locks[i].dbOid,
935 											  xlrec->locks[i].relOid);
936 	}
937 	else if (info == XLOG_RUNNING_XACTS)
938 	{
939 		xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
940 		RunningTransactionsData running;
941 
942 		running.xcnt = xlrec->xcnt;
943 		running.subxcnt = xlrec->subxcnt;
944 		running.subxid_overflow = xlrec->subxid_overflow;
945 		running.nextXid = xlrec->nextXid;
946 		running.latestCompletedXid = xlrec->latestCompletedXid;
947 		running.oldestRunningXid = xlrec->oldestRunningXid;
948 		running.xids = xlrec->xids;
949 
950 		ProcArrayApplyRecoveryInfo(&running);
951 	}
952 	else if (info == XLOG_INVALIDATIONS)
953 	{
954 		xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
955 
956 		ProcessCommittedInvalidationMessages(xlrec->msgs,
957 											 xlrec->nmsgs,
958 											 xlrec->relcacheInitFileInval,
959 											 xlrec->dbId,
960 											 xlrec->tsId);
961 	}
962 	else
963 		elog(PANIC, "standby_redo: unknown op code %u", info);
964 }
965 
966 /*
967  * Log details of the current snapshot to WAL. This allows the snapshot state
968  * to be reconstructed on the standby and for logical decoding.
969  *
970  * This is used for Hot Standby as follows:
971  *
972  * We can move directly to STANDBY_SNAPSHOT_READY at startup if we
973  * start from a shutdown checkpoint because we know nothing was running
974  * at that time and our recovery snapshot is known empty. In the more
975  * typical case of an online checkpoint we need to jump through a few
976  * hoops to get a correct recovery snapshot and this requires a two or
977  * sometimes a three stage process.
978  *
979  * The initial snapshot must contain all running xids and all current
980  * AccessExclusiveLocks at a point in time on the standby. Assembling
981  * that information while the server is running requires many and
982  * various LWLocks, so we choose to derive that information piece by
983  * piece and then re-assemble that info on the standby. When that
984  * information is fully assembled we move to STANDBY_SNAPSHOT_READY.
985  *
986  * Since locking on the primary when we derive the information is not
987  * strict, we note that there is a time window between the derivation and
988  * writing to WAL of the derived information. That allows race conditions
989  * that we must resolve, since xids and locks may enter or leave the
990  * snapshot during that window. This creates the issue that an xid or
991  * lock may start *after* the snapshot has been derived yet *before* the
992  * snapshot is logged in the running xacts WAL record. We resolve this by
993  * starting to accumulate changes at a point just prior to when we derive
994  * the snapshot on the primary, then ignore duplicates when we later apply
995  * the snapshot from the running xacts record. This is implemented during
996  * CreateCheckpoint() where we use the logical checkpoint location as
997  * our starting point and then write the running xacts record immediately
998  * before writing the main checkpoint WAL record. Since we always start
999  * up from a checkpoint and are immediately at our starting point, we
1000  * unconditionally move to STANDBY_INITIALIZED. After this point we
1001  * must do 4 things:
1002  *	* move shared nextXid forwards as we see new xids
1003  *	* extend the clog and subtrans with each new xid
1004  *	* keep track of uncommitted known assigned xids
1005  *	* keep track of uncommitted AccessExclusiveLocks
1006  *
1007  * When we see a commit/abort we must remove known assigned xids and locks
1008  * from the completing transaction. Attempted removals that cannot locate
1009  * an entry are expected and must not cause an error when we are in state
1010  * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
1011  * KnownAssignedXidsRemove().
1012  *
1013  * Later, when we apply the running xact data we must be careful to ignore
1014  * transactions already committed, since those commits raced ahead when
1015  * making WAL entries.
1016  *
1017  * The loose timing also means that locks may be recorded that have a
1018  * zero xid, since xids are removed from procs before locks are removed.
1019  * So we must prune the lock list down to ensure we hold locks only for
1020  * currently running xids, performed by StandbyReleaseOldLocks().
1021  * Zero xids should no longer be possible, but we may be replaying WAL
1022  * from a time when they were possible.
1023  *
1024  * For logical decoding only the running xacts information is needed;
1025  * there's no need to look at the locking information, but it's logged anyway,
1026  * as there's no independent knob to just enable logical decoding. For
1027  * details of how this is used, check snapbuild.c's introductory comment.
1028  *
1029  *
1030  * Returns the RecPtr of the last inserted record.
1031  */
1032 XLogRecPtr
LogStandbySnapshot(void)1033 LogStandbySnapshot(void)
1034 {
1035 	XLogRecPtr	recptr;
1036 	RunningTransactions running;
1037 	xl_standby_lock *locks;
1038 	int			nlocks;
1039 
1040 	Assert(XLogStandbyInfoActive());
1041 
1042 	/*
1043 	 * Get details of any AccessExclusiveLocks being held at the moment.
1044 	 */
1045 	locks = GetRunningTransactionLocks(&nlocks);
1046 	if (nlocks > 0)
1047 		LogAccessExclusiveLocks(nlocks, locks);
1048 	pfree(locks);
1049 
1050 	/*
1051 	 * Log details of all in-progress transactions. This should be the last
1052 	 * record we write, because standby will open up when it sees this.
1053 	 */
1054 	running = GetRunningTransactionData();
1055 
1056 	/*
1057 	 * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
1058 	 * For Hot Standby this can be done before inserting the WAL record
1059 	 * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
1060 	 * the clog. For logical decoding, though, the lock can't be released
1061 	 * early because the clog might be "in the future" from the POV of the
1062 	 * historic snapshot. This would allow for situations where we're waiting
1063 	 * for the end of a transaction listed in the xl_running_xacts record
1064 	 * which, according to the WAL, has committed before the xl_running_xacts
1065 	 * record. Fortunately this routine isn't executed frequently, and it's
1066 	 * only a shared lock.
1067 	 */
1068 	if (wal_level < WAL_LEVEL_LOGICAL)
1069 		LWLockRelease(ProcArrayLock);
1070 
1071 	recptr = LogCurrentRunningXacts(running);
1072 
1073 	/* Release lock if we kept it longer ... */
1074 	if (wal_level >= WAL_LEVEL_LOGICAL)
1075 		LWLockRelease(ProcArrayLock);
1076 
1077 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
1078 	LWLockRelease(XidGenLock);
1079 
1080 	return recptr;
1081 }
1082 
1083 /*
1084  * Record an enhanced snapshot of running transactions into WAL.
1085  *
1086  * The definitions of RunningTransactionsData and xl_xact_running_xacts are
1087  * similar. We keep them separate because xl_xact_running_xacts is a
1088  * contiguous chunk of memory and never exists fully until it is assembled in
1089  * WAL. The inserted records are marked as not being important for durability,
1090  * to avoid triggering superfluous checkpoint / archiving activity.
1091  */
1092 static XLogRecPtr
LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)1093 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
1094 {
1095 	xl_running_xacts xlrec;
1096 	XLogRecPtr	recptr;
1097 
1098 	xlrec.xcnt = CurrRunningXacts->xcnt;
1099 	xlrec.subxcnt = CurrRunningXacts->subxcnt;
1100 	xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
1101 	xlrec.nextXid = CurrRunningXacts->nextXid;
1102 	xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
1103 	xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
1104 
1105 	/* Header */
1106 	XLogBeginInsert();
1107 	XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
1108 	XLogRegisterData((char *) (&xlrec), MinSizeOfXactRunningXacts);
1109 
1110 	/* array of TransactionIds */
1111 	if (xlrec.xcnt > 0)
1112 		XLogRegisterData((char *) CurrRunningXacts->xids,
1113 						 (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId));
1114 
1115 	recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS);
1116 
1117 	if (CurrRunningXacts->subxid_overflow)
1118 		elog(trace_recovery(DEBUG2),
1119 			 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1120 			 CurrRunningXacts->xcnt,
1121 			 (uint32) (recptr >> 32), (uint32) recptr,
1122 			 CurrRunningXacts->oldestRunningXid,
1123 			 CurrRunningXacts->latestCompletedXid,
1124 			 CurrRunningXacts->nextXid);
1125 	else
1126 		elog(trace_recovery(DEBUG2),
1127 			 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1128 			 CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
1129 			 (uint32) (recptr >> 32), (uint32) recptr,
1130 			 CurrRunningXacts->oldestRunningXid,
1131 			 CurrRunningXacts->latestCompletedXid,
1132 			 CurrRunningXacts->nextXid);
1133 
1134 	/*
1135 	 * Ensure running_xacts information is synced to disk not too far in the
1136 	 * future. We don't want to stall anything though (i.e. use XLogFlush()),
1137 	 * so we let the wal writer do it during normal operation.
1138 	 * XLogSetAsyncXactLSN() conveniently will mark the LSN as to-be-synced
1139 	 * and nudge the WALWriter into action if sleeping. Check
1140 	 * XLogBackgroundFlush() for details why a record might not be flushed
1141 	 * without it.
1142 	 */
1143 	XLogSetAsyncXactLSN(recptr);
1144 
1145 	return recptr;
1146 }
1147 
1148 /*
1149  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
1150  * logged, as described in backend/storage/lmgr/README.
1151  */
1152 static void
LogAccessExclusiveLocks(int nlocks,xl_standby_lock * locks)1153 LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
1154 {
1155 	xl_standby_locks xlrec;
1156 
1157 	xlrec.nlocks = nlocks;
1158 
1159 	XLogBeginInsert();
1160 	XLogRegisterData((char *) &xlrec, offsetof(xl_standby_locks, locks));
1161 	XLogRegisterData((char *) locks, nlocks * sizeof(xl_standby_lock));
1162 	XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
1163 
1164 	(void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK);
1165 }
1166 
1167 /*
1168  * Individual logging of AccessExclusiveLocks for use during LockAcquire()
1169  */
1170 void
LogAccessExclusiveLock(Oid dbOid,Oid relOid)1171 LogAccessExclusiveLock(Oid dbOid, Oid relOid)
1172 {
1173 	xl_standby_lock xlrec;
1174 
1175 	xlrec.xid = GetCurrentTransactionId();
1176 
1177 	/*
1178 	 * Decode the locktag back to the original values, to avoid sending lots
1179 	 * of empty bytes with every message.  See lock.h to check how a locktag
1180 	 * is defined for LOCKTAG_RELATION
1181 	 */
1182 	xlrec.dbOid = dbOid;
1183 	xlrec.relOid = relOid;
1184 
1185 	LogAccessExclusiveLocks(1, &xlrec);
1186 	MyXactFlags |= XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK;
1187 }
1188 
1189 /*
1190  * Prepare to log an AccessExclusiveLock, for use during LockAcquire()
1191  */
1192 void
LogAccessExclusiveLockPrepare(void)1193 LogAccessExclusiveLockPrepare(void)
1194 {
1195 	/*
1196 	 * Ensure that a TransactionId has been assigned to this transaction, for
1197 	 * two reasons, both related to lock release on the standby. First, we
1198 	 * must assign an xid so that RecordTransactionCommit() and
1199 	 * RecordTransactionAbort() do not optimise away the transaction
1200 	 * completion record which recovery relies upon to release locks. It's a
1201 	 * hack, but for a corner case not worth adding code for into the main
1202 	 * commit path. Second, we must assign an xid before the lock is recorded
1203 	 * in shared memory, otherwise a concurrently executing
1204 	 * GetRunningTransactionLocks() might see a lock associated with an
1205 	 * InvalidTransactionId which we later assert cannot happen.
1206 	 */
1207 	(void) GetCurrentTransactionId();
1208 }
1209 
1210 /*
1211  * Emit WAL for invalidations. This currently is only used for commits without
1212  * an xid but which contain invalidations.
1213  */
1214 void
LogStandbyInvalidations(int nmsgs,SharedInvalidationMessage * msgs,bool relcacheInitFileInval)1215 LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
1216 						bool relcacheInitFileInval)
1217 {
1218 	xl_invalidations xlrec;
1219 
1220 	/* prepare record */
1221 	memset(&xlrec, 0, sizeof(xlrec));
1222 	xlrec.dbId = MyDatabaseId;
1223 	xlrec.tsId = MyDatabaseTableSpace;
1224 	xlrec.relcacheInitFileInval = relcacheInitFileInval;
1225 	xlrec.nmsgs = nmsgs;
1226 
1227 	/* perform insertion */
1228 	XLogBeginInsert();
1229 	XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
1230 	XLogRegisterData((char *) msgs,
1231 					 nmsgs * sizeof(SharedInvalidationMessage));
1232 	XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
1233 }
1234