1 /*-------------------------------------------------------------------------
2 *
3 * lock_graph.c
4 *
5 * Functions for obtaining local and global lock graphs in which each
6 * node is a distributed transaction, and an edge represent a waiting-for
7 * relationship.
8 *
9 * Copyright (c) Citus Data, Inc.
10 *
11 *-------------------------------------------------------------------------
12 */
13
14 #include "postgres.h"
15
16 #include "funcapi.h"
17 #include "libpq-fe.h"
18 #include "miscadmin.h"
19
20 #include "access/hash.h"
21 #include "distributed/backend_data.h"
22 #include "distributed/connection_management.h"
23 #include "distributed/hash_helpers.h"
24 #include "distributed/listutils.h"
25 #include "distributed/lock_graph.h"
26 #include "distributed/metadata_cache.h"
27 #include "distributed/remote_commands.h"
28 #include "distributed/tuplestore.h"
29 #include "storage/proc.h"
30 #include "utils/builtins.h"
31 #include "utils/hsearch.h"
32 #include "utils/timestamp.h"
33
34
35 /*
36 * PROCStack is a stack of PGPROC pointers used to perform a depth-first search
37 * through the lock graph. It also keeps track of which processes have been
38 * added to the stack to avoid visiting the same process multiple times.
39 */
40 typedef struct PROCStack
41 {
42 int procCount;
43 PGPROC **procs;
44 bool *procAdded;
45 } PROCStack;
46
47
48 static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex);
49 static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
50 static WaitGraph * BuildLocalWaitGraph(void);
51 static bool IsProcessWaitingForSafeOperations(PGPROC *proc);
52 static void LockLockData(void);
53 static void UnlockLockData(void);
54 static void AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc,
55 PROCStack *remaining);
56 static void AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc,
57 PROCStack *remaining);
58 static void AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
59 PROCStack *remaining);
60 static WaitEdge * AllocWaitEdge(WaitGraph *waitGraph);
61 static void AddProcToVisit(PROCStack *remaining, PGPROC *proc);
62 static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc);
63 static bool IsConflictingLockMask(int holdMask, int conflictMask);
64
65
66 PG_FUNCTION_INFO_V1(dump_local_wait_edges);
67 PG_FUNCTION_INFO_V1(dump_global_wait_edges);
68
69
70 /*
71 * dump_global_wait_edges returns global wait edges for distributed transactions
72 * originating from the node on which it is started.
73 */
74 Datum
dump_global_wait_edges(PG_FUNCTION_ARGS)75 dump_global_wait_edges(PG_FUNCTION_ARGS)
76 {
77 WaitGraph *waitGraph = BuildGlobalWaitGraph();
78
79 ReturnWaitGraph(waitGraph, fcinfo);
80
81 return (Datum) 0;
82 }
83
84
85 /*
86 * BuildGlobalWaitGraph builds a wait graph for distributed transactions
87 * that originate from this node, including edges from all (other) worker
88 * nodes.
89 */
90 WaitGraph *
BuildGlobalWaitGraph(void)91 BuildGlobalWaitGraph(void)
92 {
93 List *workerNodeList = ActiveReadableNodeList();
94 char *nodeUser = CitusExtensionOwnerName();
95 List *connectionList = NIL;
96 int32 localGroupId = GetLocalGroupId();
97
98 WaitGraph *waitGraph = BuildLocalWaitGraph();
99
100 /* open connections in parallel */
101 WorkerNode *workerNode = NULL;
102 foreach_ptr(workerNode, workerNodeList)
103 {
104 const char *nodeName = workerNode->workerName;
105 int nodePort = workerNode->workerPort;
106 int connectionFlags = 0;
107
108 if (workerNode->groupId == localGroupId)
109 {
110 /* we already have local wait edges */
111 continue;
112 }
113
114 MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
115 nodeName, nodePort,
116 nodeUser, NULL);
117
118 connectionList = lappend(connectionList, connection);
119 }
120
121 FinishConnectionListEstablishment(connectionList);
122
123 /* send commands in parallel */
124 MultiConnection *connection = NULL;
125 foreach_ptr(connection, connectionList)
126 {
127 const char *command = "SELECT * FROM dump_local_wait_edges()";
128
129 int querySent = SendRemoteCommand(connection, command);
130 if (querySent == 0)
131 {
132 ReportConnectionError(connection, WARNING);
133 }
134 }
135
136 /* receive dump_local_wait_edges results */
137 foreach_ptr(connection, connectionList)
138 {
139 bool raiseInterrupts = true;
140
141 PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
142 if (!IsResponseOK(result))
143 {
144 ReportResultError(connection, result, WARNING);
145 continue;
146 }
147
148 int64 rowCount = PQntuples(result);
149 int64 colCount = PQnfields(result);
150
151 if (colCount != 9)
152 {
153 ereport(WARNING, (errmsg("unexpected number of columns from "
154 "dump_local_wait_edges")));
155 continue;
156 }
157
158 for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
159 {
160 AddWaitEdgeFromResult(waitGraph, result, rowIndex);
161 }
162
163 PQclear(result);
164 ForgetResults(connection);
165 }
166
167 return waitGraph;
168 }
169
170
171 /*
172 * AddWaitEdgeFromResult adds an edge to the wait graph that is read from
173 * a PGresult.
174 */
175 static void
AddWaitEdgeFromResult(WaitGraph * waitGraph,PGresult * result,int rowIndex)176 AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
177 {
178 WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
179
180 waitEdge->waitingPid = ParseIntField(result, rowIndex, 0);
181 waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1);
182 waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2);
183 waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3);
184 waitEdge->blockingPid = ParseIntField(result, rowIndex, 4);
185 waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5);
186 waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6);
187 waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7);
188 waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8);
189 }
190
191
192 /*
193 * ParseIntField parses a int64 from a remote result or returns 0 if the
194 * result is NULL.
195 */
196 int64
ParseIntField(PGresult * result,int rowIndex,int colIndex)197 ParseIntField(PGresult *result, int rowIndex, int colIndex)
198 {
199 if (PQgetisnull(result, rowIndex, colIndex))
200 {
201 return 0;
202 }
203
204 char *resultString = PQgetvalue(result, rowIndex, colIndex);
205
206 return pg_strtouint64(resultString, NULL, 10);
207 }
208
209
210 /*
211 * ParseBoolField parses a bool from a remote result or returns false if the
212 * result is NULL.
213 */
214 bool
ParseBoolField(PGresult * result,int rowIndex,int colIndex)215 ParseBoolField(PGresult *result, int rowIndex, int colIndex)
216 {
217 if (PQgetisnull(result, rowIndex, colIndex))
218 {
219 return false;
220 }
221
222 char *resultString = PQgetvalue(result, rowIndex, colIndex);
223 if (strlen(resultString) != 1)
224 {
225 return false;
226 }
227
228 return resultString[0] == 't';
229 }
230
231
232 /*
233 * ParseTimestampTzField parses a timestamptz from a remote result or returns
234 * 0 if the result is NULL.
235 */
236 TimestampTz
ParseTimestampTzField(PGresult * result,int rowIndex,int colIndex)237 ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex)
238 {
239 if (PQgetisnull(result, rowIndex, colIndex))
240 {
241 return DT_NOBEGIN;
242 }
243
244 char *resultString = PQgetvalue(result, rowIndex, colIndex);
245 Datum resultStringDatum = CStringGetDatum(resultString);
246 Datum timestampDatum = DirectFunctionCall3(timestamptz_in, resultStringDatum, 0, -1);
247
248 return DatumGetTimestampTz(timestampDatum);
249 }
250
251
252 /*
253 * dump_local_wait_edges returns wait edges for distributed transactions
254 * running on the node on which it is called, which originate from the source node.
255 */
256 Datum
dump_local_wait_edges(PG_FUNCTION_ARGS)257 dump_local_wait_edges(PG_FUNCTION_ARGS)
258 {
259 WaitGraph *waitGraph = BuildLocalWaitGraph();
260 ReturnWaitGraph(waitGraph, fcinfo);
261
262 return (Datum) 0;
263 }
264
265
266 /*
267 * ReturnWaitGraph returns a wait graph for a set returning function.
268 */
269 static void
ReturnWaitGraph(WaitGraph * waitGraph,FunctionCallInfo fcinfo)270 ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
271 {
272 TupleDesc tupleDesc;
273 Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc);
274
275 /*
276 * Columns:
277 * 00: waiting_pid
278 * 01: waiting_node_id
279 * 02: waiting_transaction_num
280 * 03: waiting_transaction_stamp
281 * 04: blocking_pid
282 * 05: blocking__node_id
283 * 06: blocking_transaction_num
284 * 07: blocking_transaction_stamp
285 * 08: blocking_transaction_waiting
286 */
287 for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++)
288 {
289 Datum values[9];
290 bool nulls[9];
291 WaitEdge *curEdge = &waitGraph->edges[curEdgeNum];
292
293 memset(values, 0, sizeof(values));
294 memset(nulls, 0, sizeof(nulls));
295
296 values[0] = Int32GetDatum(curEdge->waitingPid);
297 values[1] = Int32GetDatum(curEdge->waitingNodeId);
298 if (curEdge->waitingTransactionNum != 0)
299 {
300 values[2] = Int64GetDatum(curEdge->waitingTransactionNum);
301 values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp);
302 }
303 else
304 {
305 nulls[2] = true;
306 nulls[3] = true;
307 }
308
309 values[4] = Int32GetDatum(curEdge->blockingPid);
310 values[5] = Int32GetDatum(curEdge->blockingNodeId);
311 if (curEdge->blockingTransactionNum != 0)
312 {
313 values[6] = Int64GetDatum(curEdge->blockingTransactionNum);
314 values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp);
315 }
316 else
317 {
318 nulls[6] = true;
319 nulls[7] = true;
320 }
321 values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting);
322
323 tuplestore_putvalues(tupleStore, tupleDesc, values, nulls);
324 }
325
326 /* clean up and return the tuplestore */
327 tuplestore_donestoring(tupleStore);
328 }
329
330
331 /*
332 * BuildLocalWaitGraph builds a wait graph for distributed transactions
333 * that originate from the local node.
334 */
335 static WaitGraph *
BuildLocalWaitGraph(void)336 BuildLocalWaitGraph(void)
337 {
338 PROCStack remaining;
339 int totalProcs = TotalProcCount();
340
341 /*
342 * Try hard to avoid allocations while holding lock. Thus we pre-allocate
343 * space for locks in large batches - for common scenarios this should be
344 * more than enough space to build the list of wait edges without a single
345 * allocation.
346 */
347 WaitGraph *waitGraph = (WaitGraph *) palloc0(sizeof(WaitGraph));
348 waitGraph->localNodeId = GetLocalGroupId();
349 waitGraph->allocatedSize = totalProcs * 3;
350 waitGraph->edgeCount = 0;
351 waitGraph->edges = (WaitEdge *) palloc(waitGraph->allocatedSize * sizeof(WaitEdge));
352
353 remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * totalProcs);
354 remaining.procAdded = (bool *) palloc0(sizeof(bool *) * totalProcs);
355 remaining.procCount = 0;
356
357 LockLockData();
358
359 /*
360 * Build lock-graph. We do so by first finding all procs which we are
361 * interested in (in a distributed transaction, and blocked). Once
362 * those are collected, do depth first search over all procs blocking
363 * those.
364 */
365
366 /* build list of starting procs */
367 for (int curBackend = 0; curBackend < totalProcs; curBackend++)
368 {
369 PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
370 BackendData currentBackendData;
371
372 /* skip if the PGPROC slot is unused */
373 if (currentProc->pid == 0)
374 {
375 continue;
376 }
377
378 GetBackendDataForProc(currentProc, ¤tBackendData);
379
380 /*
381 * Only start searching from distributed transactions, since we only
382 * care about distributed transactions for the purpose of distributed
383 * deadlock detection.
384 */
385 if (!IsInDistributedTransaction(¤tBackendData))
386 {
387 continue;
388 }
389
390 /* skip if the process is not blocked */
391 if (!IsProcessWaitingForLock(currentProc))
392 {
393 continue;
394 }
395
396 /* skip if the process is blocked for relation extension */
397 if (IsProcessWaitingForSafeOperations(currentProc))
398 {
399 continue;
400 }
401
402 AddProcToVisit(&remaining, currentProc);
403 }
404
405 while (remaining.procCount > 0)
406 {
407 PGPROC *waitingProc = remaining.procs[--remaining.procCount];
408
409 /* only blocked processes result in wait edges */
410 if (!IsProcessWaitingForLock(waitingProc))
411 {
412 continue;
413 }
414
415 /* skip if the process is blocked for relation extension */
416 if (IsProcessWaitingForSafeOperations(waitingProc))
417 {
418 continue;
419 }
420
421 /*
422 * Record an edge for everyone already holding the lock in a
423 * conflicting manner ("hard edges" in postgres parlance).
424 */
425 AddEdgesForLockWaits(waitGraph, waitingProc, &remaining);
426
427 /*
428 * Record an edge for everyone in front of us in the wait-queue
429 * for the lock ("soft edges" in postgres parlance).
430 */
431 AddEdgesForWaitQueue(waitGraph, waitingProc, &remaining);
432 }
433
434 UnlockLockData();
435
436 return waitGraph;
437 }
438
439
440 /*
441 * IsProcessWaitingForSafeOperations returns true if the given PROC
442 * waiting on relation extension locks, page locks or speculative locks.
443 *
444 * The function also returns true if the waiting process is an autovacuum
445 * process given that autovacuum cannot contribute to any distributed
446 * deadlocks.
447 *
448 * In general for the purpose of distributed deadlock detection, we should
449 * skip if the process blocked on the locks that may not be part of deadlocks.
450 * Those locks are held for a short duration while the relation or the index
451 * is actually extended on the disk and released as soon as the extension is
452 * done, even before the execution of the command that triggered the extension
453 * finishes. Thus, recording such waits on our lock graphs could yield detecting
454 * wrong distributed deadlocks.
455 */
456 static bool
IsProcessWaitingForSafeOperations(PGPROC * proc)457 IsProcessWaitingForSafeOperations(PGPROC *proc)
458 {
459 if (proc->waitStatus != PROC_WAIT_STATUS_WAITING)
460 {
461 return false;
462 }
463
464 if (pgproc_statusflags_compat(proc) & PROC_IS_AUTOVACUUM)
465 {
466 return true;
467 }
468
469 PROCLOCK *waitProcLock = proc->waitProcLock;
470 LOCK *waitLock = waitProcLock->tag.myLock;
471
472 return waitLock->tag.locktag_type == LOCKTAG_RELATION_EXTEND ||
473 waitLock->tag.locktag_type == LOCKTAG_PAGE ||
474 waitLock->tag.locktag_type == LOCKTAG_SPECULATIVE_TOKEN;
475 }
476
477
478 /*
479 * LockLockData takes locks the shared lock data structure, which prevents
480 * concurrent lock acquisitions/releases.
481 *
482 * The function also acquires lock on the backend shared memory to prevent
483 * new backends to start.
484 */
485 static void
LockLockData(void)486 LockLockData(void)
487 {
488 LockBackendSharedMemory(LW_SHARED);
489
490 for (int partitionNum = 0; partitionNum < NUM_LOCK_PARTITIONS; partitionNum++)
491 {
492 LWLockAcquire(LockHashPartitionLockByIndex(partitionNum), LW_SHARED);
493 }
494 }
495
496
497 /*
498 * UnlockLockData unlocks the locks on the shared lock data structure in reverse
499 * order since LWLockRelease searches the given lock from the end of the
500 * held_lwlocks array.
501 *
502 * The function also releases the shared memory lock to allow new backends to
503 * start.
504 */
505 static void
UnlockLockData(void)506 UnlockLockData(void)
507 {
508 for (int partitionNum = NUM_LOCK_PARTITIONS - 1; partitionNum >= 0; partitionNum--)
509 {
510 LWLockRelease(LockHashPartitionLockByIndex(partitionNum));
511 }
512
513 UnlockBackendSharedMemory();
514 }
515
516
517 /*
518 * AddEdgesForLockWaits adds an edge to the wait graph for every granted lock
519 * that waitingProc is waiting for.
520 *
521 * This function iterates over the procLocks data structure in shared memory,
522 * which also contains entries for locks which have not been granted yet, but
523 * it does not reflect the order of the wait queue. We therefore handle the
524 * wait queue separately.
525 */
526 static void
AddEdgesForLockWaits(WaitGraph * waitGraph,PGPROC * waitingProc,PROCStack * remaining)527 AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remaining)
528 {
529 /* the lock for which this process is waiting */
530 LOCK *waitLock = waitingProc->waitLock;
531
532 /* determine the conflict mask for the lock level used by the process */
533 LockMethod lockMethodTable = GetLocksMethodTable(waitLock);
534 int conflictMask = lockMethodTable->conflictTab[waitingProc->waitLockMode];
535
536 /* iterate through the queue of processes holding the lock */
537 SHM_QUEUE *procLocks = &waitLock->procLocks;
538 PROCLOCK *procLock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
539 offsetof(PROCLOCK, lockLink));
540
541 while (procLock != NULL)
542 {
543 PGPROC *currentProc = procLock->tag.myProc;
544
545 /*
546 * Skip processes from the same lock group, processes that don't conflict,
547 * and processes that are waiting on safe operations.
548 */
549 if (!IsSameLockGroup(waitingProc, currentProc) &&
550 IsConflictingLockMask(procLock->holdMask, conflictMask) &&
551 !IsProcessWaitingForSafeOperations(currentProc))
552 {
553 AddWaitEdge(waitGraph, waitingProc, currentProc, remaining);
554 }
555
556 procLock = (PROCLOCK *) SHMQueueNext(procLocks, &procLock->lockLink,
557 offsetof(PROCLOCK, lockLink));
558 }
559 }
560
561
562 /*
563 * AddEdgesForWaitQueue adds an edge to the wait graph for processes in front of
564 * waitingProc in the wait queue that are trying to acquire a conflicting lock.
565 */
566 static void
AddEdgesForWaitQueue(WaitGraph * waitGraph,PGPROC * waitingProc,PROCStack * remaining)567 AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remaining)
568 {
569 /* the lock for which this process is waiting */
570 LOCK *waitLock = waitingProc->waitLock;
571
572 /* determine the conflict mask for the lock level used by the process */
573 LockMethod lockMethodTable = GetLocksMethodTable(waitLock);
574 int conflictMask = lockMethodTable->conflictTab[waitingProc->waitLockMode];
575
576 /* iterate through the wait queue */
577 PROC_QUEUE *waitQueue = &(waitLock->waitProcs);
578 int queueSize = waitQueue->size;
579 PGPROC *currentProc = (PGPROC *) waitQueue->links.next;
580
581 /*
582 * Iterate through the queue from the start until we encounter waitingProc,
583 * since we only care about processes in front of waitingProc in the queue.
584 */
585 while (queueSize-- > 0 && currentProc != waitingProc)
586 {
587 int awaitMask = LOCKBIT_ON(currentProc->waitLockMode);
588
589 /*
590 * Skip processes from the same lock group, processes that don't conflict,
591 * and processes that are waiting on safe operations.
592 */
593 if (!IsSameLockGroup(waitingProc, currentProc) &&
594 IsConflictingLockMask(awaitMask, conflictMask) &&
595 !IsProcessWaitingForSafeOperations(currentProc))
596 {
597 AddWaitEdge(waitGraph, waitingProc, currentProc, remaining);
598 }
599
600 currentProc = (PGPROC *) currentProc->links.next;
601 }
602 }
603
604
605 /*
606 * AddWaitEdge adds a new wait edge to a wait graph. The nodes in the graph are
607 * transactions and an edge indicates the "waiting" process is blocked on a lock
608 * held by the "blocking" process.
609 *
610 * If the blocking process is itself waiting then it is added to the remaining
611 * stack.
612 */
613 static void
AddWaitEdge(WaitGraph * waitGraph,PGPROC * waitingProc,PGPROC * blockingProc,PROCStack * remaining)614 AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
615 PROCStack *remaining)
616 {
617 WaitEdge *curEdge = AllocWaitEdge(waitGraph);
618 BackendData waitingBackendData;
619 BackendData blockingBackendData;
620
621 GetBackendDataForProc(waitingProc, &waitingBackendData);
622 GetBackendDataForProc(blockingProc, &blockingBackendData);
623
624 curEdge->isBlockingXactWaiting =
625 IsProcessWaitingForLock(blockingProc) &&
626 !IsProcessWaitingForSafeOperations(blockingProc);
627 if (curEdge->isBlockingXactWaiting)
628 {
629 AddProcToVisit(remaining, blockingProc);
630 }
631
632 curEdge->waitingPid = waitingProc->pid;
633
634 if (IsInDistributedTransaction(&waitingBackendData))
635 {
636 DistributedTransactionId *waitingTransactionId =
637 &waitingBackendData.transactionId;
638
639 curEdge->waitingNodeId = waitingTransactionId->initiatorNodeIdentifier;
640 curEdge->waitingTransactionNum = waitingTransactionId->transactionNumber;
641 curEdge->waitingTransactionStamp = waitingTransactionId->timestamp;
642 }
643 else
644 {
645 curEdge->waitingNodeId = waitGraph->localNodeId;
646 curEdge->waitingTransactionNum = 0;
647 curEdge->waitingTransactionStamp = 0;
648 }
649
650 curEdge->blockingPid = blockingProc->pid;
651
652 if (IsInDistributedTransaction(&blockingBackendData))
653 {
654 DistributedTransactionId *blockingTransactionId =
655 &blockingBackendData.transactionId;
656
657 curEdge->blockingNodeId = blockingTransactionId->initiatorNodeIdentifier;
658 curEdge->blockingTransactionNum = blockingTransactionId->transactionNumber;
659 curEdge->blockingTransactionStamp = blockingTransactionId->timestamp;
660 }
661 else
662 {
663 curEdge->blockingNodeId = waitGraph->localNodeId;
664 curEdge->blockingTransactionNum = 0;
665 curEdge->blockingTransactionStamp = 0;
666 }
667 }
668
669
670 /*
671 * AllocWaitEdge allocates a wait edge as part of the given wait graph.
672 * If the wait graph has insufficient space its size is doubled using
673 * repalloc.
674 */
675 static WaitEdge *
AllocWaitEdge(WaitGraph * waitGraph)676 AllocWaitEdge(WaitGraph *waitGraph)
677 {
678 /* ensure space for new edge */
679 if (waitGraph->allocatedSize == waitGraph->edgeCount)
680 {
681 waitGraph->allocatedSize *= 2;
682 waitGraph->edges = (WaitEdge *)
683 repalloc(waitGraph->edges, sizeof(WaitEdge) *
684 waitGraph->allocatedSize);
685 }
686
687 return &waitGraph->edges[waitGraph->edgeCount++];
688 }
689
690
691 /*
692 * AddProcToVisit adds a process to the stack of processes to visit
693 * in the depth-first search, unless it was already added.
694 */
695 static void
AddProcToVisit(PROCStack * remaining,PGPROC * proc)696 AddProcToVisit(PROCStack *remaining, PGPROC *proc)
697 {
698 if (remaining->procAdded[proc->pgprocno])
699 {
700 return;
701 }
702
703 Assert(remaining->procCount < TotalProcCount());
704
705 remaining->procs[remaining->procCount++] = proc;
706 remaining->procAdded[proc->pgprocno] = true;
707 }
708
709
710 /*
711 * IsProcessWaitingForLock returns whether a given process is waiting for a lock.
712 */
713 bool
IsProcessWaitingForLock(PGPROC * proc)714 IsProcessWaitingForLock(PGPROC *proc)
715 {
716 return proc->waitStatus == PROC_WAIT_STATUS_WAITING;
717 }
718
719
720 /*
721 * IsSameLockGroup returns whether two processes are part of the same lock group,
722 * meaning they are either the same process, or have the same lock group leader.
723 */
724 static bool
IsSameLockGroup(PGPROC * leftProc,PGPROC * rightProc)725 IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc)
726 {
727 return leftProc == rightProc ||
728 (leftProc->lockGroupLeader != NULL &&
729 leftProc->lockGroupLeader == rightProc->lockGroupLeader);
730 }
731
732
733 /*
734 * IsConflictingLockMask returns whether the given conflict mask conflicts with the
735 * holdMask.
736 *
737 * holdMask is a bitmask with the i-th bit turned on if a lock mode i is held.
738 *
739 * conflictMask is a bitmask with the j-th bit turned on if it conflicts with
740 * lock mode i.
741 */
742 static bool
IsConflictingLockMask(int holdMask,int conflictMask)743 IsConflictingLockMask(int holdMask, int conflictMask)
744 {
745 return (holdMask & conflictMask) != 0;
746 }
747
748
749 /*
750 * IsInDistributedTransaction returns whether the given backend is in a
751 * distributed transaction.
752 */
753 bool
IsInDistributedTransaction(BackendData * backendData)754 IsInDistributedTransaction(BackendData *backendData)
755 {
756 return backendData->transactionId.transactionNumber != 0;
757 }
758