1 /*-------------------------------------------------------------------------
2 *
3 * acquire_lock.c
4 * A dynamic background worker that can help your backend to acquire its locks. This is
5 * an intrusive way of getting your way. The primary use of this will be to allow
6 * master_update_node to make progress during failure. When the system cannot possibly
7 * finish a transaction due to the host required to finish the transaction has failed
8 * it might be better to actively cancel the backend instead of waiting for it to fail.
9 *
10 * This file provides infrastructure for launching exactly one a background
11 * worker for every database in which citus is used. That background worker
12 * can then perform work like deadlock detection, prepared transaction
13 * recovery, and cleanup.
14 *
15 * Copyright (c) Citus Data, Inc.
16 *
17 *-------------------------------------------------------------------------
18 */
19
20
21 #include <unistd.h>
22
23 #include "postgres.h"
24
25
26 #include "access/xact.h"
27 #include "catalog/pg_type.h"
28 #include "executor/spi.h"
29 #include "miscadmin.h"
30 #include "pgstat.h"
31 #include "portability/instr_time.h"
32 #include "storage/ipc.h"
33 #include "storage/latch.h"
34 #include "utils/snapmgr.h"
35
36 #include "distributed/citus_acquire_lock.h"
37 #include "distributed/citus_safe_lib.h"
38 #include "distributed/connection_management.h"
39 #include "distributed/version_compat.h"
40
41 /* forward declaration of background worker entrypoint */
42 extern void LockAcquireHelperMain(Datum main_arg);
43
44 /* forward declaration of helper functions */
45 static void lock_acquire_helper_sigterm(SIGNAL_ARGS);
46 static void EnsureStopLockAcquireHelper(void *arg);
47
48 /* LockAcquireHelperArgs contains extra arguments to be used to start the worker */
49 typedef struct LockAcquireHelperArgs
50 {
51 Oid DatabaseId;
52 int32 lock_cooldown;
53 } LockAcquireHelperArgs;
54
55 static bool got_sigterm = false;
56
57
58 /*
59 * StartLockAcquireHelperBackgroundWorker creates a background worker that will help the
60 * backend passed in as an argument to complete. The worker that is started will be
61 * terminated once the current memory context gets reset, to make sure it is cleaned up in
62 * all situations. It is however advised to call TerminateBackgroundWorker on the handle
63 * returned on the first possible moment the help is no longer required.
64 */
65 BackgroundWorkerHandle *
StartLockAcquireHelperBackgroundWorker(int backendToHelp,int32 lock_cooldown)66 StartLockAcquireHelperBackgroundWorker(int backendToHelp, int32 lock_cooldown)
67 {
68 BackgroundWorkerHandle *handle = NULL;
69 LockAcquireHelperArgs args;
70 BackgroundWorker worker;
71 memset(&args, 0, sizeof(args));
72 memset(&worker, 0, sizeof(worker));
73
74 /* collect the extra arguments required for the background worker */
75 args.DatabaseId = MyDatabaseId;
76 args.lock_cooldown = lock_cooldown;
77
78 /* construct the background worker and start it */
79 SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
80 "Citus Lock Acquire Helper: %d/%u", backendToHelp, MyDatabaseId);
81 strcpy_s(worker.bgw_type, sizeof(worker.bgw_type), "citus_lock_aqcuire");
82
83 worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
84 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
85 worker.bgw_restart_time = BGW_NEVER_RESTART;
86
87 strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
88 strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_function_name),
89 "LockAcquireHelperMain");
90 worker.bgw_main_arg = Int32GetDatum(backendToHelp);
91 worker.bgw_notify_pid = 0;
92
93 memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &args, sizeof(args));
94
95 if (!RegisterDynamicBackgroundWorker(&worker, &handle))
96 {
97 return NULL;
98 }
99
100 MemoryContextCallback *workerCleanup = palloc0(sizeof(MemoryContextCallback));
101 workerCleanup->func = EnsureStopLockAcquireHelper;
102 workerCleanup->arg = handle;
103
104 MemoryContextRegisterResetCallback(CurrentMemoryContext, workerCleanup);
105
106 return handle;
107 }
108
109
110 /*
111 * EnsureStopLockAcquireHelper is designed to be called as a MemoryContextCallback. It
112 * takes a handle to the background worker and Terminates it. It is safe to be called on a
113 * handle that has already been terminated due to the guard around the generation number
114 * implemented in the handle by postgres.
115 */
116 static void
EnsureStopLockAcquireHelper(void * arg)117 EnsureStopLockAcquireHelper(void *arg)
118 {
119 BackgroundWorkerHandle *handle = (BackgroundWorkerHandle *) arg;
120 TerminateBackgroundWorker(handle);
121 }
122
123
124 /*
125 * Signal handler for SIGTERM
126 * Set a flag to let the main loop to terminate, and set our latch to wake
127 * it up.
128 */
129 static void
lock_acquire_helper_sigterm(SIGNAL_ARGS)130 lock_acquire_helper_sigterm(SIGNAL_ARGS)
131 {
132 int save_errno = errno;
133
134 got_sigterm = true;
135 SetLatch(MyLatch);
136
137 errno = save_errno;
138 }
139
140
141 /*
142 * ShouldAcquireLock tests if our backend should still proceed with acquiring the lock,
143 * and thus keep terminating conflicting backends. This function returns true until a
144 * SIGTERM, background worker termination signal, has been received.
145 *
146 * The function blocks for at most sleepms when called. During operation without being
147 * terminated this is the time between invocations to the backend termination logic.
148 */
149 static bool
ShouldAcquireLock(long sleepms)150 ShouldAcquireLock(long sleepms)
151 {
152 /* early escape in case we already got the signal to stop acquiring the lock */
153 if (got_sigterm)
154 {
155 return false;
156 }
157
158 int rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
159 sleepms * 1L, PG_WAIT_EXTENSION);
160 ResetLatch(MyLatch);
161
162 /* emergency bailout if postmaster has died */
163 if (rc & WL_POSTMASTER_DEATH)
164 {
165 proc_exit(1);
166 }
167
168 CHECK_FOR_INTERRUPTS();
169
170 return !got_sigterm;
171 }
172
173
174 /*
175 * LockAcquireHelperMain runs in a dynamic background worker to help master_update_node to
176 * acquire its locks.
177 */
178 void
LockAcquireHelperMain(Datum main_arg)179 LockAcquireHelperMain(Datum main_arg)
180 {
181 int backendPid = DatumGetInt32(main_arg);
182 StringInfoData sql;
183 LockAcquireHelperArgs *args = (LockAcquireHelperArgs *) MyBgworkerEntry->bgw_extra;
184 long timeout = 0;
185 instr_time connectionStart;
186 INSTR_TIME_SET_CURRENT(connectionStart);
187
188 /* parameters for sql query to be executed */
189 const int paramCount = 1;
190 Oid paramTypes[1] = { INT4OID };
191 Datum paramValues[1];
192
193 pqsignal(SIGTERM, lock_acquire_helper_sigterm);
194
195 BackgroundWorkerUnblockSignals();
196
197 elog(LOG, "lock acquiring backend started for backend %d (cooldown %dms)", backendPid,
198 args->lock_cooldown);
199
200 /*
201 * this loop waits till the deadline is reached (eg. lock_cooldown has passed) OR we
202 * no longer need to acquire the lock due to the termination of this backend.
203 * Only after the timeout the code will continue with the section that will acquire
204 * the lock.
205 */
206 do {
207 timeout = MillisecondsToTimeout(connectionStart, args->lock_cooldown);
208 } while (timeout > 0 && ShouldAcquireLock(timeout));
209
210 /* connecting to the database */
211 BackgroundWorkerInitializeConnectionByOid(args->DatabaseId, InvalidOid, 0);
212
213 /*
214 * The query below does a self join on pg_locks to find backends that are granted a
215 * lock our target backend (backendPid) is waiting for. Once it found such a backend
216 * it terminates the backend with pg_terminate_pid.
217 *
218 * The result is are rows of pid,bool indicating backend that is terminated and the
219 * success of the termination. These will be logged accordingly below for an
220 * administrator to correlate in the logs with the termination message.
221 */
222 initStringInfo(&sql);
223 appendStringInfo(&sql,
224 "SELECT \n"
225 " DISTINCT conflicting.pid,\n"
226 " pg_terminate_backend(conflicting.pid)\n"
227 " FROM pg_locks AS blocked\n"
228 " JOIN pg_locks AS conflicting\n"
229 " ON (conflicting.database = blocked.database\n"
230 " AND conflicting.objid = blocked.objid)\n"
231 " WHERE conflicting.granted = true\n"
232 " AND blocked.granted = false\n"
233 " AND blocked.pid = $1;");
234 paramValues[0] = Int32GetDatum(backendPid);
235
236 while (ShouldAcquireLock(100))
237 {
238 elog(LOG, "canceling competing backends for backend %d", backendPid);
239
240 /*
241 * Begin our transaction
242 */
243 SetCurrentStatementStartTimestamp();
244 StartTransactionCommand();
245 SPI_connect();
246 PushActiveSnapshot(GetTransactionSnapshot());
247 pgstat_report_activity(STATE_RUNNING, sql.data);
248
249 int spiStatus = SPI_execute_with_args(sql.data, paramCount, paramTypes,
250 paramValues,
251 NULL, false, 0);
252
253 if (spiStatus == SPI_OK_SELECT)
254 {
255 for (uint64 row = 0; row < SPI_processed; row++)
256 {
257 bool isnull = false;
258
259 int terminatedPid = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
260 SPI_tuptable->tupdesc,
261 1, &isnull));
262
263 bool isTerminated = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0],
264 SPI_tuptable->tupdesc,
265 2, &isnull));
266
267 if (isTerminated)
268 {
269 elog(WARNING, "terminated conflicting backend %d", terminatedPid);
270 }
271 else
272 {
273 elog(INFO,
274 "attempt to terminate conflicting backend %d was unsuccessful",
275 terminatedPid);
276 }
277 }
278 }
279 else
280 {
281 elog(FATAL, "cannot cancel competing backends for backend %d", backendPid);
282 }
283
284 /*
285 * And finish our transaction.
286 */
287 SPI_finish();
288 PopActiveSnapshot();
289 CommitTransactionCommand();
290 pgstat_report_stat(false);
291 pgstat_report_activity(STATE_IDLE, NULL);
292 }
293
294
295 elog(LOG, "lock acquiring backend finished for backend %d", backendPid);
296
297 /* safely got to the end, exit without problem */
298 proc_exit(0);
299 }
300