1 /*-------------------------------------------------------------------------
2 *
3 * placement_connection.c
4 * Per placement connection handling.
5 *
6 * Copyright (c) Citus Data, Inc.
7 *
8 *-------------------------------------------------------------------------
9 */
10
11
12 #include "postgres.h"
13
14 #include "distributed/pg_version_constants.h"
15
16 #include "access/hash.h"
17 #include "distributed/colocation_utils.h"
18 #include "distributed/connection_management.h"
19 #include "distributed/hash_helpers.h"
20 #include "distributed/listutils.h"
21 #include "distributed/coordinator_protocol.h"
22 #include "distributed/metadata_cache.h"
23 #include "distributed/multi_executor.h"
24 #include "distributed/distributed_planner.h"
25 #include "distributed/multi_partitioning_utils.h"
26 #include "distributed/placement_connection.h"
27 #include "distributed/relation_access_tracking.h"
28 #include "utils/hsearch.h"
29 #if PG_VERSION_NUM >= PG_VERSION_13
30 #include "common/hashfn.h"
31 #endif
32 #include "utils/memutils.h"
33
34
35 /*
36 * A connection reference is used to register that a connection has been used
37 * to read or modify either a) a shard placement as a particular user b) a
38 * group of colocated placements (which depend on whether the reference is
39 * from ConnectionPlacementHashEntry or ColocatedPlacementHashEntry).
40 */
41 typedef struct ConnectionReference
42 {
43 /*
44 * The user used to read/modify the placement. We cannot reuse connections
45 * that were performed using a different role, since it would not have the
46 * right permissions.
47 */
48 const char *userName;
49
50 /* the connection */
51 MultiConnection *connection;
52
53 /*
54 * Information about what the connection is used for. There can only be
55 * one connection executing DDL/DML for a placement to avoid deadlock
56 * issues/read-your-own-writes violations. The difference between DDL/DML
57 * currently is only used to emit more precise error messages.
58 */
59 bool hadDML;
60 bool hadDDL;
61
62 /* colocation group of the placement, if any */
63 uint32 colocationGroupId;
64 uint32 representativeValue;
65
66 /* placementId of the placement, used only for append distributed tables */
67 uint64 placementId;
68
69 /* membership in MultiConnection->referencedPlacements */
70 dlist_node connectionNode;
71 } ConnectionReference;
72
73
74 struct ColocatedPlacementsHashEntry;
75
76
77 /*
78 * Hash table mapping placements to a list of connections.
79 *
80 * This stores a list of connections for each placement, because multiple
81 * connections to the same placement may exist at the same time. E.g. an
82 * adaptive executor query may reference the same placement in several
83 * sub-tasks.
84 *
85 * We keep track about a connection having executed DML or DDL, since we can
86 * only ever allow a single transaction to do either to prevent deadlocks and
87 * consistency violations (e.g. read-your-own-writes).
88 */
89
90 /* hash key */
91 typedef struct ConnectionPlacementHashKey
92 {
93 uint64 placementId;
94 } ConnectionPlacementHashKey;
95
96 /* hash entry */
97 typedef struct ConnectionPlacementHashEntry
98 {
99 ConnectionPlacementHashKey key;
100
101 /* did any remote transactions fail? */
102 bool failed;
103
104 /* primary connection used to access the placement */
105 ConnectionReference *primaryConnection;
106
107 /* are any other connections reading from the placements? */
108 bool hasSecondaryConnections;
109
110 /* entry for the set of co-located placements */
111 struct ColocatedPlacementsHashEntry *colocatedEntry;
112
113 /* membership in ConnectionShardHashEntry->placementConnections */
114 dlist_node shardNode;
115 } ConnectionPlacementHashEntry;
116
117 /* hash table */
118 static HTAB *ConnectionPlacementHash;
119
120
121 /*
122 * A hash-table mapping colocated placements to connections. Colocated
123 * placements being the set of placements on a single node that represent the
124 * same value range. This is needed because connections for colocated
125 * placements (i.e. the corresponding placements for different colocated
126 * distributed tables) need to share connections. Otherwise things like
127 * foreign keys can very easily lead to unprincipled deadlocks. This means
128 * that there can only be one DML/DDL connection for a set of colocated
129 * placements.
130 *
131 * A set of colocated placements is identified, besides node identifying
132 * information, by the associated colocation group id and the placement's
133 * 'representativeValue' which currently is the lower boundary of it's
134 * hash-range.
135 *
136 * Note that this hash-table only contains entries for hash-partitioned
137 * tables, because others so far don't support colocation.
138 */
139
140 /* hash key */
141 typedef struct ColocatedPlacementsHashKey
142 {
143 /* to identify host - database can't differ */
144 uint32 nodeId;
145
146 /* colocation group, or invalid */
147 uint32 colocationGroupId;
148
149 /* to represent the value range */
150 uint32 representativeValue;
151 } ColocatedPlacementsHashKey;
152
153 /* hash entry */
154 typedef struct ColocatedPlacementsHashEntry
155 {
156 ColocatedPlacementsHashKey key;
157
158 /* primary connection used to access the co-located placements */
159 ConnectionReference *primaryConnection;
160
161 /* are any other connections reading from the placements? */
162 bool hasSecondaryConnections;
163 } ColocatedPlacementsHashEntry;
164
165 static HTAB *ColocatedPlacementsHash;
166
167
168 /*
169 * Hash table mapping shard ids to placements.
170 *
171 * This is used to track whether placements of a shard have to be marked
172 * invalid after a failure, or whether a coordinated transaction has to be
173 * aborted, to avoid all placements of a shard to be marked invalid.
174 */
175
176 /* hash key */
177 typedef struct ConnectionShardHashKey
178 {
179 uint64 shardId;
180 } ConnectionShardHashKey;
181
182 /* hash entry */
183 typedef struct ConnectionShardHashEntry
184 {
185 ConnectionShardHashKey key;
186 dlist_head placementConnections;
187 } ConnectionShardHashEntry;
188
189 /* hash table */
190 static HTAB *ConnectionShardHash;
191
192
193 static MultiConnection * FindPlacementListConnection(int flags, List *placementAccessList,
194 const char *userName);
195 static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry(
196 ShardPlacement *placement);
197 static bool CanUseExistingConnection(uint32 flags, const char *userName,
198 ConnectionReference *placementConnection);
199 static bool ConnectionAccessedDifferentPlacement(MultiConnection *connection,
200 ShardPlacement *placement);
201 static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
202 ShardPlacement *placement);
203 static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry);
204 static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize);
205 static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize);
206
207
208 /*
209 * GetPlacementConnection establishes a connection for a placement.
210 *
211 * See StartPlacementConnection for details.
212 */
213 MultiConnection *
GetPlacementConnection(uint32 flags,ShardPlacement * placement,const char * userName)214 GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
215 {
216 MultiConnection *connection = StartPlacementConnection(flags, placement, userName);
217
218 if (connection == NULL)
219 {
220 /* connection can only be NULL for optional connections */
221 Assert((flags & OPTIONAL_CONNECTION));
222
223 return NULL;
224 }
225
226 FinishConnectionEstablishment(connection);
227 return connection;
228 }
229
230
231 /*
232 * StartPlacementConnection initiates a connection to a remote node,
233 * associated with the placement and transaction.
234 *
235 * The connection is established for the current database. If userName is NULL
236 * the current user is used, otherwise the provided one.
237 *
238 * See StartNodeUserDatabaseConnection for details.
239 *
240 * Flags have the corresponding meaning from StartNodeUserDatabaseConnection,
241 * except that two additional flags have an effect:
242 * - FOR_DML - signal that connection is going to be used for DML (modifications)
243 * - FOR_DDL - signal that connection is going to be used for DDL
244 *
245 * Only one connection associated with the placement may have FOR_DML or
246 * FOR_DDL set. For hash-partitioned tables only one connection for a set of
247 * colocated placements may have FOR_DML/DDL set. This restriction prevents
248 * deadlocks and wrong results due to in-progress transactions.
249 */
250 MultiConnection *
StartPlacementConnection(uint32 flags,ShardPlacement * placement,const char * userName)251 StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
252 {
253 ShardPlacementAccess *placementAccess =
254 (ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess));
255
256 placementAccess->placement = placement;
257
258 if (flags & FOR_DDL)
259 {
260 placementAccess->accessType = PLACEMENT_ACCESS_DDL;
261 }
262 else if (flags & FOR_DML)
263 {
264 placementAccess->accessType = PLACEMENT_ACCESS_DML;
265 }
266 else
267 {
268 placementAccess->accessType = PLACEMENT_ACCESS_SELECT;
269 }
270
271 return StartPlacementListConnection(flags, list_make1(placementAccess), userName);
272 }
273
274
275 /*
276 * StartPlacementListConnection returns a connection to a remote node suitable for
277 * a placement accesses (SELECT, DML, DDL) or throws an error if no suitable
278 * connection can be established if would cause a self-deadlock or consistency
279 * violation.
280 */
281 MultiConnection *
StartPlacementListConnection(uint32 flags,List * placementAccessList,const char * userName)282 StartPlacementListConnection(uint32 flags, List *placementAccessList,
283 const char *userName)
284 {
285 char *freeUserName = NULL;
286
287 if (userName == NULL)
288 {
289 userName = freeUserName = CurrentUserName();
290 }
291
292 MultiConnection *chosenConnection = FindPlacementListConnection(flags,
293 placementAccessList,
294 userName);
295 if (chosenConnection == NULL)
296 {
297 /* use the first placement from the list to extract nodename and nodeport */
298 ShardPlacementAccess *placementAccess =
299 (ShardPlacementAccess *) linitial(placementAccessList);
300 ShardPlacement *placement = placementAccess->placement;
301 char *nodeName = placement->nodeName;
302 int nodePort = placement->nodePort;
303
304 /*
305 * No suitable connection in the placement->connection mapping, get one from
306 * the node->connection pool.
307 */
308 chosenConnection = StartNodeUserDatabaseConnection(flags, nodeName, nodePort,
309 userName, NULL);
310 if (chosenConnection == NULL)
311 {
312 /* connection can only be NULL for optional connections */
313 Assert((flags & OPTIONAL_CONNECTION));
314
315 return NULL;
316 }
317
318 if ((flags & REQUIRE_CLEAN_CONNECTION) &&
319 ConnectionAccessedDifferentPlacement(chosenConnection, placement))
320 {
321 /*
322 * Cached connection accessed a non-co-located placement in the same
323 * table or co-location group, while the caller asked for a clean
324 * connection. Open a new connection instead.
325 *
326 * We use this for situations in which we want to use a different
327 * connection for every placement, such as COPY. If we blindly returned
328 * a cached connection that already modified a different, non-co-located
329 * placement B in the same table or in a table with the same co-location
330 * ID as the current placement, then we'd no longer able to write to
331 * placement B later in the COPY.
332 */
333 chosenConnection = StartNodeUserDatabaseConnection(flags |
334 FORCE_NEW_CONNECTION,
335 nodeName, nodePort,
336 userName, NULL);
337
338 if (chosenConnection == NULL)
339 {
340 /* connection can only be NULL for optional connections */
341 Assert((flags & OPTIONAL_CONNECTION));
342
343 return NULL;
344 }
345
346 Assert(!ConnectionAccessedDifferentPlacement(chosenConnection, placement));
347 }
348 }
349
350 /* remember which connection we're going to use to access the placements */
351 AssignPlacementListToConnection(placementAccessList, chosenConnection);
352
353 if (freeUserName)
354 {
355 pfree(freeUserName);
356 }
357
358 return chosenConnection;
359 }
360
361
362 /*
363 * AssignPlacementListToConnection assigns a set of shard placement accesses to a
364 * given connection, meaning that connection must be used for all (conflicting)
365 * accesses of the same shard placements to make sure reads see writes and to
366 * make sure we don't take conflicting locks.
367 */
368 void
AssignPlacementListToConnection(List * placementAccessList,MultiConnection * connection)369 AssignPlacementListToConnection(List *placementAccessList, MultiConnection *connection)
370 {
371 const char *userName = connection->user;
372
373 ShardPlacementAccess *placementAccess = NULL;
374 foreach_ptr(placementAccess, placementAccessList)
375 {
376 ShardPlacement *placement = placementAccess->placement;
377 ShardPlacementAccessType accessType = placementAccess->accessType;
378
379
380 if (placement->shardId == INVALID_SHARD_ID)
381 {
382 /*
383 * When a SELECT prunes down to 0 shard, we use a dummy placement
384 * which is only used to route the query to a worker node, but
385 * the SELECT doesn't actually access any shard placement.
386 *
387 * FIXME: this can be removed if we evaluate empty SELECTs locally.
388 */
389 continue;
390 }
391
392 ConnectionPlacementHashEntry *placementEntry = FindOrCreatePlacementEntry(
393 placement);
394 ConnectionReference *placementConnection = placementEntry->primaryConnection;
395
396 if (placementConnection->connection == connection)
397 {
398 /* using the connection that was already assigned to the placement */
399 }
400 else if (placementConnection->connection == NULL)
401 {
402 /* placement does not have a connection assigned yet */
403 placementConnection->connection = connection;
404 placementConnection->hadDDL = false;
405 placementConnection->hadDML = false;
406 placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
407 userName);
408 placementConnection->placementId = placementAccess->placement->placementId;
409
410 /* record association with connection */
411 dlist_push_tail(&connection->referencedPlacements,
412 &placementConnection->connectionNode);
413 }
414 else
415 {
416 /* using a different connection than the one assigned to the placement */
417
418 if (accessType != PLACEMENT_ACCESS_SELECT)
419 {
420 /*
421 * We previously read from the placement, but now we're writing to
422 * it (if we had written to the placement, we would have either chosen
423 * the same connection, or errored out). Update the connection reference
424 * to point to the connection used for writing. We don't need to remember
425 * the existing connection since we won't be able to reuse it for
426 * accessing the placement. However, we do register that it exists in
427 * hasSecondaryConnections.
428 */
429 placementConnection->connection = connection;
430 placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
431 userName);
432
433 Assert(!placementConnection->hadDDL);
434 Assert(!placementConnection->hadDML);
435
436 /* record association with connection */
437 dlist_push_tail(&connection->referencedPlacements,
438 &placementConnection->connectionNode);
439 }
440
441 /*
442 * There are now multiple connections that read from the placement
443 * and DDL commands are forbidden.
444 */
445 placementEntry->hasSecondaryConnections = true;
446
447 if (placementEntry->colocatedEntry != NULL)
448 {
449 /* we also remember this for co-located placements */
450 placementEntry->colocatedEntry->hasSecondaryConnections = true;
451 }
452 }
453
454 /*
455 * Remember that we used the current connection for writes.
456 */
457 if (accessType == PLACEMENT_ACCESS_DDL)
458 {
459 placementConnection->hadDDL = true;
460 }
461
462 if (accessType == PLACEMENT_ACCESS_DML)
463 {
464 placementConnection->hadDML = true;
465 }
466
467 /* record the relation access */
468 Oid relationId = RelationIdForShard(placement->shardId);
469 RecordRelationAccessIfNonDistTable(relationId, accessType);
470 }
471 }
472
473
474 /*
475 * GetConnectionIfPlacementAccessedInXact returns the connection over which
476 * the placement has been access in the transaction. If not found, returns
477 * NULL.
478 */
479 MultiConnection *
GetConnectionIfPlacementAccessedInXact(int flags,List * placementAccessList,const char * userName)480 GetConnectionIfPlacementAccessedInXact(int flags, List *placementAccessList,
481 const char *userName)
482 {
483 char *freeUserName = NULL;
484
485 if (userName == NULL)
486 {
487 userName = freeUserName = CurrentUserName();
488 }
489
490 MultiConnection *connection = FindPlacementListConnection(flags, placementAccessList,
491 userName);
492
493 if (freeUserName != NULL)
494 {
495 pfree(freeUserName);
496 }
497
498 return connection;
499 }
500
501
502 /*
503 * FindPlacementListConnection determines whether there is a connection that must
504 * be used to perform the given placement accesses.
505 *
506 * If a placement was only read in this transaction, then the same connection must
507 * be used for DDL to prevent self-deadlock. If a placement was modified in this
508 * transaction, then the same connection must be used for all subsequent accesses
509 * to ensure read-your-writes consistency and prevent self-deadlock. If those
510 * conditions cannot be met, because a connection is in use or the placements in
511 * the placement access list were modified over multiple connections, then this
512 * function throws an error.
513 *
514 * The function returns the connection that needs to be used, if such a connection
515 * exists.
516 */
517 static MultiConnection *
FindPlacementListConnection(int flags,List * placementAccessList,const char * userName)518 FindPlacementListConnection(int flags, List *placementAccessList, const char *userName)
519 {
520 bool foundModifyingConnection = false;
521 MultiConnection *chosenConnection = NULL;
522
523 /*
524 * Go through all placement accesses to find a suitable connection.
525 *
526 * If none of the placements have been accessed in this transaction, connection
527 * remains NULL.
528 *
529 * If one or more of the placements have been modified in this transaction, then
530 * use the connection that performed the write. If placements have been written
531 * over multiple connections or the connection is not available, error out.
532 *
533 * If placements have only been read in this transaction, then use the last
534 * suitable connection found for a placement in the placementAccessList.
535 */
536 ShardPlacementAccess *placementAccess = NULL;
537 foreach_ptr(placementAccess, placementAccessList)
538 {
539 ShardPlacement *placement = placementAccess->placement;
540 ShardPlacementAccessType accessType = placementAccess->accessType;
541
542
543 if (placement->shardId == INVALID_SHARD_ID)
544 {
545 /*
546 * When a SELECT prunes down to 0 shard, we use a dummy placement.
547 * In that case, we can fall back to the default connection.
548 *
549 * FIXME: this can be removed if we evaluate empty SELECTs locally.
550 */
551 continue;
552 }
553
554 ConnectionPlacementHashEntry *placementEntry = FindOrCreatePlacementEntry(
555 placement);
556 ColocatedPlacementsHashEntry *colocatedEntry = placementEntry->colocatedEntry;
557 ConnectionReference *placementConnection = placementEntry->primaryConnection;
558
559 /* note: the Asserts below are primarily for clarifying the conditions */
560
561 if (placementConnection->connection == NULL)
562 {
563 /* no connection has been chosen for the placement */
564 }
565 else if (accessType == PLACEMENT_ACCESS_DDL &&
566 placementEntry->hasSecondaryConnections)
567 {
568 /*
569 * If a placement has been read over multiple connections (typically as
570 * a result of a reference table join) then a DDL command on the placement
571 * would create a self-deadlock.
572 */
573
574 Assert(placementConnection != NULL);
575
576 ereport(ERROR,
577 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
578 errmsg("cannot perform DDL on placement " UINT64_FORMAT
579 ", which has been read over multiple connections",
580 placement->placementId)));
581 }
582 else if (accessType == PLACEMENT_ACCESS_DDL && colocatedEntry != NULL &&
583 colocatedEntry->hasSecondaryConnections)
584 {
585 /*
586 * If a placement has been read over multiple (uncommitted) connections
587 * then a DDL command on a co-located placement may create a self-deadlock
588 * if there exist some relationship between the co-located placements
589 * (e.g. foreign key, partitioning).
590 */
591
592 Assert(placementConnection != NULL);
593
594 ereport(ERROR,
595 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
596 errmsg("cannot perform DDL on placement " UINT64_FORMAT
597 " since a co-located placement has been read over multiple connections",
598 placement->placementId)));
599 }
600 else if (foundModifyingConnection)
601 {
602 /*
603 * We already found a connection that performed writes on of the placements
604 * and must use it.
605 */
606
607 if ((placementConnection->hadDDL || placementConnection->hadDML) &&
608 placementConnection->connection != chosenConnection)
609 {
610 /*
611 * The current placement may have been modified over a different
612 * connection. Neither connection is guaranteed to see all uncomitted
613 * writes and therefore we cannot proceed.
614 */
615 ereport(ERROR,
616 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
617 errmsg("cannot perform query with placements that were "
618 "modified over multiple connections")));
619 }
620 }
621 else if (accessType == PLACEMENT_ACCESS_SELECT &&
622 placementEntry->hasSecondaryConnections &&
623 !placementConnection->hadDDL && !placementConnection->hadDML)
624 {
625 /*
626 * Two separate connections have already selected from this placement
627 * and it was not modified. There is no benefit to using this connection.
628 */
629 }
630 else if (CanUseExistingConnection(flags, userName, placementConnection))
631 {
632 /*
633 * There is an existing connection for the placement and we can use it.
634 */
635
636 Assert(placementConnection != NULL);
637 chosenConnection = placementConnection->connection;
638
639 if (placementConnection->hadDDL || placementConnection->hadDML)
640 {
641 /* this connection performed writes, we must use it */
642 foundModifyingConnection = true;
643 }
644 }
645 else if (placementConnection->hadDDL || placementConnection->hadDML)
646 {
647 if (strcmp(placementConnection->userName, userName) != 0)
648 {
649 ereport(ERROR,
650 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
651 errmsg("cannot perform query on placements that were "
652 "modified in this transaction by a different "
653 "user")));
654 }
655 ereport(ERROR,
656 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
657 errmsg("cannot perform query, because modifications were "
658 "made over a connection that cannot be used at "
659 "this time. This is most likely a Citus bug so "
660 "please report it"
661 )));
662 }
663 }
664
665 return chosenConnection;
666 }
667
668
669 /*
670 * FindOrCreatePlacementEntry finds a placement entry in either the
671 * placement->connection hash or the co-located placements->connection hash,
672 * or adds a new entry if the placement has not yet been accessed in the
673 * current transaction.
674 */
675 static ConnectionPlacementHashEntry *
FindOrCreatePlacementEntry(ShardPlacement * placement)676 FindOrCreatePlacementEntry(ShardPlacement *placement)
677 {
678 ConnectionPlacementHashKey connKey;
679 bool found = false;
680
681 connKey.placementId = placement->placementId;
682
683 ConnectionPlacementHashEntry *placementEntry = hash_search(ConnectionPlacementHash,
684 &connKey, HASH_ENTER,
685 &found);
686 if (!found)
687 {
688 /* no connection has been chosen for this placement */
689 placementEntry->failed = false;
690 placementEntry->primaryConnection = NULL;
691 placementEntry->hasSecondaryConnections = false;
692 placementEntry->colocatedEntry = NULL;
693
694 if (placement->partitionMethod == DISTRIBUTE_BY_HASH ||
695 placement->partitionMethod == DISTRIBUTE_BY_NONE)
696 {
697 ColocatedPlacementsHashKey coloKey;
698
699 coloKey.nodeId = placement->nodeId;
700 coloKey.colocationGroupId = placement->colocationGroupId;
701 coloKey.representativeValue = placement->representativeValue;
702
703 /* look for a connection assigned to co-located placements */
704 ColocatedPlacementsHashEntry *colocatedEntry = hash_search(
705 ColocatedPlacementsHash, &coloKey, HASH_ENTER,
706 &found);
707 if (!found)
708 {
709 void *conRef = MemoryContextAllocZero(TopTransactionContext,
710 sizeof(ConnectionReference));
711
712 ConnectionReference *connectionReference = (ConnectionReference *) conRef;
713
714 /*
715 * Store the co-location group information such that we can later
716 * determine whether a connection accessed different placements
717 * of the same co-location group.
718 */
719 connectionReference->colocationGroupId = placement->colocationGroupId;
720 connectionReference->representativeValue = placement->representativeValue;
721
722 /*
723 * Create a connection reference that can be used for the entire
724 * set of co-located placements.
725 */
726 colocatedEntry->primaryConnection = connectionReference;
727
728 colocatedEntry->hasSecondaryConnections = false;
729 }
730
731 /*
732 * Assign the connection reference for the set of co-located placements
733 * to the current placement.
734 */
735 placementEntry->primaryConnection = colocatedEntry->primaryConnection;
736 placementEntry->colocatedEntry = colocatedEntry;
737 }
738 else
739 {
740 void *conRef = MemoryContextAllocZero(TopTransactionContext,
741 sizeof(ConnectionReference));
742
743 placementEntry->primaryConnection = (ConnectionReference *) conRef;
744 }
745 }
746
747 /* record association with shard, for invalidation */
748 AssociatePlacementWithShard(placementEntry, placement);
749
750 return placementEntry;
751 }
752
753
754 /*
755 * CanUseExistingConnection is a helper function for CheckExistingConnections()
756 * that checks whether an existing connection can be reused.
757 */
758 static bool
CanUseExistingConnection(uint32 flags,const char * userName,ConnectionReference * connectionReference)759 CanUseExistingConnection(uint32 flags, const char *userName,
760 ConnectionReference *connectionReference)
761 {
762 MultiConnection *connection = connectionReference->connection;
763
764 if (!connection)
765 {
766 /* if already closed connection obviously not usable */
767 return false;
768 }
769 else if (connection->claimedExclusively)
770 {
771 /* already used */
772 return false;
773 }
774 else if (flags & FORCE_NEW_CONNECTION)
775 {
776 /* no connection reuse desired */
777 return false;
778 }
779 else if (strcmp(connectionReference->userName, userName) != 0)
780 {
781 /* connection for different user, check for conflict */
782 return false;
783 }
784 else
785 {
786 return true;
787 }
788 }
789
790
791 /*
792 * ConnectionAccessedDifferentPlacement returns true if the connection accessed another
793 * placement in the same colocation group with a different representative value,
794 * meaning it's not strictly colocated.
795 */
796 static bool
ConnectionAccessedDifferentPlacement(MultiConnection * connection,ShardPlacement * placement)797 ConnectionAccessedDifferentPlacement(MultiConnection *connection,
798 ShardPlacement *placement)
799 {
800 dlist_iter placementIter;
801
802 dlist_foreach(placementIter, &connection->referencedPlacements)
803 {
804 ConnectionReference *connectionReference =
805 dlist_container(ConnectionReference, connectionNode, placementIter.cur);
806
807 /* handle append and range distributed tables */
808 if (placement->partitionMethod != DISTRIBUTE_BY_HASH &&
809 placement->placementId != connectionReference->placementId)
810 {
811 return true;
812 }
813
814 /* handle hash distributed tables */
815 if (placement->colocationGroupId != INVALID_COLOCATION_ID &&
816 placement->colocationGroupId == connectionReference->colocationGroupId &&
817 placement->representativeValue != connectionReference->representativeValue)
818 {
819 /* non-co-located placements from the same co-location group */
820 return true;
821 }
822 }
823
824 return false;
825 }
826
827
828 /*
829 * ConnectionModifiedPlacement returns true if any DML or DDL is executed over
830 * the connection on any placement/table.
831 */
832 bool
ConnectionModifiedPlacement(MultiConnection * connection)833 ConnectionModifiedPlacement(MultiConnection *connection)
834 {
835 dlist_iter placementIter;
836
837 if (connection->remoteTransaction.transactionState == REMOTE_TRANS_NOT_STARTED)
838 {
839 /*
840 * When StartPlacementListConnection() is called, we set the
841 * hadDDL/hadDML even before the actual command is sent to
842 * remote nodes. And, if this function is called at that
843 * point, we should not assume that the connection has already
844 * done any modifications.
845 */
846 return false;
847 }
848
849 if (dlist_is_empty(&connection->referencedPlacements))
850 {
851 /*
852 * When referencesPlacements are empty, it means that we come here
853 * from an API that uses a node connection (e.g., not placement connection),
854 * which doesn't set placements.
855 * In that case, the command sent could be either write or read, so we assume
856 * it is write to be on the safe side.
857 */
858 return true;
859 }
860
861 dlist_foreach(placementIter, &connection->referencedPlacements)
862 {
863 ConnectionReference *connectionReference =
864 dlist_container(ConnectionReference, connectionNode, placementIter.cur);
865
866 if (connectionReference->hadDDL || connectionReference->hadDML)
867 {
868 return true;
869 }
870 }
871
872 return false;
873 }
874
875
876 /*
877 * AssociatePlacementWithShard records shard->placement relation in
878 * ConnectionShardHash.
879 *
880 * That association is later used, in CheckForFailedPlacements, to invalidate
881 * shard placements if necessary.
882 */
883 static void
AssociatePlacementWithShard(ConnectionPlacementHashEntry * placementEntry,ShardPlacement * placement)884 AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
885 ShardPlacement *placement)
886 {
887 ConnectionShardHashKey shardKey;
888 bool found = false;
889 dlist_iter placementIter;
890
891 shardKey.shardId = placement->shardId;
892 ConnectionShardHashEntry *shardEntry = hash_search(ConnectionShardHash, &shardKey,
893 HASH_ENTER, &found);
894 if (!found)
895 {
896 dlist_init(&shardEntry->placementConnections);
897 }
898
899 /*
900 * Check if placement is already associated with shard (happens if there's
901 * multiple connections for a placement). There'll usually only be few
902 * placement per shard, so the price of iterating isn't large.
903 */
904 dlist_foreach(placementIter, &shardEntry->placementConnections)
905 {
906 ConnectionPlacementHashEntry *currPlacementEntry =
907 dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
908
909 if (currPlacementEntry->key.placementId == placement->placementId)
910 {
911 return;
912 }
913 }
914
915 /* otherwise add */
916 dlist_push_tail(&shardEntry->placementConnections, &placementEntry->shardNode);
917 }
918
919
920 /*
921 * CloseShardPlacementAssociation handles a connection being closed before
922 * transaction end.
923 *
924 * This should only be called by connection_management.c.
925 */
926 void
CloseShardPlacementAssociation(struct MultiConnection * connection)927 CloseShardPlacementAssociation(struct MultiConnection *connection)
928 {
929 dlist_iter placementIter;
930
931 /* set connection to NULL for all references to the connection */
932 dlist_foreach(placementIter, &connection->referencedPlacements)
933 {
934 ConnectionReference *reference =
935 dlist_container(ConnectionReference, connectionNode, placementIter.cur);
936
937 reference->connection = NULL;
938
939 /*
940 * Note that we don't reset ConnectionPlacementHashEntry's
941 * primaryConnection here, that'd be more complicated than it seems
942 * worth. That means we'll error out spuriously if a DML/DDL
943 * executing connection is closed earlier in a transaction.
944 */
945 }
946 }
947
948
949 /*
950 * ResetShardPlacementAssociation resets the association of connections to
951 * shard placements at the end of a transaction.
952 *
953 * This should only be called by connection_management.c.
954 */
955 void
ResetShardPlacementAssociation(struct MultiConnection * connection)956 ResetShardPlacementAssociation(struct MultiConnection *connection)
957 {
958 dlist_init(&connection->referencedPlacements);
959 }
960
961
962 /*
963 * ResetPlacementConnectionManagement() disassociates connections from
964 * placements and shards. This will be called at the end of XACT_EVENT_COMMIT
965 * and XACT_EVENT_ABORT.
966 */
967 void
ResetPlacementConnectionManagement(void)968 ResetPlacementConnectionManagement(void)
969 {
970 /* Simply delete all entries */
971 hash_delete_all(ConnectionPlacementHash);
972 hash_delete_all(ConnectionShardHash);
973 hash_delete_all(ColocatedPlacementsHash);
974 ResetRelationAccessHash();
975
976 /*
977 * NB: memory for ConnectionReference structs and subordinate data is
978 * deleted by virtue of being allocated in TopTransactionContext.
979 */
980 }
981
982
983 /*
984 * MarkFailedShardPlacements looks through every connection in the connection shard hash
985 * and marks the placements associated with failed connections invalid.
986 *
987 * Every shard must have at least one placement connection which did not fail. If all
988 * modifying connections for a shard failed then the transaction will be aborted.
989 *
990 * This will be called just before commit, so we can abort before executing remote
991 * commits. It should also be called after modification statements, to ensure that we
992 * don't run future statements against placements which are not up to date.
993 */
994 void
MarkFailedShardPlacements()995 MarkFailedShardPlacements()
996 {
997 HASH_SEQ_STATUS status;
998 ConnectionShardHashEntry *shardEntry = NULL;
999
1000 hash_seq_init(&status, ConnectionShardHash);
1001 while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
1002 {
1003 if (!CheckShardPlacements(shardEntry))
1004 {
1005 ereport(ERROR,
1006 (errmsg("could not make changes to shard " INT64_FORMAT
1007 " on any node",
1008 shardEntry->key.shardId)));
1009 }
1010 }
1011 }
1012
1013
1014 /*
1015 * PostCommitMarkFailedShardPlacements marks placements invalid and checks whether
1016 * sufficiently many placements have failed to abort the entire coordinated
1017 * transaction.
1018 *
1019 * This will be called just after a coordinated commit so we can handle remote
1020 * transactions which failed during commit.
1021 *
1022 * When using2PC is set as least one placement must succeed per shard. If all placements
1023 * fail for a shard the entire transaction is aborted. If using2PC is not set then a only
1024 * a warning will be emitted; we cannot abort because some remote transactions might have
1025 * already been committed.
1026 */
1027 void
PostCommitMarkFailedShardPlacements(bool using2PC)1028 PostCommitMarkFailedShardPlacements(bool using2PC)
1029 {
1030 HASH_SEQ_STATUS status;
1031 ConnectionShardHashEntry *shardEntry = NULL;
1032 int successes = 0;
1033 int attempts = 0;
1034
1035 int elevel = using2PC ? ERROR : WARNING;
1036
1037 hash_seq_init(&status, ConnectionShardHash);
1038 while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
1039 {
1040 attempts++;
1041 if (CheckShardPlacements(shardEntry))
1042 {
1043 successes++;
1044 }
1045 else
1046 {
1047 /*
1048 * Only error out if we're using 2PC. If we're not using 2PC we can't error
1049 * out otherwise we can end up with a state where some shard modifications
1050 * have already committed successfully.
1051 */
1052 ereport(elevel,
1053 (errmsg("could not commit transaction for shard " INT64_FORMAT
1054 " on any active node",
1055 shardEntry->key.shardId)));
1056 }
1057 }
1058
1059 /*
1060 * If no shards could be modified at all, error out. Doesn't matter whether
1061 * we're post-commit - there's nothing to invalidate.
1062 */
1063 if (attempts > 0 && successes == 0)
1064 {
1065 ereport(ERROR, (errmsg("could not commit transaction on any active node")));
1066 }
1067 }
1068
1069
1070 /*
1071 * CheckShardPlacements is a helper function for CheckForFailedPlacements that
1072 * performs the per-shard work.
1073 */
1074 static bool
CheckShardPlacements(ConnectionShardHashEntry * shardEntry)1075 CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
1076 {
1077 int failures = 0;
1078 int successes = 0;
1079 dlist_iter placementIter;
1080
1081 dlist_foreach(placementIter, &shardEntry->placementConnections)
1082 {
1083 ConnectionPlacementHashEntry *placementEntry =
1084 dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
1085 ConnectionReference *primaryConnection = placementEntry->primaryConnection;
1086
1087 /* we only consider shards that are modified */
1088 if (primaryConnection == NULL ||
1089 !(primaryConnection->hadDDL || primaryConnection->hadDML))
1090 {
1091 continue;
1092 }
1093
1094 MultiConnection *connection = primaryConnection->connection;
1095
1096 if (!connection || connection->remoteTransaction.transactionFailed)
1097 {
1098 placementEntry->failed = true;
1099 failures++;
1100 }
1101 else
1102 {
1103 successes++;
1104 }
1105 }
1106
1107 /*
1108 * If there were any failures we want to bail on a commit in two situations:
1109 * there were no successes, or there was a failure with a reference table shard.
1110 * Ideally issues with a reference table will've errored out earlier,
1111 * but if not, we abort now to avoid an unhealthy reference table shard.
1112 */
1113 if (failures > 0 &&
1114 (successes == 0 || ReferenceTableShardId(shardEntry->key.shardId)))
1115 {
1116 return false;
1117 }
1118
1119 /* mark all failed placements invalid */
1120 dlist_foreach(placementIter, &shardEntry->placementConnections)
1121 {
1122 ConnectionPlacementHashEntry *placementEntry =
1123 dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
1124
1125 if (placementEntry->failed)
1126 {
1127 uint64 shardId = shardEntry->key.shardId;
1128 uint64 placementId = placementEntry->key.placementId;
1129 ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId);
1130
1131 /*
1132 * We only set shard state if it currently is SHARD_STATE_ACTIVE, which
1133 * prevents overwriting shard state if it was already set somewhere else.
1134 */
1135 if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
1136 {
1137 MarkShardPlacementInactive(shardPlacement);
1138 }
1139 }
1140 }
1141
1142 return true;
1143 }
1144
1145
1146 /*
1147 * InitPlacementConnectionManagement performs initialization of the
1148 * infrastructure in this file at server start.
1149 */
1150 void
InitPlacementConnectionManagement(void)1151 InitPlacementConnectionManagement(void)
1152 {
1153 HASHCTL info;
1154
1155 /* create (placementId) -> [ConnectionReference] hash */
1156 memset(&info, 0, sizeof(info));
1157 info.keysize = sizeof(ConnectionPlacementHashKey);
1158 info.entrysize = sizeof(ConnectionPlacementHashEntry);
1159 info.hash = tag_hash;
1160 info.hcxt = ConnectionContext;
1161 uint32 hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1162
1163 ConnectionPlacementHash = hash_create("citus connection cache (placementid)",
1164 64, &info, hashFlags);
1165
1166 /* create (colocated placement identity) -> [ConnectionReference] hash */
1167 memset(&info, 0, sizeof(info));
1168 info.keysize = sizeof(ColocatedPlacementsHashKey);
1169 info.entrysize = sizeof(ColocatedPlacementsHashEntry);
1170 info.hash = ColocatedPlacementsHashHash;
1171 info.match = ColocatedPlacementsHashCompare;
1172 info.hcxt = ConnectionContext;
1173 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
1174
1175 ColocatedPlacementsHash = hash_create("citus connection cache (colocated placements)",
1176 64, &info, hashFlags);
1177
1178 /* create (shardId) -> [ConnectionShardHashEntry] hash */
1179 memset(&info, 0, sizeof(info));
1180 info.keysize = sizeof(ConnectionShardHashKey);
1181 info.entrysize = sizeof(ConnectionShardHashEntry);
1182 info.hash = tag_hash;
1183 info.hcxt = ConnectionContext;
1184 hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1185
1186 ConnectionShardHash = hash_create("citus connection cache (shardid)",
1187 64, &info, hashFlags);
1188
1189 /* (relationId) = [relationAccessMode] hash */
1190 AllocateRelationAccessHash();
1191 }
1192
1193
1194 /*
1195 * UseConnectionPerPlacement returns whether we should use as separate connection
1196 * per placement even if another connection is idle. We mostly use this in testing
1197 * scenarios.
1198 */
1199 bool
UseConnectionPerPlacement(void)1200 UseConnectionPerPlacement(void)
1201 {
1202 return ForceMaxQueryParallelization &&
1203 MultiShardConnectionType != SEQUENTIAL_CONNECTION;
1204 }
1205
1206
1207 static uint32
ColocatedPlacementsHashHash(const void * key,Size keysize)1208 ColocatedPlacementsHashHash(const void *key, Size keysize)
1209 {
1210 ColocatedPlacementsHashKey *entry = (ColocatedPlacementsHashKey *) key;
1211
1212 uint32 hash = hash_uint32(entry->nodeId);
1213 hash = hash_combine(hash, hash_uint32(entry->colocationGroupId));
1214 hash = hash_combine(hash, hash_uint32(entry->representativeValue));
1215
1216 return hash;
1217 }
1218
1219
1220 static int
ColocatedPlacementsHashCompare(const void * a,const void * b,Size keysize)1221 ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize)
1222 {
1223 ColocatedPlacementsHashKey *ca = (ColocatedPlacementsHashKey *) a;
1224 ColocatedPlacementsHashKey *cb = (ColocatedPlacementsHashKey *) b;
1225
1226 if (ca->nodeId != cb->nodeId ||
1227 ca->colocationGroupId != cb->colocationGroupId ||
1228 ca->representativeValue != cb->representativeValue)
1229 {
1230 return 1;
1231 }
1232 else
1233 {
1234 return 0;
1235 }
1236 }
1237