1 /*-------------------------------------------------------------------------
2 *
3 * connection_management.c
4 * Central management of connections and their life-cycle
5 *
6 * Copyright (c) Citus Data, Inc.
7 *
8 *-------------------------------------------------------------------------
9 */
10
11 #include "postgres.h"
12 #include "pgstat.h"
13
14 #include "libpq-fe.h"
15
16 #include "miscadmin.h"
17
18 #include "safe_lib.h"
19
20 #include "access/hash.h"
21 #include "commands/dbcommands.h"
22 #include "distributed/connection_management.h"
23 #include "distributed/errormessage.h"
24 #include "distributed/error_codes.h"
25 #include "distributed/listutils.h"
26 #include "distributed/log_utils.h"
27 #include "distributed/memutils.h"
28 #include "distributed/metadata_cache.h"
29 #include "distributed/hash_helpers.h"
30 #include "distributed/placement_connection.h"
31 #include "distributed/run_from_same_connection.h"
32 #include "distributed/shared_connection_stats.h"
33 #include "distributed/cancel_utils.h"
34 #include "distributed/remote_commands.h"
35 #include "distributed/time_constants.h"
36 #include "distributed/version_compat.h"
37 #include "distributed/worker_log_messages.h"
38 #include "mb/pg_wchar.h"
39 #include "portability/instr_time.h"
40 #include "storage/ipc.h"
41 #include "utils/hsearch.h"
42 #include "utils/memutils.h"
43
44
45 int NodeConnectionTimeout = 30000;
46 int MaxCachedConnectionsPerWorker = 1;
47 int MaxCachedConnectionLifetime = 10 * MS_PER_MINUTE;
48
49 HTAB *ConnectionHash = NULL;
50 HTAB *ConnParamsHash = NULL;
51
52 MemoryContext ConnectionContext = NULL;
53
54 static uint32 ConnectionHashHash(const void *key, Size keysize);
55 static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
56 static void StartConnectionEstablishment(MultiConnection *connectionn,
57 ConnectionHashKey *key);
58 static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
59 static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
60 static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
61 static bool ShouldShutdownConnection(MultiConnection *connection, const int
62 cachedConnectionCount);
63 static void ResetConnection(MultiConnection *connection);
64 static bool RemoteTransactionIdle(MultiConnection *connection);
65 static int EventSetSizeForConnectionList(List *connections);
66
67
68 /* types for async connection management */
69 enum MultiConnectionPhase
70 {
71 MULTI_CONNECTION_PHASE_CONNECTING,
72 MULTI_CONNECTION_PHASE_CONNECTED,
73 MULTI_CONNECTION_PHASE_ERROR,
74 };
75 typedef struct MultiConnectionPollState
76 {
77 enum MultiConnectionPhase phase;
78 MultiConnection *connection;
79 PostgresPollingStatusType pollmode;
80 } MultiConnectionPollState;
81
82
83 /* helper functions for async connection management */
84 static bool MultiConnectionStatePoll(MultiConnectionPollState *connectionState);
85 static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections,
86 int *waitCount);
87 static void CloseNotReadyMultiConnectionStates(List *connectionStates);
88 static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState);
89 static void CitusPQFinish(MultiConnection *connection);
90 static ConnParamsHashEntry * FindOrCreateConnParamsEntry(ConnectionHashKey *key);
91
92 /*
93 * Initialize per-backend connection management infrastructure.
94 */
95 void
InitializeConnectionManagement(void)96 InitializeConnectionManagement(void)
97 {
98 HASHCTL info, connParamsInfo;
99
100 /*
101 * Create a single context for connection and transaction related memory
102 * management. Doing so, instead of allocating in TopMemoryContext, makes
103 * it easier to associate used memory.
104 */
105 ConnectionContext = AllocSetContextCreateExtended(TopMemoryContext,
106 "Connection Context",
107 ALLOCSET_DEFAULT_MINSIZE,
108 ALLOCSET_DEFAULT_INITSIZE,
109 ALLOCSET_DEFAULT_MAXSIZE);
110
111 /* create (host,port,user,database) -> [connection] hash */
112 memset(&info, 0, sizeof(info));
113 info.keysize = sizeof(ConnectionHashKey);
114 info.entrysize = sizeof(ConnectionHashEntry);
115 info.hash = ConnectionHashHash;
116 info.match = ConnectionHashCompare;
117 info.hcxt = ConnectionContext;
118 uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
119
120 /* connParamsInfo is same as info, except for entrysize */
121 connParamsInfo = info;
122 connParamsInfo.entrysize = sizeof(ConnParamsHashEntry);
123
124 ConnectionHash = hash_create("citus connection cache (host,port,user,database)",
125 64, &info, hashFlags);
126
127 ConnParamsHash = hash_create("citus connparams cache (host,port,user,database)",
128 64, &connParamsInfo, hashFlags);
129 }
130
131
132 /*
133 * InvalidateConnParamsHashEntries sets every hash entry's isValid flag to false.
134 */
135 void
InvalidateConnParamsHashEntries(void)136 InvalidateConnParamsHashEntries(void)
137 {
138 if (ConnParamsHash != NULL)
139 {
140 ConnParamsHashEntry *entry = NULL;
141 HASH_SEQ_STATUS status;
142
143 hash_seq_init(&status, ConnParamsHash);
144 while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL)
145 {
146 entry->isValid = false;
147 }
148 }
149 }
150
151
152 /*
153 * AfterXactConnectionHandling performs connection management activity after the end of a transaction. Both
154 * COMMIT and ABORT paths are handled here.
155 *
156 * This is called by Citus' global transaction callback.
157 */
158 void
AfterXactConnectionHandling(bool isCommit)159 AfterXactConnectionHandling(bool isCommit)
160 {
161 HASH_SEQ_STATUS status;
162 ConnectionHashEntry *entry;
163
164 hash_seq_init(&status, ConnectionHash);
165 while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
166 {
167 if (!entry->isValid)
168 {
169 /* skip invalid connection hash entries */
170 continue;
171 }
172
173 AfterXactHostConnectionHandling(entry, isCommit);
174
175 /*
176 * NB: We leave the hash entry in place, even if there's no individual
177 * connections in it anymore. There seems no benefit in deleting it,
178 * and it'll save a bit of work in the next transaction.
179 */
180 }
181 }
182
183
184 /*
185 * GetNodeConnection() establishes a connection to remote node, using default
186 * user and database.
187 *
188 * See StartNodeUserDatabaseConnection for details.
189 */
190 MultiConnection *
GetNodeConnection(uint32 flags,const char * hostname,int32 port)191 GetNodeConnection(uint32 flags, const char *hostname, int32 port)
192 {
193 return GetNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL);
194 }
195
196
197 /*
198 * StartNodeConnection initiates a connection to remote node, using default
199 * user and database.
200 *
201 * See StartNodeUserDatabaseConnection for details.
202 */
203 MultiConnection *
StartNodeConnection(uint32 flags,const char * hostname,int32 port)204 StartNodeConnection(uint32 flags, const char *hostname, int32 port)
205 {
206 MultiConnection *connection = StartNodeUserDatabaseConnection(flags, hostname, port,
207 NULL, NULL);
208
209 /*
210 * connection can only be NULL for optional connections, which we don't
211 * support in this codepath.
212 */
213 Assert((flags & OPTIONAL_CONNECTION) == 0);
214 Assert(connection != NULL);
215 return connection;
216 }
217
218
219 /*
220 * GetNodeUserDatabaseConnection establishes connection to remote node.
221 *
222 * See StartNodeUserDatabaseConnection for details.
223 */
224 MultiConnection *
GetNodeUserDatabaseConnection(uint32 flags,const char * hostname,int32 port,const char * user,const char * database)225 GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
226 const char *user, const char *database)
227 {
228 MultiConnection *connection = StartNodeUserDatabaseConnection(flags, hostname, port,
229 user, database);
230
231 /*
232 * connection can only be NULL for optional connections, which we don't
233 * support in this codepath.
234 */
235 Assert((flags & OPTIONAL_CONNECTION) == 0);
236 Assert(connection != NULL);
237
238 FinishConnectionEstablishment(connection);
239
240 return connection;
241 }
242
243
244 /*
245 * StartNodeUserDatabaseConnection() initiates a connection to a remote node.
246 *
247 * If user or database are NULL, the current session's defaults are used. The
248 * following flags influence connection establishment behaviour:
249 * - FORCE_NEW_CONNECTION - a new connection is required
250 *
251 * The returned connection has only been initiated, not fully
252 * established. That's useful to allow parallel connection establishment. If
253 * that's not desired use the Get* variant.
254 */
255 MultiConnection *
StartNodeUserDatabaseConnection(uint32 flags,const char * hostname,int32 port,const char * user,const char * database)256 StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
257 const char *user, const char *database)
258 {
259 ConnectionHashKey key;
260 bool found;
261
262 /* do some minimal input checks */
263 if (strlen(hostname) > MAX_NODE_LENGTH)
264 {
265 ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
266 errmsg("hostname exceeds the maximum length of %d",
267 MAX_NODE_LENGTH)));
268 }
269
270 strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
271
272 key.port = port;
273 if (user)
274 {
275 strlcpy(key.user, user, NAMEDATALEN);
276 }
277 else
278 {
279 strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
280 }
281 if (database)
282 {
283 strlcpy(key.database, database, NAMEDATALEN);
284 }
285 else
286 {
287 strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN);
288 }
289
290 if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
291 {
292 CurrentCoordinatedTransactionState = COORD_TRANS_IDLE;
293 }
294
295 /*
296 * Lookup relevant hash entry. We always enter. If only a cached
297 * connection is desired, and there's none, we'll simply leave the
298 * connection list empty.
299 */
300
301 ConnectionHashEntry *entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
302 if (!found || !entry->isValid)
303 {
304 /*
305 * We are just building hash entry or previously it was left in an
306 * invalid state as we couldn't allocate memory for it.
307 * So initialize entry->connections list here.
308 */
309 entry->isValid = false;
310 entry->connections = MemoryContextAlloc(ConnectionContext,
311 sizeof(dlist_head));
312 dlist_init(entry->connections);
313
314 /*
315 * If MemoryContextAlloc errors out -e.g. during an OOM-, entry->connections
316 * stays as NULL. So entry->isValid should be set to true right after we
317 * initialize entry->connections properly.
318 */
319 entry->isValid = true;
320 }
321
322 /* if desired, check whether there's a usable connection */
323 if (!(flags & FORCE_NEW_CONNECTION))
324 {
325 /* check connection cache for a connection that's not already in use */
326 MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
327 if (connection)
328 {
329 return connection;
330 }
331 }
332
333
334 /*
335 * Either no caching desired, or no pre-established, non-claimed,
336 * connection present. Initiate connection establishment.
337 */
338 MultiConnection *connection = MemoryContextAllocZero(ConnectionContext,
339 sizeof(MultiConnection));
340 connection->initilizationState = POOL_STATE_NOT_INITIALIZED;
341 dlist_push_tail(entry->connections, &connection->connectionNode);
342
343 /* these two flags are by nature cannot happen at the same time */
344 Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION)));
345
346 if (flags & WAIT_FOR_CONNECTION)
347 {
348 WaitLoopForSharedConnection(hostname, port);
349 }
350 else if (flags & OPTIONAL_CONNECTION)
351 {
352 /*
353 * We can afford to skip establishing an optional connection. For
354 * non-optional connections, we first retry for some time. If we still
355 * cannot reserve the right to establish a connection, we prefer to
356 * error out.
357 */
358 if (!TryToIncrementSharedConnectionCounter(hostname, port))
359 {
360 /* do not track the connection anymore */
361 dlist_delete(&connection->connectionNode);
362 pfree(connection);
363
364 return NULL;
365 }
366 }
367 else
368 {
369 /*
370 * The caller doesn't want the connection manager to wait
371 * until a connection slot is available on the remote node.
372 * In the end, we might fail to establish connection to the
373 * remote node as it might not have any space in
374 * max_connections for this connection establishment.
375 *
376 * Still, we keep track of the connection counter.
377 */
378 IncrementSharedConnectionCounter(hostname, port);
379 }
380
381
382 /*
383 * We've already incremented the counter above, so we should decrement
384 * when we're done with the connection.
385 */
386 connection->initilizationState = POOL_STATE_COUNTER_INCREMENTED;
387
388 StartConnectionEstablishment(connection, &key);
389
390 ResetShardPlacementAssociation(connection);
391
392 /* fully initialized the connection, record it */
393 connection->initilizationState = POOL_STATE_INITIALIZED;
394
395 return connection;
396 }
397
398
399 /*
400 * FindAvailableConnection searches the given list of connections for one that
401 * is not claimed exclusively.
402 *
403 * If no connection is available, FindAvailableConnection returns NULL.
404 */
405 static MultiConnection *
FindAvailableConnection(dlist_head * connections,uint32 flags)406 FindAvailableConnection(dlist_head *connections, uint32 flags)
407 {
408 dlist_iter iter;
409
410 dlist_foreach(iter, connections)
411 {
412 MultiConnection *connection =
413 dlist_container(MultiConnection, connectionNode, iter.cur);
414
415 if (flags & OUTSIDE_TRANSACTION)
416 {
417 /* don't return connections that are used in transactions */
418 if (connection->remoteTransaction.transactionState !=
419 REMOTE_TRANS_NOT_STARTED)
420 {
421 continue;
422 }
423 }
424
425 /* don't return claimed connections */
426 if (connection->claimedExclusively)
427 {
428 /* connection is in use for an ongoing operation */
429 continue;
430 }
431
432 if (connection->forceCloseAtTransactionEnd)
433 {
434 /*
435 * This is a connection that should be closed, probabably because
436 * of old connection options. So we ignore it. It will
437 * automatically be closed at the end of the transaction.
438 */
439 continue;
440 }
441
442 if (connection->initilizationState != POOL_STATE_INITIALIZED)
443 {
444 /*
445 * If the connection has not been initialized, it should not be
446 * considered as available.
447 */
448 continue;
449 }
450
451 return connection;
452 }
453
454 return NULL;
455 }
456
457
458 /*
459 * CloseAllConnectionsAfterTransaction sets the forceClose flag of all the
460 * connections. This is mainly done when citus.node_conninfo changes.
461 */
462 void
CloseAllConnectionsAfterTransaction(void)463 CloseAllConnectionsAfterTransaction(void)
464 {
465 if (ConnectionHash == NULL)
466 {
467 return;
468 }
469 HASH_SEQ_STATUS status;
470 ConnectionHashEntry *entry;
471
472 hash_seq_init(&status, ConnectionHash);
473 while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
474 {
475 if (!entry->isValid)
476 {
477 /* skip invalid connection hash entries */
478 continue;
479 }
480
481 dlist_iter iter;
482
483 dlist_head *connections = entry->connections;
484 dlist_foreach(iter, connections)
485 {
486 MultiConnection *connection =
487 dlist_container(MultiConnection, connectionNode, iter.cur);
488
489 connection->forceCloseAtTransactionEnd = true;
490 }
491 }
492 }
493
494
495 /*
496 * ConnectionAvailableToNode returns a MultiConnection if the session has at least
497 * one connection established and avaliable to use to the give node. Else, returns
498 * false.
499 */
500 MultiConnection *
ConnectionAvailableToNode(char * hostName,int nodePort,const char * userName,const char * database)501 ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
502 const char *database)
503 {
504 ConnectionHashKey key;
505 bool found = false;
506
507 strlcpy(key.hostname, hostName, MAX_NODE_LENGTH);
508 key.port = nodePort;
509 strlcpy(key.user, userName, NAMEDATALEN);
510 strlcpy(key.database, database, NAMEDATALEN);
511
512 ConnectionHashEntry *entry =
513 (ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found);
514
515 if (!found || !entry->isValid)
516 {
517 return false;
518 }
519
520 int flags = 0;
521 MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
522
523 return connection;
524 }
525
526
527 /*
528 * CloseNodeConnectionsAfterTransaction sets the forceClose flag of the connections
529 * to a particular node as true such that the connections are no longer cached. This
530 * is mainly used when a worker leaves the cluster.
531 */
532 void
CloseNodeConnectionsAfterTransaction(char * nodeName,int nodePort)533 CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
534 {
535 HASH_SEQ_STATUS status;
536 ConnectionHashEntry *entry;
537
538 hash_seq_init(&status, ConnectionHash);
539 while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
540 {
541 if (!entry->isValid)
542 {
543 /* skip invalid connection hash entries */
544 continue;
545 }
546
547 dlist_iter iter;
548
549 if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort)
550 {
551 continue;
552 }
553
554 dlist_head *connections = entry->connections;
555 dlist_foreach(iter, connections)
556 {
557 MultiConnection *connection =
558 dlist_container(MultiConnection, connectionNode, iter.cur);
559
560 connection->forceCloseAtTransactionEnd = true;
561 }
562 }
563 }
564
565
566 /*
567 * Close a previously established connection.
568 */
569 void
CloseConnection(MultiConnection * connection)570 CloseConnection(MultiConnection *connection)
571 {
572 ConnectionHashKey key;
573 bool found;
574
575 /* close connection */
576 CitusPQFinish(connection);
577
578 strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
579 key.port = connection->port;
580 strlcpy(key.user, connection->user, NAMEDATALEN);
581 strlcpy(key.database, connection->database, NAMEDATALEN);
582
583 hash_search(ConnectionHash, &key, HASH_FIND, &found);
584
585 if (found)
586 {
587 /* unlink from list of open connections */
588 dlist_delete(&connection->connectionNode);
589
590 /* same for transaction state and shard/placement machinery */
591 CloseRemoteTransaction(connection);
592 CloseShardPlacementAssociation(connection);
593
594 /* we leave the per-host entry alive */
595 pfree(connection);
596 }
597 else
598 {
599 ereport(ERROR, (errmsg("closing untracked connection")));
600 }
601 }
602
603
604 /*
605 * ShutdownAllConnections shutdowns all the MultiConnections in the
606 * ConnectionHash.
607 *
608 * This function is intended to be called atexit() of the backend, so
609 * that the cached connections are closed properly. Calling this function
610 * at another point in the code could be dangerous, so think twice if you
611 * need to call this function.
612 */
613 void
ShutdownAllConnections(void)614 ShutdownAllConnections(void)
615 {
616 ConnectionHashEntry *entry = NULL;
617 HASH_SEQ_STATUS status;
618
619 hash_seq_init(&status, ConnectionHash);
620 while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != NULL)
621 {
622 if (!entry->isValid)
623 {
624 /* skip invalid connection hash entries */
625 continue;
626 }
627
628 dlist_iter iter;
629
630 dlist_foreach(iter, entry->connections)
631 {
632 MultiConnection *connection =
633 dlist_container(MultiConnection, connectionNode, iter.cur);
634
635 ShutdownConnection(connection);
636 }
637 }
638 }
639
640
641 /*
642 * ShutdownConnection, if necessary cancels the currently running statement,
643 * and then closes the underlying libpq connection. The MultiConnection
644 * itself is left intact.
645 *
646 * NB: Cancelling a statement requires network IO, and currently is not
647 * interruptible. Unfortunately libpq does not provide a non-blocking
648 * implementation of PQcancel(), so we don't have much choice for now.
649 */
650 void
ShutdownConnection(MultiConnection * connection)651 ShutdownConnection(MultiConnection *connection)
652 {
653 /*
654 * Only cancel statement if there's currently one running, and the
655 * connection is in an OK state.
656 */
657 if (PQstatus(connection->pgConn) == CONNECTION_OK &&
658 PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE)
659 {
660 SendCancelationRequest(connection);
661 }
662 CitusPQFinish(connection);
663 }
664
665
666 /*
667 * MultiConnectionStatePoll executes a PQconnectPoll on the connection to progres the
668 * connection establishment. The return value of this function indicates if the
669 * MultiConnectionPollState has been changed, which could require a change to the WaitEventSet
670 */
671 static bool
MultiConnectionStatePoll(MultiConnectionPollState * connectionState)672 MultiConnectionStatePoll(MultiConnectionPollState *connectionState)
673 {
674 MultiConnection *connection = connectionState->connection;
675 ConnStatusType status = PQstatus(connection->pgConn);
676 PostgresPollingStatusType oldPollmode = connectionState->pollmode;
677
678 Assert(connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTING);
679
680 if (status == CONNECTION_OK)
681 {
682 connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTED;
683 return true;
684 }
685 else if (status == CONNECTION_BAD)
686 {
687 /* FIXME: retries? */
688 connectionState->phase = MULTI_CONNECTION_PHASE_ERROR;
689 return true;
690 }
691 else
692 {
693 connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTING;
694 }
695
696 connectionState->pollmode = PQconnectPoll(connection->pgConn);
697
698 /*
699 * FIXME: Do we want to add transparent retry support here?
700 */
701 if (connectionState->pollmode == PGRES_POLLING_FAILED)
702 {
703 connectionState->phase = MULTI_CONNECTION_PHASE_ERROR;
704 return true;
705 }
706 else if (connectionState->pollmode == PGRES_POLLING_OK)
707 {
708 connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTED;
709 return true;
710 }
711 else
712 {
713 Assert(connectionState->pollmode == PGRES_POLLING_WRITING ||
714 connectionState->pollmode == PGRES_POLLING_READING);
715 }
716
717 return (oldPollmode != connectionState->pollmode) ? true : false;
718 }
719
720
721 /*
722 * EventSetSizeForConnectionList calculates the space needed for a WaitEventSet based on a
723 * list of connections.
724 */
725 inline static int
EventSetSizeForConnectionList(List * connections)726 EventSetSizeForConnectionList(List *connections)
727 {
728 /* we need space for 2 postgres events in the waitset on top of the connections */
729 return list_length(connections) + 2;
730 }
731
732
733 /*
734 * WaitEventSetFromMultiConnectionStates takes a list of MultiConnectionStates and adds
735 * all sockets of the connections that are still in the connecting phase to a WaitSet,
736 * taking into account the maximum number of connections that could be added in total to
737 * a WaitSet.
738 *
739 * waitCount populates the number of connections added to the WaitSet in case when a
740 * non-NULL pointer is provided.
741 */
742 static WaitEventSet *
WaitEventSetFromMultiConnectionStates(List * connections,int * waitCount)743 WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
744 {
745 const int eventSetSize = EventSetSizeForConnectionList(connections);
746 int numEventsAdded = 0;
747
748 if (waitCount)
749 {
750 *waitCount = 0;
751 }
752
753 WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
754 EnsureReleaseResource((MemoryContextCallbackFunction) (&FreeWaitEventSet),
755 waitEventSet);
756
757 /*
758 * Put the wait events for the signal latch and postmaster death at the end such that
759 * event index + pendingConnectionsStartIndex = the connection index in the array.
760 */
761 AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
762 AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
763 numEventsAdded += 2;
764
765 MultiConnectionPollState *connectionState = NULL;
766 foreach_ptr(connectionState, connections)
767 {
768 if (numEventsAdded >= eventSetSize)
769 {
770 /* room for events to schedule is exhausted */
771 break;
772 }
773
774 if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING)
775 {
776 /* connections that are not connecting will not be added to the WaitSet */
777 continue;
778 }
779
780 int sock = PQsocket(connectionState->connection->pgConn);
781
782 int eventMask = MultiConnectionStateEventMask(connectionState);
783
784 AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, connectionState);
785 numEventsAdded++;
786
787 if (waitCount)
788 {
789 *waitCount = *waitCount + 1;
790 }
791 }
792
793 return waitEventSet;
794 }
795
796
797 /*
798 * MultiConnectionStateEventMask returns the eventMask use by the WaitEventSet for the
799 * for the socket associated with the connection based on the pollmode PQconnectPoll
800 * returned in its last invocation
801 */
802 static uint32
MultiConnectionStateEventMask(MultiConnectionPollState * connectionState)803 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState)
804 {
805 uint32 eventMask = 0;
806 if (connectionState->pollmode == PGRES_POLLING_READING)
807 {
808 eventMask |= WL_SOCKET_READABLE;
809 }
810 else
811 {
812 eventMask |= WL_SOCKET_WRITEABLE;
813 }
814 return eventMask;
815 }
816
817
818 /*
819 * FinishConnectionListEstablishment takes a list of MultiConnection and finishes the
820 * connections establishment asynchronously for all connections not already fully
821 * connected.
822 */
823 void
FinishConnectionListEstablishment(List * multiConnectionList)824 FinishConnectionListEstablishment(List *multiConnectionList)
825 {
826 instr_time connectionStart;
827 INSTR_TIME_SET_CURRENT(connectionStart);
828
829 List *connectionStates = NULL;
830
831 WaitEventSet *waitEventSet = NULL;
832 bool waitEventSetRebuild = true;
833 int waitCount = 0;
834
835 MultiConnection *connection = NULL;
836 foreach_ptr(connection, multiConnectionList)
837 {
838 MultiConnectionPollState *connectionState =
839 palloc0(sizeof(MultiConnectionPollState));
840
841 connectionState->connection = connection;
842
843 /*
844 * before we can build the waitset to wait for asynchronous IO we need to know the
845 * pollmode to use for the sockets. This is populated by executing one round of
846 * PQconnectPoll. This updates the MultiConnectionPollState struct with its phase and
847 * its next poll mode.
848 */
849 MultiConnectionStatePoll(connectionState);
850
851 connectionStates = lappend(connectionStates, connectionState);
852 if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTING)
853 {
854 waitCount++;
855 }
856 }
857
858 /* prepare space for socket events */
859 WaitEvent *events = (WaitEvent *) palloc0(EventSetSizeForConnectionList(
860 connectionStates) *
861 sizeof(WaitEvent));
862
863 /*
864 * for high connection counts with lots of round trips we could potentially have a lot
865 * of (big) waitsets that we'd like to clean right after we have used them. To do this
866 * we switch to a temporary memory context for this loop which gets reset at the end
867 */
868 MemoryContext oldContext = MemoryContextSwitchTo(
869 AllocSetContextCreate(CurrentMemoryContext,
870 "connection establishment temporary context",
871 ALLOCSET_DEFAULT_SIZES));
872 while (waitCount > 0)
873 {
874 long timeout = MillisecondsToTimeout(connectionStart, NodeConnectionTimeout);
875
876 if (waitEventSetRebuild)
877 {
878 MemoryContextReset(CurrentMemoryContext);
879 waitEventSet = WaitEventSetFromMultiConnectionStates(connectionStates,
880 &waitCount);
881 waitEventSetRebuild = false;
882
883 if (waitCount <= 0)
884 {
885 break;
886 }
887 }
888
889 int eventCount = WaitEventSetWait(waitEventSet, timeout, events, waitCount,
890 WAIT_EVENT_CLIENT_READ);
891
892 for (int eventIndex = 0; eventIndex < eventCount; eventIndex++)
893 {
894 WaitEvent *event = &events[eventIndex];
895 MultiConnectionPollState *connectionState =
896 (MultiConnectionPollState *) event->user_data;
897
898 if (event->events & WL_POSTMASTER_DEATH)
899 {
900 ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
901 }
902
903 if (event->events & WL_LATCH_SET)
904 {
905 ResetLatch(MyLatch);
906
907 CHECK_FOR_INTERRUPTS();
908
909 if (IsHoldOffCancellationReceived())
910 {
911 /*
912 * because we can't break from 2 loops easily we need to not forget to
913 * reset the memory context
914 */
915 MemoryContextDelete(MemoryContextSwitchTo(oldContext));
916 return;
917 }
918
919 continue;
920 }
921
922 bool connectionStateChanged = MultiConnectionStatePoll(connectionState);
923 if (connectionStateChanged)
924 {
925 if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING)
926 {
927 /* we cannot stop waiting for connection, so rebuild the event set */
928 waitEventSetRebuild = true;
929 }
930 else
931 {
932 /* connection state changed, reset the event mask */
933 uint32 eventMask = MultiConnectionStateEventMask(connectionState);
934 ModifyWaitEvent(waitEventSet, event->pos, eventMask, NULL);
935 }
936
937 /*
938 * The state has changed to connected, update the connection's
939 * state as well.
940 */
941 if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED)
942 {
943 MarkConnectionConnected(connectionState->connection);
944 }
945 }
946 }
947
948 if (eventCount == 0)
949 {
950 /*
951 * timeout has occured on waitset, double check the timeout since
952 * connectionStart and if passed close all non-finished connections
953 */
954
955 if (MillisecondsPassedSince(connectionStart) >= NodeConnectionTimeout)
956 {
957 /*
958 * showing as a warning, can't be an error. In some cases queries can
959 * proceed with only some of the connections being fully established.
960 * Queries that can't will error then and there
961 */
962 ereport(WARNING, (errmsg("could not establish connection after %u ms",
963 NodeConnectionTimeout)));
964
965 /*
966 * Close all connections that have not been fully established.
967 */
968 CloseNotReadyMultiConnectionStates(connectionStates);
969
970 break;
971 }
972 }
973 }
974 MemoryContextDelete(MemoryContextSwitchTo(oldContext));
975 }
976
977
978 /*
979 * MillisecondsPassedSince returns the number of milliseconds elapsed between an
980 * instr_time & the current time.
981 */
982 double
MillisecondsPassedSince(instr_time moment)983 MillisecondsPassedSince(instr_time moment)
984 {
985 instr_time timeSinceMoment;
986 INSTR_TIME_SET_CURRENT(timeSinceMoment);
987 INSTR_TIME_SUBTRACT(timeSinceMoment, moment);
988 return INSTR_TIME_GET_MILLISEC(timeSinceMoment);
989 }
990
991
992 /*
993 * MillisecondsToTimeout returns the numer of milliseconds that still need to elapse
994 * before msAfterStart milliseconds have passed since start. The outcome can be used to
995 * pass to the Wait of an EventSet to make sure it returns after the timeout has passed.
996 */
997 long
MillisecondsToTimeout(instr_time start,long msAfterStart)998 MillisecondsToTimeout(instr_time start, long msAfterStart)
999 {
1000 return msAfterStart - MillisecondsPassedSince(start);
1001 }
1002
1003
1004 /*
1005 * CloseNotReadyMultiConnectionStates calls CloseConnection for all MultiConnection's
1006 * tracked in the MultiConnectionPollState list passed in, only if the connection is not yet
1007 * fully established.
1008 *
1009 * This function removes the pointer to the MultiConnection data after the Connections are
1010 * closed since they should not be used anymore.
1011 */
1012 static void
CloseNotReadyMultiConnectionStates(List * connectionStates)1013 CloseNotReadyMultiConnectionStates(List *connectionStates)
1014 {
1015 MultiConnectionPollState *connectionState = NULL;
1016 foreach_ptr(connectionState, connectionStates)
1017 {
1018 MultiConnection *connection = connectionState->connection;
1019
1020 if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING)
1021 {
1022 continue;
1023 }
1024
1025 /* close connection, otherwise we take up resource on the other side */
1026 CitusPQFinish(connection);
1027 }
1028 }
1029
1030
1031 /*
1032 * CitusPQFinish is a wrapper around PQfinish and does book keeping on shared connection
1033 * counters.
1034 */
1035 static void
CitusPQFinish(MultiConnection * connection)1036 CitusPQFinish(MultiConnection *connection)
1037 {
1038 if (connection->pgConn != NULL)
1039 {
1040 PQfinish(connection->pgConn);
1041 connection->pgConn = NULL;
1042 }
1043
1044 /* behave idempotently, there is no gurantee that CitusPQFinish() is called once */
1045 if (connection->initilizationState >= POOL_STATE_COUNTER_INCREMENTED)
1046 {
1047 DecrementSharedConnectionCounter(connection->hostname, connection->port);
1048 connection->initilizationState = POOL_STATE_NOT_INITIALIZED;
1049 }
1050 }
1051
1052
1053 /*
1054 * Close connections on timeout in FinishConnectionListEstablishment
1055 * Synchronously finish connection establishment of an individual connection.
1056 * This function is a convenience wrapped around FinishConnectionListEstablishment.
1057 */
1058 void
FinishConnectionEstablishment(MultiConnection * connection)1059 FinishConnectionEstablishment(MultiConnection *connection)
1060 {
1061 FinishConnectionListEstablishment(list_make1(connection));
1062 }
1063
1064
1065 /*
1066 * ClaimConnectionExclusively signals that this connection is actively being
1067 * used. That means it'll not be, again, returned by
1068 * StartNodeUserDatabaseConnection() et al until releases with
1069 * UnclaimConnection().
1070 */
1071 void
ClaimConnectionExclusively(MultiConnection * connection)1072 ClaimConnectionExclusively(MultiConnection *connection)
1073 {
1074 Assert(!connection->claimedExclusively);
1075 connection->claimedExclusively = true;
1076 }
1077
1078
1079 /*
1080 * UnclaimConnection signals that this connection is not being used
1081 * anymore. That means it again may be returned by
1082 * StartNodeUserDatabaseConnection() et al.
1083 */
1084 void
UnclaimConnection(MultiConnection * connection)1085 UnclaimConnection(MultiConnection *connection)
1086 {
1087 connection->claimedExclusively = false;
1088 }
1089
1090
1091 static uint32
ConnectionHashHash(const void * key,Size keysize)1092 ConnectionHashHash(const void *key, Size keysize)
1093 {
1094 ConnectionHashKey *entry = (ConnectionHashKey *) key;
1095
1096 uint32 hash = string_hash(entry->hostname, NAMEDATALEN);
1097 hash = hash_combine(hash, hash_uint32(entry->port));
1098 hash = hash_combine(hash, string_hash(entry->user, NAMEDATALEN));
1099 hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN));
1100
1101 return hash;
1102 }
1103
1104
1105 static int
ConnectionHashCompare(const void * a,const void * b,Size keysize)1106 ConnectionHashCompare(const void *a, const void *b, Size keysize)
1107 {
1108 ConnectionHashKey *ca = (ConnectionHashKey *) a;
1109 ConnectionHashKey *cb = (ConnectionHashKey *) b;
1110
1111 if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 ||
1112 ca->port != cb->port ||
1113 strncmp(ca->user, cb->user, NAMEDATALEN) != 0 ||
1114 strncmp(ca->database, cb->database, NAMEDATALEN) != 0)
1115 {
1116 return 1;
1117 }
1118 else
1119 {
1120 return 0;
1121 }
1122 }
1123
1124
1125 /*
1126 * Asynchronously establish connection to a remote node, but don't wait for
1127 * that to finish. DNS lookups etc. are performed synchronously though.
1128 */
1129 static void
StartConnectionEstablishment(MultiConnection * connection,ConnectionHashKey * key)1130 StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key)
1131 {
1132 static uint64 connectionId = 1;
1133
1134 ConnParamsHashEntry *entry = FindOrCreateConnParamsEntry(key);
1135
1136 strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
1137 connection->port = key->port;
1138 strlcpy(connection->database, key->database, NAMEDATALEN);
1139 strlcpy(connection->user, key->user, NAMEDATALEN);
1140
1141 connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
1142 (const char **) entry->values,
1143 false);
1144 INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentStart);
1145 connection->connectionId = connectionId++;
1146
1147 /*
1148 * To avoid issues with interrupts not getting caught all our connections
1149 * are managed in a non-blocking manner. remote_commands.c provides
1150 * wrappers emulating blocking behaviour.
1151 */
1152 PQsetnonblocking(connection->pgConn, true);
1153
1154 SetCitusNoticeReceiver(connection);
1155 }
1156
1157
1158 /*
1159 * WarmUpConnParamsHash warms up the ConnParamsHash by loading all the
1160 * conn params for active primary nodes.
1161 */
1162 void
WarmUpConnParamsHash(void)1163 WarmUpConnParamsHash(void)
1164 {
1165 List *workerNodeList = ActivePrimaryNodeList(AccessShareLock);
1166 WorkerNode *workerNode = NULL;
1167 foreach_ptr(workerNode, workerNodeList)
1168 {
1169 ConnectionHashKey key;
1170 strlcpy(key.hostname, workerNode->workerName, MAX_NODE_LENGTH);
1171 key.port = workerNode->workerPort;
1172 strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN);
1173 strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
1174 FindOrCreateConnParamsEntry(&key);
1175 }
1176 }
1177
1178
1179 /*
1180 * FindOrCreateConnParamsEntry searches ConnParamsHash for the given key,
1181 * if it is not found, it is created.
1182 */
1183 static ConnParamsHashEntry *
FindOrCreateConnParamsEntry(ConnectionHashKey * key)1184 FindOrCreateConnParamsEntry(ConnectionHashKey *key)
1185 {
1186 bool found = false;
1187
1188 /* search our cache for precomputed connection settings */
1189 ConnParamsHashEntry *entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
1190 if (!found || !entry->isValid)
1191 {
1192 if (!found)
1193 {
1194 /*
1195 * Zero out entry, but not the key part.
1196 * Avoids leaving invalid pointers in hash table if GetConnParam throws with MemoryContextAllocZero.
1197 */
1198 memset(((char *) entry) + sizeof(ConnectionHashKey), 0,
1199 sizeof(ConnParamsHashEntry) - sizeof(ConnectionHashKey));
1200 }
1201
1202 /* avoid leaking memory in the keys and values arrays */
1203 if (found && !entry->isValid)
1204 {
1205 FreeConnParamsHashEntryFields(entry);
1206 }
1207
1208 /* if not found or not valid, compute them from GUC, runtime, etc. */
1209 GetConnParams(key, &entry->keywords, &entry->values, &entry->runtimeParamStart,
1210 ConnectionContext);
1211
1212 entry->isValid = true;
1213 }
1214
1215 return entry;
1216 }
1217
1218
1219 /*
1220 * FreeConnParamsHashEntryFields frees any dynamically allocated memory reachable
1221 * from the fields of the provided ConnParamsHashEntry. This includes all runtime
1222 * libpq keywords and values, as well as the actual arrays storing them.
1223 */
1224 static void
FreeConnParamsHashEntryFields(ConnParamsHashEntry * entry)1225 FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry)
1226 {
1227 /*
1228 * if there was a memory error during the initialization of ConnParamHashEntry in
1229 * GetConnParams the keywords or values might not have been initialized completely.
1230 * We check if they have been initialized before freeing them.
1231 *
1232 * We only iteratively free the lists starting at the index pointed to by
1233 * entry->runtimeParamStart as all entries before are settings that are managed
1234 * separately.
1235 */
1236
1237 if (entry->keywords != NULL)
1238 {
1239 char **keyword = &entry->keywords[entry->runtimeParamStart];
1240 while (*keyword != NULL)
1241 {
1242 pfree(*keyword);
1243 keyword++;
1244 }
1245 pfree(entry->keywords);
1246 entry->keywords = NULL;
1247 }
1248
1249 if (entry->values != NULL)
1250 {
1251 char **value = &entry->values[entry->runtimeParamStart];
1252 while (*value != NULL)
1253 {
1254 pfree(*value);
1255 value++;
1256 }
1257 pfree(entry->values);
1258 entry->values = NULL;
1259 }
1260
1261 entry->runtimeParamStart = 0;
1262 }
1263
1264
1265 /*
1266 * AfterXactHostConnectionHandling closes all remote connections if not necessary anymore (i.e. not session
1267 * lifetime), or if in a failed state.
1268 */
1269 static void
AfterXactHostConnectionHandling(ConnectionHashEntry * entry,bool isCommit)1270 AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
1271 {
1272 if (!entry || !entry->isValid)
1273 {
1274 /* callers only pass valid hash entries but let's be on the safe side */
1275 ereport(ERROR, (errmsg("connection hash entry is NULL or invalid")));
1276 }
1277
1278 dlist_mutable_iter iter;
1279 int cachedConnectionCount = 0;
1280
1281 dlist_foreach_modify(iter, entry->connections)
1282 {
1283 MultiConnection *connection =
1284 dlist_container(MultiConnection, connectionNode, iter.cur);
1285
1286 /*
1287 * To avoid leaking connections we warn if connections are
1288 * still claimed exclusively. We can only do so if the transaction is
1289 * committed, as it's normal that code didn't have chance to clean
1290 * up after errors.
1291 */
1292 if (isCommit && connection->claimedExclusively)
1293 {
1294 ereport(WARNING,
1295 (errmsg("connection claimed exclusively at transaction commit")));
1296 }
1297
1298
1299 if (ShouldShutdownConnection(connection, cachedConnectionCount))
1300 {
1301 ShutdownConnection(connection);
1302
1303 /* unlink from list */
1304 dlist_delete(iter.cur);
1305
1306 pfree(connection);
1307 }
1308 else
1309 {
1310 /*
1311 * reset healthy session lifespan connections.
1312 */
1313 ResetConnection(connection);
1314
1315 cachedConnectionCount++;
1316 }
1317 }
1318 }
1319
1320
1321 /*
1322 * ShouldShutdownConnection returns true if either one of the followings is true:
1323 * - The connection is citus initiated.
1324 * - Current cached connections is already at MaxCachedConnectionsPerWorker
1325 * - Connection is forced to close at the end of transaction
1326 * - Connection is not in OK state
1327 * - A transaction is still in progress (usually because we are cancelling a distributed transaction)
1328 * - A connection reached its maximum lifetime
1329 */
1330 static bool
ShouldShutdownConnection(MultiConnection * connection,const int cachedConnectionCount)1331 ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount)
1332 {
1333 /*
1334 * When we are in a backend that was created to serve an internal connection
1335 * from the coordinator or another worker, we disable connection caching to avoid
1336 * escalating the number of cached connections. We can recognize such backends
1337 * from their application name.
1338 */
1339 return IsCitusInitiatedRemoteBackend() ||
1340 connection->initilizationState != POOL_STATE_INITIALIZED ||
1341 cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
1342 connection->forceCloseAtTransactionEnd ||
1343 PQstatus(connection->pgConn) != CONNECTION_OK ||
1344 !RemoteTransactionIdle(connection) ||
1345 (MaxCachedConnectionLifetime >= 0 &&
1346 MillisecondsToTimeout(connection->connectionEstablishmentStart,
1347 MaxCachedConnectionLifetime) <= 0);
1348 }
1349
1350
1351 /*
1352 * IsCitusInitiatedRemoteBackend returns true if we are in a backend that citus
1353 * initiated via remote connection.
1354 */
1355 bool
IsCitusInitiatedRemoteBackend(void)1356 IsCitusInitiatedRemoteBackend(void)
1357 {
1358 return application_name && strcmp(application_name, CITUS_APPLICATION_NAME) == 0;
1359 }
1360
1361
1362 /*
1363 * ResetConnection preserves the given connection for later usage by
1364 * resetting its states.
1365 */
1366 static void
ResetConnection(MultiConnection * connection)1367 ResetConnection(MultiConnection *connection)
1368 {
1369 /* reset per-transaction state */
1370 ResetRemoteTransaction(connection);
1371 ResetShardPlacementAssociation(connection);
1372
1373 /* reset copy state */
1374 connection->copyBytesWrittenSinceLastFlush = 0;
1375
1376 UnclaimConnection(connection);
1377 }
1378
1379
1380 /*
1381 * RemoteTransactionIdle function returns true if we manually
1382 * set flag on run_commands_on_session_level_connection_to_node to true to
1383 * force connection API keeping connection open or the status of the connection
1384 * is idle.
1385 */
1386 static bool
RemoteTransactionIdle(MultiConnection * connection)1387 RemoteTransactionIdle(MultiConnection *connection)
1388 {
1389 /*
1390 * This is a very special case where we're running isolation tests on MX.
1391 * We don't care whether the transaction is idle or not when we're
1392 * running MX isolation tests. Thus, let the caller act as if the remote
1393 * transactions is idle.
1394 */
1395 if (AllowNonIdleTransactionOnXactHandling())
1396 {
1397 return true;
1398 }
1399
1400 return PQtransactionStatus(connection->pgConn) == PQTRANS_IDLE;
1401 }
1402
1403
1404 /*
1405 * MarkConnectionConnected is a helper function which sets the connection
1406 * connectionState to MULTI_CONNECTION_CONNECTED, and also updates connection
1407 * establishment time when necessary.
1408 */
1409 void
MarkConnectionConnected(MultiConnection * connection)1410 MarkConnectionConnected(MultiConnection *connection)
1411 {
1412 connection->connectionState = MULTI_CONNECTION_CONNECTED;
1413
1414 if (INSTR_TIME_IS_ZERO(connection->connectionEstablishmentEnd))
1415 {
1416 INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd);
1417 }
1418 }
1419