1 /*-------------------------------------------------------------------------
2  *
3  * lock_graph.c
4  *
5  *  Functions for obtaining local and global lock graphs in which each
6  *  node is a distributed transaction, and an edge represent a waiting-for
7  *  relationship.
8  *
9  * Copyright (c) Citus Data, Inc.
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "funcapi.h"
17 #include "libpq-fe.h"
18 #include "miscadmin.h"
19 
20 #include "access/hash.h"
21 #include "distributed/backend_data.h"
22 #include "distributed/connection_management.h"
23 #include "distributed/hash_helpers.h"
24 #include "distributed/listutils.h"
25 #include "distributed/lock_graph.h"
26 #include "distributed/metadata_cache.h"
27 #include "distributed/remote_commands.h"
28 #include "distributed/tuplestore.h"
29 #include "storage/proc.h"
30 #include "utils/builtins.h"
31 #include "utils/hsearch.h"
32 #include "utils/timestamp.h"
33 
34 
35 /*
36  * PROCStack is a stack of PGPROC pointers used to perform a depth-first search
37  * through the lock graph. It also keeps track of which processes have been
38  * added to the stack to avoid visiting the same process multiple times.
39  */
40 typedef struct PROCStack
41 {
42 	int procCount;
43 	PGPROC **procs;
44 	bool *procAdded;
45 } PROCStack;
46 
47 
48 static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex);
49 static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
50 static WaitGraph * BuildLocalWaitGraph(void);
51 static bool IsProcessWaitingForSafeOperations(PGPROC *proc);
52 static void LockLockData(void);
53 static void UnlockLockData(void);
54 static void AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc,
55 								 PROCStack *remaining);
56 static void AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc,
57 								 PROCStack *remaining);
58 static void AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
59 						PROCStack *remaining);
60 static WaitEdge * AllocWaitEdge(WaitGraph *waitGraph);
61 static void AddProcToVisit(PROCStack *remaining, PGPROC *proc);
62 static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc);
63 static bool IsConflictingLockMask(int holdMask, int conflictMask);
64 
65 
66 PG_FUNCTION_INFO_V1(dump_local_wait_edges);
67 PG_FUNCTION_INFO_V1(dump_global_wait_edges);
68 
69 
70 /*
71  * dump_global_wait_edges returns global wait edges for distributed transactions
72  * originating from the node on which it is started.
73  */
74 Datum
dump_global_wait_edges(PG_FUNCTION_ARGS)75 dump_global_wait_edges(PG_FUNCTION_ARGS)
76 {
77 	WaitGraph *waitGraph = BuildGlobalWaitGraph();
78 
79 	ReturnWaitGraph(waitGraph, fcinfo);
80 
81 	return (Datum) 0;
82 }
83 
84 
85 /*
86  * BuildGlobalWaitGraph builds a wait graph for distributed transactions
87  * that originate from this node, including edges from all (other) worker
88  * nodes.
89  */
90 WaitGraph *
BuildGlobalWaitGraph(void)91 BuildGlobalWaitGraph(void)
92 {
93 	List *workerNodeList = ActiveReadableNodeList();
94 	char *nodeUser = CitusExtensionOwnerName();
95 	List *connectionList = NIL;
96 	int32 localGroupId = GetLocalGroupId();
97 
98 	WaitGraph *waitGraph = BuildLocalWaitGraph();
99 
100 	/* open connections in parallel */
101 	WorkerNode *workerNode = NULL;
102 	foreach_ptr(workerNode, workerNodeList)
103 	{
104 		const char *nodeName = workerNode->workerName;
105 		int nodePort = workerNode->workerPort;
106 		int connectionFlags = 0;
107 
108 		if (workerNode->groupId == localGroupId)
109 		{
110 			/* we already have local wait edges */
111 			continue;
112 		}
113 
114 		MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
115 																	  nodeName, nodePort,
116 																	  nodeUser, NULL);
117 
118 		connectionList = lappend(connectionList, connection);
119 	}
120 
121 	FinishConnectionListEstablishment(connectionList);
122 
123 	/* send commands in parallel */
124 	MultiConnection *connection = NULL;
125 	foreach_ptr(connection, connectionList)
126 	{
127 		const char *command = "SELECT * FROM dump_local_wait_edges()";
128 
129 		int querySent = SendRemoteCommand(connection, command);
130 		if (querySent == 0)
131 		{
132 			ReportConnectionError(connection, WARNING);
133 		}
134 	}
135 
136 	/* receive dump_local_wait_edges results */
137 	foreach_ptr(connection, connectionList)
138 	{
139 		bool raiseInterrupts = true;
140 
141 		PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
142 		if (!IsResponseOK(result))
143 		{
144 			ReportResultError(connection, result, WARNING);
145 			continue;
146 		}
147 
148 		int64 rowCount = PQntuples(result);
149 		int64 colCount = PQnfields(result);
150 
151 		if (colCount != 9)
152 		{
153 			ereport(WARNING, (errmsg("unexpected number of columns from "
154 									 "dump_local_wait_edges")));
155 			continue;
156 		}
157 
158 		for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
159 		{
160 			AddWaitEdgeFromResult(waitGraph, result, rowIndex);
161 		}
162 
163 		PQclear(result);
164 		ForgetResults(connection);
165 	}
166 
167 	return waitGraph;
168 }
169 
170 
171 /*
172  * AddWaitEdgeFromResult adds an edge to the wait graph that is read from
173  * a PGresult.
174  */
175 static void
AddWaitEdgeFromResult(WaitGraph * waitGraph,PGresult * result,int rowIndex)176 AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
177 {
178 	WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
179 
180 	waitEdge->waitingPid = ParseIntField(result, rowIndex, 0);
181 	waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1);
182 	waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2);
183 	waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3);
184 	waitEdge->blockingPid = ParseIntField(result, rowIndex, 4);
185 	waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5);
186 	waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6);
187 	waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7);
188 	waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8);
189 }
190 
191 
192 /*
193  * ParseIntField parses a int64 from a remote result or returns 0 if the
194  * result is NULL.
195  */
196 int64
ParseIntField(PGresult * result,int rowIndex,int colIndex)197 ParseIntField(PGresult *result, int rowIndex, int colIndex)
198 {
199 	if (PQgetisnull(result, rowIndex, colIndex))
200 	{
201 		return 0;
202 	}
203 
204 	char *resultString = PQgetvalue(result, rowIndex, colIndex);
205 
206 	return pg_strtouint64(resultString, NULL, 10);
207 }
208 
209 
210 /*
211  * ParseBoolField parses a bool from a remote result or returns false if the
212  * result is NULL.
213  */
214 bool
ParseBoolField(PGresult * result,int rowIndex,int colIndex)215 ParseBoolField(PGresult *result, int rowIndex, int colIndex)
216 {
217 	if (PQgetisnull(result, rowIndex, colIndex))
218 	{
219 		return false;
220 	}
221 
222 	char *resultString = PQgetvalue(result, rowIndex, colIndex);
223 	if (strlen(resultString) != 1)
224 	{
225 		return false;
226 	}
227 
228 	return resultString[0] == 't';
229 }
230 
231 
232 /*
233  * ParseTimestampTzField parses a timestamptz from a remote result or returns
234  * 0 if the result is NULL.
235  */
236 TimestampTz
ParseTimestampTzField(PGresult * result,int rowIndex,int colIndex)237 ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex)
238 {
239 	if (PQgetisnull(result, rowIndex, colIndex))
240 	{
241 		return DT_NOBEGIN;
242 	}
243 
244 	char *resultString = PQgetvalue(result, rowIndex, colIndex);
245 	Datum resultStringDatum = CStringGetDatum(resultString);
246 	Datum timestampDatum = DirectFunctionCall3(timestamptz_in, resultStringDatum, 0, -1);
247 
248 	return DatumGetTimestampTz(timestampDatum);
249 }
250 
251 
252 /*
253  * dump_local_wait_edges returns wait edges for distributed transactions
254  * running on the node on which it is called, which originate from the source node.
255  */
256 Datum
dump_local_wait_edges(PG_FUNCTION_ARGS)257 dump_local_wait_edges(PG_FUNCTION_ARGS)
258 {
259 	WaitGraph *waitGraph = BuildLocalWaitGraph();
260 	ReturnWaitGraph(waitGraph, fcinfo);
261 
262 	return (Datum) 0;
263 }
264 
265 
266 /*
267  * ReturnWaitGraph returns a wait graph for a set returning function.
268  */
269 static void
ReturnWaitGraph(WaitGraph * waitGraph,FunctionCallInfo fcinfo)270 ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
271 {
272 	TupleDesc tupleDesc;
273 	Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc);
274 
275 	/*
276 	 * Columns:
277 	 * 00: waiting_pid
278 	 * 01: waiting_node_id
279 	 * 02: waiting_transaction_num
280 	 * 03: waiting_transaction_stamp
281 	 * 04: blocking_pid
282 	 * 05: blocking__node_id
283 	 * 06: blocking_transaction_num
284 	 * 07: blocking_transaction_stamp
285 	 * 08: blocking_transaction_waiting
286 	 */
287 	for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++)
288 	{
289 		Datum values[9];
290 		bool nulls[9];
291 		WaitEdge *curEdge = &waitGraph->edges[curEdgeNum];
292 
293 		memset(values, 0, sizeof(values));
294 		memset(nulls, 0, sizeof(nulls));
295 
296 		values[0] = Int32GetDatum(curEdge->waitingPid);
297 		values[1] = Int32GetDatum(curEdge->waitingNodeId);
298 		if (curEdge->waitingTransactionNum != 0)
299 		{
300 			values[2] = Int64GetDatum(curEdge->waitingTransactionNum);
301 			values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp);
302 		}
303 		else
304 		{
305 			nulls[2] = true;
306 			nulls[3] = true;
307 		}
308 
309 		values[4] = Int32GetDatum(curEdge->blockingPid);
310 		values[5] = Int32GetDatum(curEdge->blockingNodeId);
311 		if (curEdge->blockingTransactionNum != 0)
312 		{
313 			values[6] = Int64GetDatum(curEdge->blockingTransactionNum);
314 			values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp);
315 		}
316 		else
317 		{
318 			nulls[6] = true;
319 			nulls[7] = true;
320 		}
321 		values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting);
322 
323 		tuplestore_putvalues(tupleStore, tupleDesc, values, nulls);
324 	}
325 
326 	/* clean up and return the tuplestore */
327 	tuplestore_donestoring(tupleStore);
328 }
329 
330 
331 /*
332  * BuildLocalWaitGraph builds a wait graph for distributed transactions
333  * that originate from the local node.
334  */
335 static WaitGraph *
BuildLocalWaitGraph(void)336 BuildLocalWaitGraph(void)
337 {
338 	PROCStack remaining;
339 	int totalProcs = TotalProcCount();
340 
341 	/*
342 	 * Try hard to avoid allocations while holding lock. Thus we pre-allocate
343 	 * space for locks in large batches - for common scenarios this should be
344 	 * more than enough space to build the list of wait edges without a single
345 	 * allocation.
346 	 */
347 	WaitGraph *waitGraph = (WaitGraph *) palloc0(sizeof(WaitGraph));
348 	waitGraph->localNodeId = GetLocalGroupId();
349 	waitGraph->allocatedSize = totalProcs * 3;
350 	waitGraph->edgeCount = 0;
351 	waitGraph->edges = (WaitEdge *) palloc(waitGraph->allocatedSize * sizeof(WaitEdge));
352 
353 	remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * totalProcs);
354 	remaining.procAdded = (bool *) palloc0(sizeof(bool *) * totalProcs);
355 	remaining.procCount = 0;
356 
357 	LockLockData();
358 
359 	/*
360 	 * Build lock-graph.  We do so by first finding all procs which we are
361 	 * interested in (in a distributed transaction, and blocked).  Once
362 	 * those are collected, do depth first search over all procs blocking
363 	 * those.
364 	 */
365 
366 	/* build list of starting procs */
367 	for (int curBackend = 0; curBackend < totalProcs; curBackend++)
368 	{
369 		PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
370 		BackendData currentBackendData;
371 
372 		/* skip if the PGPROC slot is unused */
373 		if (currentProc->pid == 0)
374 		{
375 			continue;
376 		}
377 
378 		GetBackendDataForProc(currentProc, &currentBackendData);
379 
380 		/*
381 		 * Only start searching from distributed transactions, since we only
382 		 * care about distributed transactions for the purpose of distributed
383 		 * deadlock detection.
384 		 */
385 		if (!IsInDistributedTransaction(&currentBackendData))
386 		{
387 			continue;
388 		}
389 
390 		/* skip if the process is not blocked */
391 		if (!IsProcessWaitingForLock(currentProc))
392 		{
393 			continue;
394 		}
395 
396 		/* skip if the process is blocked for relation extension */
397 		if (IsProcessWaitingForSafeOperations(currentProc))
398 		{
399 			continue;
400 		}
401 
402 		AddProcToVisit(&remaining, currentProc);
403 	}
404 
405 	while (remaining.procCount > 0)
406 	{
407 		PGPROC *waitingProc = remaining.procs[--remaining.procCount];
408 
409 		/* only blocked processes result in wait edges */
410 		if (!IsProcessWaitingForLock(waitingProc))
411 		{
412 			continue;
413 		}
414 
415 		/* skip if the process is blocked for relation extension */
416 		if (IsProcessWaitingForSafeOperations(waitingProc))
417 		{
418 			continue;
419 		}
420 
421 		/*
422 		 * Record an edge for everyone already holding the lock in a
423 		 * conflicting manner ("hard edges" in postgres parlance).
424 		 */
425 		AddEdgesForLockWaits(waitGraph, waitingProc, &remaining);
426 
427 		/*
428 		 * Record an edge for everyone in front of us in the wait-queue
429 		 * for the lock ("soft edges" in postgres parlance).
430 		 */
431 		AddEdgesForWaitQueue(waitGraph, waitingProc, &remaining);
432 	}
433 
434 	UnlockLockData();
435 
436 	return waitGraph;
437 }
438 
439 
440 /*
441  * IsProcessWaitingForSafeOperations returns true if the given PROC
442  * waiting on relation extension locks, page locks or speculative locks.
443  *
444  * The function also returns true if the waiting process is an autovacuum
445  * process given that autovacuum cannot contribute to any distributed
446  * deadlocks.
447  *
448  * In general for the purpose of distributed deadlock detection, we should
449  * skip if the process blocked on the locks that may not be part of deadlocks.
450  * Those locks are held for a short duration while the relation or the index
451  * is actually extended on the disk and released as soon as the extension is
452  * done, even before the execution of the command that triggered the extension
453  * finishes. Thus, recording such waits on our lock graphs could yield detecting
454  * wrong distributed deadlocks.
455  */
456 static bool
IsProcessWaitingForSafeOperations(PGPROC * proc)457 IsProcessWaitingForSafeOperations(PGPROC *proc)
458 {
459 	if (proc->waitStatus != PROC_WAIT_STATUS_WAITING)
460 	{
461 		return false;
462 	}
463 
464 	if (pgproc_statusflags_compat(proc) & PROC_IS_AUTOVACUUM)
465 	{
466 		return true;
467 	}
468 
469 	PROCLOCK *waitProcLock = proc->waitProcLock;
470 	LOCK *waitLock = waitProcLock->tag.myLock;
471 
472 	return waitLock->tag.locktag_type == LOCKTAG_RELATION_EXTEND ||
473 		   waitLock->tag.locktag_type == LOCKTAG_PAGE ||
474 		   waitLock->tag.locktag_type == LOCKTAG_SPECULATIVE_TOKEN;
475 }
476 
477 
478 /*
479  * LockLockData takes locks the shared lock data structure, which prevents
480  * concurrent lock acquisitions/releases.
481  *
482  * The function also acquires lock on the backend shared memory to prevent
483  * new backends to start.
484  */
485 static void
LockLockData(void)486 LockLockData(void)
487 {
488 	LockBackendSharedMemory(LW_SHARED);
489 
490 	for (int partitionNum = 0; partitionNum < NUM_LOCK_PARTITIONS; partitionNum++)
491 	{
492 		LWLockAcquire(LockHashPartitionLockByIndex(partitionNum), LW_SHARED);
493 	}
494 }
495 
496 
497 /*
498  * UnlockLockData unlocks the locks on the shared lock data structure in reverse
499  * order since LWLockRelease searches the given lock from the end of the
500  * held_lwlocks array.
501  *
502  * The function also releases the shared memory lock to allow new backends to
503  * start.
504  */
505 static void
UnlockLockData(void)506 UnlockLockData(void)
507 {
508 	for (int partitionNum = NUM_LOCK_PARTITIONS - 1; partitionNum >= 0; partitionNum--)
509 	{
510 		LWLockRelease(LockHashPartitionLockByIndex(partitionNum));
511 	}
512 
513 	UnlockBackendSharedMemory();
514 }
515 
516 
517 /*
518  * AddEdgesForLockWaits adds an edge to the wait graph for every granted lock
519  * that waitingProc is waiting for.
520  *
521  * This function iterates over the procLocks data structure in shared memory,
522  * which also contains entries for locks which have not been granted yet, but
523  * it does not reflect the order of the wait queue. We therefore handle the
524  * wait queue separately.
525  */
526 static void
AddEdgesForLockWaits(WaitGraph * waitGraph,PGPROC * waitingProc,PROCStack * remaining)527 AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remaining)
528 {
529 	/* the lock for which this process is waiting */
530 	LOCK *waitLock = waitingProc->waitLock;
531 
532 	/* determine the conflict mask for the lock level used by the process */
533 	LockMethod lockMethodTable = GetLocksMethodTable(waitLock);
534 	int conflictMask = lockMethodTable->conflictTab[waitingProc->waitLockMode];
535 
536 	/* iterate through the queue of processes holding the lock */
537 	SHM_QUEUE *procLocks = &waitLock->procLocks;
538 	PROCLOCK *procLock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
539 												   offsetof(PROCLOCK, lockLink));
540 
541 	while (procLock != NULL)
542 	{
543 		PGPROC *currentProc = procLock->tag.myProc;
544 
545 		/*
546 		 * Skip processes from the same lock group, processes that don't conflict,
547 		 * and processes that are waiting on safe operations.
548 		 */
549 		if (!IsSameLockGroup(waitingProc, currentProc) &&
550 			IsConflictingLockMask(procLock->holdMask, conflictMask) &&
551 			!IsProcessWaitingForSafeOperations(currentProc))
552 		{
553 			AddWaitEdge(waitGraph, waitingProc, currentProc, remaining);
554 		}
555 
556 		procLock = (PROCLOCK *) SHMQueueNext(procLocks, &procLock->lockLink,
557 											 offsetof(PROCLOCK, lockLink));
558 	}
559 }
560 
561 
562 /*
563  * AddEdgesForWaitQueue adds an edge to the wait graph for processes in front of
564  * waitingProc in the wait queue that are trying to acquire a conflicting lock.
565  */
566 static void
AddEdgesForWaitQueue(WaitGraph * waitGraph,PGPROC * waitingProc,PROCStack * remaining)567 AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remaining)
568 {
569 	/* the lock for which this process is waiting */
570 	LOCK *waitLock = waitingProc->waitLock;
571 
572 	/* determine the conflict mask for the lock level used by the process */
573 	LockMethod lockMethodTable = GetLocksMethodTable(waitLock);
574 	int conflictMask = lockMethodTable->conflictTab[waitingProc->waitLockMode];
575 
576 	/* iterate through the wait queue */
577 	PROC_QUEUE *waitQueue = &(waitLock->waitProcs);
578 	int queueSize = waitQueue->size;
579 	PGPROC *currentProc = (PGPROC *) waitQueue->links.next;
580 
581 	/*
582 	 * Iterate through the queue from the start until we encounter waitingProc,
583 	 * since we only care about processes in front of waitingProc in the queue.
584 	 */
585 	while (queueSize-- > 0 && currentProc != waitingProc)
586 	{
587 		int awaitMask = LOCKBIT_ON(currentProc->waitLockMode);
588 
589 		/*
590 		 * Skip processes from the same lock group, processes that don't conflict,
591 		 * and processes that are waiting on safe operations.
592 		 */
593 		if (!IsSameLockGroup(waitingProc, currentProc) &&
594 			IsConflictingLockMask(awaitMask, conflictMask) &&
595 			!IsProcessWaitingForSafeOperations(currentProc))
596 		{
597 			AddWaitEdge(waitGraph, waitingProc, currentProc, remaining);
598 		}
599 
600 		currentProc = (PGPROC *) currentProc->links.next;
601 	}
602 }
603 
604 
605 /*
606  * AddWaitEdge adds a new wait edge to a wait graph. The nodes in the graph are
607  * transactions and an edge indicates the "waiting" process is blocked on a lock
608  * held by the "blocking" process.
609  *
610  * If the blocking process is itself waiting then it is added to the remaining
611  * stack.
612  */
613 static void
AddWaitEdge(WaitGraph * waitGraph,PGPROC * waitingProc,PGPROC * blockingProc,PROCStack * remaining)614 AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
615 			PROCStack *remaining)
616 {
617 	WaitEdge *curEdge = AllocWaitEdge(waitGraph);
618 	BackendData waitingBackendData;
619 	BackendData blockingBackendData;
620 
621 	GetBackendDataForProc(waitingProc, &waitingBackendData);
622 	GetBackendDataForProc(blockingProc, &blockingBackendData);
623 
624 	curEdge->isBlockingXactWaiting =
625 		IsProcessWaitingForLock(blockingProc) &&
626 		!IsProcessWaitingForSafeOperations(blockingProc);
627 	if (curEdge->isBlockingXactWaiting)
628 	{
629 		AddProcToVisit(remaining, blockingProc);
630 	}
631 
632 	curEdge->waitingPid = waitingProc->pid;
633 
634 	if (IsInDistributedTransaction(&waitingBackendData))
635 	{
636 		DistributedTransactionId *waitingTransactionId =
637 			&waitingBackendData.transactionId;
638 
639 		curEdge->waitingNodeId = waitingTransactionId->initiatorNodeIdentifier;
640 		curEdge->waitingTransactionNum = waitingTransactionId->transactionNumber;
641 		curEdge->waitingTransactionStamp = waitingTransactionId->timestamp;
642 	}
643 	else
644 	{
645 		curEdge->waitingNodeId = waitGraph->localNodeId;
646 		curEdge->waitingTransactionNum = 0;
647 		curEdge->waitingTransactionStamp = 0;
648 	}
649 
650 	curEdge->blockingPid = blockingProc->pid;
651 
652 	if (IsInDistributedTransaction(&blockingBackendData))
653 	{
654 		DistributedTransactionId *blockingTransactionId =
655 			&blockingBackendData.transactionId;
656 
657 		curEdge->blockingNodeId = blockingTransactionId->initiatorNodeIdentifier;
658 		curEdge->blockingTransactionNum = blockingTransactionId->transactionNumber;
659 		curEdge->blockingTransactionStamp = blockingTransactionId->timestamp;
660 	}
661 	else
662 	{
663 		curEdge->blockingNodeId = waitGraph->localNodeId;
664 		curEdge->blockingTransactionNum = 0;
665 		curEdge->blockingTransactionStamp = 0;
666 	}
667 }
668 
669 
670 /*
671  * AllocWaitEdge allocates a wait edge as part of the given wait graph.
672  * If the wait graph has insufficient space its size is doubled using
673  * repalloc.
674  */
675 static WaitEdge *
AllocWaitEdge(WaitGraph * waitGraph)676 AllocWaitEdge(WaitGraph *waitGraph)
677 {
678 	/* ensure space for new edge */
679 	if (waitGraph->allocatedSize == waitGraph->edgeCount)
680 	{
681 		waitGraph->allocatedSize *= 2;
682 		waitGraph->edges = (WaitEdge *)
683 						   repalloc(waitGraph->edges, sizeof(WaitEdge) *
684 									waitGraph->allocatedSize);
685 	}
686 
687 	return &waitGraph->edges[waitGraph->edgeCount++];
688 }
689 
690 
691 /*
692  * AddProcToVisit adds a process to the stack of processes to visit
693  * in the depth-first search, unless it was already added.
694  */
695 static void
AddProcToVisit(PROCStack * remaining,PGPROC * proc)696 AddProcToVisit(PROCStack *remaining, PGPROC *proc)
697 {
698 	if (remaining->procAdded[proc->pgprocno])
699 	{
700 		return;
701 	}
702 
703 	Assert(remaining->procCount < TotalProcCount());
704 
705 	remaining->procs[remaining->procCount++] = proc;
706 	remaining->procAdded[proc->pgprocno] = true;
707 }
708 
709 
710 /*
711  * IsProcessWaitingForLock returns whether a given process is waiting for a lock.
712  */
713 bool
IsProcessWaitingForLock(PGPROC * proc)714 IsProcessWaitingForLock(PGPROC *proc)
715 {
716 	return proc->waitStatus == PROC_WAIT_STATUS_WAITING;
717 }
718 
719 
720 /*
721  * IsSameLockGroup returns whether two processes are part of the same lock group,
722  * meaning they are either the same process, or have the same lock group leader.
723  */
724 static bool
IsSameLockGroup(PGPROC * leftProc,PGPROC * rightProc)725 IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc)
726 {
727 	return leftProc == rightProc ||
728 		   (leftProc->lockGroupLeader != NULL &&
729 			leftProc->lockGroupLeader == rightProc->lockGroupLeader);
730 }
731 
732 
733 /*
734  * IsConflictingLockMask returns whether the given conflict mask conflicts with the
735  * holdMask.
736  *
737  * holdMask is a bitmask with the i-th bit turned on if a lock mode i is held.
738  *
739  * conflictMask is a bitmask with the j-th bit turned on if it conflicts with
740  * lock mode i.
741  */
742 static bool
IsConflictingLockMask(int holdMask,int conflictMask)743 IsConflictingLockMask(int holdMask, int conflictMask)
744 {
745 	return (holdMask & conflictMask) != 0;
746 }
747 
748 
749 /*
750  * IsInDistributedTransaction returns whether the given backend is in a
751  * distributed transaction.
752  */
753 bool
IsInDistributedTransaction(BackendData * backendData)754 IsInDistributedTransaction(BackendData *backendData)
755 {
756 	return backendData->transactionId.transactionNumber != 0;
757 }
758