1 /*-------------------------------------------------------------------------
2 *
3 * backend_data.c
4 *
5 * Infrastructure for managing per backend data that can efficiently
6 * accessed by all sessions.
7 *
8 * Copyright (c) Citus Data, Inc.
9 *
10 *-------------------------------------------------------------------------
11 */
12
13 #include "postgres.h"
14
15 #include "distributed/pg_version_constants.h"
16
17 #include "miscadmin.h"
18
19 #include "funcapi.h"
20 #include "access/htup_details.h"
21 #include "catalog/pg_authid.h"
22 #include "catalog/pg_type.h"
23 #include "datatype/timestamp.h"
24 #include "distributed/backend_data.h"
25 #include "distributed/connection_management.h"
26 #include "distributed/listutils.h"
27 #include "distributed/lock_graph.h"
28 #include "distributed/metadata_cache.h"
29 #include "distributed/remote_commands.h"
30 #include "distributed/shared_connection_stats.h"
31 #include "distributed/transaction_identifier.h"
32 #include "distributed/tuplestore.h"
33 #include "nodes/execnodes.h"
34 #include "postmaster/autovacuum.h" /* to access autovacuum_max_workers */
35 #include "replication/walsender.h"
36 #include "storage/ipc.h"
37 #include "storage/lmgr.h"
38 #include "storage/lwlock.h"
39 #include "storage/proc.h"
40 #include "storage/spin.h"
41 #include "storage/s_lock.h"
42 #include "utils/timestamp.h"
43
44
45 #define GET_ACTIVE_TRANSACTION_QUERY "SELECT * FROM get_all_active_transactions();"
46 #define ACTIVE_TRANSACTION_COLUMN_COUNT 6
47
48 /*
49 * Each backend's data reside in the shared memory
50 * on the BackendManagementShmemData.
51 */
52 typedef struct BackendManagementShmemData
53 {
54 int trancheId;
55 NamedLWLockTranche namedLockTranche;
56 LWLock lock;
57
58 /*
59 * We prefer to use an atomic integer over sequences for two
60 * reasons (i) orders of magnitude performance difference
61 * (ii) allowing read-only replicas to be able to generate ids
62 */
63 pg_atomic_uint64 nextTransactionNumber;
64
65 /*
66 * Total number of client backends that are authenticated.
67 * We only care about activeClientBackendCounter when adaptive
68 * connection management is enabled, otherwise always zero.
69 *
70 * Note that the counter does not consider any background workers
71 * or such, it only counts client_backends.
72 */
73 pg_atomic_uint32 activeClientBackendCounter;
74
75 BackendData backends[FLEXIBLE_ARRAY_MEMBER];
76 } BackendManagementShmemData;
77
78
79 static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc
80 tupleDescriptor);
81
82 static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
83 static BackendManagementShmemData *backendManagementShmemData = NULL;
84 static BackendData *MyBackendData = NULL;
85
86
87 static void BackendManagementShmemInit(void);
88 static size_t BackendManagementShmemSize(void);
89
90
91 PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
92 PG_FUNCTION_INFO_V1(get_current_transaction_id);
93 PG_FUNCTION_INFO_V1(get_global_active_transactions);
94 PG_FUNCTION_INFO_V1(get_all_active_transactions);
95
96
97 /*
98 * assign_distributed_transaction_id updates the shared memory allocated for this backend
99 * and sets initiatorNodeIdentifier, transactionNumber, timestamp fields with the given
100 * inputs. Also, the function sets the database id and process id via the information that
101 * Postgres provides.
102 *
103 * This function is only intended for internal use for managing distributed transactions.
104 * Users should not use this function for any purpose.
105 */
106 Datum
assign_distributed_transaction_id(PG_FUNCTION_ARGS)107 assign_distributed_transaction_id(PG_FUNCTION_ARGS)
108 {
109 CheckCitusVersion(ERROR);
110
111 Oid userId = GetUserId();
112
113 /* prepare data before acquiring spinlock to protect against errors */
114 int32 initiatorNodeIdentifier = PG_GETARG_INT32(0);
115 uint64 transactionNumber = PG_GETARG_INT64(1);
116 TimestampTz timestamp = PG_GETARG_TIMESTAMPTZ(2);
117
118 /* MyBackendData should always be avaliable, just out of paranoia */
119 if (!MyBackendData)
120 {
121 ereport(ERROR, (errmsg("backend is not ready for distributed transactions")));
122 }
123
124 /*
125 * Note that we don't need to lock shared memory (i.e., LockBackendSharedMemory()) here
126 * since this function is executed after AssignDistributedTransactionId() issued on the
127 * initiator node, which already takes the required lock to enforce the consistency.
128 */
129
130 SpinLockAcquire(&MyBackendData->mutex);
131
132 /* if an id is already assigned, release the lock and error */
133 if (MyBackendData->transactionId.transactionNumber != 0)
134 {
135 SpinLockRelease(&MyBackendData->mutex);
136
137 ereport(ERROR, (errmsg("the backend has already been assigned a "
138 "transaction id")));
139 }
140
141 MyBackendData->databaseId = MyDatabaseId;
142 MyBackendData->userId = userId;
143
144 MyBackendData->transactionId.initiatorNodeIdentifier = initiatorNodeIdentifier;
145 MyBackendData->transactionId.transactionNumber = transactionNumber;
146 MyBackendData->transactionId.timestamp = timestamp;
147 MyBackendData->transactionId.transactionOriginator = false;
148
149 MyBackendData->citusBackend.initiatorNodeIdentifier =
150 MyBackendData->transactionId.initiatorNodeIdentifier;
151 MyBackendData->citusBackend.transactionOriginator = false;
152
153 SpinLockRelease(&MyBackendData->mutex);
154
155 PG_RETURN_VOID();
156 }
157
158
159 /*
160 * get_current_transaction_id returns a tuple with (databaseId, processId,
161 * initiatorNodeIdentifier, transactionNumber, timestamp) that exists in the
162 * shared memory associated with this backend. Note that if the backend
163 * is not in a transaction, the function returns uninitialized data where
164 * transactionNumber equals to 0.
165 */
166 Datum
get_current_transaction_id(PG_FUNCTION_ARGS)167 get_current_transaction_id(PG_FUNCTION_ARGS)
168 {
169 CheckCitusVersion(ERROR);
170
171 TupleDesc tupleDescriptor = NULL;
172
173 Datum values[5];
174 bool isNulls[5];
175
176
177 /* build a tuple descriptor for our result type */
178 if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
179 {
180 elog(ERROR, "return type must be a row type");
181 }
182
183 /* MyBackendData should always be avaliable, just out of paranoia */
184 if (!MyBackendData)
185 {
186 ereport(ERROR, (errmsg("backend is not ready for distributed transactions")));
187 }
188
189 DistributedTransactionId *distributedTransctionId =
190 GetCurrentDistributedTransactionId();
191
192 memset(values, 0, sizeof(values));
193 memset(isNulls, false, sizeof(isNulls));
194
195 /* first two fields do not change for this backend, so get directly */
196 values[0] = ObjectIdGetDatum(MyDatabaseId);
197 values[1] = Int32GetDatum(MyProcPid);
198
199 values[2] = Int32GetDatum(distributedTransctionId->initiatorNodeIdentifier);
200 values[3] = UInt64GetDatum(distributedTransctionId->transactionNumber);
201
202 /* provide a better output */
203 if (distributedTransctionId->initiatorNodeIdentifier != 0)
204 {
205 values[4] = TimestampTzGetDatum(distributedTransctionId->timestamp);
206 }
207 else
208 {
209 isNulls[4] = true;
210 }
211
212 HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
213
214 PG_RETURN_DATUM(HeapTupleGetDatum(heapTuple));
215 }
216
217
218 /*
219 * get_global_active_transactions returns all the available information about all
220 * the active backends from each node of the cluster. If you call that function from
221 * the coordinator, it will returns back active transaction from the coordinator as
222 * well. Yet, if you call it from the worker, result won't include the transactions
223 * on the coordinator node, since worker nodes are not aware of the coordinator.
224 */
225 Datum
get_global_active_transactions(PG_FUNCTION_ARGS)226 get_global_active_transactions(PG_FUNCTION_ARGS)
227 {
228 CheckCitusVersion(ERROR);
229
230 TupleDesc tupleDescriptor = NULL;
231 List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
232 List *connectionList = NIL;
233 StringInfo queryToSend = makeStringInfo();
234
235 Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
236
237 appendStringInfo(queryToSend, GET_ACTIVE_TRANSACTION_QUERY);
238
239 /* add active transactions for local node */
240 StoreAllActiveTransactions(tupleStore, tupleDescriptor);
241
242 int32 localGroupId = GetLocalGroupId();
243
244 /* open connections in parallel */
245 WorkerNode *workerNode = NULL;
246 foreach_ptr(workerNode, workerNodeList)
247 {
248 const char *nodeName = workerNode->workerName;
249 int nodePort = workerNode->workerPort;
250 int connectionFlags = 0;
251
252 if (workerNode->groupId == localGroupId)
253 {
254 /* we already get these transactions via GetAllActiveTransactions() */
255 continue;
256 }
257
258 MultiConnection *connection = StartNodeConnection(connectionFlags, nodeName,
259 nodePort);
260
261 connectionList = lappend(connectionList, connection);
262 }
263
264 FinishConnectionListEstablishment(connectionList);
265
266 /* send commands in parallel */
267 MultiConnection *connection = NULL;
268 foreach_ptr(connection, connectionList)
269 {
270 int querySent = SendRemoteCommand(connection, queryToSend->data);
271 if (querySent == 0)
272 {
273 ReportConnectionError(connection, WARNING);
274 }
275 }
276
277 /* receive query results */
278 foreach_ptr(connection, connectionList)
279 {
280 bool raiseInterrupts = true;
281 Datum values[ACTIVE_TRANSACTION_COLUMN_COUNT];
282 bool isNulls[ACTIVE_TRANSACTION_COLUMN_COUNT];
283
284 if (PQstatus(connection->pgConn) != CONNECTION_OK)
285 {
286 continue;
287 }
288
289 PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
290 if (!IsResponseOK(result))
291 {
292 ReportResultError(connection, result, WARNING);
293 continue;
294 }
295
296 int64 rowCount = PQntuples(result);
297 int64 colCount = PQnfields(result);
298
299 /* Although it is not expected */
300 if (colCount != ACTIVE_TRANSACTION_COLUMN_COUNT)
301 {
302 ereport(WARNING, (errmsg("unexpected number of columns from "
303 "get_all_active_transactions")));
304 continue;
305 }
306
307 for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
308 {
309 memset(values, 0, sizeof(values));
310 memset(isNulls, false, sizeof(isNulls));
311
312 values[0] = ParseIntField(result, rowIndex, 0);
313 values[1] = ParseIntField(result, rowIndex, 1);
314 values[2] = ParseIntField(result, rowIndex, 2);
315 values[3] = ParseBoolField(result, rowIndex, 3);
316 values[4] = ParseIntField(result, rowIndex, 4);
317 values[5] = ParseTimestampTzField(result, rowIndex, 5);
318
319 tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
320 }
321
322 PQclear(result);
323 ForgetResults(connection);
324 }
325
326 /* clean up and return the tuplestore */
327 tuplestore_donestoring(tupleStore);
328
329 PG_RETURN_VOID();
330 }
331
332
333 /*
334 * get_all_active_transactions returns all the avaliable information about all
335 * the active backends.
336 */
337 Datum
get_all_active_transactions(PG_FUNCTION_ARGS)338 get_all_active_transactions(PG_FUNCTION_ARGS)
339 {
340 CheckCitusVersion(ERROR);
341
342 TupleDesc tupleDescriptor = NULL;
343 Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
344
345 StoreAllActiveTransactions(tupleStore, tupleDescriptor);
346
347 /* clean up and return the tuplestore */
348 tuplestore_donestoring(tupleStore);
349
350 PG_RETURN_VOID();
351 }
352
353
354 /*
355 * StoreAllActiveTransactions gets active transaction from the local node and inserts
356 * them into the given tuplestore.
357 */
358 static void
StoreAllActiveTransactions(Tuplestorestate * tupleStore,TupleDesc tupleDescriptor)359 StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
360 {
361 Datum values[ACTIVE_TRANSACTION_COLUMN_COUNT];
362 bool isNulls[ACTIVE_TRANSACTION_COLUMN_COUNT];
363 bool showAllTransactions = superuser();
364 const Oid userId = GetUserId();
365
366 /*
367 * We don't want to initialize memory while spinlock is held so we
368 * prefer to do it here. This initialization is done only for the first
369 * row.
370 */
371 memset(values, 0, sizeof(values));
372 memset(isNulls, false, sizeof(isNulls));
373
374 if (is_member_of_role(userId, ROLE_PG_MONITOR))
375 {
376 showAllTransactions = true;
377 }
378
379 /* we're reading all distributed transactions, prevent new backends */
380 LockBackendSharedMemory(LW_SHARED);
381
382 for (int backendIndex = 0; backendIndex < MaxBackends; ++backendIndex)
383 {
384 BackendData *currentBackend =
385 &backendManagementShmemData->backends[backendIndex];
386
387 /* to work on data after releasing g spinlock to protect against errors */
388 int initiatorNodeIdentifier = -1;
389 uint64 transactionNumber = 0;
390
391 SpinLockAcquire(¤tBackend->mutex);
392
393 /* we're only interested in backends initiated by Citus */
394 if (currentBackend->citusBackend.initiatorNodeIdentifier < 0)
395 {
396 SpinLockRelease(¤tBackend->mutex);
397 continue;
398 }
399
400 /*
401 * Unless the user has a role that allows seeing all transactions (superuser,
402 * pg_monitor), skip over transactions belonging to other users.
403 */
404 if (!showAllTransactions && currentBackend->userId != userId)
405 {
406 SpinLockRelease(¤tBackend->mutex);
407 continue;
408 }
409
410 Oid databaseId = currentBackend->databaseId;
411 int backendPid = ProcGlobal->allProcs[backendIndex].pid;
412 initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier;
413
414 /*
415 * We prefer to use worker_query instead of transactionOriginator in the user facing
416 * functions since its more intuitive. Thus, we negate the result before returning.
417 *
418 * We prefer to use citusBackend's transactionOriginator field over transactionId's
419 * field with the same name. The reason is that it also covers backends that are not
420 * inside a distributed transaction.
421 */
422 bool coordinatorOriginatedQuery =
423 currentBackend->citusBackend.transactionOriginator;
424
425 transactionNumber = currentBackend->transactionId.transactionNumber;
426 TimestampTz transactionIdTimestamp = currentBackend->transactionId.timestamp;
427
428 SpinLockRelease(¤tBackend->mutex);
429
430 values[0] = ObjectIdGetDatum(databaseId);
431 values[1] = Int32GetDatum(backendPid);
432 values[2] = Int32GetDatum(initiatorNodeIdentifier);
433 values[3] = !coordinatorOriginatedQuery;
434 values[4] = UInt64GetDatum(transactionNumber);
435 values[5] = TimestampTzGetDatum(transactionIdTimestamp);
436
437 tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
438
439 /*
440 * We don't want to initialize memory while spinlock is held so we
441 * prefer to do it here. This initialization is done for the rows
442 * starting from the second one.
443 */
444 memset(values, 0, sizeof(values));
445 memset(isNulls, false, sizeof(isNulls));
446 }
447
448 UnlockBackendSharedMemory();
449 }
450
451
452 /*
453 * InitializeBackendManagement requests the necessary shared memory
454 * from Postgres and sets up the shared memory startup hook.
455 */
456 void
InitializeBackendManagement(void)457 InitializeBackendManagement(void)
458 {
459 /* allocate shared memory */
460 if (!IsUnderPostmaster)
461 {
462 RequestAddinShmemSpace(BackendManagementShmemSize());
463 }
464
465 prev_shmem_startup_hook = shmem_startup_hook;
466 shmem_startup_hook = BackendManagementShmemInit;
467 }
468
469
470 /*
471 * BackendManagementShmemInit is the callback that is to be called on shared
472 * memory startup hook. The function sets up the necessary shared memory
473 * segment for the backend manager.
474 */
475 static void
BackendManagementShmemInit(void)476 BackendManagementShmemInit(void)
477 {
478 bool alreadyInitialized = false;
479
480 /* we may update the shmem, acquire lock exclusively */
481 LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
482
483 backendManagementShmemData =
484 (BackendManagementShmemData *) ShmemInitStruct(
485 "Backend Management Shmem",
486 BackendManagementShmemSize(),
487 &alreadyInitialized);
488
489 if (!alreadyInitialized)
490 {
491 char *trancheName = "Backend Management Tranche";
492
493 NamedLWLockTranche *namedLockTranche =
494 &backendManagementShmemData->namedLockTranche;
495
496 /* start by zeroing out all the memory */
497 memset(backendManagementShmemData, 0,
498 BackendManagementShmemSize());
499
500 namedLockTranche->trancheId = LWLockNewTrancheId();
501
502 LWLockRegisterTranche(namedLockTranche->trancheId, trancheName);
503 LWLockInitialize(&backendManagementShmemData->lock,
504 namedLockTranche->trancheId);
505
506 /* start the distributed transaction ids from 1 */
507 pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1);
508
509 /* there are no active backends yet, so start with zero */
510 pg_atomic_init_u32(&backendManagementShmemData->activeClientBackendCounter, 0);
511
512 /*
513 * We need to init per backend's spinlock before any backend
514 * starts its execution. Note that we initialize TotalProcs (e.g., not
515 * MaxBackends) since some of the blocking processes could be prepared
516 * transactions, which aren't covered by MaxBackends.
517 *
518 * We also initiate initiatorNodeIdentifier to -1, which can never be
519 * used as a node id.
520 */
521 int totalProcs = TotalProcCount();
522 for (int backendIndex = 0; backendIndex < totalProcs; ++backendIndex)
523 {
524 BackendData *backendData =
525 &backendManagementShmemData->backends[backendIndex];
526 backendData->citusBackend.initiatorNodeIdentifier = -1;
527 SpinLockInit(&backendData->mutex);
528 }
529 }
530
531 LWLockRelease(AddinShmemInitLock);
532
533 if (prev_shmem_startup_hook != NULL)
534 {
535 prev_shmem_startup_hook();
536 }
537 }
538
539
540 /*
541 * BackendManagementShmemSize returns the size that should be allocated
542 * on the shared memory for backend management.
543 */
544 static size_t
BackendManagementShmemSize(void)545 BackendManagementShmemSize(void)
546 {
547 Size size = 0;
548 int totalProcs = TotalProcCount();
549
550 size = add_size(size, sizeof(BackendManagementShmemData));
551 size = add_size(size, mul_size(sizeof(BackendData), totalProcs));
552
553 return size;
554 }
555
556
557 /*
558 * TotalProcCount returns the total processes that could run via the current
559 * postgres server. See the details in the function comments.
560 *
561 * There is one thing we should warn the readers. Citus enforces to be loaded
562 * as the first extension in shared_preload_libraries. However, if any other
563 * extension overrides MaxConnections, autovacuum_max_workers or
564 * max_worker_processes, our reasoning in this function may not work as expected.
565 * Given that it is not a usual pattern for extension, we consider Citus' behaviour
566 * good enough for now.
567 */
568 int
TotalProcCount(void)569 TotalProcCount(void)
570 {
571 int maxBackends = 0;
572 int totalProcs = 0;
573
574 #ifdef WIN32
575
576 /* autovacuum_max_workers is not PGDLLIMPORT, so use a high estimate for windows */
577 int estimatedMaxAutovacuumWorkers = 30;
578 maxBackends =
579 MaxConnections + estimatedMaxAutovacuumWorkers + 1 + max_worker_processes;
580 #else
581
582 /*
583 * We're simply imitating Postgrsql's InitializeMaxBackends(). Given that all
584 * the items used here PGC_POSTMASTER, should be safe to access them
585 * anytime during the execution even before InitializeMaxBackends() is called.
586 */
587 maxBackends = MaxConnections + autovacuum_max_workers + 1 + max_worker_processes;
588 #endif
589
590 /*
591 * We prefer to maintain space for auxiliary procs or preperad transactions in
592 * the backend space because they could be blocking processes and our current
593 * implementation of distributed deadlock detection could process them
594 * as a regular backend. In the future, we could consider changing deadlock
595 * detection algorithm to ignore auxiliary procs or prepared transactions and
596 * save some space.
597 */
598 totalProcs = maxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts;
599
600 totalProcs += max_wal_senders;
601
602 return totalProcs;
603 }
604
605
606 /*
607 * InitializeBackendData initialises MyBackendData to the shared memory segment
608 * belonging to the current backend.
609 *
610 * The function is called through CitusHasBeenLoaded when we first detect that
611 * the Citus extension is present, and after any subsequent invalidation of
612 * pg_dist_partition (see InvalidateMetadataSystemCache()).
613 *
614 * We only need to initialise MyBackendData once. The only goal here is to make
615 * sure that we don't use the backend data from a previous backend with the same
616 * pgprocno. Resetting the backend data after a distributed transaction happens
617 * on COMMIT/ABORT through transaction callbacks.
618 */
619 void
InitializeBackendData(void)620 InitializeBackendData(void)
621 {
622 if (MyBackendData != NULL)
623 {
624 /*
625 * We already initialized MyBackendData before. We definitely should
626 * not initialise it again, because we might be in the middle of a
627 * distributed transaction.
628 */
629 return;
630 }
631
632 MyBackendData = &backendManagementShmemData->backends[MyProc->pgprocno];
633
634 Assert(MyBackendData);
635
636 LockBackendSharedMemory(LW_EXCLUSIVE);
637
638 /* zero out the backend data */
639 UnSetDistributedTransactionId();
640
641 UnlockBackendSharedMemory();
642 }
643
644
645 /*
646 * UnSetDistributedTransactionId simply acquires the mutex and resets the backend's
647 * distributed transaction data in shared memory to the initial values.
648 */
649 void
UnSetDistributedTransactionId(void)650 UnSetDistributedTransactionId(void)
651 {
652 /* backend does not exist if the extension is not created */
653 if (MyBackendData)
654 {
655 SpinLockAcquire(&MyBackendData->mutex);
656
657 MyBackendData->databaseId = 0;
658 MyBackendData->userId = 0;
659 MyBackendData->cancelledDueToDeadlock = false;
660 MyBackendData->transactionId.initiatorNodeIdentifier = 0;
661 MyBackendData->transactionId.transactionOriginator = false;
662 MyBackendData->transactionId.transactionNumber = 0;
663 MyBackendData->transactionId.timestamp = 0;
664
665 MyBackendData->citusBackend.initiatorNodeIdentifier = -1;
666 MyBackendData->citusBackend.transactionOriginator = false;
667
668 SpinLockRelease(&MyBackendData->mutex);
669 }
670 }
671
672
673 /*
674 * LockBackendSharedMemory is a simple wrapper around LWLockAcquire on the
675 * shared memory lock.
676 *
677 * We use the backend shared memory lock for preventing new backends to be part
678 * of a new distributed transaction or an existing backend to leave a distributed
679 * transaction while we're reading the all backends' data.
680 *
681 * The primary goal is to provide consistent view of the current distributed
682 * transactions while doing the deadlock detection.
683 */
684 void
LockBackendSharedMemory(LWLockMode lockMode)685 LockBackendSharedMemory(LWLockMode lockMode)
686 {
687 LWLockAcquire(&backendManagementShmemData->lock, lockMode);
688 }
689
690
691 /*
692 * UnlockBackendSharedMemory is a simple wrapper around LWLockRelease on the
693 * shared memory lock.
694 */
695 void
UnlockBackendSharedMemory(void)696 UnlockBackendSharedMemory(void)
697 {
698 LWLockRelease(&backendManagementShmemData->lock);
699 }
700
701
702 /*
703 * GetCurrentDistributedTransactionId reads the backend's distributed transaction id and
704 * returns a copy of it.
705 *
706 * When called from a parallel worker, it uses the parent's transaction ID per the logic
707 * in GetBackendDataForProc.
708 */
709 DistributedTransactionId *
GetCurrentDistributedTransactionId(void)710 GetCurrentDistributedTransactionId(void)
711 {
712 DistributedTransactionId *currentDistributedTransactionId =
713 (DistributedTransactionId *) palloc(sizeof(DistributedTransactionId));
714 BackendData backendData;
715
716 GetBackendDataForProc(MyProc, &backendData);
717
718 currentDistributedTransactionId->initiatorNodeIdentifier =
719 backendData.transactionId.initiatorNodeIdentifier;
720 currentDistributedTransactionId->transactionOriginator =
721 backendData.transactionId.transactionOriginator;
722 currentDistributedTransactionId->transactionNumber =
723 backendData.transactionId.transactionNumber;
724 currentDistributedTransactionId->timestamp =
725 backendData.transactionId.timestamp;
726
727 return currentDistributedTransactionId;
728 }
729
730
731 /*
732 * AssignDistributedTransactionId generates a new distributed transaction id and
733 * sets it for the current backend. It also sets the databaseId and
734 * processId fields.
735 *
736 * This function should only be called on UseCoordinatedTransaction(). Any other
737 * callers is very likely to break the distributed transaction management.
738 */
739 void
AssignDistributedTransactionId(void)740 AssignDistributedTransactionId(void)
741 {
742 pg_atomic_uint64 *transactionNumberSequence =
743 &backendManagementShmemData->nextTransactionNumber;
744
745 uint64 nextTransactionNumber = pg_atomic_fetch_add_u64(transactionNumberSequence, 1);
746 int32 localGroupId = GetLocalGroupId();
747 TimestampTz currentTimestamp = GetCurrentTimestamp();
748 Oid userId = GetUserId();
749
750 SpinLockAcquire(&MyBackendData->mutex);
751
752 MyBackendData->databaseId = MyDatabaseId;
753 MyBackendData->userId = userId;
754
755 MyBackendData->transactionId.initiatorNodeIdentifier = localGroupId;
756 MyBackendData->transactionId.transactionOriginator = true;
757 MyBackendData->transactionId.transactionNumber = nextTransactionNumber;
758 MyBackendData->transactionId.timestamp = currentTimestamp;
759
760 MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
761 MyBackendData->citusBackend.transactionOriginator = true;
762
763 SpinLockRelease(&MyBackendData->mutex);
764 }
765
766
767 /*
768 * MarkCitusInitiatedCoordinatorBackend sets that coordinator backend is
769 * initiated by Citus.
770 */
771 void
MarkCitusInitiatedCoordinatorBackend(void)772 MarkCitusInitiatedCoordinatorBackend(void)
773 {
774 /*
775 * GetLocalGroupId may throw exception which can cause leaving spin lock
776 * unreleased. Calling GetLocalGroupId function before the lock to avoid this.
777 */
778 int32 localGroupId = GetLocalGroupId();
779
780 SpinLockAcquire(&MyBackendData->mutex);
781
782 MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
783 MyBackendData->citusBackend.transactionOriginator = true;
784
785 SpinLockRelease(&MyBackendData->mutex);
786 }
787
788
789 /*
790 * CurrentDistributedTransactionNumber returns the transaction number of the
791 * current distributed transaction. The caller must make sure a distributed
792 * transaction is in progress.
793 */
794 uint64
CurrentDistributedTransactionNumber(void)795 CurrentDistributedTransactionNumber(void)
796 {
797 Assert(MyBackendData != NULL);
798
799 return MyBackendData->transactionId.transactionNumber;
800 }
801
802
803 /*
804 * GetBackendDataForProc writes the backend data for the given process to
805 * result. If the process is part of a lock group (parallel query) it
806 * returns the leader data instead.
807 */
808 void
GetBackendDataForProc(PGPROC * proc,BackendData * result)809 GetBackendDataForProc(PGPROC *proc, BackendData *result)
810 {
811 int pgprocno = proc->pgprocno;
812
813 if (proc->lockGroupLeader != NULL)
814 {
815 pgprocno = proc->lockGroupLeader->pgprocno;
816 }
817
818 BackendData *backendData = &backendManagementShmemData->backends[pgprocno];
819
820 SpinLockAcquire(&backendData->mutex);
821
822 *result = *backendData;
823
824 SpinLockRelease(&backendData->mutex);
825 }
826
827
828 /*
829 * CancelTransactionDueToDeadlock cancels the input proc and also marks the backend
830 * data with this information.
831 */
832 void
CancelTransactionDueToDeadlock(PGPROC * proc)833 CancelTransactionDueToDeadlock(PGPROC *proc)
834 {
835 BackendData *backendData = &backendManagementShmemData->backends[proc->pgprocno];
836
837 /* backend might not have used citus yet and thus not initialized backend data */
838 if (!backendData)
839 {
840 return;
841 }
842
843 SpinLockAcquire(&backendData->mutex);
844
845 /* send a SIGINT only if the process is still in a distributed transaction */
846 if (backendData->transactionId.transactionNumber != 0)
847 {
848 backendData->cancelledDueToDeadlock = true;
849 SpinLockRelease(&backendData->mutex);
850
851 if (kill(proc->pid, SIGINT) != 0)
852 {
853 ereport(WARNING,
854 (errmsg("attempted to cancel this backend (pid: %d) to resolve a "
855 "distributed deadlock but the backend could not "
856 "be cancelled", proc->pid)));
857 }
858 }
859 else
860 {
861 SpinLockRelease(&backendData->mutex);
862 }
863 }
864
865
866 /*
867 * MyBackendGotCancelledDueToDeadlock returns whether the current distributed
868 * transaction was cancelled due to a deadlock. If the backend is not in a
869 * distributed transaction, the function returns false.
870 * We keep some session level state to keep track of if we were cancelled
871 * because of a distributed deadlock. When clearState is true, this function
872 * also resets that state. So after calling this function with clearState true,
873 * a second would always return false.
874 */
875 bool
MyBackendGotCancelledDueToDeadlock(bool clearState)876 MyBackendGotCancelledDueToDeadlock(bool clearState)
877 {
878 bool cancelledDueToDeadlock = false;
879
880 /* backend might not have used citus yet and thus not initialized backend data */
881 if (!MyBackendData)
882 {
883 return false;
884 }
885
886 SpinLockAcquire(&MyBackendData->mutex);
887
888 if (IsInDistributedTransaction(MyBackendData))
889 {
890 cancelledDueToDeadlock = MyBackendData->cancelledDueToDeadlock;
891 }
892 if (clearState)
893 {
894 MyBackendData->cancelledDueToDeadlock = false;
895 }
896
897 SpinLockRelease(&MyBackendData->mutex);
898
899 return cancelledDueToDeadlock;
900 }
901
902
903 /*
904 * MyBackendIsInDisributedTransaction returns true if MyBackendData
905 * is in a distributed transaction.
906 */
907 bool
MyBackendIsInDisributedTransaction(void)908 MyBackendIsInDisributedTransaction(void)
909 {
910 /* backend might not have used citus yet and thus not initialized backend data */
911 if (!MyBackendData)
912 {
913 return false;
914 }
915
916 return IsInDistributedTransaction(MyBackendData);
917 }
918
919
920 /*
921 * ActiveDistributedTransactionNumbers returns a list of pointers to
922 * transaction numbers of distributed transactions that are in progress
923 * and were started by the node on which it is called.
924 */
925 List *
ActiveDistributedTransactionNumbers(void)926 ActiveDistributedTransactionNumbers(void)
927 {
928 List *activeTransactionNumberList = NIL;
929
930 /* build list of starting procs */
931 for (int curBackend = 0; curBackend < MaxBackends; curBackend++)
932 {
933 PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
934 BackendData currentBackendData;
935
936 if (currentProc->pid == 0)
937 {
938 /* unused PGPROC slot */
939 continue;
940 }
941
942 GetBackendDataForProc(currentProc, ¤tBackendData);
943
944 if (!IsInDistributedTransaction(¤tBackendData))
945 {
946 /* not a distributed transaction */
947 continue;
948 }
949
950 if (!currentBackendData.transactionId.transactionOriginator)
951 {
952 /* not a coordinator process */
953 continue;
954 }
955
956 uint64 *transactionNumber = (uint64 *) palloc0(sizeof(uint64));
957 *transactionNumber = currentBackendData.transactionId.transactionNumber;
958
959 activeTransactionNumberList = lappend(activeTransactionNumberList,
960 transactionNumber);
961 }
962
963 return activeTransactionNumberList;
964 }
965
966
967 /*
968 * GetMyProcLocalTransactionId() is a wrapper for
969 * getting lxid of MyProc.
970 */
971 LocalTransactionId
GetMyProcLocalTransactionId(void)972 GetMyProcLocalTransactionId(void)
973 {
974 return MyProc->lxid;
975 }
976
977
978 /*
979 * GetAllActiveClientBackendCount returns activeClientBackendCounter in
980 * the shared memory.
981 */
982 int
GetAllActiveClientBackendCount(void)983 GetAllActiveClientBackendCount(void)
984 {
985 uint32 activeBackendCount =
986 pg_atomic_read_u32(&backendManagementShmemData->activeClientBackendCounter);
987
988 return activeBackendCount;
989 }
990
991
992 /*
993 * IncrementClientBackendCounter increments activeClientBackendCounter in
994 * the shared memory by one.
995 */
996 void
IncrementClientBackendCounter(void)997 IncrementClientBackendCounter(void)
998 {
999 pg_atomic_add_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1);
1000 }
1001
1002
1003 /*
1004 * DecrementClientBackendCounter decrements activeClientBackendCounter in
1005 * the shared memory by one.
1006 */
1007 void
DecrementClientBackendCounter(void)1008 DecrementClientBackendCounter(void)
1009 {
1010 pg_atomic_sub_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1);
1011 }
1012