1 /*-------------------------------------------------------------------------
2 *
3 * standby.c
4 * Misc functions used in Hot Standby mode.
5 *
6 * All functions for handling RM_STANDBY_ID, which relate to
7 * AccessExclusiveLocks and starting snapshots for Hot Standby mode.
8 * Plus conflict recovery processing.
9 *
10 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
11 * Portions Copyright (c) 1994, Regents of the University of California
12 *
13 * IDENTIFICATION
14 * src/backend/storage/ipc/standby.c
15 *
16 *-------------------------------------------------------------------------
17 */
18 #include "postgres.h"
19 #include "access/transam.h"
20 #include "access/twophase.h"
21 #include "access/xact.h"
22 #include "access/xlog.h"
23 #include "access/xloginsert.h"
24 #include "miscadmin.h"
25 #include "storage/bufmgr.h"
26 #include "storage/lmgr.h"
27 #include "storage/proc.h"
28 #include "storage/procarray.h"
29 #include "storage/sinvaladt.h"
30 #include "storage/standby.h"
31 #include "utils/hsearch.h"
32 #include "utils/memutils.h"
33 #include "utils/ps_status.h"
34 #include "utils/timeout.h"
35 #include "utils/timestamp.h"
36
37 /* User-settable GUC parameters */
38 int vacuum_defer_cleanup_age;
39 int max_standby_archive_delay = 30 * 1000;
40 int max_standby_streaming_delay = 30 * 1000;
41
42 static HTAB *RecoveryLockLists;
43
44 /* Flags set by timeout handlers */
45 static volatile sig_atomic_t got_standby_deadlock_timeout = false;
46 static volatile sig_atomic_t got_standby_lock_timeout = false;
47
48 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
49 ProcSignalReason reason, bool report_waiting);
50 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
51 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
52 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
53
54 /*
55 * Keep track of all the locks owned by a given transaction.
56 */
57 typedef struct RecoveryLockListsEntry
58 {
59 TransactionId xid;
60 List *locks;
61 } RecoveryLockListsEntry;
62
63 /*
64 * InitRecoveryTransactionEnvironment
65 * Initialize tracking of in-progress transactions in master
66 *
67 * We need to issue shared invalidations and hold locks. Holding locks
68 * means others may want to wait on us, so we need to make a lock table
69 * vxact entry like a real transaction. We could create and delete
70 * lock table entries for each transaction but its simpler just to create
71 * one permanent entry and leave it there all the time. Locks are then
72 * acquired and released as needed. Yes, this means you can see the
73 * Startup process in pg_locks once we have run this.
74 */
75 void
InitRecoveryTransactionEnvironment(void)76 InitRecoveryTransactionEnvironment(void)
77 {
78 VirtualTransactionId vxid;
79 HASHCTL hash_ctl;
80
81 /*
82 * Initialize the hash table for tracking the list of locks held by each
83 * transaction.
84 */
85 memset(&hash_ctl, 0, sizeof(hash_ctl));
86 hash_ctl.keysize = sizeof(TransactionId);
87 hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
88 RecoveryLockLists = hash_create("RecoveryLockLists",
89 64,
90 &hash_ctl,
91 HASH_ELEM | HASH_BLOBS);
92
93 /*
94 * Initialize shared invalidation management for Startup process, being
95 * careful to register ourselves as a sendOnly process so we don't need to
96 * read messages, nor will we get signalled when the queue starts filling
97 * up.
98 */
99 SharedInvalBackendInit(true);
100
101 /*
102 * Lock a virtual transaction id for Startup process.
103 *
104 * We need to do GetNextLocalTransactionId() because
105 * SharedInvalBackendInit() leaves localTransactionid invalid and the lock
106 * manager doesn't like that at all.
107 *
108 * Note that we don't need to run XactLockTableInsert() because nobody
109 * needs to wait on xids. That sounds a little strange, but table locks
110 * are held by vxids and row level locks are held by xids. All queries
111 * hold AccessShareLocks so never block while we write or lock new rows.
112 */
113 vxid.backendId = MyBackendId;
114 vxid.localTransactionId = GetNextLocalTransactionId();
115 VirtualXactLockTableInsert(vxid);
116
117 standbyState = STANDBY_INITIALIZED;
118 }
119
120 /*
121 * ShutdownRecoveryTransactionEnvironment
122 * Shut down transaction tracking
123 *
124 * Prepare to switch from hot standby mode to normal operation. Shut down
125 * recovery-time transaction tracking.
126 *
127 * This must be called even in shutdown of startup process if transaction
128 * tracking has been initialized. Otherwise some locks the tracked
129 * transactions were holding will not be released and and may interfere with
130 * the processes still running (but will exit soon later) at the exit of
131 * startup process.
132 */
133 void
ShutdownRecoveryTransactionEnvironment(void)134 ShutdownRecoveryTransactionEnvironment(void)
135 {
136 /*
137 * Do nothing if RecoveryLockLists is NULL because which means that
138 * transaction tracking has not been yet initialized or has been already
139 * shutdowned. This prevents transaction tracking from being shutdowned
140 * unexpectedly more than once.
141 */
142 if (RecoveryLockLists == NULL)
143 return;
144
145 /* Mark all tracked in-progress transactions as finished. */
146 ExpireAllKnownAssignedTransactionIds();
147
148 /* Release all locks the tracked transactions were holding */
149 StandbyReleaseAllLocks();
150
151 /* Destroy the hash table of locks. */
152 hash_destroy(RecoveryLockLists);
153 RecoveryLockLists = NULL;
154
155 /* Cleanup our VirtualTransaction */
156 VirtualXactLockTableCleanup();
157 }
158
159
160 /*
161 * -----------------------------------------------------
162 * Standby wait timers and backend cancel logic
163 * -----------------------------------------------------
164 */
165
166 /*
167 * Determine the cutoff time at which we want to start canceling conflicting
168 * transactions. Returns zero (a time safely in the past) if we are willing
169 * to wait forever.
170 */
171 static TimestampTz
GetStandbyLimitTime(void)172 GetStandbyLimitTime(void)
173 {
174 TimestampTz rtime;
175 bool fromStream;
176
177 /*
178 * The cutoff time is the last WAL data receipt time plus the appropriate
179 * delay variable. Delay of -1 means wait forever.
180 */
181 GetXLogReceiptTime(&rtime, &fromStream);
182 if (fromStream)
183 {
184 if (max_standby_streaming_delay < 0)
185 return 0; /* wait forever */
186 return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
187 }
188 else
189 {
190 if (max_standby_archive_delay < 0)
191 return 0; /* wait forever */
192 return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
193 }
194 }
195
196 #define STANDBY_INITIAL_WAIT_US 1000
197 static int standbyWait_us = STANDBY_INITIAL_WAIT_US;
198
199 /*
200 * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
201 * We wait here for a while then return. If we decide we can't wait any
202 * more then we return true, if we can wait some more return false.
203 */
204 static bool
WaitExceedsMaxStandbyDelay(void)205 WaitExceedsMaxStandbyDelay(void)
206 {
207 TimestampTz ltime;
208
209 CHECK_FOR_INTERRUPTS();
210
211 /* Are we past the limit time? */
212 ltime = GetStandbyLimitTime();
213 if (ltime && GetCurrentTimestamp() >= ltime)
214 return true;
215
216 /*
217 * Sleep a bit (this is essential to avoid busy-waiting).
218 */
219 pg_usleep(standbyWait_us);
220
221 /*
222 * Progressively increase the sleep times, but not to more than 1s, since
223 * pg_usleep isn't interruptable on some platforms.
224 */
225 standbyWait_us *= 2;
226 if (standbyWait_us > 1000000)
227 standbyWait_us = 1000000;
228
229 return false;
230 }
231
232 /*
233 * This is the main executioner for any query backend that conflicts with
234 * recovery processing. Judgement has already been passed on it within
235 * a specific rmgr. Here we just issue the orders to the procs. The procs
236 * then throw the required error as instructed.
237 *
238 * If report_waiting is true, "waiting" is reported in PS display if necessary.
239 * If the caller has already reported that, report_waiting should be false.
240 * Otherwise, "waiting" is reported twice unexpectedly.
241 */
242 static void
ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId * waitlist,ProcSignalReason reason,bool report_waiting)243 ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
244 ProcSignalReason reason, bool report_waiting)
245 {
246 TimestampTz waitStart = 0;
247 char *new_status;
248
249 /* Fast exit, to avoid a kernel call if there's no work to be done. */
250 if (!VirtualTransactionIdIsValid(*waitlist))
251 return;
252
253 if (report_waiting)
254 waitStart = GetCurrentTimestamp();
255 new_status = NULL; /* we haven't changed the ps display */
256
257 while (VirtualTransactionIdIsValid(*waitlist))
258 {
259 /* reset standbyWait_us for each xact we wait for */
260 standbyWait_us = STANDBY_INITIAL_WAIT_US;
261
262 /* wait until the virtual xid is gone */
263 while (!VirtualXactLock(*waitlist, false))
264 {
265 /*
266 * Report via ps if we have been waiting for more than 500 msec
267 * (should that be configurable?)
268 */
269 if (update_process_title && new_status == NULL && report_waiting &&
270 TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
271 500))
272 {
273 const char *old_status;
274 int len;
275
276 old_status = get_ps_display(&len);
277 new_status = (char *) palloc(len + 8 + 1);
278 memcpy(new_status, old_status, len);
279 strcpy(new_status + len, " waiting");
280 set_ps_display(new_status, false);
281 new_status[len] = '\0'; /* truncate off " waiting" */
282 }
283
284 /* Is it time to kill it? */
285 if (WaitExceedsMaxStandbyDelay())
286 {
287 pid_t pid;
288
289 /*
290 * Now find out who to throw out of the balloon.
291 */
292 Assert(VirtualTransactionIdIsValid(*waitlist));
293 pid = CancelVirtualTransaction(*waitlist, reason);
294
295 /*
296 * Wait a little bit for it to die so that we avoid flooding
297 * an unresponsive backend when system is heavily loaded.
298 */
299 if (pid != 0)
300 pg_usleep(5000L);
301 }
302 }
303
304 /* The virtual transaction is gone now, wait for the next one */
305 waitlist++;
306 }
307
308 /* Reset ps display if we changed it */
309 if (new_status)
310 {
311 set_ps_display(new_status, false);
312 pfree(new_status);
313 }
314 }
315
316 void
ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,RelFileNode node)317 ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
318 {
319 VirtualTransactionId *backends;
320
321 /*
322 * If we get passed InvalidTransactionId then we are a little surprised,
323 * but it is theoretically possible in normal running. It also happens
324 * when replaying already applied WAL records after a standby crash or
325 * restart, or when replaying an XLOG_HEAP2_VISIBLE record that marks as
326 * frozen a page which was already all-visible. If latestRemovedXid is
327 * invalid then there is no conflict. That rule applies across all record
328 * types that suffer from this conflict.
329 */
330 if (!TransactionIdIsValid(latestRemovedXid))
331 return;
332
333 backends = GetConflictingVirtualXIDs(latestRemovedXid,
334 node.dbNode);
335
336 ResolveRecoveryConflictWithVirtualXIDs(backends,
337 PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
338 true);
339 }
340
341 void
ResolveRecoveryConflictWithTablespace(Oid tsid)342 ResolveRecoveryConflictWithTablespace(Oid tsid)
343 {
344 VirtualTransactionId *temp_file_users;
345
346 /*
347 * Standby users may be currently using this tablespace for their
348 * temporary files. We only care about current users because
349 * temp_tablespace parameter will just ignore tablespaces that no longer
350 * exist.
351 *
352 * Ask everybody to cancel their queries immediately so we can ensure no
353 * temp files remain and we can remove the tablespace. Nuke the entire
354 * site from orbit, it's the only way to be sure.
355 *
356 * XXX: We could work out the pids of active backends using this
357 * tablespace by examining the temp filenames in the directory. We would
358 * then convert the pids into VirtualXIDs before attempting to cancel
359 * them.
360 *
361 * We don't wait for commit because drop tablespace is non-transactional.
362 */
363 temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
364 InvalidOid);
365 ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
366 PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
367 true);
368 }
369
370 void
ResolveRecoveryConflictWithDatabase(Oid dbid)371 ResolveRecoveryConflictWithDatabase(Oid dbid)
372 {
373 /*
374 * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
375 * only waits for transactions and completely idle sessions would block
376 * us. This is rare enough that we do this as simply as possible: no wait,
377 * just force them off immediately.
378 *
379 * No locking is required here because we already acquired
380 * AccessExclusiveLock. Anybody trying to connect while we do this will
381 * block during InitPostgres() and then disconnect when they see the
382 * database has been removed.
383 */
384 while (CountDBBackends(dbid) > 0)
385 {
386 CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true);
387
388 /*
389 * Wait awhile for them to die so that we avoid flooding an
390 * unresponsive backend when system is heavily loaded.
391 */
392 pg_usleep(10000);
393 }
394 }
395
396 /*
397 * ResolveRecoveryConflictWithLock is called from ProcSleep()
398 * to resolve conflicts with other backends holding relation locks.
399 *
400 * The WaitLatch sleep normally done in ProcSleep()
401 * (when not InHotStandby) is performed here, for code clarity.
402 *
403 * We either resolve conflicts immediately or set a timeout to wake us at
404 * the limit of our patience.
405 *
406 * Resolve conflicts by canceling to all backends holding a conflicting
407 * lock. As we are already queued to be granted the lock, no new lock
408 * requests conflicting with ours will be granted in the meantime.
409 *
410 * We also must check for deadlocks involving the Startup process and
411 * hot-standby backend processes. If deadlock_timeout is reached in
412 * this function, all the backends holding the conflicting locks are
413 * requested to check themselves for deadlocks.
414 */
415 void
ResolveRecoveryConflictWithLock(LOCKTAG locktag)416 ResolveRecoveryConflictWithLock(LOCKTAG locktag)
417 {
418 TimestampTz ltime;
419
420 Assert(InHotStandby);
421
422 ltime = GetStandbyLimitTime();
423
424 if (GetCurrentTimestamp() >= ltime && ltime != 0)
425 {
426 /*
427 * We're already behind, so clear a path as quickly as possible.
428 */
429 VirtualTransactionId *backends;
430
431 backends = GetLockConflicts(&locktag, AccessExclusiveLock);
432
433 /*
434 * Prevent ResolveRecoveryConflictWithVirtualXIDs() from reporting
435 * "waiting" in PS display by disabling its argument report_waiting
436 * because the caller, WaitOnLock(), has already reported that.
437 */
438 ResolveRecoveryConflictWithVirtualXIDs(backends,
439 PROCSIG_RECOVERY_CONFLICT_LOCK,
440 false);
441 }
442 else
443 {
444 /*
445 * Wait (or wait again) until ltime, and check for deadlocks as well
446 * if we will be waiting longer than deadlock_timeout
447 */
448 EnableTimeoutParams timeouts[2];
449 int cnt = 0;
450
451 if (ltime != 0)
452 {
453 got_standby_lock_timeout = false;
454 timeouts[cnt].id = STANDBY_LOCK_TIMEOUT;
455 timeouts[cnt].type = TMPARAM_AT;
456 timeouts[cnt].fin_time = ltime;
457 cnt++;
458 }
459
460 got_standby_deadlock_timeout = false;
461 timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
462 timeouts[cnt].type = TMPARAM_AFTER;
463 timeouts[cnt].delay_ms = DeadlockTimeout;
464 cnt++;
465
466 enable_timeouts(timeouts, cnt);
467 }
468
469 /* Wait to be signaled by the release of the Relation Lock */
470 ProcWaitForSignal();
471
472 /*
473 * Exit if ltime is reached. Then all the backends holding conflicting
474 * locks will be canceled in the next ResolveRecoveryConflictWithLock()
475 * call.
476 */
477 if (got_standby_lock_timeout)
478 goto cleanup;
479
480 if (got_standby_deadlock_timeout)
481 {
482 VirtualTransactionId *backends;
483
484 backends = GetLockConflicts(&locktag, AccessExclusiveLock);
485
486 /* Quick exit if there's no work to be done */
487 if (!VirtualTransactionIdIsValid(*backends))
488 goto cleanup;
489
490 /*
491 * Send signals to all the backends holding the conflicting locks, to
492 * ask them to check themselves for deadlocks.
493 */
494 while (VirtualTransactionIdIsValid(*backends))
495 {
496 SignalVirtualTransaction(*backends,
497 PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
498 false);
499 backends++;
500 }
501
502 /*
503 * Wait again here to be signaled by the release of the Relation Lock,
504 * to prevent the subsequent RecoveryConflictWithLock() from causing
505 * deadlock_timeout and sending a request for deadlocks check again.
506 * Otherwise the request continues to be sent every deadlock_timeout
507 * until the relation locks are released or ltime is reached.
508 */
509 got_standby_deadlock_timeout = false;
510 ProcWaitForSignal();
511 }
512
513 cleanup:
514
515 /*
516 * Clear any timeout requests established above. We assume here that the
517 * Startup process doesn't have any other outstanding timeouts than those
518 * used by this function. If that stops being true, we could cancel the
519 * timeouts individually, but that'd be slower.
520 */
521 disable_all_timeouts(false);
522 got_standby_lock_timeout = false;
523 got_standby_deadlock_timeout = false;
524 }
525
526 /*
527 * ResolveRecoveryConflictWithBufferPin is called from LockBufferForCleanup()
528 * to resolve conflicts with other backends holding buffer pins.
529 *
530 * The ProcWaitForSignal() sleep normally done in LockBufferForCleanup()
531 * (when not InHotStandby) is performed here, for code clarity.
532 *
533 * We either resolve conflicts immediately or set a timeout to wake us at
534 * the limit of our patience.
535 *
536 * Resolve conflicts by sending a PROCSIG signal to all backends to check if
537 * they hold one of the buffer pins that is blocking Startup process. If so,
538 * those backends will take an appropriate error action, ERROR or FATAL.
539 *
540 * We also must check for deadlocks. Deadlocks occur because if queries
541 * wait on a lock, that must be behind an AccessExclusiveLock, which can only
542 * be cleared if the Startup process replays a transaction completion record.
543 * If Startup process is also waiting then that is a deadlock. The deadlock
544 * can occur if the query is waiting and then the Startup sleeps, or if
545 * Startup is sleeping and the query waits on a lock. We protect against
546 * only the former sequence here, the latter sequence is checked prior to
547 * the query sleeping, in CheckRecoveryConflictDeadlock().
548 *
549 * Deadlocks are extremely rare, and relatively expensive to check for,
550 * so we don't do a deadlock check right away ... only if we have had to wait
551 * at least deadlock_timeout.
552 */
553 void
ResolveRecoveryConflictWithBufferPin(void)554 ResolveRecoveryConflictWithBufferPin(void)
555 {
556 TimestampTz ltime;
557
558 Assert(InHotStandby);
559
560 ltime = GetStandbyLimitTime();
561
562 if (GetCurrentTimestamp() >= ltime && ltime != 0)
563 {
564 /*
565 * We're already behind, so clear a path as quickly as possible.
566 */
567 SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
568 }
569 else
570 {
571 /*
572 * Wake up at ltime, and check for deadlocks as well if we will be
573 * waiting longer than deadlock_timeout
574 */
575 EnableTimeoutParams timeouts[2];
576 int cnt = 0;
577
578 if (ltime != 0)
579 {
580 timeouts[cnt].id = STANDBY_TIMEOUT;
581 timeouts[cnt].type = TMPARAM_AT;
582 timeouts[cnt].fin_time = ltime;
583 cnt++;
584 }
585
586 got_standby_deadlock_timeout = false;
587 timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
588 timeouts[cnt].type = TMPARAM_AFTER;
589 timeouts[cnt].delay_ms = DeadlockTimeout;
590 cnt++;
591
592 enable_timeouts(timeouts, cnt);
593 }
594
595 /* Wait to be signaled by UnpinBuffer() */
596 ProcWaitForSignal();
597
598 if (got_standby_deadlock_timeout)
599 {
600 /*
601 * Send out a request for hot-standby backends to check themselves for
602 * deadlocks.
603 *
604 * XXX The subsequent ResolveRecoveryConflictWithBufferPin() will wait
605 * to be signaled by UnpinBuffer() again and send a request for
606 * deadlocks check if deadlock_timeout happens. This causes the
607 * request to continue to be sent every deadlock_timeout until the
608 * buffer is unpinned or ltime is reached. This would increase the
609 * workload in the startup process and backends. In practice it may
610 * not be so harmful because the period that the buffer is kept pinned
611 * is basically no so long. But we should fix this?
612 */
613 SendRecoveryConflictWithBufferPin(
614 PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
615 }
616
617 /*
618 * Clear any timeout requests established above. We assume here that the
619 * Startup process doesn't have any other timeouts than what this function
620 * uses. If that stops being true, we could cancel the timeouts
621 * individually, but that'd be slower.
622 */
623 disable_all_timeouts(false);
624 got_standby_deadlock_timeout = false;
625 }
626
627 static void
SendRecoveryConflictWithBufferPin(ProcSignalReason reason)628 SendRecoveryConflictWithBufferPin(ProcSignalReason reason)
629 {
630 Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ||
631 reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
632
633 /*
634 * We send signal to all backends to ask them if they are holding the
635 * buffer pin which is delaying the Startup process. We must not set the
636 * conflict flag yet, since most backends will be innocent. Let the
637 * SIGUSR1 handling in each backend decide their own fate.
638 */
639 CancelDBBackends(InvalidOid, reason, false);
640 }
641
642 /*
643 * In Hot Standby perform early deadlock detection. We abort the lock
644 * wait if we are about to sleep while holding the buffer pin that Startup
645 * process is waiting for.
646 *
647 * Note: this code is pessimistic, because there is no way for it to
648 * determine whether an actual deadlock condition is present: the lock we
649 * need to wait for might be unrelated to any held by the Startup process.
650 * Sooner or later, this mechanism should get ripped out in favor of somehow
651 * accounting for buffer locks in DeadLockCheck(). However, errors here
652 * seem to be very low-probability in practice, so for now it's not worth
653 * the trouble.
654 */
655 void
CheckRecoveryConflictDeadlock(void)656 CheckRecoveryConflictDeadlock(void)
657 {
658 Assert(!InRecovery); /* do not call in Startup process */
659
660 if (!HoldingBufferPinThatDelaysRecovery())
661 return;
662
663 /*
664 * Error message should match ProcessInterrupts() but we avoid calling
665 * that because we aren't handling an interrupt at this point. Note that
666 * we only cancel the current transaction here, so if we are in a
667 * subtransaction and the pin is held by a parent, then the Startup
668 * process will continue to wait even though we have avoided deadlock.
669 */
670 ereport(ERROR,
671 (errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
672 errmsg("canceling statement due to conflict with recovery"),
673 errdetail("User transaction caused buffer deadlock with recovery.")));
674 }
675
676
677 /* --------------------------------
678 * timeout handler routines
679 * --------------------------------
680 */
681
682 /*
683 * StandbyDeadLockHandler() will be called if STANDBY_DEADLOCK_TIMEOUT
684 * occurs before STANDBY_TIMEOUT.
685 */
686 void
StandbyDeadLockHandler(void)687 StandbyDeadLockHandler(void)
688 {
689 got_standby_deadlock_timeout = true;
690 }
691
692 /*
693 * StandbyTimeoutHandler() will be called if STANDBY_TIMEOUT is exceeded.
694 * Send out a request to release conflicting buffer pins unconditionally,
695 * so we can press ahead with applying changes in recovery.
696 */
697 void
StandbyTimeoutHandler(void)698 StandbyTimeoutHandler(void)
699 {
700 /* forget any pending STANDBY_DEADLOCK_TIMEOUT request */
701 disable_timeout(STANDBY_DEADLOCK_TIMEOUT, false);
702
703 SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
704 }
705
706 /*
707 * StandbyLockTimeoutHandler() will be called if STANDBY_LOCK_TIMEOUT is exceeded.
708 */
709 void
StandbyLockTimeoutHandler(void)710 StandbyLockTimeoutHandler(void)
711 {
712 got_standby_lock_timeout = true;
713 }
714
715 /*
716 * -----------------------------------------------------
717 * Locking in Recovery Mode
718 * -----------------------------------------------------
719 *
720 * All locks are held by the Startup process using a single virtual
721 * transaction. This implementation is both simpler and in some senses,
722 * more correct. The locks held mean "some original transaction held
723 * this lock, so query access is not allowed at this time". So the Startup
724 * process is the proxy by which the original locks are implemented.
725 *
726 * We only keep track of AccessExclusiveLocks, which are only ever held by
727 * one transaction on one relation.
728 *
729 * We keep a hash table of lists of locks in local memory keyed by xid,
730 * RecoveryLockLists, so we can keep track of the various entries made by
731 * the Startup process's virtual xid in the shared lock table.
732 *
733 * We record the lock against the top-level xid, rather than individual
734 * subtransaction xids. This means AccessExclusiveLocks held by aborted
735 * subtransactions are not released as early as possible on standbys.
736 *
737 * List elements use type xl_standby_lock, since the WAL record type exactly
738 * matches the information that we need to keep track of.
739 *
740 * We use session locks rather than normal locks so we don't need
741 * ResourceOwners.
742 */
743
744
745 void
StandbyAcquireAccessExclusiveLock(TransactionId xid,Oid dbOid,Oid relOid)746 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
747 {
748 RecoveryLockListsEntry *entry;
749 xl_standby_lock *newlock;
750 LOCKTAG locktag;
751 bool found;
752
753 /* Already processed? */
754 if (!TransactionIdIsValid(xid) ||
755 TransactionIdDidCommit(xid) ||
756 TransactionIdDidAbort(xid))
757 return;
758
759 elog(trace_recovery(DEBUG4),
760 "adding recovery lock: db %u rel %u", dbOid, relOid);
761
762 /* dbOid is InvalidOid when we are locking a shared relation. */
763 Assert(OidIsValid(relOid));
764
765 /* Create a new list for this xid, if we don't have one already. */
766 entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
767 if (!found)
768 {
769 entry->xid = xid;
770 entry->locks = NIL;
771 }
772
773 newlock = palloc(sizeof(xl_standby_lock));
774 newlock->xid = xid;
775 newlock->dbOid = dbOid;
776 newlock->relOid = relOid;
777 entry->locks = lappend(entry->locks, newlock);
778
779 SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
780
781 (void) LockAcquire(&locktag, AccessExclusiveLock, true, false);
782 }
783
784 static void
StandbyReleaseLockList(List * locks)785 StandbyReleaseLockList(List *locks)
786 {
787 while (locks)
788 {
789 xl_standby_lock *lock = (xl_standby_lock *) linitial(locks);
790 LOCKTAG locktag;
791 elog(trace_recovery(DEBUG4),
792 "releasing recovery lock: xid %u db %u rel %u",
793 lock->xid, lock->dbOid, lock->relOid);
794 SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
795 if (!LockRelease(&locktag, AccessExclusiveLock, true))
796 {
797 elog(LOG,
798 "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
799 lock->xid, lock->dbOid, lock->relOid);
800 Assert(false);
801 }
802 pfree(lock);
803 locks = list_delete_first(locks);
804 }
805 }
806
807 static void
StandbyReleaseLocks(TransactionId xid)808 StandbyReleaseLocks(TransactionId xid)
809 {
810 RecoveryLockListsEntry *entry;
811
812 if (TransactionIdIsValid(xid))
813 {
814 if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
815 {
816 StandbyReleaseLockList(entry->locks);
817 hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
818 }
819 }
820 else
821 StandbyReleaseAllLocks();
822 }
823
824 /*
825 * Release locks for a transaction tree, starting at xid down, from
826 * RecoveryLockLists.
827 *
828 * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
829 * to remove any AccessExclusiveLocks requested by a transaction.
830 */
831 void
StandbyReleaseLockTree(TransactionId xid,int nsubxids,TransactionId * subxids)832 StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
833 {
834 int i;
835
836 StandbyReleaseLocks(xid);
837
838 for (i = 0; i < nsubxids; i++)
839 StandbyReleaseLocks(subxids[i]);
840 }
841
842 /*
843 * Called at end of recovery and when we see a shutdown checkpoint.
844 */
845 void
StandbyReleaseAllLocks(void)846 StandbyReleaseAllLocks(void)
847 {
848 HASH_SEQ_STATUS status;
849 RecoveryLockListsEntry *entry;
850
851 elog(trace_recovery(DEBUG2), "release all standby locks");
852
853 hash_seq_init(&status, RecoveryLockLists);
854 while ((entry = hash_seq_search(&status)))
855 {
856 StandbyReleaseLockList(entry->locks);
857 hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
858 }
859 }
860
861 /*
862 * StandbyReleaseOldLocks
863 * Release standby locks held by top-level XIDs that aren't running,
864 * as long as they're not prepared transactions.
865 */
866 void
StandbyReleaseOldLocks(int nxids,TransactionId * xids)867 StandbyReleaseOldLocks(int nxids, TransactionId *xids)
868 {
869 HASH_SEQ_STATUS status;
870 RecoveryLockListsEntry *entry;
871
872 hash_seq_init(&status, RecoveryLockLists);
873 while ((entry = hash_seq_search(&status)))
874 {
875 bool remove = false;
876
877 Assert(TransactionIdIsValid(entry->xid));
878
879 if (StandbyTransactionIdIsPrepared(entry->xid))
880 remove = false;
881 else
882 {
883 int i;
884 bool found = false;
885
886 for (i = 0; i < nxids; i++)
887 {
888 if (entry->xid == xids[i])
889 {
890 found = true;
891 break;
892 }
893 }
894
895 /*
896 * If its not a running transaction, remove it.
897 */
898 if (!found)
899 remove = true;
900 }
901
902 if (remove)
903 {
904 StandbyReleaseLockList(entry->locks);
905 hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
906 }
907 }
908 }
909
910 /*
911 * --------------------------------------------------------------------
912 * Recovery handling for Rmgr RM_STANDBY_ID
913 *
914 * These record types will only be created if XLogStandbyInfoActive()
915 * --------------------------------------------------------------------
916 */
917
918 void
standby_redo(XLogReaderState * record)919 standby_redo(XLogReaderState *record)
920 {
921 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
922
923 /* Backup blocks are not used in standby records */
924 Assert(!XLogRecHasAnyBlockRefs(record));
925
926 /* Do nothing if we're not in hot standby mode */
927 if (standbyState == STANDBY_DISABLED)
928 return;
929
930 if (info == XLOG_STANDBY_LOCK)
931 {
932 xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
933 int i;
934
935 for (i = 0; i < xlrec->nlocks; i++)
936 StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
937 xlrec->locks[i].dbOid,
938 xlrec->locks[i].relOid);
939 }
940 else if (info == XLOG_RUNNING_XACTS)
941 {
942 xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
943 RunningTransactionsData running;
944
945 running.xcnt = xlrec->xcnt;
946 running.subxcnt = xlrec->subxcnt;
947 running.subxid_overflow = xlrec->subxid_overflow;
948 running.nextXid = xlrec->nextXid;
949 running.latestCompletedXid = xlrec->latestCompletedXid;
950 running.oldestRunningXid = xlrec->oldestRunningXid;
951 running.xids = xlrec->xids;
952
953 ProcArrayApplyRecoveryInfo(&running);
954 }
955 else if (info == XLOG_INVALIDATIONS)
956 {
957 xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
958
959 ProcessCommittedInvalidationMessages(xlrec->msgs,
960 xlrec->nmsgs,
961 xlrec->relcacheInitFileInval,
962 xlrec->dbId,
963 xlrec->tsId);
964 }
965 else
966 elog(PANIC, "standby_redo: unknown op code %u", info);
967 }
968
969 /*
970 * Log details of the current snapshot to WAL. This allows the snapshot state
971 * to be reconstructed on the standby and for logical decoding.
972 *
973 * This is used for Hot Standby as follows:
974 *
975 * We can move directly to STANDBY_SNAPSHOT_READY at startup if we
976 * start from a shutdown checkpoint because we know nothing was running
977 * at that time and our recovery snapshot is known empty. In the more
978 * typical case of an online checkpoint we need to jump through a few
979 * hoops to get a correct recovery snapshot and this requires a two or
980 * sometimes a three stage process.
981 *
982 * The initial snapshot must contain all running xids and all current
983 * AccessExclusiveLocks at a point in time on the standby. Assembling
984 * that information while the server is running requires many and
985 * various LWLocks, so we choose to derive that information piece by
986 * piece and then re-assemble that info on the standby. When that
987 * information is fully assembled we move to STANDBY_SNAPSHOT_READY.
988 *
989 * Since locking on the primary when we derive the information is not
990 * strict, we note that there is a time window between the derivation and
991 * writing to WAL of the derived information. That allows race conditions
992 * that we must resolve, since xids and locks may enter or leave the
993 * snapshot during that window. This creates the issue that an xid or
994 * lock may start *after* the snapshot has been derived yet *before* the
995 * snapshot is logged in the running xacts WAL record. We resolve this by
996 * starting to accumulate changes at a point just prior to when we derive
997 * the snapshot on the primary, then ignore duplicates when we later apply
998 * the snapshot from the running xacts record. This is implemented during
999 * CreateCheckpoint() where we use the logical checkpoint location as
1000 * our starting point and then write the running xacts record immediately
1001 * before writing the main checkpoint WAL record. Since we always start
1002 * up from a checkpoint and are immediately at our starting point, we
1003 * unconditionally move to STANDBY_INITIALIZED. After this point we
1004 * must do 4 things:
1005 * * move shared nextXid forwards as we see new xids
1006 * * extend the clog and subtrans with each new xid
1007 * * keep track of uncommitted known assigned xids
1008 * * keep track of uncommitted AccessExclusiveLocks
1009 *
1010 * When we see a commit/abort we must remove known assigned xids and locks
1011 * from the completing transaction. Attempted removals that cannot locate
1012 * an entry are expected and must not cause an error when we are in state
1013 * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
1014 * KnownAssignedXidsRemove().
1015 *
1016 * Later, when we apply the running xact data we must be careful to ignore
1017 * transactions already committed, since those commits raced ahead when
1018 * making WAL entries.
1019 *
1020 * The loose timing also means that locks may be recorded that have a
1021 * zero xid, since xids are removed from procs before locks are removed.
1022 * So we must prune the lock list down to ensure we hold locks only for
1023 * currently running xids, performed by StandbyReleaseOldLocks().
1024 * Zero xids should no longer be possible, but we may be replaying WAL
1025 * from a time when they were possible.
1026 *
1027 * For logical decoding only the running xacts information is needed;
1028 * there's no need to look at the locking information, but it's logged anyway,
1029 * as there's no independent knob to just enable logical decoding. For
1030 * details of how this is used, check snapbuild.c's introductory comment.
1031 *
1032 *
1033 * Returns the RecPtr of the last inserted record.
1034 */
1035 XLogRecPtr
LogStandbySnapshot(void)1036 LogStandbySnapshot(void)
1037 {
1038 XLogRecPtr recptr;
1039 RunningTransactions running;
1040 xl_standby_lock *locks;
1041 int nlocks;
1042
1043 Assert(XLogStandbyInfoActive());
1044
1045 /*
1046 * Get details of any AccessExclusiveLocks being held at the moment.
1047 */
1048 locks = GetRunningTransactionLocks(&nlocks);
1049 if (nlocks > 0)
1050 LogAccessExclusiveLocks(nlocks, locks);
1051 pfree(locks);
1052
1053 /*
1054 * Log details of all in-progress transactions. This should be the last
1055 * record we write, because standby will open up when it sees this.
1056 */
1057 running = GetRunningTransactionData();
1058
1059 /*
1060 * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
1061 * For Hot Standby this can be done before inserting the WAL record
1062 * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
1063 * the clog. For logical decoding, though, the lock can't be released
1064 * early because the clog might be "in the future" from the POV of the
1065 * historic snapshot. This would allow for situations where we're waiting
1066 * for the end of a transaction listed in the xl_running_xacts record
1067 * which, according to the WAL, has committed before the xl_running_xacts
1068 * record. Fortunately this routine isn't executed frequently, and it's
1069 * only a shared lock.
1070 */
1071 if (wal_level < WAL_LEVEL_LOGICAL)
1072 LWLockRelease(ProcArrayLock);
1073
1074 recptr = LogCurrentRunningXacts(running);
1075
1076 /* Release lock if we kept it longer ... */
1077 if (wal_level >= WAL_LEVEL_LOGICAL)
1078 LWLockRelease(ProcArrayLock);
1079
1080 /* GetRunningTransactionData() acquired XidGenLock, we must release it */
1081 LWLockRelease(XidGenLock);
1082
1083 return recptr;
1084 }
1085
1086 /*
1087 * Record an enhanced snapshot of running transactions into WAL.
1088 *
1089 * The definitions of RunningTransactionsData and xl_xact_running_xacts
1090 * are similar. We keep them separate because xl_xact_running_xacts
1091 * is a contiguous chunk of memory and never exists fully until it is
1092 * assembled in WAL.
1093 */
1094 static XLogRecPtr
LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)1095 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
1096 {
1097 xl_running_xacts xlrec;
1098 XLogRecPtr recptr;
1099
1100 xlrec.xcnt = CurrRunningXacts->xcnt;
1101 xlrec.subxcnt = CurrRunningXacts->subxcnt;
1102 xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
1103 xlrec.nextXid = CurrRunningXacts->nextXid;
1104 xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
1105 xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
1106
1107 /* Header */
1108 XLogBeginInsert();
1109 XLogRegisterData((char *) (&xlrec), MinSizeOfXactRunningXacts);
1110
1111 /* array of TransactionIds */
1112 if (xlrec.xcnt > 0)
1113 XLogRegisterData((char *) CurrRunningXacts->xids,
1114 (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId));
1115
1116 recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS);
1117
1118 if (CurrRunningXacts->subxid_overflow)
1119 elog(trace_recovery(DEBUG2),
1120 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1121 CurrRunningXacts->xcnt,
1122 (uint32) (recptr >> 32), (uint32) recptr,
1123 CurrRunningXacts->oldestRunningXid,
1124 CurrRunningXacts->latestCompletedXid,
1125 CurrRunningXacts->nextXid);
1126 else
1127 elog(trace_recovery(DEBUG2),
1128 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1129 CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
1130 (uint32) (recptr >> 32), (uint32) recptr,
1131 CurrRunningXacts->oldestRunningXid,
1132 CurrRunningXacts->latestCompletedXid,
1133 CurrRunningXacts->nextXid);
1134
1135 /*
1136 * Ensure running_xacts information is synced to disk not too far in the
1137 * future. We don't want to stall anything though (i.e. use XLogFlush()),
1138 * so we let the wal writer do it during normal operation.
1139 * XLogSetAsyncXactLSN() conveniently will mark the LSN as to-be-synced
1140 * and nudge the WALWriter into action if sleeping. Check
1141 * XLogBackgroundFlush() for details why a record might not be flushed
1142 * without it.
1143 */
1144 XLogSetAsyncXactLSN(recptr);
1145
1146 return recptr;
1147 }
1148
1149 /*
1150 * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
1151 * logged, as described in backend/storage/lmgr/README.
1152 */
1153 static void
LogAccessExclusiveLocks(int nlocks,xl_standby_lock * locks)1154 LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
1155 {
1156 xl_standby_locks xlrec;
1157
1158 xlrec.nlocks = nlocks;
1159
1160 XLogBeginInsert();
1161 XLogRegisterData((char *) &xlrec, offsetof(xl_standby_locks, locks));
1162 XLogRegisterData((char *) locks, nlocks * sizeof(xl_standby_lock));
1163
1164 (void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK);
1165 }
1166
1167 /*
1168 * Individual logging of AccessExclusiveLocks for use during LockAcquire()
1169 */
1170 void
LogAccessExclusiveLock(Oid dbOid,Oid relOid)1171 LogAccessExclusiveLock(Oid dbOid, Oid relOid)
1172 {
1173 xl_standby_lock xlrec;
1174
1175 xlrec.xid = GetTopTransactionId();
1176
1177 /*
1178 * Decode the locktag back to the original values, to avoid sending lots
1179 * of empty bytes with every message. See lock.h to check how a locktag
1180 * is defined for LOCKTAG_RELATION
1181 */
1182 xlrec.dbOid = dbOid;
1183 xlrec.relOid = relOid;
1184
1185 LogAccessExclusiveLocks(1, &xlrec);
1186 }
1187
1188 /*
1189 * Prepare to log an AccessExclusiveLock, for use during LockAcquire()
1190 */
1191 void
LogAccessExclusiveLockPrepare(void)1192 LogAccessExclusiveLockPrepare(void)
1193 {
1194 /*
1195 * Ensure that a TransactionId has been assigned to this transaction, for
1196 * two reasons, both related to lock release on the standby. First, we
1197 * must assign an xid so that RecordTransactionCommit() and
1198 * RecordTransactionAbort() do not optimise away the transaction
1199 * completion record which recovery relies upon to release locks. It's a
1200 * hack, but for a corner case not worth adding code for into the main
1201 * commit path. Second, we must assign an xid before the lock is recorded
1202 * in shared memory, otherwise a concurrently executing
1203 * GetRunningTransactionLocks() might see a lock associated with an
1204 * InvalidTransactionId which we later assert cannot happen.
1205 */
1206 (void) GetTopTransactionId();
1207 }
1208
1209 /*
1210 * Emit WAL for invalidations. This currently is only used for commits without
1211 * an xid but which contain invalidations.
1212 */
1213 void
LogStandbyInvalidations(int nmsgs,SharedInvalidationMessage * msgs,bool relcacheInitFileInval)1214 LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
1215 bool relcacheInitFileInval)
1216 {
1217 xl_invalidations xlrec;
1218
1219 /* prepare record */
1220 memset(&xlrec, 0, sizeof(xlrec));
1221 xlrec.dbId = MyDatabaseId;
1222 xlrec.tsId = MyDatabaseTableSpace;
1223 xlrec.relcacheInitFileInval = relcacheInitFileInval;
1224 xlrec.nmsgs = nmsgs;
1225
1226 /* perform insertion */
1227 XLogBeginInsert();
1228 XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
1229 XLogRegisterData((char *) msgs,
1230 nmsgs * sizeof(SharedInvalidationMessage));
1231 XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
1232 }
1233