1 /*-------------------------------------------------------------------------
2  *
3  * standby.c
4  *	  Misc functions used in Hot Standby mode.
5  *
6  *	All functions for handling RM_STANDBY_ID, which relate to
7  *	AccessExclusiveLocks and starting snapshots for Hot Standby mode.
8  *	Plus conflict recovery processing.
9  *
10  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *	  src/backend/storage/ipc/standby.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 #include "access/transam.h"
20 #include "access/twophase.h"
21 #include "access/xact.h"
22 #include "access/xlog.h"
23 #include "access/xloginsert.h"
24 #include "miscadmin.h"
25 #include "pgstat.h"
26 #include "storage/bufmgr.h"
27 #include "storage/lmgr.h"
28 #include "storage/proc.h"
29 #include "storage/procarray.h"
30 #include "storage/sinvaladt.h"
31 #include "storage/standby.h"
32 #include "utils/hsearch.h"
33 #include "utils/memutils.h"
34 #include "utils/ps_status.h"
35 #include "utils/timeout.h"
36 #include "utils/timestamp.h"
37 
38 /* User-settable GUC parameters */
39 int			vacuum_defer_cleanup_age;
40 int			max_standby_archive_delay = 30 * 1000;
41 int			max_standby_streaming_delay = 30 * 1000;
42 
43 static HTAB *RecoveryLockLists;
44 
45 /* Flags set by timeout handlers */
46 static volatile sig_atomic_t got_standby_deadlock_timeout = false;
47 static volatile sig_atomic_t got_standby_lock_timeout = false;
48 
49 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
50 									   ProcSignalReason reason, bool report_waiting);
51 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
52 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
53 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
54 
55 /*
56  * Keep track of all the locks owned by a given transaction.
57  */
58 typedef struct RecoveryLockListsEntry
59 {
60 	TransactionId xid;
61 	List	   *locks;
62 } RecoveryLockListsEntry;
63 
64 /*
65  * InitRecoveryTransactionEnvironment
66  *		Initialize tracking of in-progress transactions in master
67  *
68  * We need to issue shared invalidations and hold locks. Holding locks
69  * means others may want to wait on us, so we need to make a lock table
70  * vxact entry like a real transaction. We could create and delete
71  * lock table entries for each transaction but its simpler just to create
72  * one permanent entry and leave it there all the time. Locks are then
73  * acquired and released as needed. Yes, this means you can see the
74  * Startup process in pg_locks once we have run this.
75  */
76 void
InitRecoveryTransactionEnvironment(void)77 InitRecoveryTransactionEnvironment(void)
78 {
79 	VirtualTransactionId vxid;
80 	HASHCTL		hash_ctl;
81 
82 	/*
83 	 * Initialize the hash table for tracking the list of locks held by each
84 	 * transaction.
85 	 */
86 	memset(&hash_ctl, 0, sizeof(hash_ctl));
87 	hash_ctl.keysize = sizeof(TransactionId);
88 	hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
89 	RecoveryLockLists = hash_create("RecoveryLockLists",
90 									64,
91 									&hash_ctl,
92 									HASH_ELEM | HASH_BLOBS);
93 
94 	/*
95 	 * Initialize shared invalidation management for Startup process, being
96 	 * careful to register ourselves as a sendOnly process so we don't need to
97 	 * read messages, nor will we get signalled when the queue starts filling
98 	 * up.
99 	 */
100 	SharedInvalBackendInit(true);
101 
102 	/*
103 	 * Lock a virtual transaction id for Startup process.
104 	 *
105 	 * We need to do GetNextLocalTransactionId() because
106 	 * SharedInvalBackendInit() leaves localTransactionid invalid and the lock
107 	 * manager doesn't like that at all.
108 	 *
109 	 * Note that we don't need to run XactLockTableInsert() because nobody
110 	 * needs to wait on xids. That sounds a little strange, but table locks
111 	 * are held by vxids and row level locks are held by xids. All queries
112 	 * hold AccessShareLocks so never block while we write or lock new rows.
113 	 */
114 	vxid.backendId = MyBackendId;
115 	vxid.localTransactionId = GetNextLocalTransactionId();
116 	VirtualXactLockTableInsert(vxid);
117 
118 	standbyState = STANDBY_INITIALIZED;
119 }
120 
121 /*
122  * ShutdownRecoveryTransactionEnvironment
123  *		Shut down transaction tracking
124  *
125  * Prepare to switch from hot standby mode to normal operation. Shut down
126  * recovery-time transaction tracking.
127  *
128  * This must be called even in shutdown of startup process if transaction
129  * tracking has been initialized. Otherwise some locks the tracked
130  * transactions were holding will not be released and and may interfere with
131  * the processes still running (but will exit soon later) at the exit of
132  * startup process.
133  */
134 void
ShutdownRecoveryTransactionEnvironment(void)135 ShutdownRecoveryTransactionEnvironment(void)
136 {
137 	/*
138 	 * Do nothing if RecoveryLockLists is NULL because which means that
139 	 * transaction tracking has not been yet initialized or has been already
140 	 * shutdowned. This prevents transaction tracking from being shutdowned
141 	 * unexpectedly more than once.
142 	 */
143 	if (RecoveryLockLists == NULL)
144 		return;
145 
146 	/* Mark all tracked in-progress transactions as finished. */
147 	ExpireAllKnownAssignedTransactionIds();
148 
149 	/* Release all locks the tracked transactions were holding */
150 	StandbyReleaseAllLocks();
151 
152 	/* Destroy the hash table of locks. */
153 	hash_destroy(RecoveryLockLists);
154 	RecoveryLockLists = NULL;
155 
156 	/* Cleanup our VirtualTransaction */
157 	VirtualXactLockTableCleanup();
158 }
159 
160 
161 /*
162  * -----------------------------------------------------
163  *		Standby wait timers and backend cancel logic
164  * -----------------------------------------------------
165  */
166 
167 /*
168  * Determine the cutoff time at which we want to start canceling conflicting
169  * transactions.  Returns zero (a time safely in the past) if we are willing
170  * to wait forever.
171  */
172 static TimestampTz
GetStandbyLimitTime(void)173 GetStandbyLimitTime(void)
174 {
175 	TimestampTz rtime;
176 	bool		fromStream;
177 
178 	/*
179 	 * The cutoff time is the last WAL data receipt time plus the appropriate
180 	 * delay variable.  Delay of -1 means wait forever.
181 	 */
182 	GetXLogReceiptTime(&rtime, &fromStream);
183 	if (fromStream)
184 	{
185 		if (max_standby_streaming_delay < 0)
186 			return 0;			/* wait forever */
187 		return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
188 	}
189 	else
190 	{
191 		if (max_standby_archive_delay < 0)
192 			return 0;			/* wait forever */
193 		return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
194 	}
195 }
196 
197 #define STANDBY_INITIAL_WAIT_US  1000
198 static int	standbyWait_us = STANDBY_INITIAL_WAIT_US;
199 
200 /*
201  * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
202  * We wait here for a while then return. If we decide we can't wait any
203  * more then we return true, if we can wait some more return false.
204  */
205 static bool
WaitExceedsMaxStandbyDelay(void)206 WaitExceedsMaxStandbyDelay(void)
207 {
208 	TimestampTz ltime;
209 
210 	CHECK_FOR_INTERRUPTS();
211 
212 	/* Are we past the limit time? */
213 	ltime = GetStandbyLimitTime();
214 	if (ltime && GetCurrentTimestamp() >= ltime)
215 		return true;
216 
217 	/*
218 	 * Sleep a bit (this is essential to avoid busy-waiting).
219 	 */
220 	pg_usleep(standbyWait_us);
221 
222 	/*
223 	 * Progressively increase the sleep times, but not to more than 1s, since
224 	 * pg_usleep isn't interruptable on some platforms.
225 	 */
226 	standbyWait_us *= 2;
227 	if (standbyWait_us > 1000000)
228 		standbyWait_us = 1000000;
229 
230 	return false;
231 }
232 
233 /*
234  * This is the main executioner for any query backend that conflicts with
235  * recovery processing. Judgement has already been passed on it within
236  * a specific rmgr. Here we just issue the orders to the procs. The procs
237  * then throw the required error as instructed.
238  *
239  * If report_waiting is true, "waiting" is reported in PS display if necessary.
240  * If the caller has already reported that, report_waiting should be false.
241  * Otherwise, "waiting" is reported twice unexpectedly.
242  */
243 static void
ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId * waitlist,ProcSignalReason reason,bool report_waiting)244 ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
245 									   ProcSignalReason reason, bool report_waiting)
246 {
247 	TimestampTz waitStart = 0;
248 	char	   *new_status;
249 
250 	/* Fast exit, to avoid a kernel call if there's no work to be done. */
251 	if (!VirtualTransactionIdIsValid(*waitlist))
252 		return;
253 
254 	if (report_waiting)
255 		waitStart = GetCurrentTimestamp();
256 	new_status = NULL;			/* we haven't changed the ps display */
257 
258 	while (VirtualTransactionIdIsValid(*waitlist))
259 	{
260 		/* reset standbyWait_us for each xact we wait for */
261 		standbyWait_us = STANDBY_INITIAL_WAIT_US;
262 
263 		/* wait until the virtual xid is gone */
264 		while (!VirtualXactLock(*waitlist, false))
265 		{
266 			/*
267 			 * Report via ps if we have been waiting for more than 500 msec
268 			 * (should that be configurable?)
269 			 */
270 			if (update_process_title && new_status == NULL && report_waiting &&
271 				TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
272 										   500))
273 			{
274 				const char *old_status;
275 				int			len;
276 
277 				old_status = get_ps_display(&len);
278 				new_status = (char *) palloc(len + 8 + 1);
279 				memcpy(new_status, old_status, len);
280 				strcpy(new_status + len, " waiting");
281 				set_ps_display(new_status, false);
282 				new_status[len] = '\0'; /* truncate off " waiting" */
283 			}
284 
285 			/* Is it time to kill it? */
286 			if (WaitExceedsMaxStandbyDelay())
287 			{
288 				pid_t		pid;
289 
290 				/*
291 				 * Now find out who to throw out of the balloon.
292 				 */
293 				Assert(VirtualTransactionIdIsValid(*waitlist));
294 				pid = CancelVirtualTransaction(*waitlist, reason);
295 
296 				/*
297 				 * Wait a little bit for it to die so that we avoid flooding
298 				 * an unresponsive backend when system is heavily loaded.
299 				 */
300 				if (pid != 0)
301 					pg_usleep(5000L);
302 			}
303 		}
304 
305 		/* The virtual transaction is gone now, wait for the next one */
306 		waitlist++;
307 	}
308 
309 	/* Reset ps display if we changed it */
310 	if (new_status)
311 	{
312 		set_ps_display(new_status, false);
313 		pfree(new_status);
314 	}
315 }
316 
317 void
ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,RelFileNode node)318 ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
319 {
320 	VirtualTransactionId *backends;
321 
322 	/*
323 	 * If we get passed InvalidTransactionId then we are a little surprised,
324 	 * but it is theoretically possible in normal running. It also happens
325 	 * when replaying already applied WAL records after a standby crash or
326 	 * restart, or when replaying an XLOG_HEAP2_VISIBLE record that marks as
327 	 * frozen a page which was already all-visible.  If latestRemovedXid is
328 	 * invalid then there is no conflict. That rule applies across all record
329 	 * types that suffer from this conflict.
330 	 */
331 	if (!TransactionIdIsValid(latestRemovedXid))
332 		return;
333 
334 	backends = GetConflictingVirtualXIDs(latestRemovedXid,
335 										 node.dbNode);
336 
337 	ResolveRecoveryConflictWithVirtualXIDs(backends,
338 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
339 										   true);
340 }
341 
342 void
ResolveRecoveryConflictWithTablespace(Oid tsid)343 ResolveRecoveryConflictWithTablespace(Oid tsid)
344 {
345 	VirtualTransactionId *temp_file_users;
346 
347 	/*
348 	 * Standby users may be currently using this tablespace for their
349 	 * temporary files. We only care about current users because
350 	 * temp_tablespace parameter will just ignore tablespaces that no longer
351 	 * exist.
352 	 *
353 	 * Ask everybody to cancel their queries immediately so we can ensure no
354 	 * temp files remain and we can remove the tablespace. Nuke the entire
355 	 * site from orbit, it's the only way to be sure.
356 	 *
357 	 * XXX: We could work out the pids of active backends using this
358 	 * tablespace by examining the temp filenames in the directory. We would
359 	 * then convert the pids into VirtualXIDs before attempting to cancel
360 	 * them.
361 	 *
362 	 * We don't wait for commit because drop tablespace is non-transactional.
363 	 */
364 	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
365 												InvalidOid);
366 	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
367 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
368 										   true);
369 }
370 
371 void
ResolveRecoveryConflictWithDatabase(Oid dbid)372 ResolveRecoveryConflictWithDatabase(Oid dbid)
373 {
374 	/*
375 	 * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
376 	 * only waits for transactions and completely idle sessions would block
377 	 * us. This is rare enough that we do this as simply as possible: no wait,
378 	 * just force them off immediately.
379 	 *
380 	 * No locking is required here because we already acquired
381 	 * AccessExclusiveLock. Anybody trying to connect while we do this will
382 	 * block during InitPostgres() and then disconnect when they see the
383 	 * database has been removed.
384 	 */
385 	while (CountDBBackends(dbid) > 0)
386 	{
387 		CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true);
388 
389 		/*
390 		 * Wait awhile for them to die so that we avoid flooding an
391 		 * unresponsive backend when system is heavily loaded.
392 		 */
393 		pg_usleep(10000);
394 	}
395 }
396 
397 /*
398  * ResolveRecoveryConflictWithLock is called from ProcSleep()
399  * to resolve conflicts with other backends holding relation locks.
400  *
401  * The WaitLatch sleep normally done in ProcSleep()
402  * (when not InHotStandby) is performed here, for code clarity.
403  *
404  * We either resolve conflicts immediately or set a timeout to wake us at
405  * the limit of our patience.
406  *
407  * Resolve conflicts by canceling to all backends holding a conflicting
408  * lock.  As we are already queued to be granted the lock, no new lock
409  * requests conflicting with ours will be granted in the meantime.
410  *
411  * We also must check for deadlocks involving the Startup process and
412  * hot-standby backend processes. If deadlock_timeout is reached in
413  * this function, all the backends holding the conflicting locks are
414  * requested to check themselves for deadlocks.
415  */
416 void
ResolveRecoveryConflictWithLock(LOCKTAG locktag)417 ResolveRecoveryConflictWithLock(LOCKTAG locktag)
418 {
419 	TimestampTz ltime;
420 
421 	Assert(InHotStandby);
422 
423 	ltime = GetStandbyLimitTime();
424 
425 	if (GetCurrentTimestamp() >= ltime && ltime != 0)
426 	{
427 		/*
428 		 * We're already behind, so clear a path as quickly as possible.
429 		 */
430 		VirtualTransactionId *backends;
431 
432 		backends = GetLockConflicts(&locktag, AccessExclusiveLock);
433 
434 		/*
435 		 * Prevent ResolveRecoveryConflictWithVirtualXIDs() from reporting
436 		 * "waiting" in PS display by disabling its argument report_waiting
437 		 * because the caller, WaitOnLock(), has already reported that.
438 		 */
439 		ResolveRecoveryConflictWithVirtualXIDs(backends,
440 											   PROCSIG_RECOVERY_CONFLICT_LOCK,
441 											   false);
442 	}
443 	else
444 	{
445 		/*
446 		 * Wait (or wait again) until ltime, and check for deadlocks as well
447 		 * if we will be waiting longer than deadlock_timeout
448 		 */
449 		EnableTimeoutParams timeouts[2];
450 		int			cnt = 0;
451 
452 		if (ltime != 0)
453 		{
454 			got_standby_lock_timeout = false;
455 			timeouts[cnt].id = STANDBY_LOCK_TIMEOUT;
456 			timeouts[cnt].type = TMPARAM_AT;
457 			timeouts[cnt].fin_time = ltime;
458 			cnt++;
459 		}
460 
461 		got_standby_deadlock_timeout = false;
462 		timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
463 		timeouts[cnt].type = TMPARAM_AFTER;
464 		timeouts[cnt].delay_ms = DeadlockTimeout;
465 		cnt++;
466 
467 		enable_timeouts(timeouts, cnt);
468 	}
469 
470 	/* Wait to be signaled by the release of the Relation Lock */
471 	ProcWaitForSignal(PG_WAIT_LOCK | locktag.locktag_type);
472 
473 	/*
474 	 * Exit if ltime is reached. Then all the backends holding conflicting
475 	 * locks will be canceled in the next ResolveRecoveryConflictWithLock()
476 	 * call.
477 	 */
478 	if (got_standby_lock_timeout)
479 		goto cleanup;
480 
481 	if (got_standby_deadlock_timeout)
482 	{
483 		VirtualTransactionId *backends;
484 
485 		backends = GetLockConflicts(&locktag, AccessExclusiveLock);
486 
487 		/* Quick exit if there's no work to be done */
488 		if (!VirtualTransactionIdIsValid(*backends))
489 			goto cleanup;
490 
491 		/*
492 		 * Send signals to all the backends holding the conflicting locks, to
493 		 * ask them to check themselves for deadlocks.
494 		 */
495 		while (VirtualTransactionIdIsValid(*backends))
496 		{
497 			SignalVirtualTransaction(*backends,
498 									 PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
499 									 false);
500 			backends++;
501 		}
502 
503 		/*
504 		 * Wait again here to be signaled by the release of the Relation Lock,
505 		 * to prevent the subsequent RecoveryConflictWithLock() from causing
506 		 * deadlock_timeout and sending a request for deadlocks check again.
507 		 * Otherwise the request continues to be sent every deadlock_timeout
508 		 * until the relation locks are released or ltime is reached.
509 		 */
510 		got_standby_deadlock_timeout = false;
511 		ProcWaitForSignal(PG_WAIT_LOCK | locktag.locktag_type);
512 	}
513 
514 cleanup:
515 
516 	/*
517 	 * Clear any timeout requests established above.  We assume here that the
518 	 * Startup process doesn't have any other outstanding timeouts than those
519 	 * used by this function. If that stops being true, we could cancel the
520 	 * timeouts individually, but that'd be slower.
521 	 */
522 	disable_all_timeouts(false);
523 	got_standby_lock_timeout = false;
524 	got_standby_deadlock_timeout = false;
525 }
526 
527 /*
528  * ResolveRecoveryConflictWithBufferPin is called from LockBufferForCleanup()
529  * to resolve conflicts with other backends holding buffer pins.
530  *
531  * The ProcWaitForSignal() sleep normally done in LockBufferForCleanup()
532  * (when not InHotStandby) is performed here, for code clarity.
533  *
534  * We either resolve conflicts immediately or set a timeout to wake us at
535  * the limit of our patience.
536  *
537  * Resolve conflicts by sending a PROCSIG signal to all backends to check if
538  * they hold one of the buffer pins that is blocking Startup process. If so,
539  * those backends will take an appropriate error action, ERROR or FATAL.
540  *
541  * We also must check for deadlocks.  Deadlocks occur because if queries
542  * wait on a lock, that must be behind an AccessExclusiveLock, which can only
543  * be cleared if the Startup process replays a transaction completion record.
544  * If Startup process is also waiting then that is a deadlock. The deadlock
545  * can occur if the query is waiting and then the Startup sleeps, or if
546  * Startup is sleeping and the query waits on a lock. We protect against
547  * only the former sequence here, the latter sequence is checked prior to
548  * the query sleeping, in CheckRecoveryConflictDeadlock().
549  *
550  * Deadlocks are extremely rare, and relatively expensive to check for,
551  * so we don't do a deadlock check right away ... only if we have had to wait
552  * at least deadlock_timeout.
553  */
554 void
ResolveRecoveryConflictWithBufferPin(void)555 ResolveRecoveryConflictWithBufferPin(void)
556 {
557 	TimestampTz ltime;
558 
559 	Assert(InHotStandby);
560 
561 	ltime = GetStandbyLimitTime();
562 
563 	if (GetCurrentTimestamp() >= ltime && ltime != 0)
564 	{
565 		/*
566 		 * We're already behind, so clear a path as quickly as possible.
567 		 */
568 		SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
569 	}
570 	else
571 	{
572 		/*
573 		 * Wake up at ltime, and check for deadlocks as well if we will be
574 		 * waiting longer than deadlock_timeout
575 		 */
576 		EnableTimeoutParams timeouts[2];
577 		int			cnt = 0;
578 
579 		if (ltime != 0)
580 		{
581 			timeouts[cnt].id = STANDBY_TIMEOUT;
582 			timeouts[cnt].type = TMPARAM_AT;
583 			timeouts[cnt].fin_time = ltime;
584 			cnt++;
585 		}
586 
587 		got_standby_deadlock_timeout = false;
588 		timeouts[cnt].id = STANDBY_DEADLOCK_TIMEOUT;
589 		timeouts[cnt].type = TMPARAM_AFTER;
590 		timeouts[cnt].delay_ms = DeadlockTimeout;
591 		cnt++;
592 
593 		enable_timeouts(timeouts, cnt);
594 	}
595 
596 	/* Wait to be signaled by UnpinBuffer() */
597 	ProcWaitForSignal(PG_WAIT_BUFFER_PIN);
598 
599 	if (got_standby_deadlock_timeout)
600 	{
601 		/*
602 		 * Send out a request for hot-standby backends to check themselves for
603 		 * deadlocks.
604 		 *
605 		 * XXX The subsequent ResolveRecoveryConflictWithBufferPin() will wait
606 		 * to be signaled by UnpinBuffer() again and send a request for
607 		 * deadlocks check if deadlock_timeout happens. This causes the
608 		 * request to continue to be sent every deadlock_timeout until the
609 		 * buffer is unpinned or ltime is reached. This would increase the
610 		 * workload in the startup process and backends. In practice it may
611 		 * not be so harmful because the period that the buffer is kept pinned
612 		 * is basically no so long. But we should fix this?
613 		 */
614 		SendRecoveryConflictWithBufferPin(
615 										  PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
616 	}
617 
618 	/*
619 	 * Clear any timeout requests established above.  We assume here that the
620 	 * Startup process doesn't have any other timeouts than what this function
621 	 * uses.  If that stops being true, we could cancel the timeouts
622 	 * individually, but that'd be slower.
623 	 */
624 	disable_all_timeouts(false);
625 	got_standby_deadlock_timeout = false;
626 }
627 
628 static void
SendRecoveryConflictWithBufferPin(ProcSignalReason reason)629 SendRecoveryConflictWithBufferPin(ProcSignalReason reason)
630 {
631 	Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ||
632 		   reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
633 
634 	/*
635 	 * We send signal to all backends to ask them if they are holding the
636 	 * buffer pin which is delaying the Startup process. We must not set the
637 	 * conflict flag yet, since most backends will be innocent. Let the
638 	 * SIGUSR1 handling in each backend decide their own fate.
639 	 */
640 	CancelDBBackends(InvalidOid, reason, false);
641 }
642 
643 /*
644  * In Hot Standby perform early deadlock detection.  We abort the lock
645  * wait if we are about to sleep while holding the buffer pin that Startup
646  * process is waiting for.
647  *
648  * Note: this code is pessimistic, because there is no way for it to
649  * determine whether an actual deadlock condition is present: the lock we
650  * need to wait for might be unrelated to any held by the Startup process.
651  * Sooner or later, this mechanism should get ripped out in favor of somehow
652  * accounting for buffer locks in DeadLockCheck().  However, errors here
653  * seem to be very low-probability in practice, so for now it's not worth
654  * the trouble.
655  */
656 void
CheckRecoveryConflictDeadlock(void)657 CheckRecoveryConflictDeadlock(void)
658 {
659 	Assert(!InRecovery);		/* do not call in Startup process */
660 
661 	if (!HoldingBufferPinThatDelaysRecovery())
662 		return;
663 
664 	/*
665 	 * Error message should match ProcessInterrupts() but we avoid calling
666 	 * that because we aren't handling an interrupt at this point. Note that
667 	 * we only cancel the current transaction here, so if we are in a
668 	 * subtransaction and the pin is held by a parent, then the Startup
669 	 * process will continue to wait even though we have avoided deadlock.
670 	 */
671 	ereport(ERROR,
672 			(errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
673 			 errmsg("canceling statement due to conflict with recovery"),
674 			 errdetail("User transaction caused buffer deadlock with recovery.")));
675 }
676 
677 
678 /* --------------------------------
679  *		timeout handler routines
680  * --------------------------------
681  */
682 
683 /*
684  * StandbyDeadLockHandler() will be called if STANDBY_DEADLOCK_TIMEOUT
685  * occurs before STANDBY_TIMEOUT.
686  */
687 void
StandbyDeadLockHandler(void)688 StandbyDeadLockHandler(void)
689 {
690 	got_standby_deadlock_timeout = true;
691 }
692 
693 /*
694  * StandbyTimeoutHandler() will be called if STANDBY_TIMEOUT is exceeded.
695  * Send out a request to release conflicting buffer pins unconditionally,
696  * so we can press ahead with applying changes in recovery.
697  */
698 void
StandbyTimeoutHandler(void)699 StandbyTimeoutHandler(void)
700 {
701 	/* forget any pending STANDBY_DEADLOCK_TIMEOUT request */
702 	disable_timeout(STANDBY_DEADLOCK_TIMEOUT, false);
703 
704 	SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
705 }
706 
707 /*
708  * StandbyLockTimeoutHandler() will be called if STANDBY_LOCK_TIMEOUT is exceeded.
709  */
710 void
StandbyLockTimeoutHandler(void)711 StandbyLockTimeoutHandler(void)
712 {
713 	got_standby_lock_timeout = true;
714 }
715 
716 /*
717  * -----------------------------------------------------
718  * Locking in Recovery Mode
719  * -----------------------------------------------------
720  *
721  * All locks are held by the Startup process using a single virtual
722  * transaction. This implementation is both simpler and in some senses,
723  * more correct. The locks held mean "some original transaction held
724  * this lock, so query access is not allowed at this time". So the Startup
725  * process is the proxy by which the original locks are implemented.
726  *
727  * We only keep track of AccessExclusiveLocks, which are only ever held by
728  * one transaction on one relation.
729  *
730  * We keep a hash table of lists of locks in local memory keyed by xid,
731  * RecoveryLockLists, so we can keep track of the various entries made by
732  * the Startup process's virtual xid in the shared lock table.
733  *
734  * List elements use type xl_standby_lock, since the WAL record type exactly
735  * matches the information that we need to keep track of.
736  *
737  * We use session locks rather than normal locks so we don't need
738  * ResourceOwners.
739  */
740 
741 
742 void
StandbyAcquireAccessExclusiveLock(TransactionId xid,Oid dbOid,Oid relOid)743 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
744 {
745 	RecoveryLockListsEntry *entry;
746 	xl_standby_lock *newlock;
747 	LOCKTAG		locktag;
748 	bool		found;
749 
750 	/* Already processed? */
751 	if (!TransactionIdIsValid(xid) ||
752 		TransactionIdDidCommit(xid) ||
753 		TransactionIdDidAbort(xid))
754 		return;
755 
756 	elog(trace_recovery(DEBUG4),
757 		 "adding recovery lock: db %u rel %u", dbOid, relOid);
758 
759 	/* dbOid is InvalidOid when we are locking a shared relation. */
760 	Assert(OidIsValid(relOid));
761 
762 	/* Create a new list for this xid, if we don't have one already. */
763 	entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
764 	if (!found)
765 	{
766 		entry->xid = xid;
767 		entry->locks = NIL;
768 	}
769 
770 	newlock = palloc(sizeof(xl_standby_lock));
771 	newlock->xid = xid;
772 	newlock->dbOid = dbOid;
773 	newlock->relOid = relOid;
774 	entry->locks = lappend(entry->locks, newlock);
775 
776 	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
777 
778 	(void) LockAcquire(&locktag, AccessExclusiveLock, true, false);
779 }
780 
781 static void
StandbyReleaseLockList(List * locks)782 StandbyReleaseLockList(List *locks)
783 {
784 	while (locks)
785 	{
786 		xl_standby_lock *lock = (xl_standby_lock *) linitial(locks);
787 		LOCKTAG		locktag;
788 
789 		elog(trace_recovery(DEBUG4),
790 			 "releasing recovery lock: xid %u db %u rel %u",
791 			 lock->xid, lock->dbOid, lock->relOid);
792 		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
793 		if (!LockRelease(&locktag, AccessExclusiveLock, true))
794 		{
795 			elog(LOG,
796 				 "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
797 				 lock->xid, lock->dbOid, lock->relOid);
798 			Assert(false);
799 		}
800 		pfree(lock);
801 		locks = list_delete_first(locks);
802 	}
803 }
804 
805 static void
StandbyReleaseLocks(TransactionId xid)806 StandbyReleaseLocks(TransactionId xid)
807 {
808 	RecoveryLockListsEntry *entry;
809 
810 	if (TransactionIdIsValid(xid))
811 	{
812 		if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
813 		{
814 			StandbyReleaseLockList(entry->locks);
815 			hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
816 		}
817 	}
818 	else
819 		StandbyReleaseAllLocks();
820 }
821 
822 /*
823  * Release locks for a transaction tree, starting at xid down, from
824  * RecoveryLockLists.
825  *
826  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
827  * to remove any AccessExclusiveLocks requested by a transaction.
828  */
829 void
StandbyReleaseLockTree(TransactionId xid,int nsubxids,TransactionId * subxids)830 StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
831 {
832 	int			i;
833 
834 	StandbyReleaseLocks(xid);
835 
836 	for (i = 0; i < nsubxids; i++)
837 		StandbyReleaseLocks(subxids[i]);
838 }
839 
840 /*
841  * Called at end of recovery and when we see a shutdown checkpoint.
842  */
843 void
StandbyReleaseAllLocks(void)844 StandbyReleaseAllLocks(void)
845 {
846 	HASH_SEQ_STATUS status;
847 	RecoveryLockListsEntry *entry;
848 
849 	elog(trace_recovery(DEBUG2), "release all standby locks");
850 
851 	hash_seq_init(&status, RecoveryLockLists);
852 	while ((entry = hash_seq_search(&status)))
853 	{
854 		StandbyReleaseLockList(entry->locks);
855 		hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
856 	}
857 }
858 
859 /*
860  * StandbyReleaseOldLocks
861  *		Release standby locks held by top-level XIDs that aren't running,
862  *		as long as they're not prepared transactions.
863  */
864 void
StandbyReleaseOldLocks(TransactionId oldxid)865 StandbyReleaseOldLocks(TransactionId oldxid)
866 {
867 	HASH_SEQ_STATUS status;
868 	RecoveryLockListsEntry *entry;
869 
870 	hash_seq_init(&status, RecoveryLockLists);
871 	while ((entry = hash_seq_search(&status)))
872 	{
873 		Assert(TransactionIdIsValid(entry->xid));
874 
875 		/* Skip if prepared transaction. */
876 		if (StandbyTransactionIdIsPrepared(entry->xid))
877 			continue;
878 
879 		/* Skip if >= oldxid. */
880 		if (!TransactionIdPrecedes(entry->xid, oldxid))
881 			continue;
882 
883 		/* Remove all locks and hash table entry. */
884 		StandbyReleaseLockList(entry->locks);
885 		hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
886 	}
887 }
888 
889 /*
890  * --------------------------------------------------------------------
891  *		Recovery handling for Rmgr RM_STANDBY_ID
892  *
893  * These record types will only be created if XLogStandbyInfoActive()
894  * --------------------------------------------------------------------
895  */
896 
897 void
standby_redo(XLogReaderState * record)898 standby_redo(XLogReaderState *record)
899 {
900 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
901 
902 	/* Backup blocks are not used in standby records */
903 	Assert(!XLogRecHasAnyBlockRefs(record));
904 
905 	/* Do nothing if we're not in hot standby mode */
906 	if (standbyState == STANDBY_DISABLED)
907 		return;
908 
909 	if (info == XLOG_STANDBY_LOCK)
910 	{
911 		xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
912 		int			i;
913 
914 		for (i = 0; i < xlrec->nlocks; i++)
915 			StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
916 											  xlrec->locks[i].dbOid,
917 											  xlrec->locks[i].relOid);
918 	}
919 	else if (info == XLOG_RUNNING_XACTS)
920 	{
921 		xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
922 		RunningTransactionsData running;
923 
924 		running.xcnt = xlrec->xcnt;
925 		running.subxcnt = xlrec->subxcnt;
926 		running.subxid_overflow = xlrec->subxid_overflow;
927 		running.nextXid = xlrec->nextXid;
928 		running.latestCompletedXid = xlrec->latestCompletedXid;
929 		running.oldestRunningXid = xlrec->oldestRunningXid;
930 		running.xids = xlrec->xids;
931 
932 		ProcArrayApplyRecoveryInfo(&running);
933 	}
934 	else if (info == XLOG_INVALIDATIONS)
935 	{
936 		xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
937 
938 		ProcessCommittedInvalidationMessages(xlrec->msgs,
939 											 xlrec->nmsgs,
940 											 xlrec->relcacheInitFileInval,
941 											 xlrec->dbId,
942 											 xlrec->tsId);
943 	}
944 	else
945 		elog(PANIC, "standby_redo: unknown op code %u", info);
946 }
947 
948 /*
949  * Log details of the current snapshot to WAL. This allows the snapshot state
950  * to be reconstructed on the standby and for logical decoding.
951  *
952  * This is used for Hot Standby as follows:
953  *
954  * We can move directly to STANDBY_SNAPSHOT_READY at startup if we
955  * start from a shutdown checkpoint because we know nothing was running
956  * at that time and our recovery snapshot is known empty. In the more
957  * typical case of an online checkpoint we need to jump through a few
958  * hoops to get a correct recovery snapshot and this requires a two or
959  * sometimes a three stage process.
960  *
961  * The initial snapshot must contain all running xids and all current
962  * AccessExclusiveLocks at a point in time on the standby. Assembling
963  * that information while the server is running requires many and
964  * various LWLocks, so we choose to derive that information piece by
965  * piece and then re-assemble that info on the standby. When that
966  * information is fully assembled we move to STANDBY_SNAPSHOT_READY.
967  *
968  * Since locking on the primary when we derive the information is not
969  * strict, we note that there is a time window between the derivation and
970  * writing to WAL of the derived information. That allows race conditions
971  * that we must resolve, since xids and locks may enter or leave the
972  * snapshot during that window. This creates the issue that an xid or
973  * lock may start *after* the snapshot has been derived yet *before* the
974  * snapshot is logged in the running xacts WAL record. We resolve this by
975  * starting to accumulate changes at a point just prior to when we derive
976  * the snapshot on the primary, then ignore duplicates when we later apply
977  * the snapshot from the running xacts record. This is implemented during
978  * CreateCheckpoint() where we use the logical checkpoint location as
979  * our starting point and then write the running xacts record immediately
980  * before writing the main checkpoint WAL record. Since we always start
981  * up from a checkpoint and are immediately at our starting point, we
982  * unconditionally move to STANDBY_INITIALIZED. After this point we
983  * must do 4 things:
984  *	* move shared nextXid forwards as we see new xids
985  *	* extend the clog and subtrans with each new xid
986  *	* keep track of uncommitted known assigned xids
987  *	* keep track of uncommitted AccessExclusiveLocks
988  *
989  * When we see a commit/abort we must remove known assigned xids and locks
990  * from the completing transaction. Attempted removals that cannot locate
991  * an entry are expected and must not cause an error when we are in state
992  * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
993  * KnownAssignedXidsRemove().
994  *
995  * Later, when we apply the running xact data we must be careful to ignore
996  * transactions already committed, since those commits raced ahead when
997  * making WAL entries.
998  *
999  * The loose timing also means that locks may be recorded that have a
1000  * zero xid, since xids are removed from procs before locks are removed.
1001  * So we must prune the lock list down to ensure we hold locks only for
1002  * currently running xids, performed by StandbyReleaseOldLocks().
1003  * Zero xids should no longer be possible, but we may be replaying WAL
1004  * from a time when they were possible.
1005  *
1006  * For logical decoding only the running xacts information is needed;
1007  * there's no need to look at the locking information, but it's logged anyway,
1008  * as there's no independent knob to just enable logical decoding. For
1009  * details of how this is used, check snapbuild.c's introductory comment.
1010  *
1011  *
1012  * Returns the RecPtr of the last inserted record.
1013  */
1014 XLogRecPtr
LogStandbySnapshot(void)1015 LogStandbySnapshot(void)
1016 {
1017 	XLogRecPtr	recptr;
1018 	RunningTransactions running;
1019 	xl_standby_lock *locks;
1020 	int			nlocks;
1021 
1022 	Assert(XLogStandbyInfoActive());
1023 
1024 	/*
1025 	 * Get details of any AccessExclusiveLocks being held at the moment.
1026 	 */
1027 	locks = GetRunningTransactionLocks(&nlocks);
1028 	if (nlocks > 0)
1029 		LogAccessExclusiveLocks(nlocks, locks);
1030 	pfree(locks);
1031 
1032 	/*
1033 	 * Log details of all in-progress transactions. This should be the last
1034 	 * record we write, because standby will open up when it sees this.
1035 	 */
1036 	running = GetRunningTransactionData();
1037 
1038 	/*
1039 	 * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
1040 	 * For Hot Standby this can be done before inserting the WAL record
1041 	 * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
1042 	 * the clog. For logical decoding, though, the lock can't be released
1043 	 * early because the clog might be "in the future" from the POV of the
1044 	 * historic snapshot. This would allow for situations where we're waiting
1045 	 * for the end of a transaction listed in the xl_running_xacts record
1046 	 * which, according to the WAL, has committed before the xl_running_xacts
1047 	 * record. Fortunately this routine isn't executed frequently, and it's
1048 	 * only a shared lock.
1049 	 */
1050 	if (wal_level < WAL_LEVEL_LOGICAL)
1051 		LWLockRelease(ProcArrayLock);
1052 
1053 	recptr = LogCurrentRunningXacts(running);
1054 
1055 	/* Release lock if we kept it longer ... */
1056 	if (wal_level >= WAL_LEVEL_LOGICAL)
1057 		LWLockRelease(ProcArrayLock);
1058 
1059 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
1060 	LWLockRelease(XidGenLock);
1061 
1062 	return recptr;
1063 }
1064 
1065 /*
1066  * Record an enhanced snapshot of running transactions into WAL.
1067  *
1068  * The definitions of RunningTransactionsData and xl_xact_running_xacts are
1069  * similar. We keep them separate because xl_xact_running_xacts is a
1070  * contiguous chunk of memory and never exists fully until it is assembled in
1071  * WAL. The inserted records are marked as not being important for durability,
1072  * to avoid triggering superfluous checkpoint / archiving activity.
1073  */
1074 static XLogRecPtr
LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)1075 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
1076 {
1077 	xl_running_xacts xlrec;
1078 	XLogRecPtr	recptr;
1079 
1080 	xlrec.xcnt = CurrRunningXacts->xcnt;
1081 	xlrec.subxcnt = CurrRunningXacts->subxcnt;
1082 	xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
1083 	xlrec.nextXid = CurrRunningXacts->nextXid;
1084 	xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
1085 	xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
1086 
1087 	/* Header */
1088 	XLogBeginInsert();
1089 	XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
1090 	XLogRegisterData((char *) (&xlrec), MinSizeOfXactRunningXacts);
1091 
1092 	/* array of TransactionIds */
1093 	if (xlrec.xcnt > 0)
1094 		XLogRegisterData((char *) CurrRunningXacts->xids,
1095 						 (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId));
1096 
1097 	recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS);
1098 
1099 	if (CurrRunningXacts->subxid_overflow)
1100 		elog(trace_recovery(DEBUG2),
1101 			 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1102 			 CurrRunningXacts->xcnt,
1103 			 (uint32) (recptr >> 32), (uint32) recptr,
1104 			 CurrRunningXacts->oldestRunningXid,
1105 			 CurrRunningXacts->latestCompletedXid,
1106 			 CurrRunningXacts->nextXid);
1107 	else
1108 		elog(trace_recovery(DEBUG2),
1109 			 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1110 			 CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
1111 			 (uint32) (recptr >> 32), (uint32) recptr,
1112 			 CurrRunningXacts->oldestRunningXid,
1113 			 CurrRunningXacts->latestCompletedXid,
1114 			 CurrRunningXacts->nextXid);
1115 
1116 	/*
1117 	 * Ensure running_xacts information is synced to disk not too far in the
1118 	 * future. We don't want to stall anything though (i.e. use XLogFlush()),
1119 	 * so we let the wal writer do it during normal operation.
1120 	 * XLogSetAsyncXactLSN() conveniently will mark the LSN as to-be-synced
1121 	 * and nudge the WALWriter into action if sleeping. Check
1122 	 * XLogBackgroundFlush() for details why a record might not be flushed
1123 	 * without it.
1124 	 */
1125 	XLogSetAsyncXactLSN(recptr);
1126 
1127 	return recptr;
1128 }
1129 
1130 /*
1131  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
1132  * logged, as described in backend/storage/lmgr/README.
1133  */
1134 static void
LogAccessExclusiveLocks(int nlocks,xl_standby_lock * locks)1135 LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
1136 {
1137 	xl_standby_locks xlrec;
1138 
1139 	xlrec.nlocks = nlocks;
1140 
1141 	XLogBeginInsert();
1142 	XLogRegisterData((char *) &xlrec, offsetof(xl_standby_locks, locks));
1143 	XLogRegisterData((char *) locks, nlocks * sizeof(xl_standby_lock));
1144 	XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
1145 
1146 	(void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK);
1147 }
1148 
1149 /*
1150  * Individual logging of AccessExclusiveLocks for use during LockAcquire()
1151  */
1152 void
LogAccessExclusiveLock(Oid dbOid,Oid relOid)1153 LogAccessExclusiveLock(Oid dbOid, Oid relOid)
1154 {
1155 	xl_standby_lock xlrec;
1156 
1157 	xlrec.xid = GetCurrentTransactionId();
1158 
1159 	xlrec.dbOid = dbOid;
1160 	xlrec.relOid = relOid;
1161 
1162 	LogAccessExclusiveLocks(1, &xlrec);
1163 	MyXactFlags |= XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK;
1164 }
1165 
1166 /*
1167  * Prepare to log an AccessExclusiveLock, for use during LockAcquire()
1168  */
1169 void
LogAccessExclusiveLockPrepare(void)1170 LogAccessExclusiveLockPrepare(void)
1171 {
1172 	/*
1173 	 * Ensure that a TransactionId has been assigned to this transaction, for
1174 	 * two reasons, both related to lock release on the standby. First, we
1175 	 * must assign an xid so that RecordTransactionCommit() and
1176 	 * RecordTransactionAbort() do not optimise away the transaction
1177 	 * completion record which recovery relies upon to release locks. It's a
1178 	 * hack, but for a corner case not worth adding code for into the main
1179 	 * commit path. Second, we must assign an xid before the lock is recorded
1180 	 * in shared memory, otherwise a concurrently executing
1181 	 * GetRunningTransactionLocks() might see a lock associated with an
1182 	 * InvalidTransactionId which we later assert cannot happen.
1183 	 */
1184 	(void) GetCurrentTransactionId();
1185 }
1186 
1187 /*
1188  * Emit WAL for invalidations. This currently is only used for commits without
1189  * an xid but which contain invalidations.
1190  */
1191 void
LogStandbyInvalidations(int nmsgs,SharedInvalidationMessage * msgs,bool relcacheInitFileInval)1192 LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
1193 						bool relcacheInitFileInval)
1194 {
1195 	xl_invalidations xlrec;
1196 
1197 	/* prepare record */
1198 	memset(&xlrec, 0, sizeof(xlrec));
1199 	xlrec.dbId = MyDatabaseId;
1200 	xlrec.tsId = MyDatabaseTableSpace;
1201 	xlrec.relcacheInitFileInval = relcacheInitFileInval;
1202 	xlrec.nmsgs = nmsgs;
1203 
1204 	/* perform insertion */
1205 	XLogBeginInsert();
1206 	XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
1207 	XLogRegisterData((char *) msgs,
1208 					 nmsgs * sizeof(SharedInvalidationMessage));
1209 	XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
1210 }
1211