1 /*-------------------------------------------------------------------------
2  *
3  * placement_connection.c
4  *   Per placement connection handling.
5  *
6  * Copyright (c) Citus Data, Inc.
7  *
8  *-------------------------------------------------------------------------
9  */
10 
11 
12 #include "postgres.h"
13 
14 #include "distributed/pg_version_constants.h"
15 
16 #include "access/hash.h"
17 #include "distributed/colocation_utils.h"
18 #include "distributed/connection_management.h"
19 #include "distributed/hash_helpers.h"
20 #include "distributed/listutils.h"
21 #include "distributed/coordinator_protocol.h"
22 #include "distributed/metadata_cache.h"
23 #include "distributed/multi_executor.h"
24 #include "distributed/distributed_planner.h"
25 #include "distributed/multi_partitioning_utils.h"
26 #include "distributed/placement_connection.h"
27 #include "distributed/relation_access_tracking.h"
28 #include "utils/hsearch.h"
29 #if PG_VERSION_NUM >= PG_VERSION_13
30 #include "common/hashfn.h"
31 #endif
32 #include "utils/memutils.h"
33 
34 
35 /*
36  * A connection reference is used to register that a connection has been used
37  * to read or modify either a) a shard placement as a particular user b) a
38  * group of colocated placements (which depend on whether the reference is
39  * from ConnectionPlacementHashEntry or ColocatedPlacementHashEntry).
40  */
41 typedef struct ConnectionReference
42 {
43 	/*
44 	 * The user used to read/modify the placement. We cannot reuse connections
45 	 * that were performed using a different role, since it would not have the
46 	 * right permissions.
47 	 */
48 	const char *userName;
49 
50 	/* the connection */
51 	MultiConnection *connection;
52 
53 	/*
54 	 * Information about what the connection is used for. There can only be
55 	 * one connection executing DDL/DML for a placement to avoid deadlock
56 	 * issues/read-your-own-writes violations.  The difference between DDL/DML
57 	 * currently is only used to emit more precise error messages.
58 	 */
59 	bool hadDML;
60 	bool hadDDL;
61 
62 	/* colocation group of the placement, if any */
63 	uint32 colocationGroupId;
64 	uint32 representativeValue;
65 
66 	/* placementId of the placement, used only for append distributed tables */
67 	uint64 placementId;
68 
69 	/* membership in MultiConnection->referencedPlacements */
70 	dlist_node connectionNode;
71 } ConnectionReference;
72 
73 
74 struct ColocatedPlacementsHashEntry;
75 
76 
77 /*
78  * Hash table mapping placements to a list of connections.
79  *
80  * This stores a list of connections for each placement, because multiple
81  * connections to the same placement may exist at the same time. E.g. an
82  * adaptive executor query may reference the same placement in several
83  * sub-tasks.
84  *
85  * We keep track about a connection having executed DML or DDL, since we can
86  * only ever allow a single transaction to do either to prevent deadlocks and
87  * consistency violations (e.g. read-your-own-writes).
88  */
89 
90 /* hash key */
91 typedef struct ConnectionPlacementHashKey
92 {
93 	uint64 placementId;
94 } ConnectionPlacementHashKey;
95 
96 /* hash entry */
97 typedef struct ConnectionPlacementHashEntry
98 {
99 	ConnectionPlacementHashKey key;
100 
101 	/* did any remote transactions fail? */
102 	bool failed;
103 
104 	/* primary connection used to access the placement */
105 	ConnectionReference *primaryConnection;
106 
107 	/* are any other connections reading from the placements? */
108 	bool hasSecondaryConnections;
109 
110 	/* entry for the set of co-located placements */
111 	struct ColocatedPlacementsHashEntry *colocatedEntry;
112 
113 	/* membership in ConnectionShardHashEntry->placementConnections */
114 	dlist_node shardNode;
115 } ConnectionPlacementHashEntry;
116 
117 /* hash table */
118 static HTAB *ConnectionPlacementHash;
119 
120 
121 /*
122  * A hash-table mapping colocated placements to connections. Colocated
123  * placements being the set of placements on a single node that represent the
124  * same value range. This is needed because connections for colocated
125  * placements (i.e. the corresponding placements for different colocated
126  * distributed tables) need to share connections.  Otherwise things like
127  * foreign keys can very easily lead to unprincipled deadlocks.  This means
128  * that there can only be one DML/DDL connection for a set of colocated
129  * placements.
130  *
131  * A set of colocated placements is identified, besides node identifying
132  * information, by the associated colocation group id and the placement's
133  * 'representativeValue' which currently is the lower boundary of it's
134  * hash-range.
135  *
136  * Note that this hash-table only contains entries for hash-partitioned
137  * tables, because others so far don't support colocation.
138  */
139 
140 /* hash key */
141 typedef struct ColocatedPlacementsHashKey
142 {
143 	/* to identify host - database can't differ */
144 	uint32 nodeId;
145 
146 	/* colocation group, or invalid */
147 	uint32 colocationGroupId;
148 
149 	/* to represent the value range */
150 	uint32 representativeValue;
151 } ColocatedPlacementsHashKey;
152 
153 /* hash entry */
154 typedef struct ColocatedPlacementsHashEntry
155 {
156 	ColocatedPlacementsHashKey key;
157 
158 	/* primary connection used to access the co-located placements */
159 	ConnectionReference *primaryConnection;
160 
161 	/* are any other connections reading from the placements? */
162 	bool hasSecondaryConnections;
163 }  ColocatedPlacementsHashEntry;
164 
165 static HTAB *ColocatedPlacementsHash;
166 
167 
168 /*
169  * Hash table mapping shard ids to placements.
170  *
171  * This is used to track whether placements of a shard have to be marked
172  * invalid after a failure, or whether a coordinated transaction has to be
173  * aborted, to avoid all placements of a shard to be marked invalid.
174  */
175 
176 /* hash key */
177 typedef struct ConnectionShardHashKey
178 {
179 	uint64 shardId;
180 } ConnectionShardHashKey;
181 
182 /* hash entry */
183 typedef struct ConnectionShardHashEntry
184 {
185 	ConnectionShardHashKey key;
186 	dlist_head placementConnections;
187 } ConnectionShardHashEntry;
188 
189 /* hash table */
190 static HTAB *ConnectionShardHash;
191 
192 
193 static MultiConnection * FindPlacementListConnection(int flags, List *placementAccessList,
194 													 const char *userName);
195 static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry(
196 	ShardPlacement *placement);
197 static bool CanUseExistingConnection(uint32 flags, const char *userName,
198 									 ConnectionReference *placementConnection);
199 static bool ConnectionAccessedDifferentPlacement(MultiConnection *connection,
200 												 ShardPlacement *placement);
201 static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
202 										ShardPlacement *placement);
203 static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry);
204 static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize);
205 static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize);
206 
207 
208 /*
209  * GetPlacementConnection establishes a connection for a placement.
210  *
211  * See StartPlacementConnection for details.
212  */
213 MultiConnection *
GetPlacementConnection(uint32 flags,ShardPlacement * placement,const char * userName)214 GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
215 {
216 	MultiConnection *connection = StartPlacementConnection(flags, placement, userName);
217 
218 	if (connection == NULL)
219 	{
220 		/* connection can only be NULL for optional connections */
221 		Assert((flags & OPTIONAL_CONNECTION));
222 
223 		return NULL;
224 	}
225 
226 	FinishConnectionEstablishment(connection);
227 	return connection;
228 }
229 
230 
231 /*
232  * StartPlacementConnection initiates a connection to a remote node,
233  * associated with the placement and transaction.
234  *
235  * The connection is established for the current database. If userName is NULL
236  * the current user is used, otherwise the provided one.
237  *
238  * See StartNodeUserDatabaseConnection for details.
239  *
240  * Flags have the corresponding meaning from StartNodeUserDatabaseConnection,
241  * except that two additional flags have an effect:
242  * - FOR_DML - signal that connection is going to be used for DML (modifications)
243  * - FOR_DDL - signal that connection is going to be used for DDL
244  *
245  * Only one connection associated with the placement may have FOR_DML or
246  * FOR_DDL set. For hash-partitioned tables only one connection for a set of
247  * colocated placements may have FOR_DML/DDL set.  This restriction prevents
248  * deadlocks and wrong results due to in-progress transactions.
249  */
250 MultiConnection *
StartPlacementConnection(uint32 flags,ShardPlacement * placement,const char * userName)251 StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
252 {
253 	ShardPlacementAccess *placementAccess =
254 		(ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess));
255 
256 	placementAccess->placement = placement;
257 
258 	if (flags & FOR_DDL)
259 	{
260 		placementAccess->accessType = PLACEMENT_ACCESS_DDL;
261 	}
262 	else if (flags & FOR_DML)
263 	{
264 		placementAccess->accessType = PLACEMENT_ACCESS_DML;
265 	}
266 	else
267 	{
268 		placementAccess->accessType = PLACEMENT_ACCESS_SELECT;
269 	}
270 
271 	return StartPlacementListConnection(flags, list_make1(placementAccess), userName);
272 }
273 
274 
275 /*
276  * StartPlacementListConnection returns a connection to a remote node suitable for
277  * a placement accesses (SELECT, DML, DDL) or throws an error if no suitable
278  * connection can be established if would cause a self-deadlock or consistency
279  * violation.
280  */
281 MultiConnection *
StartPlacementListConnection(uint32 flags,List * placementAccessList,const char * userName)282 StartPlacementListConnection(uint32 flags, List *placementAccessList,
283 							 const char *userName)
284 {
285 	char *freeUserName = NULL;
286 
287 	if (userName == NULL)
288 	{
289 		userName = freeUserName = CurrentUserName();
290 	}
291 
292 	MultiConnection *chosenConnection = FindPlacementListConnection(flags,
293 																	placementAccessList,
294 																	userName);
295 	if (chosenConnection == NULL)
296 	{
297 		/* use the first placement from the list to extract nodename and nodeport */
298 		ShardPlacementAccess *placementAccess =
299 			(ShardPlacementAccess *) linitial(placementAccessList);
300 		ShardPlacement *placement = placementAccess->placement;
301 		char *nodeName = placement->nodeName;
302 		int nodePort = placement->nodePort;
303 
304 		/*
305 		 * No suitable connection in the placement->connection mapping, get one from
306 		 * the node->connection pool.
307 		 */
308 		chosenConnection = StartNodeUserDatabaseConnection(flags, nodeName, nodePort,
309 														   userName, NULL);
310 		if (chosenConnection == NULL)
311 		{
312 			/* connection can only be NULL for optional connections */
313 			Assert((flags & OPTIONAL_CONNECTION));
314 
315 			return NULL;
316 		}
317 
318 		if ((flags & REQUIRE_CLEAN_CONNECTION) &&
319 			ConnectionAccessedDifferentPlacement(chosenConnection, placement))
320 		{
321 			/*
322 			 * Cached connection accessed a non-co-located placement in the same
323 			 * table or co-location group, while the caller asked for a clean
324 			 * connection. Open a new connection instead.
325 			 *
326 			 * We use this for situations in which we want to use a different
327 			 * connection for every placement, such as COPY. If we blindly returned
328 			 * a cached connection that already modified a different, non-co-located
329 			 * placement B in the same table or in a table with the same co-location
330 			 * ID as the current placement, then we'd no longer able to write to
331 			 * placement B later in the COPY.
332 			 */
333 			chosenConnection = StartNodeUserDatabaseConnection(flags |
334 															   FORCE_NEW_CONNECTION,
335 															   nodeName, nodePort,
336 															   userName, NULL);
337 
338 			if (chosenConnection == NULL)
339 			{
340 				/* connection can only be NULL for optional connections */
341 				Assert((flags & OPTIONAL_CONNECTION));
342 
343 				return NULL;
344 			}
345 
346 			Assert(!ConnectionAccessedDifferentPlacement(chosenConnection, placement));
347 		}
348 	}
349 
350 	/* remember which connection we're going to use to access the placements */
351 	AssignPlacementListToConnection(placementAccessList, chosenConnection);
352 
353 	if (freeUserName)
354 	{
355 		pfree(freeUserName);
356 	}
357 
358 	return chosenConnection;
359 }
360 
361 
362 /*
363  * AssignPlacementListToConnection assigns a set of shard placement accesses to a
364  * given connection, meaning that connection must be used for all (conflicting)
365  * accesses of the same shard placements to make sure reads see writes and to
366  * make sure we don't take conflicting locks.
367  */
368 void
AssignPlacementListToConnection(List * placementAccessList,MultiConnection * connection)369 AssignPlacementListToConnection(List *placementAccessList, MultiConnection *connection)
370 {
371 	const char *userName = connection->user;
372 
373 	ShardPlacementAccess *placementAccess = NULL;
374 	foreach_ptr(placementAccess, placementAccessList)
375 	{
376 		ShardPlacement *placement = placementAccess->placement;
377 		ShardPlacementAccessType accessType = placementAccess->accessType;
378 
379 
380 		if (placement->shardId == INVALID_SHARD_ID)
381 		{
382 			/*
383 			 * When a SELECT prunes down to 0 shard, we use a dummy placement
384 			 * which is only used to route the query to a worker node, but
385 			 * the SELECT doesn't actually access any shard placement.
386 			 *
387 			 * FIXME: this can be removed if we evaluate empty SELECTs locally.
388 			 */
389 			continue;
390 		}
391 
392 		ConnectionPlacementHashEntry *placementEntry = FindOrCreatePlacementEntry(
393 			placement);
394 		ConnectionReference *placementConnection = placementEntry->primaryConnection;
395 
396 		if (placementConnection->connection == connection)
397 		{
398 			/* using the connection that was already assigned to the placement */
399 		}
400 		else if (placementConnection->connection == NULL)
401 		{
402 			/* placement does not have a connection assigned yet */
403 			placementConnection->connection = connection;
404 			placementConnection->hadDDL = false;
405 			placementConnection->hadDML = false;
406 			placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
407 																userName);
408 			placementConnection->placementId = placementAccess->placement->placementId;
409 
410 			/* record association with connection */
411 			dlist_push_tail(&connection->referencedPlacements,
412 							&placementConnection->connectionNode);
413 		}
414 		else
415 		{
416 			/* using a different connection than the one assigned to the placement */
417 
418 			if (accessType != PLACEMENT_ACCESS_SELECT)
419 			{
420 				/*
421 				 * We previously read from the placement, but now we're writing to
422 				 * it (if we had written to the placement, we would have either chosen
423 				 * the same connection, or errored out). Update the connection reference
424 				 * to point to the connection used for writing. We don't need to remember
425 				 * the existing connection since we won't be able to reuse it for
426 				 * accessing the placement. However, we do register that it exists in
427 				 * hasSecondaryConnections.
428 				 */
429 				placementConnection->connection = connection;
430 				placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
431 																	userName);
432 
433 				Assert(!placementConnection->hadDDL);
434 				Assert(!placementConnection->hadDML);
435 
436 				/* record association with connection */
437 				dlist_push_tail(&connection->referencedPlacements,
438 								&placementConnection->connectionNode);
439 			}
440 
441 			/*
442 			 * There are now multiple connections that read from the placement
443 			 * and DDL commands are forbidden.
444 			 */
445 			placementEntry->hasSecondaryConnections = true;
446 
447 			if (placementEntry->colocatedEntry != NULL)
448 			{
449 				/* we also remember this for co-located placements */
450 				placementEntry->colocatedEntry->hasSecondaryConnections = true;
451 			}
452 		}
453 
454 		/*
455 		 * Remember that we used the current connection for writes.
456 		 */
457 		if (accessType == PLACEMENT_ACCESS_DDL)
458 		{
459 			placementConnection->hadDDL = true;
460 		}
461 
462 		if (accessType == PLACEMENT_ACCESS_DML)
463 		{
464 			placementConnection->hadDML = true;
465 		}
466 
467 		/* record the relation access */
468 		Oid relationId = RelationIdForShard(placement->shardId);
469 		RecordRelationAccessIfNonDistTable(relationId, accessType);
470 	}
471 }
472 
473 
474 /*
475  * GetConnectionIfPlacementAccessedInXact returns the connection over which
476  * the placement has been access in the transaction. If not found, returns
477  * NULL.
478  */
479 MultiConnection *
GetConnectionIfPlacementAccessedInXact(int flags,List * placementAccessList,const char * userName)480 GetConnectionIfPlacementAccessedInXact(int flags, List *placementAccessList,
481 									   const char *userName)
482 {
483 	char *freeUserName = NULL;
484 
485 	if (userName == NULL)
486 	{
487 		userName = freeUserName = CurrentUserName();
488 	}
489 
490 	MultiConnection *connection = FindPlacementListConnection(flags, placementAccessList,
491 															  userName);
492 
493 	if (freeUserName != NULL)
494 	{
495 		pfree(freeUserName);
496 	}
497 
498 	return connection;
499 }
500 
501 
502 /*
503  * FindPlacementListConnection determines whether there is a connection that must
504  * be used to perform the given placement accesses.
505  *
506  * If a placement was only read in this transaction, then the same connection must
507  * be used for DDL to prevent self-deadlock. If a placement was modified in this
508  * transaction, then the same connection must be used for all subsequent accesses
509  * to ensure read-your-writes consistency and prevent self-deadlock. If those
510  * conditions cannot be met, because a connection is in use or the placements in
511  * the placement access list were modified over multiple connections, then this
512  * function throws an error.
513  *
514  * The function returns the connection that needs to be used, if such a connection
515  * exists.
516  */
517 static MultiConnection *
FindPlacementListConnection(int flags,List * placementAccessList,const char * userName)518 FindPlacementListConnection(int flags, List *placementAccessList, const char *userName)
519 {
520 	bool foundModifyingConnection = false;
521 	MultiConnection *chosenConnection = NULL;
522 
523 	/*
524 	 * Go through all placement accesses to find a suitable connection.
525 	 *
526 	 * If none of the placements have been accessed in this transaction, connection
527 	 * remains NULL.
528 	 *
529 	 * If one or more of the placements have been modified in this transaction, then
530 	 * use the connection that performed the write. If placements have been written
531 	 * over multiple connections or the connection is not available, error out.
532 	 *
533 	 * If placements have only been read in this transaction, then use the last
534 	 * suitable connection found for a placement in the placementAccessList.
535 	 */
536 	ShardPlacementAccess *placementAccess = NULL;
537 	foreach_ptr(placementAccess, placementAccessList)
538 	{
539 		ShardPlacement *placement = placementAccess->placement;
540 		ShardPlacementAccessType accessType = placementAccess->accessType;
541 
542 
543 		if (placement->shardId == INVALID_SHARD_ID)
544 		{
545 			/*
546 			 * When a SELECT prunes down to 0 shard, we use a dummy placement.
547 			 * In that case, we can fall back to the default connection.
548 			 *
549 			 * FIXME: this can be removed if we evaluate empty SELECTs locally.
550 			 */
551 			continue;
552 		}
553 
554 		ConnectionPlacementHashEntry *placementEntry = FindOrCreatePlacementEntry(
555 			placement);
556 		ColocatedPlacementsHashEntry *colocatedEntry = placementEntry->colocatedEntry;
557 		ConnectionReference *placementConnection = placementEntry->primaryConnection;
558 
559 		/* note: the Asserts below are primarily for clarifying the conditions */
560 
561 		if (placementConnection->connection == NULL)
562 		{
563 			/* no connection has been chosen for the placement */
564 		}
565 		else if (accessType == PLACEMENT_ACCESS_DDL &&
566 				 placementEntry->hasSecondaryConnections)
567 		{
568 			/*
569 			 * If a placement has been read over multiple connections (typically as
570 			 * a result of a reference table join) then a DDL command on the placement
571 			 * would create a self-deadlock.
572 			 */
573 
574 			Assert(placementConnection != NULL);
575 
576 			ereport(ERROR,
577 					(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
578 					 errmsg("cannot perform DDL on placement " UINT64_FORMAT
579 							", which has been read over multiple connections",
580 							placement->placementId)));
581 		}
582 		else if (accessType == PLACEMENT_ACCESS_DDL && colocatedEntry != NULL &&
583 				 colocatedEntry->hasSecondaryConnections)
584 		{
585 			/*
586 			 * If a placement has been read over multiple (uncommitted) connections
587 			 * then a DDL command on a co-located placement may create a self-deadlock
588 			 * if there exist some relationship between the co-located placements
589 			 * (e.g. foreign key, partitioning).
590 			 */
591 
592 			Assert(placementConnection != NULL);
593 
594 			ereport(ERROR,
595 					(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
596 					 errmsg("cannot perform DDL on placement " UINT64_FORMAT
597 							" since a co-located placement has been read over multiple connections",
598 							placement->placementId)));
599 		}
600 		else if (foundModifyingConnection)
601 		{
602 			/*
603 			 * We already found a connection that performed writes on of the placements
604 			 * and must use it.
605 			 */
606 
607 			if ((placementConnection->hadDDL || placementConnection->hadDML) &&
608 				placementConnection->connection != chosenConnection)
609 			{
610 				/*
611 				 * The current placement may have been modified over a different
612 				 * connection. Neither connection is guaranteed to see all uncomitted
613 				 * writes and therefore we cannot proceed.
614 				 */
615 				ereport(ERROR,
616 						(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
617 						 errmsg("cannot perform query with placements that were "
618 								"modified over multiple connections")));
619 			}
620 		}
621 		else if (accessType == PLACEMENT_ACCESS_SELECT &&
622 				 placementEntry->hasSecondaryConnections &&
623 				 !placementConnection->hadDDL && !placementConnection->hadDML)
624 		{
625 			/*
626 			 * Two separate connections have already selected from this placement
627 			 * and it was not modified. There is no benefit to using this connection.
628 			 */
629 		}
630 		else if (CanUseExistingConnection(flags, userName, placementConnection))
631 		{
632 			/*
633 			 * There is an existing connection for the placement and we can use it.
634 			 */
635 
636 			Assert(placementConnection != NULL);
637 			chosenConnection = placementConnection->connection;
638 
639 			if (placementConnection->hadDDL || placementConnection->hadDML)
640 			{
641 				/* this connection performed writes, we must use it */
642 				foundModifyingConnection = true;
643 			}
644 		}
645 		else if (placementConnection->hadDDL || placementConnection->hadDML)
646 		{
647 			if (strcmp(placementConnection->userName, userName) != 0)
648 			{
649 				ereport(ERROR,
650 						(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
651 						 errmsg("cannot perform query on placements that were "
652 								"modified in this transaction by a different "
653 								"user")));
654 			}
655 			ereport(ERROR,
656 					(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
657 					 errmsg("cannot perform query, because modifications were "
658 							"made over a connection that cannot be used at "
659 							"this time. This is most likely a Citus bug so "
660 							"please report it"
661 							)));
662 		}
663 	}
664 
665 	return chosenConnection;
666 }
667 
668 
669 /*
670  * FindOrCreatePlacementEntry finds a placement entry in either the
671  * placement->connection hash or the co-located placements->connection hash,
672  * or adds a new entry if the placement has not yet been accessed in the
673  * current transaction.
674  */
675 static ConnectionPlacementHashEntry *
FindOrCreatePlacementEntry(ShardPlacement * placement)676 FindOrCreatePlacementEntry(ShardPlacement *placement)
677 {
678 	ConnectionPlacementHashKey connKey;
679 	bool found = false;
680 
681 	connKey.placementId = placement->placementId;
682 
683 	ConnectionPlacementHashEntry *placementEntry = hash_search(ConnectionPlacementHash,
684 															   &connKey, HASH_ENTER,
685 															   &found);
686 	if (!found)
687 	{
688 		/* no connection has been chosen for this placement */
689 		placementEntry->failed = false;
690 		placementEntry->primaryConnection = NULL;
691 		placementEntry->hasSecondaryConnections = false;
692 		placementEntry->colocatedEntry = NULL;
693 
694 		if (placement->partitionMethod == DISTRIBUTE_BY_HASH ||
695 			placement->partitionMethod == DISTRIBUTE_BY_NONE)
696 		{
697 			ColocatedPlacementsHashKey coloKey;
698 
699 			coloKey.nodeId = placement->nodeId;
700 			coloKey.colocationGroupId = placement->colocationGroupId;
701 			coloKey.representativeValue = placement->representativeValue;
702 
703 			/* look for a connection assigned to co-located placements */
704 			ColocatedPlacementsHashEntry *colocatedEntry = hash_search(
705 				ColocatedPlacementsHash, &coloKey, HASH_ENTER,
706 				&found);
707 			if (!found)
708 			{
709 				void *conRef = MemoryContextAllocZero(TopTransactionContext,
710 													  sizeof(ConnectionReference));
711 
712 				ConnectionReference *connectionReference = (ConnectionReference *) conRef;
713 
714 				/*
715 				 * Store the co-location group information such that we can later
716 				 * determine whether a connection accessed different placements
717 				 * of the same co-location group.
718 				 */
719 				connectionReference->colocationGroupId = placement->colocationGroupId;
720 				connectionReference->representativeValue = placement->representativeValue;
721 
722 				/*
723 				 * Create a connection reference that can be used for the entire
724 				 * set of co-located placements.
725 				 */
726 				colocatedEntry->primaryConnection = connectionReference;
727 
728 				colocatedEntry->hasSecondaryConnections = false;
729 			}
730 
731 			/*
732 			 * Assign the connection reference for the set of co-located placements
733 			 * to the current placement.
734 			 */
735 			placementEntry->primaryConnection = colocatedEntry->primaryConnection;
736 			placementEntry->colocatedEntry = colocatedEntry;
737 		}
738 		else
739 		{
740 			void *conRef = MemoryContextAllocZero(TopTransactionContext,
741 												  sizeof(ConnectionReference));
742 
743 			placementEntry->primaryConnection = (ConnectionReference *) conRef;
744 		}
745 	}
746 
747 	/* record association with shard, for invalidation */
748 	AssociatePlacementWithShard(placementEntry, placement);
749 
750 	return placementEntry;
751 }
752 
753 
754 /*
755  * CanUseExistingConnection is a helper function for CheckExistingConnections()
756  * that checks whether an existing connection can be reused.
757  */
758 static bool
CanUseExistingConnection(uint32 flags,const char * userName,ConnectionReference * connectionReference)759 CanUseExistingConnection(uint32 flags, const char *userName,
760 						 ConnectionReference *connectionReference)
761 {
762 	MultiConnection *connection = connectionReference->connection;
763 
764 	if (!connection)
765 	{
766 		/* if already closed connection obviously not usable */
767 		return false;
768 	}
769 	else if (connection->claimedExclusively)
770 	{
771 		/* already used */
772 		return false;
773 	}
774 	else if (flags & FORCE_NEW_CONNECTION)
775 	{
776 		/* no connection reuse desired */
777 		return false;
778 	}
779 	else if (strcmp(connectionReference->userName, userName) != 0)
780 	{
781 		/* connection for different user, check for conflict */
782 		return false;
783 	}
784 	else
785 	{
786 		return true;
787 	}
788 }
789 
790 
791 /*
792  * ConnectionAccessedDifferentPlacement returns true if the connection accessed another
793  * placement in the same colocation group with a different representative value,
794  * meaning it's not strictly colocated.
795  */
796 static bool
ConnectionAccessedDifferentPlacement(MultiConnection * connection,ShardPlacement * placement)797 ConnectionAccessedDifferentPlacement(MultiConnection *connection,
798 									 ShardPlacement *placement)
799 {
800 	dlist_iter placementIter;
801 
802 	dlist_foreach(placementIter, &connection->referencedPlacements)
803 	{
804 		ConnectionReference *connectionReference =
805 			dlist_container(ConnectionReference, connectionNode, placementIter.cur);
806 
807 		/* handle append and range distributed tables */
808 		if (placement->partitionMethod != DISTRIBUTE_BY_HASH &&
809 			placement->placementId != connectionReference->placementId)
810 		{
811 			return true;
812 		}
813 
814 		/* handle hash distributed tables */
815 		if (placement->colocationGroupId != INVALID_COLOCATION_ID &&
816 			placement->colocationGroupId == connectionReference->colocationGroupId &&
817 			placement->representativeValue != connectionReference->representativeValue)
818 		{
819 			/* non-co-located placements from the same co-location group */
820 			return true;
821 		}
822 	}
823 
824 	return false;
825 }
826 
827 
828 /*
829  * ConnectionModifiedPlacement returns true if any DML or DDL is executed over
830  * the connection on any placement/table.
831  */
832 bool
ConnectionModifiedPlacement(MultiConnection * connection)833 ConnectionModifiedPlacement(MultiConnection *connection)
834 {
835 	dlist_iter placementIter;
836 
837 	if (connection->remoteTransaction.transactionState == REMOTE_TRANS_NOT_STARTED)
838 	{
839 		/*
840 		 * When StartPlacementListConnection() is called, we set the
841 		 * hadDDL/hadDML even before the actual command is sent to
842 		 * remote nodes. And, if this function is called at that
843 		 * point, we should not assume that the connection has already
844 		 * done any modifications.
845 		 */
846 		return false;
847 	}
848 
849 	if (dlist_is_empty(&connection->referencedPlacements))
850 	{
851 		/*
852 		 * When referencesPlacements are empty, it means that we come here
853 		 * from an API that uses a node connection (e.g., not placement connection),
854 		 * which doesn't set placements.
855 		 * In that case, the command sent could be either write or read, so we assume
856 		 * it is write to be on the safe side.
857 		 */
858 		return true;
859 	}
860 
861 	dlist_foreach(placementIter, &connection->referencedPlacements)
862 	{
863 		ConnectionReference *connectionReference =
864 			dlist_container(ConnectionReference, connectionNode, placementIter.cur);
865 
866 		if (connectionReference->hadDDL || connectionReference->hadDML)
867 		{
868 			return true;
869 		}
870 	}
871 
872 	return false;
873 }
874 
875 
876 /*
877  * AssociatePlacementWithShard records shard->placement relation in
878  * ConnectionShardHash.
879  *
880  * That association is later used, in CheckForFailedPlacements, to invalidate
881  * shard placements if necessary.
882  */
883 static void
AssociatePlacementWithShard(ConnectionPlacementHashEntry * placementEntry,ShardPlacement * placement)884 AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
885 							ShardPlacement *placement)
886 {
887 	ConnectionShardHashKey shardKey;
888 	bool found = false;
889 	dlist_iter placementIter;
890 
891 	shardKey.shardId = placement->shardId;
892 	ConnectionShardHashEntry *shardEntry = hash_search(ConnectionShardHash, &shardKey,
893 													   HASH_ENTER, &found);
894 	if (!found)
895 	{
896 		dlist_init(&shardEntry->placementConnections);
897 	}
898 
899 	/*
900 	 * Check if placement is already associated with shard (happens if there's
901 	 * multiple connections for a placement).  There'll usually only be few
902 	 * placement per shard, so the price of iterating isn't large.
903 	 */
904 	dlist_foreach(placementIter, &shardEntry->placementConnections)
905 	{
906 		ConnectionPlacementHashEntry *currPlacementEntry =
907 			dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
908 
909 		if (currPlacementEntry->key.placementId == placement->placementId)
910 		{
911 			return;
912 		}
913 	}
914 
915 	/* otherwise add */
916 	dlist_push_tail(&shardEntry->placementConnections, &placementEntry->shardNode);
917 }
918 
919 
920 /*
921  * CloseShardPlacementAssociation handles a connection being closed before
922  * transaction end.
923  *
924  * This should only be called by connection_management.c.
925  */
926 void
CloseShardPlacementAssociation(struct MultiConnection * connection)927 CloseShardPlacementAssociation(struct MultiConnection *connection)
928 {
929 	dlist_iter placementIter;
930 
931 	/* set connection to NULL for all references to the connection */
932 	dlist_foreach(placementIter, &connection->referencedPlacements)
933 	{
934 		ConnectionReference *reference =
935 			dlist_container(ConnectionReference, connectionNode, placementIter.cur);
936 
937 		reference->connection = NULL;
938 
939 		/*
940 		 * Note that we don't reset ConnectionPlacementHashEntry's
941 		 * primaryConnection here, that'd be more complicated than it seems
942 		 * worth.  That means we'll error out spuriously if a DML/DDL
943 		 * executing connection is closed earlier in a transaction.
944 		 */
945 	}
946 }
947 
948 
949 /*
950  * ResetShardPlacementAssociation resets the association of connections to
951  * shard placements at the end of a transaction.
952  *
953  * This should only be called by connection_management.c.
954  */
955 void
ResetShardPlacementAssociation(struct MultiConnection * connection)956 ResetShardPlacementAssociation(struct MultiConnection *connection)
957 {
958 	dlist_init(&connection->referencedPlacements);
959 }
960 
961 
962 /*
963  * ResetPlacementConnectionManagement() disassociates connections from
964  * placements and shards. This will be called at the end of XACT_EVENT_COMMIT
965  * and XACT_EVENT_ABORT.
966  */
967 void
ResetPlacementConnectionManagement(void)968 ResetPlacementConnectionManagement(void)
969 {
970 	/* Simply delete all entries */
971 	hash_delete_all(ConnectionPlacementHash);
972 	hash_delete_all(ConnectionShardHash);
973 	hash_delete_all(ColocatedPlacementsHash);
974 	ResetRelationAccessHash();
975 
976 	/*
977 	 * NB: memory for ConnectionReference structs and subordinate data is
978 	 * deleted by virtue of being allocated in TopTransactionContext.
979 	 */
980 }
981 
982 
983 /*
984  * MarkFailedShardPlacements looks through every connection in the connection shard hash
985  * and marks the placements associated with failed connections invalid.
986  *
987  * Every shard must have at least one placement connection which did not fail. If all
988  * modifying connections for a shard failed then the transaction will be aborted.
989  *
990  * This will be called just before commit, so we can abort before executing remote
991  * commits. It should also be called after modification statements, to ensure that we
992  * don't run future statements against placements which are not up to date.
993  */
994 void
MarkFailedShardPlacements()995 MarkFailedShardPlacements()
996 {
997 	HASH_SEQ_STATUS status;
998 	ConnectionShardHashEntry *shardEntry = NULL;
999 
1000 	hash_seq_init(&status, ConnectionShardHash);
1001 	while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
1002 	{
1003 		if (!CheckShardPlacements(shardEntry))
1004 		{
1005 			ereport(ERROR,
1006 					(errmsg("could not make changes to shard " INT64_FORMAT
1007 							" on any node",
1008 							shardEntry->key.shardId)));
1009 		}
1010 	}
1011 }
1012 
1013 
1014 /*
1015  * PostCommitMarkFailedShardPlacements marks placements invalid and checks whether
1016  * sufficiently many placements have failed to abort the entire coordinated
1017  * transaction.
1018  *
1019  * This will be called just after a coordinated commit so we can handle remote
1020  * transactions which failed during commit.
1021  *
1022  * When using2PC is set as least one placement must succeed per shard. If all placements
1023  * fail for a shard the entire transaction is aborted. If using2PC is not set then a only
1024  * a warning will be emitted; we cannot abort because some remote transactions might have
1025  * already been committed.
1026  */
1027 void
PostCommitMarkFailedShardPlacements(bool using2PC)1028 PostCommitMarkFailedShardPlacements(bool using2PC)
1029 {
1030 	HASH_SEQ_STATUS status;
1031 	ConnectionShardHashEntry *shardEntry = NULL;
1032 	int successes = 0;
1033 	int attempts = 0;
1034 
1035 	int elevel = using2PC ? ERROR : WARNING;
1036 
1037 	hash_seq_init(&status, ConnectionShardHash);
1038 	while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
1039 	{
1040 		attempts++;
1041 		if (CheckShardPlacements(shardEntry))
1042 		{
1043 			successes++;
1044 		}
1045 		else
1046 		{
1047 			/*
1048 			 * Only error out if we're using 2PC. If we're not using 2PC we can't error
1049 			 * out otherwise we can end up with a state where some shard modifications
1050 			 * have already committed successfully.
1051 			 */
1052 			ereport(elevel,
1053 					(errmsg("could not commit transaction for shard " INT64_FORMAT
1054 							" on any active node",
1055 							shardEntry->key.shardId)));
1056 		}
1057 	}
1058 
1059 	/*
1060 	 * If no shards could be modified at all, error out. Doesn't matter whether
1061 	 * we're post-commit - there's nothing to invalidate.
1062 	 */
1063 	if (attempts > 0 && successes == 0)
1064 	{
1065 		ereport(ERROR, (errmsg("could not commit transaction on any active node")));
1066 	}
1067 }
1068 
1069 
1070 /*
1071  * CheckShardPlacements is a helper function for CheckForFailedPlacements that
1072  * performs the per-shard work.
1073  */
1074 static bool
CheckShardPlacements(ConnectionShardHashEntry * shardEntry)1075 CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
1076 {
1077 	int failures = 0;
1078 	int successes = 0;
1079 	dlist_iter placementIter;
1080 
1081 	dlist_foreach(placementIter, &shardEntry->placementConnections)
1082 	{
1083 		ConnectionPlacementHashEntry *placementEntry =
1084 			dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
1085 		ConnectionReference *primaryConnection = placementEntry->primaryConnection;
1086 
1087 		/* we only consider shards that are modified */
1088 		if (primaryConnection == NULL ||
1089 			!(primaryConnection->hadDDL || primaryConnection->hadDML))
1090 		{
1091 			continue;
1092 		}
1093 
1094 		MultiConnection *connection = primaryConnection->connection;
1095 
1096 		if (!connection || connection->remoteTransaction.transactionFailed)
1097 		{
1098 			placementEntry->failed = true;
1099 			failures++;
1100 		}
1101 		else
1102 		{
1103 			successes++;
1104 		}
1105 	}
1106 
1107 	/*
1108 	 * If there were any failures we want to bail on a commit in two situations:
1109 	 * there were no successes, or there was a failure with a reference table shard.
1110 	 * Ideally issues with a reference table will've errored out earlier,
1111 	 * but if not, we abort now to avoid an unhealthy reference table shard.
1112 	 */
1113 	if (failures > 0 &&
1114 		(successes == 0 || ReferenceTableShardId(shardEntry->key.shardId)))
1115 	{
1116 		return false;
1117 	}
1118 
1119 	/* mark all failed placements invalid */
1120 	dlist_foreach(placementIter, &shardEntry->placementConnections)
1121 	{
1122 		ConnectionPlacementHashEntry *placementEntry =
1123 			dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
1124 
1125 		if (placementEntry->failed)
1126 		{
1127 			uint64 shardId = shardEntry->key.shardId;
1128 			uint64 placementId = placementEntry->key.placementId;
1129 			ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId);
1130 
1131 			/*
1132 			 * We only set shard state if it currently is SHARD_STATE_ACTIVE, which
1133 			 * prevents overwriting shard state if it was already set somewhere else.
1134 			 */
1135 			if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
1136 			{
1137 				MarkShardPlacementInactive(shardPlacement);
1138 			}
1139 		}
1140 	}
1141 
1142 	return true;
1143 }
1144 
1145 
1146 /*
1147  * InitPlacementConnectionManagement performs initialization of the
1148  * infrastructure in this file at server start.
1149  */
1150 void
InitPlacementConnectionManagement(void)1151 InitPlacementConnectionManagement(void)
1152 {
1153 	HASHCTL info;
1154 
1155 	/* create (placementId) -> [ConnectionReference] hash */
1156 	memset(&info, 0, sizeof(info));
1157 	info.keysize = sizeof(ConnectionPlacementHashKey);
1158 	info.entrysize = sizeof(ConnectionPlacementHashEntry);
1159 	info.hash = tag_hash;
1160 	info.hcxt = ConnectionContext;
1161 	uint32 hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1162 
1163 	ConnectionPlacementHash = hash_create("citus connection cache (placementid)",
1164 										  64, &info, hashFlags);
1165 
1166 	/* create (colocated placement identity) -> [ConnectionReference] hash */
1167 	memset(&info, 0, sizeof(info));
1168 	info.keysize = sizeof(ColocatedPlacementsHashKey);
1169 	info.entrysize = sizeof(ColocatedPlacementsHashEntry);
1170 	info.hash = ColocatedPlacementsHashHash;
1171 	info.match = ColocatedPlacementsHashCompare;
1172 	info.hcxt = ConnectionContext;
1173 	hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
1174 
1175 	ColocatedPlacementsHash = hash_create("citus connection cache (colocated placements)",
1176 										  64, &info, hashFlags);
1177 
1178 	/* create (shardId) -> [ConnectionShardHashEntry] hash */
1179 	memset(&info, 0, sizeof(info));
1180 	info.keysize = sizeof(ConnectionShardHashKey);
1181 	info.entrysize = sizeof(ConnectionShardHashEntry);
1182 	info.hash = tag_hash;
1183 	info.hcxt = ConnectionContext;
1184 	hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1185 
1186 	ConnectionShardHash = hash_create("citus connection cache (shardid)",
1187 									  64, &info, hashFlags);
1188 
1189 	/* (relationId) = [relationAccessMode] hash */
1190 	AllocateRelationAccessHash();
1191 }
1192 
1193 
1194 /*
1195  * UseConnectionPerPlacement returns whether we should use as separate connection
1196  * per placement even if another connection is idle. We mostly use this in testing
1197  * scenarios.
1198  */
1199 bool
UseConnectionPerPlacement(void)1200 UseConnectionPerPlacement(void)
1201 {
1202 	return ForceMaxQueryParallelization &&
1203 		   MultiShardConnectionType != SEQUENTIAL_CONNECTION;
1204 }
1205 
1206 
1207 static uint32
ColocatedPlacementsHashHash(const void * key,Size keysize)1208 ColocatedPlacementsHashHash(const void *key, Size keysize)
1209 {
1210 	ColocatedPlacementsHashKey *entry = (ColocatedPlacementsHashKey *) key;
1211 
1212 	uint32 hash = hash_uint32(entry->nodeId);
1213 	hash = hash_combine(hash, hash_uint32(entry->colocationGroupId));
1214 	hash = hash_combine(hash, hash_uint32(entry->representativeValue));
1215 
1216 	return hash;
1217 }
1218 
1219 
1220 static int
ColocatedPlacementsHashCompare(const void * a,const void * b,Size keysize)1221 ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize)
1222 {
1223 	ColocatedPlacementsHashKey *ca = (ColocatedPlacementsHashKey *) a;
1224 	ColocatedPlacementsHashKey *cb = (ColocatedPlacementsHashKey *) b;
1225 
1226 	if (ca->nodeId != cb->nodeId ||
1227 		ca->colocationGroupId != cb->colocationGroupId ||
1228 		ca->representativeValue != cb->representativeValue)
1229 	{
1230 		return 1;
1231 	}
1232 	else
1233 	{
1234 		return 0;
1235 	}
1236 }
1237