1 /*-------------------------------------------------------------------------
2  *
3  * backend_data.c
4  *
5  *  Infrastructure for managing per backend data that can efficiently
6  *  accessed by all sessions.
7  *
8  * Copyright (c) Citus Data, Inc.
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 #include "postgres.h"
14 
15 #include "distributed/pg_version_constants.h"
16 
17 #include "miscadmin.h"
18 
19 #include "funcapi.h"
20 #include "access/htup_details.h"
21 #include "catalog/pg_authid.h"
22 #include "catalog/pg_type.h"
23 #include "datatype/timestamp.h"
24 #include "distributed/backend_data.h"
25 #include "distributed/connection_management.h"
26 #include "distributed/listutils.h"
27 #include "distributed/lock_graph.h"
28 #include "distributed/metadata_cache.h"
29 #include "distributed/remote_commands.h"
30 #include "distributed/shared_connection_stats.h"
31 #include "distributed/transaction_identifier.h"
32 #include "distributed/tuplestore.h"
33 #include "nodes/execnodes.h"
34 #include "postmaster/autovacuum.h" /* to access autovacuum_max_workers */
35 #include "replication/walsender.h"
36 #include "storage/ipc.h"
37 #include "storage/lmgr.h"
38 #include "storage/lwlock.h"
39 #include "storage/proc.h"
40 #include "storage/spin.h"
41 #include "storage/s_lock.h"
42 #include "utils/timestamp.h"
43 
44 
45 #define GET_ACTIVE_TRANSACTION_QUERY "SELECT * FROM get_all_active_transactions();"
46 #define ACTIVE_TRANSACTION_COLUMN_COUNT 6
47 
48 /*
49  * Each backend's data reside in the shared memory
50  * on the BackendManagementShmemData.
51  */
52 typedef struct BackendManagementShmemData
53 {
54 	int trancheId;
55 	NamedLWLockTranche namedLockTranche;
56 	LWLock lock;
57 
58 	/*
59 	 * We prefer to use an atomic integer over sequences for two
60 	 * reasons (i) orders of magnitude performance difference
61 	 * (ii) allowing read-only replicas to be able to generate ids
62 	 */
63 	pg_atomic_uint64 nextTransactionNumber;
64 
65 	/*
66 	 * Total number of client backends that are authenticated.
67 	 * We only care about activeClientBackendCounter when adaptive
68 	 * connection management is enabled, otherwise always zero.
69 	 *
70 	 * Note that the counter does not consider any background workers
71 	 * or such, it only counts client_backends.
72 	 */
73 	pg_atomic_uint32 activeClientBackendCounter;
74 
75 	BackendData backends[FLEXIBLE_ARRAY_MEMBER];
76 } BackendManagementShmemData;
77 
78 
79 static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc
80 									   tupleDescriptor);
81 
82 static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
83 static BackendManagementShmemData *backendManagementShmemData = NULL;
84 static BackendData *MyBackendData = NULL;
85 
86 
87 static void BackendManagementShmemInit(void);
88 static size_t BackendManagementShmemSize(void);
89 
90 
91 PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
92 PG_FUNCTION_INFO_V1(get_current_transaction_id);
93 PG_FUNCTION_INFO_V1(get_global_active_transactions);
94 PG_FUNCTION_INFO_V1(get_all_active_transactions);
95 
96 
97 /*
98  * assign_distributed_transaction_id updates the shared memory allocated for this backend
99  * and sets initiatorNodeIdentifier, transactionNumber, timestamp fields with the given
100  * inputs. Also, the function sets the database id and process id via the information that
101  * Postgres provides.
102  *
103  * This function is only intended for internal use for managing distributed transactions.
104  * Users should not use this function for any purpose.
105  */
106 Datum
assign_distributed_transaction_id(PG_FUNCTION_ARGS)107 assign_distributed_transaction_id(PG_FUNCTION_ARGS)
108 {
109 	CheckCitusVersion(ERROR);
110 
111 	Oid userId = GetUserId();
112 
113 	/* prepare data before acquiring spinlock to protect against errors */
114 	int32 initiatorNodeIdentifier = PG_GETARG_INT32(0);
115 	uint64 transactionNumber = PG_GETARG_INT64(1);
116 	TimestampTz timestamp = PG_GETARG_TIMESTAMPTZ(2);
117 
118 	/* MyBackendData should always be avaliable, just out of paranoia */
119 	if (!MyBackendData)
120 	{
121 		ereport(ERROR, (errmsg("backend is not ready for distributed transactions")));
122 	}
123 
124 	/*
125 	 * Note that we don't need to lock shared memory (i.e., LockBackendSharedMemory()) here
126 	 * since this function is executed after AssignDistributedTransactionId() issued on the
127 	 * initiator node, which already takes the required lock to enforce the consistency.
128 	 */
129 
130 	SpinLockAcquire(&MyBackendData->mutex);
131 
132 	/* if an id is already assigned, release the lock and error */
133 	if (MyBackendData->transactionId.transactionNumber != 0)
134 	{
135 		SpinLockRelease(&MyBackendData->mutex);
136 
137 		ereport(ERROR, (errmsg("the backend has already been assigned a "
138 							   "transaction id")));
139 	}
140 
141 	MyBackendData->databaseId = MyDatabaseId;
142 	MyBackendData->userId = userId;
143 
144 	MyBackendData->transactionId.initiatorNodeIdentifier = initiatorNodeIdentifier;
145 	MyBackendData->transactionId.transactionNumber = transactionNumber;
146 	MyBackendData->transactionId.timestamp = timestamp;
147 	MyBackendData->transactionId.transactionOriginator = false;
148 
149 	MyBackendData->citusBackend.initiatorNodeIdentifier =
150 		MyBackendData->transactionId.initiatorNodeIdentifier;
151 	MyBackendData->citusBackend.transactionOriginator = false;
152 
153 	SpinLockRelease(&MyBackendData->mutex);
154 
155 	PG_RETURN_VOID();
156 }
157 
158 
159 /*
160  * get_current_transaction_id returns a tuple with (databaseId, processId,
161  * initiatorNodeIdentifier, transactionNumber, timestamp) that exists in the
162  * shared memory associated with this backend. Note that if the backend
163  * is not in a transaction, the function returns uninitialized data where
164  * transactionNumber equals to 0.
165  */
166 Datum
get_current_transaction_id(PG_FUNCTION_ARGS)167 get_current_transaction_id(PG_FUNCTION_ARGS)
168 {
169 	CheckCitusVersion(ERROR);
170 
171 	TupleDesc tupleDescriptor = NULL;
172 
173 	Datum values[5];
174 	bool isNulls[5];
175 
176 
177 	/* build a tuple descriptor for our result type */
178 	if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
179 	{
180 		elog(ERROR, "return type must be a row type");
181 	}
182 
183 	/* MyBackendData should always be avaliable, just out of paranoia */
184 	if (!MyBackendData)
185 	{
186 		ereport(ERROR, (errmsg("backend is not ready for distributed transactions")));
187 	}
188 
189 	DistributedTransactionId *distributedTransctionId =
190 		GetCurrentDistributedTransactionId();
191 
192 	memset(values, 0, sizeof(values));
193 	memset(isNulls, false, sizeof(isNulls));
194 
195 	/* first two fields do not change for this backend, so get directly */
196 	values[0] = ObjectIdGetDatum(MyDatabaseId);
197 	values[1] = Int32GetDatum(MyProcPid);
198 
199 	values[2] = Int32GetDatum(distributedTransctionId->initiatorNodeIdentifier);
200 	values[3] = UInt64GetDatum(distributedTransctionId->transactionNumber);
201 
202 	/* provide a better output */
203 	if (distributedTransctionId->initiatorNodeIdentifier != 0)
204 	{
205 		values[4] = TimestampTzGetDatum(distributedTransctionId->timestamp);
206 	}
207 	else
208 	{
209 		isNulls[4] = true;
210 	}
211 
212 	HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
213 
214 	PG_RETURN_DATUM(HeapTupleGetDatum(heapTuple));
215 }
216 
217 
218 /*
219  * get_global_active_transactions returns all the available information about all
220  * the active backends from each node of the cluster. If you call that function from
221  * the coordinator, it will returns back active transaction from the coordinator as
222  * well. Yet, if you call it from the worker, result won't include the transactions
223  * on the coordinator node, since worker nodes are not aware of the coordinator.
224  */
225 Datum
get_global_active_transactions(PG_FUNCTION_ARGS)226 get_global_active_transactions(PG_FUNCTION_ARGS)
227 {
228 	CheckCitusVersion(ERROR);
229 
230 	TupleDesc tupleDescriptor = NULL;
231 	List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
232 	List *connectionList = NIL;
233 	StringInfo queryToSend = makeStringInfo();
234 
235 	Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
236 
237 	appendStringInfo(queryToSend, GET_ACTIVE_TRANSACTION_QUERY);
238 
239 	/* add active transactions for local node */
240 	StoreAllActiveTransactions(tupleStore, tupleDescriptor);
241 
242 	int32 localGroupId = GetLocalGroupId();
243 
244 	/* open connections in parallel */
245 	WorkerNode *workerNode = NULL;
246 	foreach_ptr(workerNode, workerNodeList)
247 	{
248 		const char *nodeName = workerNode->workerName;
249 		int nodePort = workerNode->workerPort;
250 		int connectionFlags = 0;
251 
252 		if (workerNode->groupId == localGroupId)
253 		{
254 			/* we already get these transactions via GetAllActiveTransactions() */
255 			continue;
256 		}
257 
258 		MultiConnection *connection = StartNodeConnection(connectionFlags, nodeName,
259 														  nodePort);
260 
261 		connectionList = lappend(connectionList, connection);
262 	}
263 
264 	FinishConnectionListEstablishment(connectionList);
265 
266 	/* send commands in parallel */
267 	MultiConnection *connection = NULL;
268 	foreach_ptr(connection, connectionList)
269 	{
270 		int querySent = SendRemoteCommand(connection, queryToSend->data);
271 		if (querySent == 0)
272 		{
273 			ReportConnectionError(connection, WARNING);
274 		}
275 	}
276 
277 	/* receive query results */
278 	foreach_ptr(connection, connectionList)
279 	{
280 		bool raiseInterrupts = true;
281 		Datum values[ACTIVE_TRANSACTION_COLUMN_COUNT];
282 		bool isNulls[ACTIVE_TRANSACTION_COLUMN_COUNT];
283 
284 		if (PQstatus(connection->pgConn) != CONNECTION_OK)
285 		{
286 			continue;
287 		}
288 
289 		PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
290 		if (!IsResponseOK(result))
291 		{
292 			ReportResultError(connection, result, WARNING);
293 			continue;
294 		}
295 
296 		int64 rowCount = PQntuples(result);
297 		int64 colCount = PQnfields(result);
298 
299 		/* Although it is not expected */
300 		if (colCount != ACTIVE_TRANSACTION_COLUMN_COUNT)
301 		{
302 			ereport(WARNING, (errmsg("unexpected number of columns from "
303 									 "get_all_active_transactions")));
304 			continue;
305 		}
306 
307 		for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
308 		{
309 			memset(values, 0, sizeof(values));
310 			memset(isNulls, false, sizeof(isNulls));
311 
312 			values[0] = ParseIntField(result, rowIndex, 0);
313 			values[1] = ParseIntField(result, rowIndex, 1);
314 			values[2] = ParseIntField(result, rowIndex, 2);
315 			values[3] = ParseBoolField(result, rowIndex, 3);
316 			values[4] = ParseIntField(result, rowIndex, 4);
317 			values[5] = ParseTimestampTzField(result, rowIndex, 5);
318 
319 			tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
320 		}
321 
322 		PQclear(result);
323 		ForgetResults(connection);
324 	}
325 
326 	/* clean up and return the tuplestore */
327 	tuplestore_donestoring(tupleStore);
328 
329 	PG_RETURN_VOID();
330 }
331 
332 
333 /*
334  * get_all_active_transactions returns all the avaliable information about all
335  * the active backends.
336  */
337 Datum
get_all_active_transactions(PG_FUNCTION_ARGS)338 get_all_active_transactions(PG_FUNCTION_ARGS)
339 {
340 	CheckCitusVersion(ERROR);
341 
342 	TupleDesc tupleDescriptor = NULL;
343 	Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
344 
345 	StoreAllActiveTransactions(tupleStore, tupleDescriptor);
346 
347 	/* clean up and return the tuplestore */
348 	tuplestore_donestoring(tupleStore);
349 
350 	PG_RETURN_VOID();
351 }
352 
353 
354 /*
355  * StoreAllActiveTransactions gets active transaction from the local node and inserts
356  * them into the given tuplestore.
357  */
358 static void
StoreAllActiveTransactions(Tuplestorestate * tupleStore,TupleDesc tupleDescriptor)359 StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
360 {
361 	Datum values[ACTIVE_TRANSACTION_COLUMN_COUNT];
362 	bool isNulls[ACTIVE_TRANSACTION_COLUMN_COUNT];
363 	bool showAllTransactions = superuser();
364 	const Oid userId = GetUserId();
365 
366 	/*
367 	 * We don't want to initialize memory while spinlock is held so we
368 	 * prefer to do it here. This initialization is done only for the first
369 	 * row.
370 	 */
371 	memset(values, 0, sizeof(values));
372 	memset(isNulls, false, sizeof(isNulls));
373 
374 	if (is_member_of_role(userId, ROLE_PG_MONITOR))
375 	{
376 		showAllTransactions = true;
377 	}
378 
379 	/* we're reading all distributed transactions, prevent new backends */
380 	LockBackendSharedMemory(LW_SHARED);
381 
382 	for (int backendIndex = 0; backendIndex < MaxBackends; ++backendIndex)
383 	{
384 		BackendData *currentBackend =
385 			&backendManagementShmemData->backends[backendIndex];
386 
387 		/* to work on data after releasing g spinlock to protect against errors */
388 		int initiatorNodeIdentifier = -1;
389 		uint64 transactionNumber = 0;
390 
391 		SpinLockAcquire(&currentBackend->mutex);
392 
393 		/* we're only interested in backends initiated by Citus */
394 		if (currentBackend->citusBackend.initiatorNodeIdentifier < 0)
395 		{
396 			SpinLockRelease(&currentBackend->mutex);
397 			continue;
398 		}
399 
400 		/*
401 		 * Unless the user has a role that allows seeing all transactions (superuser,
402 		 * pg_monitor), skip over transactions belonging to other users.
403 		 */
404 		if (!showAllTransactions && currentBackend->userId != userId)
405 		{
406 			SpinLockRelease(&currentBackend->mutex);
407 			continue;
408 		}
409 
410 		Oid databaseId = currentBackend->databaseId;
411 		int backendPid = ProcGlobal->allProcs[backendIndex].pid;
412 		initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier;
413 
414 		/*
415 		 * We prefer to use worker_query instead of transactionOriginator in the user facing
416 		 * functions since its more intuitive. Thus, we negate the result before returning.
417 		 *
418 		 * We prefer to use citusBackend's transactionOriginator field over transactionId's
419 		 * field with the same name. The reason is that it also covers backends that are not
420 		 * inside a distributed transaction.
421 		 */
422 		bool coordinatorOriginatedQuery =
423 			currentBackend->citusBackend.transactionOriginator;
424 
425 		transactionNumber = currentBackend->transactionId.transactionNumber;
426 		TimestampTz transactionIdTimestamp = currentBackend->transactionId.timestamp;
427 
428 		SpinLockRelease(&currentBackend->mutex);
429 
430 		values[0] = ObjectIdGetDatum(databaseId);
431 		values[1] = Int32GetDatum(backendPid);
432 		values[2] = Int32GetDatum(initiatorNodeIdentifier);
433 		values[3] = !coordinatorOriginatedQuery;
434 		values[4] = UInt64GetDatum(transactionNumber);
435 		values[5] = TimestampTzGetDatum(transactionIdTimestamp);
436 
437 		tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
438 
439 		/*
440 		 * We don't want to initialize memory while spinlock is held so we
441 		 * prefer to do it here. This initialization is done for the rows
442 		 * starting from the second one.
443 		 */
444 		memset(values, 0, sizeof(values));
445 		memset(isNulls, false, sizeof(isNulls));
446 	}
447 
448 	UnlockBackendSharedMemory();
449 }
450 
451 
452 /*
453  * InitializeBackendManagement requests the necessary shared memory
454  * from Postgres and sets up the shared memory startup hook.
455  */
456 void
InitializeBackendManagement(void)457 InitializeBackendManagement(void)
458 {
459 	/* allocate shared memory */
460 	if (!IsUnderPostmaster)
461 	{
462 		RequestAddinShmemSpace(BackendManagementShmemSize());
463 	}
464 
465 	prev_shmem_startup_hook = shmem_startup_hook;
466 	shmem_startup_hook = BackendManagementShmemInit;
467 }
468 
469 
470 /*
471  * BackendManagementShmemInit is the callback that is to be called on shared
472  * memory startup hook. The function sets up the necessary shared memory
473  * segment for the backend manager.
474  */
475 static void
BackendManagementShmemInit(void)476 BackendManagementShmemInit(void)
477 {
478 	bool alreadyInitialized = false;
479 
480 	/* we may update the shmem, acquire lock exclusively */
481 	LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
482 
483 	backendManagementShmemData =
484 		(BackendManagementShmemData *) ShmemInitStruct(
485 			"Backend Management Shmem",
486 			BackendManagementShmemSize(),
487 			&alreadyInitialized);
488 
489 	if (!alreadyInitialized)
490 	{
491 		char *trancheName = "Backend Management Tranche";
492 
493 		NamedLWLockTranche *namedLockTranche =
494 			&backendManagementShmemData->namedLockTranche;
495 
496 		/* start by zeroing out all the memory */
497 		memset(backendManagementShmemData, 0,
498 			   BackendManagementShmemSize());
499 
500 		namedLockTranche->trancheId = LWLockNewTrancheId();
501 
502 		LWLockRegisterTranche(namedLockTranche->trancheId, trancheName);
503 		LWLockInitialize(&backendManagementShmemData->lock,
504 						 namedLockTranche->trancheId);
505 
506 		/* start the distributed transaction ids from 1 */
507 		pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1);
508 
509 		/* there are no active backends yet, so start with zero */
510 		pg_atomic_init_u32(&backendManagementShmemData->activeClientBackendCounter, 0);
511 
512 		/*
513 		 * We need to init per backend's spinlock before any backend
514 		 * starts its execution. Note that we initialize TotalProcs (e.g., not
515 		 * MaxBackends) since some of the blocking processes could be prepared
516 		 * transactions, which aren't covered by MaxBackends.
517 		 *
518 		 * We also initiate initiatorNodeIdentifier to -1, which can never be
519 		 * used as a node id.
520 		 */
521 		int totalProcs = TotalProcCount();
522 		for (int backendIndex = 0; backendIndex < totalProcs; ++backendIndex)
523 		{
524 			BackendData *backendData =
525 				&backendManagementShmemData->backends[backendIndex];
526 			backendData->citusBackend.initiatorNodeIdentifier = -1;
527 			SpinLockInit(&backendData->mutex);
528 		}
529 	}
530 
531 	LWLockRelease(AddinShmemInitLock);
532 
533 	if (prev_shmem_startup_hook != NULL)
534 	{
535 		prev_shmem_startup_hook();
536 	}
537 }
538 
539 
540 /*
541  * BackendManagementShmemSize returns the size that should be allocated
542  * on the shared memory for backend management.
543  */
544 static size_t
BackendManagementShmemSize(void)545 BackendManagementShmemSize(void)
546 {
547 	Size size = 0;
548 	int totalProcs = TotalProcCount();
549 
550 	size = add_size(size, sizeof(BackendManagementShmemData));
551 	size = add_size(size, mul_size(sizeof(BackendData), totalProcs));
552 
553 	return size;
554 }
555 
556 
557 /*
558  * TotalProcCount returns the total processes that could run via the current
559  * postgres server. See the details in the function comments.
560  *
561  * There is one thing we should warn the readers. Citus enforces to be loaded
562  * as the first extension in shared_preload_libraries. However, if any other
563  * extension overrides MaxConnections, autovacuum_max_workers or
564  * max_worker_processes, our reasoning in this function may not work as expected.
565  * Given that it is not a usual pattern for extension, we consider Citus' behaviour
566  * good enough for now.
567  */
568 int
TotalProcCount(void)569 TotalProcCount(void)
570 {
571 	int maxBackends = 0;
572 	int totalProcs = 0;
573 
574 #ifdef WIN32
575 
576 	/* autovacuum_max_workers is not PGDLLIMPORT, so use a high estimate for windows */
577 	int estimatedMaxAutovacuumWorkers = 30;
578 	maxBackends =
579 		MaxConnections + estimatedMaxAutovacuumWorkers + 1 + max_worker_processes;
580 #else
581 
582 	/*
583 	 * We're simply imitating Postgrsql's InitializeMaxBackends(). Given that all
584 	 * the items used here PGC_POSTMASTER, should be safe to access them
585 	 * anytime during the execution even before InitializeMaxBackends() is called.
586 	 */
587 	maxBackends = MaxConnections + autovacuum_max_workers + 1 + max_worker_processes;
588 #endif
589 
590 	/*
591 	 * We prefer to maintain space for auxiliary procs or preperad transactions in
592 	 * the backend space because they could be blocking processes and our current
593 	 * implementation of distributed deadlock detection could process them
594 	 * as a regular backend. In the future, we could consider changing deadlock
595 	 * detection algorithm to ignore auxiliary procs or prepared transactions and
596 	 * save some space.
597 	 */
598 	totalProcs = maxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts;
599 
600 	totalProcs += max_wal_senders;
601 
602 	return totalProcs;
603 }
604 
605 
606 /*
607  * InitializeBackendData initialises MyBackendData to the shared memory segment
608  * belonging to the current backend.
609  *
610  * The function is called through CitusHasBeenLoaded when we first detect that
611  * the Citus extension is present, and after any subsequent invalidation of
612  * pg_dist_partition (see InvalidateMetadataSystemCache()).
613  *
614  * We only need to initialise MyBackendData once. The only goal here is to make
615  * sure that we don't use the backend data from a previous backend with the same
616  * pgprocno. Resetting the backend data after a distributed transaction happens
617  * on COMMIT/ABORT through transaction callbacks.
618  */
619 void
InitializeBackendData(void)620 InitializeBackendData(void)
621 {
622 	if (MyBackendData != NULL)
623 	{
624 		/*
625 		 * We already initialized MyBackendData before. We definitely should
626 		 * not initialise it again, because we might be in the middle of a
627 		 * distributed transaction.
628 		 */
629 		return;
630 	}
631 
632 	MyBackendData = &backendManagementShmemData->backends[MyProc->pgprocno];
633 
634 	Assert(MyBackendData);
635 
636 	LockBackendSharedMemory(LW_EXCLUSIVE);
637 
638 	/* zero out the backend data */
639 	UnSetDistributedTransactionId();
640 
641 	UnlockBackendSharedMemory();
642 }
643 
644 
645 /*
646  * UnSetDistributedTransactionId simply acquires the mutex and resets the backend's
647  * distributed transaction data in shared memory to the initial values.
648  */
649 void
UnSetDistributedTransactionId(void)650 UnSetDistributedTransactionId(void)
651 {
652 	/* backend does not exist if the extension is not created */
653 	if (MyBackendData)
654 	{
655 		SpinLockAcquire(&MyBackendData->mutex);
656 
657 		MyBackendData->databaseId = 0;
658 		MyBackendData->userId = 0;
659 		MyBackendData->cancelledDueToDeadlock = false;
660 		MyBackendData->transactionId.initiatorNodeIdentifier = 0;
661 		MyBackendData->transactionId.transactionOriginator = false;
662 		MyBackendData->transactionId.transactionNumber = 0;
663 		MyBackendData->transactionId.timestamp = 0;
664 
665 		MyBackendData->citusBackend.initiatorNodeIdentifier = -1;
666 		MyBackendData->citusBackend.transactionOriginator = false;
667 
668 		SpinLockRelease(&MyBackendData->mutex);
669 	}
670 }
671 
672 
673 /*
674  * LockBackendSharedMemory is a simple wrapper around LWLockAcquire on the
675  * shared memory lock.
676  *
677  * We use the backend shared memory lock for preventing new backends to be part
678  * of a new distributed transaction or an existing backend to leave a distributed
679  * transaction while we're reading the all backends' data.
680  *
681  * The primary goal is to provide consistent view of the current distributed
682  * transactions while doing the deadlock detection.
683  */
684 void
LockBackendSharedMemory(LWLockMode lockMode)685 LockBackendSharedMemory(LWLockMode lockMode)
686 {
687 	LWLockAcquire(&backendManagementShmemData->lock, lockMode);
688 }
689 
690 
691 /*
692  * UnlockBackendSharedMemory is a simple wrapper around LWLockRelease on the
693  * shared memory lock.
694  */
695 void
UnlockBackendSharedMemory(void)696 UnlockBackendSharedMemory(void)
697 {
698 	LWLockRelease(&backendManagementShmemData->lock);
699 }
700 
701 
702 /*
703  * GetCurrentDistributedTransactionId reads the backend's distributed transaction id and
704  * returns a copy of it.
705  *
706  * When called from a parallel worker, it uses the parent's transaction ID per the logic
707  * in GetBackendDataForProc.
708  */
709 DistributedTransactionId *
GetCurrentDistributedTransactionId(void)710 GetCurrentDistributedTransactionId(void)
711 {
712 	DistributedTransactionId *currentDistributedTransactionId =
713 		(DistributedTransactionId *) palloc(sizeof(DistributedTransactionId));
714 	BackendData backendData;
715 
716 	GetBackendDataForProc(MyProc, &backendData);
717 
718 	currentDistributedTransactionId->initiatorNodeIdentifier =
719 		backendData.transactionId.initiatorNodeIdentifier;
720 	currentDistributedTransactionId->transactionOriginator =
721 		backendData.transactionId.transactionOriginator;
722 	currentDistributedTransactionId->transactionNumber =
723 		backendData.transactionId.transactionNumber;
724 	currentDistributedTransactionId->timestamp =
725 		backendData.transactionId.timestamp;
726 
727 	return currentDistributedTransactionId;
728 }
729 
730 
731 /*
732  * AssignDistributedTransactionId generates a new distributed transaction id and
733  * sets it for the current backend. It also sets the databaseId and
734  * processId fields.
735  *
736  * This function should only be called on UseCoordinatedTransaction(). Any other
737  * callers is very likely to break the distributed transaction management.
738  */
739 void
AssignDistributedTransactionId(void)740 AssignDistributedTransactionId(void)
741 {
742 	pg_atomic_uint64 *transactionNumberSequence =
743 		&backendManagementShmemData->nextTransactionNumber;
744 
745 	uint64 nextTransactionNumber = pg_atomic_fetch_add_u64(transactionNumberSequence, 1);
746 	int32 localGroupId = GetLocalGroupId();
747 	TimestampTz currentTimestamp = GetCurrentTimestamp();
748 	Oid userId = GetUserId();
749 
750 	SpinLockAcquire(&MyBackendData->mutex);
751 
752 	MyBackendData->databaseId = MyDatabaseId;
753 	MyBackendData->userId = userId;
754 
755 	MyBackendData->transactionId.initiatorNodeIdentifier = localGroupId;
756 	MyBackendData->transactionId.transactionOriginator = true;
757 	MyBackendData->transactionId.transactionNumber = nextTransactionNumber;
758 	MyBackendData->transactionId.timestamp = currentTimestamp;
759 
760 	MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
761 	MyBackendData->citusBackend.transactionOriginator = true;
762 
763 	SpinLockRelease(&MyBackendData->mutex);
764 }
765 
766 
767 /*
768  * MarkCitusInitiatedCoordinatorBackend sets that coordinator backend is
769  * initiated by Citus.
770  */
771 void
MarkCitusInitiatedCoordinatorBackend(void)772 MarkCitusInitiatedCoordinatorBackend(void)
773 {
774 	/*
775 	 * GetLocalGroupId may throw exception which can cause leaving spin lock
776 	 * unreleased. Calling GetLocalGroupId function before the lock to avoid this.
777 	 */
778 	int32 localGroupId = GetLocalGroupId();
779 
780 	SpinLockAcquire(&MyBackendData->mutex);
781 
782 	MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
783 	MyBackendData->citusBackend.transactionOriginator = true;
784 
785 	SpinLockRelease(&MyBackendData->mutex);
786 }
787 
788 
789 /*
790  * CurrentDistributedTransactionNumber returns the transaction number of the
791  * current distributed transaction. The caller must make sure a distributed
792  * transaction is in progress.
793  */
794 uint64
CurrentDistributedTransactionNumber(void)795 CurrentDistributedTransactionNumber(void)
796 {
797 	Assert(MyBackendData != NULL);
798 
799 	return MyBackendData->transactionId.transactionNumber;
800 }
801 
802 
803 /*
804  * GetBackendDataForProc writes the backend data for the given process to
805  * result. If the process is part of a lock group (parallel query) it
806  * returns the leader data instead.
807  */
808 void
GetBackendDataForProc(PGPROC * proc,BackendData * result)809 GetBackendDataForProc(PGPROC *proc, BackendData *result)
810 {
811 	int pgprocno = proc->pgprocno;
812 
813 	if (proc->lockGroupLeader != NULL)
814 	{
815 		pgprocno = proc->lockGroupLeader->pgprocno;
816 	}
817 
818 	BackendData *backendData = &backendManagementShmemData->backends[pgprocno];
819 
820 	SpinLockAcquire(&backendData->mutex);
821 
822 	*result = *backendData;
823 
824 	SpinLockRelease(&backendData->mutex);
825 }
826 
827 
828 /*
829  * CancelTransactionDueToDeadlock cancels the input proc and also marks the backend
830  * data with this information.
831  */
832 void
CancelTransactionDueToDeadlock(PGPROC * proc)833 CancelTransactionDueToDeadlock(PGPROC *proc)
834 {
835 	BackendData *backendData = &backendManagementShmemData->backends[proc->pgprocno];
836 
837 	/* backend might not have used citus yet and thus not initialized backend data */
838 	if (!backendData)
839 	{
840 		return;
841 	}
842 
843 	SpinLockAcquire(&backendData->mutex);
844 
845 	/* send a SIGINT only if the process is still in a distributed transaction */
846 	if (backendData->transactionId.transactionNumber != 0)
847 	{
848 		backendData->cancelledDueToDeadlock = true;
849 		SpinLockRelease(&backendData->mutex);
850 
851 		if (kill(proc->pid, SIGINT) != 0)
852 		{
853 			ereport(WARNING,
854 					(errmsg("attempted to cancel this backend (pid: %d) to resolve a "
855 							"distributed deadlock but the backend could not "
856 							"be cancelled", proc->pid)));
857 		}
858 	}
859 	else
860 	{
861 		SpinLockRelease(&backendData->mutex);
862 	}
863 }
864 
865 
866 /*
867  * MyBackendGotCancelledDueToDeadlock returns whether the current distributed
868  * transaction was cancelled due to a deadlock. If the backend is not in a
869  * distributed transaction, the function returns false.
870  * We keep some session level state to keep track of if we were cancelled
871  * because of a distributed deadlock. When clearState is true, this function
872  * also resets that state. So after calling this function with clearState true,
873  * a second would always return false.
874  */
875 bool
MyBackendGotCancelledDueToDeadlock(bool clearState)876 MyBackendGotCancelledDueToDeadlock(bool clearState)
877 {
878 	bool cancelledDueToDeadlock = false;
879 
880 	/* backend might not have used citus yet and thus not initialized backend data */
881 	if (!MyBackendData)
882 	{
883 		return false;
884 	}
885 
886 	SpinLockAcquire(&MyBackendData->mutex);
887 
888 	if (IsInDistributedTransaction(MyBackendData))
889 	{
890 		cancelledDueToDeadlock = MyBackendData->cancelledDueToDeadlock;
891 	}
892 	if (clearState)
893 	{
894 		MyBackendData->cancelledDueToDeadlock = false;
895 	}
896 
897 	SpinLockRelease(&MyBackendData->mutex);
898 
899 	return cancelledDueToDeadlock;
900 }
901 
902 
903 /*
904  * MyBackendIsInDisributedTransaction returns true if MyBackendData
905  * is in a distributed transaction.
906  */
907 bool
MyBackendIsInDisributedTransaction(void)908 MyBackendIsInDisributedTransaction(void)
909 {
910 	/* backend might not have used citus yet and thus not initialized backend data */
911 	if (!MyBackendData)
912 	{
913 		return false;
914 	}
915 
916 	return IsInDistributedTransaction(MyBackendData);
917 }
918 
919 
920 /*
921  * ActiveDistributedTransactionNumbers returns a list of pointers to
922  * transaction numbers of distributed transactions that are in progress
923  * and were started by the node on which it is called.
924  */
925 List *
ActiveDistributedTransactionNumbers(void)926 ActiveDistributedTransactionNumbers(void)
927 {
928 	List *activeTransactionNumberList = NIL;
929 
930 	/* build list of starting procs */
931 	for (int curBackend = 0; curBackend < MaxBackends; curBackend++)
932 	{
933 		PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
934 		BackendData currentBackendData;
935 
936 		if (currentProc->pid == 0)
937 		{
938 			/* unused PGPROC slot */
939 			continue;
940 		}
941 
942 		GetBackendDataForProc(currentProc, &currentBackendData);
943 
944 		if (!IsInDistributedTransaction(&currentBackendData))
945 		{
946 			/* not a distributed transaction */
947 			continue;
948 		}
949 
950 		if (!currentBackendData.transactionId.transactionOriginator)
951 		{
952 			/* not a coordinator process */
953 			continue;
954 		}
955 
956 		uint64 *transactionNumber = (uint64 *) palloc0(sizeof(uint64));
957 		*transactionNumber = currentBackendData.transactionId.transactionNumber;
958 
959 		activeTransactionNumberList = lappend(activeTransactionNumberList,
960 											  transactionNumber);
961 	}
962 
963 	return activeTransactionNumberList;
964 }
965 
966 
967 /*
968  * GetMyProcLocalTransactionId() is a wrapper for
969  * getting lxid of MyProc.
970  */
971 LocalTransactionId
GetMyProcLocalTransactionId(void)972 GetMyProcLocalTransactionId(void)
973 {
974 	return MyProc->lxid;
975 }
976 
977 
978 /*
979  * GetAllActiveClientBackendCount returns activeClientBackendCounter in
980  * the shared memory.
981  */
982 int
GetAllActiveClientBackendCount(void)983 GetAllActiveClientBackendCount(void)
984 {
985 	uint32 activeBackendCount =
986 		pg_atomic_read_u32(&backendManagementShmemData->activeClientBackendCounter);
987 
988 	return activeBackendCount;
989 }
990 
991 
992 /*
993  * IncrementClientBackendCounter increments activeClientBackendCounter in
994  * the shared memory by one.
995  */
996 void
IncrementClientBackendCounter(void)997 IncrementClientBackendCounter(void)
998 {
999 	pg_atomic_add_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1);
1000 }
1001 
1002 
1003 /*
1004  * DecrementClientBackendCounter decrements activeClientBackendCounter in
1005  * the shared memory by one.
1006  */
1007 void
DecrementClientBackendCounter(void)1008 DecrementClientBackendCounter(void)
1009 {
1010 	pg_atomic_sub_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1);
1011 }
1012