1 /*-------------------------------------------------------------------------
2  *
3  * acquire_lock.c
4  *	  A dynamic background worker that can help your backend to acquire its locks. This is
5  *	  an intrusive way of getting your way. The primary use of this will be to allow
6  *	  master_update_node to make progress during failure. When the system cannot possibly
7  *	  finish a transaction due to the host required to finish the transaction has failed
8  *	  it might be better to actively cancel the backend instead of waiting for it to fail.
9  *
10  * This file provides infrastructure for launching exactly one a background
11  * worker for every database in which citus is used.  That background worker
12  * can then perform work like deadlock detection, prepared transaction
13  * recovery, and cleanup.
14  *
15  * Copyright (c) Citus Data, Inc.
16  *
17  *-------------------------------------------------------------------------
18  */
19 
20 
21 #include <unistd.h>
22 
23 #include "postgres.h"
24 
25 
26 #include "access/xact.h"
27 #include "catalog/pg_type.h"
28 #include "executor/spi.h"
29 #include "miscadmin.h"
30 #include "pgstat.h"
31 #include "portability/instr_time.h"
32 #include "storage/ipc.h"
33 #include "storage/latch.h"
34 #include "utils/snapmgr.h"
35 
36 #include "distributed/citus_acquire_lock.h"
37 #include "distributed/citus_safe_lib.h"
38 #include "distributed/connection_management.h"
39 #include "distributed/version_compat.h"
40 
41 /* forward declaration of background worker entrypoint */
42 extern void LockAcquireHelperMain(Datum main_arg);
43 
44 /* forward declaration of helper functions */
45 static void lock_acquire_helper_sigterm(SIGNAL_ARGS);
46 static void EnsureStopLockAcquireHelper(void *arg);
47 
48 /* LockAcquireHelperArgs contains extra arguments to be used to start the worker */
49 typedef struct LockAcquireHelperArgs
50 {
51 	Oid DatabaseId;
52 	int32 lock_cooldown;
53 } LockAcquireHelperArgs;
54 
55 static bool got_sigterm = false;
56 
57 
58 /*
59  * StartLockAcquireHelperBackgroundWorker creates a background worker that will help the
60  * backend passed in as an argument to complete. The worker that is started will be
61  * terminated once the current memory context gets reset, to make sure it is cleaned up in
62  * all situations. It is however advised to call TerminateBackgroundWorker on the handle
63  * returned on the first possible moment the help is no longer required.
64  */
65 BackgroundWorkerHandle *
StartLockAcquireHelperBackgroundWorker(int backendToHelp,int32 lock_cooldown)66 StartLockAcquireHelperBackgroundWorker(int backendToHelp, int32 lock_cooldown)
67 {
68 	BackgroundWorkerHandle *handle = NULL;
69 	LockAcquireHelperArgs args;
70 	BackgroundWorker worker;
71 	memset(&args, 0, sizeof(args));
72 	memset(&worker, 0, sizeof(worker));
73 
74 	/* collect the extra arguments required for the background worker */
75 	args.DatabaseId = MyDatabaseId;
76 	args.lock_cooldown = lock_cooldown;
77 
78 	/* construct the background worker and start it */
79 	SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
80 				 "Citus Lock Acquire Helper: %d/%u", backendToHelp, MyDatabaseId);
81 	strcpy_s(worker.bgw_type, sizeof(worker.bgw_type), "citus_lock_aqcuire");
82 
83 	worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
84 	worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
85 	worker.bgw_restart_time = BGW_NEVER_RESTART;
86 
87 	strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
88 	strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_function_name),
89 			 "LockAcquireHelperMain");
90 	worker.bgw_main_arg = Int32GetDatum(backendToHelp);
91 	worker.bgw_notify_pid = 0;
92 
93 	memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &args, sizeof(args));
94 
95 	if (!RegisterDynamicBackgroundWorker(&worker, &handle))
96 	{
97 		return NULL;
98 	}
99 
100 	MemoryContextCallback *workerCleanup = palloc0(sizeof(MemoryContextCallback));
101 	workerCleanup->func = EnsureStopLockAcquireHelper;
102 	workerCleanup->arg = handle;
103 
104 	MemoryContextRegisterResetCallback(CurrentMemoryContext, workerCleanup);
105 
106 	return handle;
107 }
108 
109 
110 /*
111  * EnsureStopLockAcquireHelper is designed to be called as a MemoryContextCallback. It
112  * takes a handle to the background worker and Terminates it. It is safe to be called on a
113  * handle that has already been terminated due to the guard around the generation number
114  * implemented in the handle by postgres.
115  */
116 static void
EnsureStopLockAcquireHelper(void * arg)117 EnsureStopLockAcquireHelper(void *arg)
118 {
119 	BackgroundWorkerHandle *handle = (BackgroundWorkerHandle *) arg;
120 	TerminateBackgroundWorker(handle);
121 }
122 
123 
124 /*
125  * Signal handler for SIGTERM
126  *		Set a flag to let the main loop to terminate, and set our latch to wake
127  *		it up.
128  */
129 static void
lock_acquire_helper_sigterm(SIGNAL_ARGS)130 lock_acquire_helper_sigterm(SIGNAL_ARGS)
131 {
132 	int save_errno = errno;
133 
134 	got_sigterm = true;
135 	SetLatch(MyLatch);
136 
137 	errno = save_errno;
138 }
139 
140 
141 /*
142  * ShouldAcquireLock tests if our backend should still proceed with acquiring the lock,
143  * and thus keep terminating conflicting backends. This function returns true until a
144  * SIGTERM, background worker termination signal, has been received.
145  *
146  * The function blocks for at most sleepms when called. During operation without being
147  * terminated this is the time between invocations to the backend termination logic.
148  */
149 static bool
ShouldAcquireLock(long sleepms)150 ShouldAcquireLock(long sleepms)
151 {
152 	/* early escape in case we already got the signal to stop acquiring the lock */
153 	if (got_sigterm)
154 	{
155 		return false;
156 	}
157 
158 	int rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
159 					   sleepms * 1L, PG_WAIT_EXTENSION);
160 	ResetLatch(MyLatch);
161 
162 	/* emergency bailout if postmaster has died */
163 	if (rc & WL_POSTMASTER_DEATH)
164 	{
165 		proc_exit(1);
166 	}
167 
168 	CHECK_FOR_INTERRUPTS();
169 
170 	return !got_sigterm;
171 }
172 
173 
174 /*
175  * LockAcquireHelperMain runs in a dynamic background worker to help master_update_node to
176  * acquire its locks.
177  */
178 void
LockAcquireHelperMain(Datum main_arg)179 LockAcquireHelperMain(Datum main_arg)
180 {
181 	int backendPid = DatumGetInt32(main_arg);
182 	StringInfoData sql;
183 	LockAcquireHelperArgs *args = (LockAcquireHelperArgs *) MyBgworkerEntry->bgw_extra;
184 	long timeout = 0;
185 	instr_time connectionStart;
186 	INSTR_TIME_SET_CURRENT(connectionStart);
187 
188 	/* parameters for sql query to be executed */
189 	const int paramCount = 1;
190 	Oid paramTypes[1] = { INT4OID };
191 	Datum paramValues[1];
192 
193 	pqsignal(SIGTERM, lock_acquire_helper_sigterm);
194 
195 	BackgroundWorkerUnblockSignals();
196 
197 	elog(LOG, "lock acquiring backend started for backend %d (cooldown %dms)", backendPid,
198 		 args->lock_cooldown);
199 
200 	/*
201 	 * this loop waits till the deadline is reached (eg. lock_cooldown has passed) OR we
202 	 * no longer need to acquire the lock due to the termination of this backend.
203 	 * Only after the timeout the code will continue with the section that will acquire
204 	 * the lock.
205 	 */
206 	do {
207 		timeout = MillisecondsToTimeout(connectionStart, args->lock_cooldown);
208 	} while (timeout > 0 && ShouldAcquireLock(timeout));
209 
210 	/* connecting to the database */
211 	BackgroundWorkerInitializeConnectionByOid(args->DatabaseId, InvalidOid, 0);
212 
213 	/*
214 	 * The query below does a self join on pg_locks to find backends that are granted a
215 	 * lock our target backend (backendPid) is waiting for. Once it found such a backend
216 	 * it terminates the backend with pg_terminate_pid.
217 	 *
218 	 * The result is are rows of pid,bool indicating backend that is terminated and the
219 	 * success of the termination. These will be logged accordingly below for an
220 	 * administrator to correlate in the logs with the termination message.
221 	 */
222 	initStringInfo(&sql);
223 	appendStringInfo(&sql,
224 					 "SELECT \n"
225 					 "    DISTINCT conflicting.pid,\n"
226 					 "    pg_terminate_backend(conflicting.pid)\n"
227 					 "  FROM pg_locks AS blocked\n"
228 					 "       JOIN pg_locks AS conflicting\n"
229 					 "         ON (conflicting.database = blocked.database\n"
230 					 "             AND conflicting.objid = blocked.objid)\n"
231 					 " WHERE conflicting.granted = true\n"
232 					 "   AND blocked.granted = false\n"
233 					 "   AND blocked.pid = $1;");
234 	paramValues[0] = Int32GetDatum(backendPid);
235 
236 	while (ShouldAcquireLock(100))
237 	{
238 		elog(LOG, "canceling competing backends for backend %d", backendPid);
239 
240 		/*
241 		 * Begin our transaction
242 		 */
243 		SetCurrentStatementStartTimestamp();
244 		StartTransactionCommand();
245 		SPI_connect();
246 		PushActiveSnapshot(GetTransactionSnapshot());
247 		pgstat_report_activity(STATE_RUNNING, sql.data);
248 
249 		int spiStatus = SPI_execute_with_args(sql.data, paramCount, paramTypes,
250 											  paramValues,
251 											  NULL, false, 0);
252 
253 		if (spiStatus == SPI_OK_SELECT)
254 		{
255 			for (uint64 row = 0; row < SPI_processed; row++)
256 			{
257 				bool isnull = false;
258 
259 				int terminatedPid = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
260 																SPI_tuptable->tupdesc,
261 																1, &isnull));
262 
263 				bool isTerminated = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0],
264 															   SPI_tuptable->tupdesc,
265 															   2, &isnull));
266 
267 				if (isTerminated)
268 				{
269 					elog(WARNING, "terminated conflicting backend %d", terminatedPid);
270 				}
271 				else
272 				{
273 					elog(INFO,
274 						 "attempt to terminate conflicting backend %d was unsuccessful",
275 						 terminatedPid);
276 				}
277 			}
278 		}
279 		else
280 		{
281 			elog(FATAL, "cannot cancel competing backends for backend %d", backendPid);
282 		}
283 
284 		/*
285 		 * And finish our transaction.
286 		 */
287 		SPI_finish();
288 		PopActiveSnapshot();
289 		CommitTransactionCommand();
290 		pgstat_report_stat(false);
291 		pgstat_report_activity(STATE_IDLE, NULL);
292 	}
293 
294 
295 	elog(LOG, "lock acquiring backend finished for backend %d", backendPid);
296 
297 	/* safely got to the end, exit without problem */
298 	proc_exit(0);
299 }
300