1 /*-------------------------------------------------------------------------
2  *
3  * connection_management.c
4  *   Central management of connections and their life-cycle
5  *
6  * Copyright (c) Citus Data, Inc.
7  *
8  *-------------------------------------------------------------------------
9  */
10 
11 #include "postgres.h"
12 #include "pgstat.h"
13 
14 #include "libpq-fe.h"
15 
16 #include "miscadmin.h"
17 
18 #include "safe_lib.h"
19 
20 #include "access/hash.h"
21 #include "commands/dbcommands.h"
22 #include "distributed/connection_management.h"
23 #include "distributed/errormessage.h"
24 #include "distributed/error_codes.h"
25 #include "distributed/listutils.h"
26 #include "distributed/log_utils.h"
27 #include "distributed/memutils.h"
28 #include "distributed/metadata_cache.h"
29 #include "distributed/hash_helpers.h"
30 #include "distributed/placement_connection.h"
31 #include "distributed/run_from_same_connection.h"
32 #include "distributed/shared_connection_stats.h"
33 #include "distributed/cancel_utils.h"
34 #include "distributed/remote_commands.h"
35 #include "distributed/time_constants.h"
36 #include "distributed/version_compat.h"
37 #include "distributed/worker_log_messages.h"
38 #include "mb/pg_wchar.h"
39 #include "portability/instr_time.h"
40 #include "storage/ipc.h"
41 #include "utils/hsearch.h"
42 #include "utils/memutils.h"
43 
44 
45 int NodeConnectionTimeout = 30000;
46 int MaxCachedConnectionsPerWorker = 1;
47 int MaxCachedConnectionLifetime = 10 * MS_PER_MINUTE;
48 
49 HTAB *ConnectionHash = NULL;
50 HTAB *ConnParamsHash = NULL;
51 
52 MemoryContext ConnectionContext = NULL;
53 
54 static uint32 ConnectionHashHash(const void *key, Size keysize);
55 static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
56 static void StartConnectionEstablishment(MultiConnection *connectionn,
57 										 ConnectionHashKey *key);
58 static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
59 static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
60 static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
61 static bool ShouldShutdownConnection(MultiConnection *connection, const int
62 									 cachedConnectionCount);
63 static void ResetConnection(MultiConnection *connection);
64 static bool RemoteTransactionIdle(MultiConnection *connection);
65 static int EventSetSizeForConnectionList(List *connections);
66 
67 
68 /* types for async connection management */
69 enum MultiConnectionPhase
70 {
71 	MULTI_CONNECTION_PHASE_CONNECTING,
72 	MULTI_CONNECTION_PHASE_CONNECTED,
73 	MULTI_CONNECTION_PHASE_ERROR,
74 };
75 typedef struct MultiConnectionPollState
76 {
77 	enum MultiConnectionPhase phase;
78 	MultiConnection *connection;
79 	PostgresPollingStatusType pollmode;
80 } MultiConnectionPollState;
81 
82 
83 /* helper functions for async connection management */
84 static bool MultiConnectionStatePoll(MultiConnectionPollState *connectionState);
85 static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections,
86 															int *waitCount);
87 static void CloseNotReadyMultiConnectionStates(List *connectionStates);
88 static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState);
89 static void CitusPQFinish(MultiConnection *connection);
90 static ConnParamsHashEntry * FindOrCreateConnParamsEntry(ConnectionHashKey *key);
91 
92 /*
93  * Initialize per-backend connection management infrastructure.
94  */
95 void
InitializeConnectionManagement(void)96 InitializeConnectionManagement(void)
97 {
98 	HASHCTL info, connParamsInfo;
99 
100 	/*
101 	 * Create a single context for connection and transaction related memory
102 	 * management. Doing so, instead of allocating in TopMemoryContext, makes
103 	 * it easier to associate used memory.
104 	 */
105 	ConnectionContext = AllocSetContextCreateExtended(TopMemoryContext,
106 													  "Connection Context",
107 													  ALLOCSET_DEFAULT_MINSIZE,
108 													  ALLOCSET_DEFAULT_INITSIZE,
109 													  ALLOCSET_DEFAULT_MAXSIZE);
110 
111 	/* create (host,port,user,database) -> [connection] hash */
112 	memset(&info, 0, sizeof(info));
113 	info.keysize = sizeof(ConnectionHashKey);
114 	info.entrysize = sizeof(ConnectionHashEntry);
115 	info.hash = ConnectionHashHash;
116 	info.match = ConnectionHashCompare;
117 	info.hcxt = ConnectionContext;
118 	uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
119 
120 	/* connParamsInfo is same as info, except for entrysize */
121 	connParamsInfo = info;
122 	connParamsInfo.entrysize = sizeof(ConnParamsHashEntry);
123 
124 	ConnectionHash = hash_create("citus connection cache (host,port,user,database)",
125 								 64, &info, hashFlags);
126 
127 	ConnParamsHash = hash_create("citus connparams cache (host,port,user,database)",
128 								 64, &connParamsInfo, hashFlags);
129 }
130 
131 
132 /*
133  * InvalidateConnParamsHashEntries sets every hash entry's isValid flag to false.
134  */
135 void
InvalidateConnParamsHashEntries(void)136 InvalidateConnParamsHashEntries(void)
137 {
138 	if (ConnParamsHash != NULL)
139 	{
140 		ConnParamsHashEntry *entry = NULL;
141 		HASH_SEQ_STATUS status;
142 
143 		hash_seq_init(&status, ConnParamsHash);
144 		while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL)
145 		{
146 			entry->isValid = false;
147 		}
148 	}
149 }
150 
151 
152 /*
153  * AfterXactConnectionHandling performs connection management activity after the end of a transaction. Both
154  * COMMIT and ABORT paths are handled here.
155  *
156  * This is called by Citus' global transaction callback.
157  */
158 void
AfterXactConnectionHandling(bool isCommit)159 AfterXactConnectionHandling(bool isCommit)
160 {
161 	HASH_SEQ_STATUS status;
162 	ConnectionHashEntry *entry;
163 
164 	hash_seq_init(&status, ConnectionHash);
165 	while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
166 	{
167 		if (!entry->isValid)
168 		{
169 			/* skip invalid connection hash entries */
170 			continue;
171 		}
172 
173 		AfterXactHostConnectionHandling(entry, isCommit);
174 
175 		/*
176 		 * NB: We leave the hash entry in place, even if there's no individual
177 		 * connections in it anymore. There seems no benefit in deleting it,
178 		 * and it'll save a bit of work in the next transaction.
179 		 */
180 	}
181 }
182 
183 
184 /*
185  * GetNodeConnection() establishes a connection to remote node, using default
186  * user and database.
187  *
188  * See StartNodeUserDatabaseConnection for details.
189  */
190 MultiConnection *
GetNodeConnection(uint32 flags,const char * hostname,int32 port)191 GetNodeConnection(uint32 flags, const char *hostname, int32 port)
192 {
193 	return GetNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL);
194 }
195 
196 
197 /*
198  * StartNodeConnection initiates a connection to remote node, using default
199  * user and database.
200  *
201  * See StartNodeUserDatabaseConnection for details.
202  */
203 MultiConnection *
StartNodeConnection(uint32 flags,const char * hostname,int32 port)204 StartNodeConnection(uint32 flags, const char *hostname, int32 port)
205 {
206 	MultiConnection *connection = StartNodeUserDatabaseConnection(flags, hostname, port,
207 																  NULL, NULL);
208 
209 	/*
210 	 * connection can only be NULL for optional connections, which we don't
211 	 * support in this codepath.
212 	 */
213 	Assert((flags & OPTIONAL_CONNECTION) == 0);
214 	Assert(connection != NULL);
215 	return connection;
216 }
217 
218 
219 /*
220  * GetNodeUserDatabaseConnection establishes connection to remote node.
221  *
222  * See StartNodeUserDatabaseConnection for details.
223  */
224 MultiConnection *
GetNodeUserDatabaseConnection(uint32 flags,const char * hostname,int32 port,const char * user,const char * database)225 GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
226 							  const char *user, const char *database)
227 {
228 	MultiConnection *connection = StartNodeUserDatabaseConnection(flags, hostname, port,
229 																  user, database);
230 
231 	/*
232 	 * connection can only be NULL for optional connections, which we don't
233 	 * support in this codepath.
234 	 */
235 	Assert((flags & OPTIONAL_CONNECTION) == 0);
236 	Assert(connection != NULL);
237 
238 	FinishConnectionEstablishment(connection);
239 
240 	return connection;
241 }
242 
243 
244 /*
245  * StartNodeUserDatabaseConnection() initiates a connection to a remote node.
246  *
247  * If user or database are NULL, the current session's defaults are used. The
248  * following flags influence connection establishment behaviour:
249  * - FORCE_NEW_CONNECTION - a new connection is required
250  *
251  * The returned connection has only been initiated, not fully
252  * established. That's useful to allow parallel connection establishment. If
253  * that's not desired use the Get* variant.
254  */
255 MultiConnection *
StartNodeUserDatabaseConnection(uint32 flags,const char * hostname,int32 port,const char * user,const char * database)256 StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
257 								const char *user, const char *database)
258 {
259 	ConnectionHashKey key;
260 	bool found;
261 
262 	/* do some minimal input checks */
263 	if (strlen(hostname) > MAX_NODE_LENGTH)
264 	{
265 		ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
266 						errmsg("hostname exceeds the maximum length of %d",
267 							   MAX_NODE_LENGTH)));
268 	}
269 
270 	strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
271 
272 	key.port = port;
273 	if (user)
274 	{
275 		strlcpy(key.user, user, NAMEDATALEN);
276 	}
277 	else
278 	{
279 		strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
280 	}
281 	if (database)
282 	{
283 		strlcpy(key.database, database, NAMEDATALEN);
284 	}
285 	else
286 	{
287 		strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN);
288 	}
289 
290 	if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
291 	{
292 		CurrentCoordinatedTransactionState = COORD_TRANS_IDLE;
293 	}
294 
295 	/*
296 	 * Lookup relevant hash entry. We always enter. If only a cached
297 	 * connection is desired, and there's none, we'll simply leave the
298 	 * connection list empty.
299 	 */
300 
301 	ConnectionHashEntry *entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
302 	if (!found || !entry->isValid)
303 	{
304 		/*
305 		 * We are just building hash entry or previously it was left in an
306 		 * invalid state as we couldn't allocate memory for it.
307 		 * So initialize entry->connections list here.
308 		 */
309 		entry->isValid = false;
310 		entry->connections = MemoryContextAlloc(ConnectionContext,
311 												sizeof(dlist_head));
312 		dlist_init(entry->connections);
313 
314 		/*
315 		 * If MemoryContextAlloc errors out -e.g. during an OOM-, entry->connections
316 		 * stays as NULL. So entry->isValid should be set to true right after we
317 		 * initialize entry->connections properly.
318 		 */
319 		entry->isValid = true;
320 	}
321 
322 	/* if desired, check whether there's a usable connection */
323 	if (!(flags & FORCE_NEW_CONNECTION))
324 	{
325 		/* check connection cache for a connection that's not already in use */
326 		MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
327 		if (connection)
328 		{
329 			return connection;
330 		}
331 	}
332 
333 
334 	/*
335 	 * Either no caching desired, or no pre-established, non-claimed,
336 	 * connection present. Initiate connection establishment.
337 	 */
338 	MultiConnection *connection = MemoryContextAllocZero(ConnectionContext,
339 														 sizeof(MultiConnection));
340 	connection->initilizationState = POOL_STATE_NOT_INITIALIZED;
341 	dlist_push_tail(entry->connections, &connection->connectionNode);
342 
343 	/* these two flags are by nature cannot happen at the same time */
344 	Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION)));
345 
346 	if (flags & WAIT_FOR_CONNECTION)
347 	{
348 		WaitLoopForSharedConnection(hostname, port);
349 	}
350 	else if (flags & OPTIONAL_CONNECTION)
351 	{
352 		/*
353 		 * We can afford to skip establishing an optional connection. For
354 		 * non-optional connections, we first retry for some time. If we still
355 		 * cannot reserve the right to establish a connection, we prefer to
356 		 * error out.
357 		 */
358 		if (!TryToIncrementSharedConnectionCounter(hostname, port))
359 		{
360 			/* do not track the connection anymore */
361 			dlist_delete(&connection->connectionNode);
362 			pfree(connection);
363 
364 			return NULL;
365 		}
366 	}
367 	else
368 	{
369 		/*
370 		 * The caller doesn't want the connection manager to wait
371 		 * until a connection slot is available on the remote node.
372 		 * In the end, we might fail to establish connection to the
373 		 * remote node as it might not have any space in
374 		 * max_connections for this connection establishment.
375 		 *
376 		 * Still, we keep track of the connection counter.
377 		 */
378 		IncrementSharedConnectionCounter(hostname, port);
379 	}
380 
381 
382 	/*
383 	 * We've already incremented the counter above, so we should decrement
384 	 * when we're done with the connection.
385 	 */
386 	connection->initilizationState = POOL_STATE_COUNTER_INCREMENTED;
387 
388 	StartConnectionEstablishment(connection, &key);
389 
390 	ResetShardPlacementAssociation(connection);
391 
392 	/* fully initialized the connection, record it */
393 	connection->initilizationState = POOL_STATE_INITIALIZED;
394 
395 	return connection;
396 }
397 
398 
399 /*
400  * FindAvailableConnection searches the given list of connections for one that
401  * is not claimed exclusively.
402  *
403  * If no connection is available, FindAvailableConnection returns NULL.
404  */
405 static MultiConnection *
FindAvailableConnection(dlist_head * connections,uint32 flags)406 FindAvailableConnection(dlist_head *connections, uint32 flags)
407 {
408 	dlist_iter iter;
409 
410 	dlist_foreach(iter, connections)
411 	{
412 		MultiConnection *connection =
413 			dlist_container(MultiConnection, connectionNode, iter.cur);
414 
415 		if (flags & OUTSIDE_TRANSACTION)
416 		{
417 			/* don't return connections that are used in transactions */
418 			if (connection->remoteTransaction.transactionState !=
419 				REMOTE_TRANS_NOT_STARTED)
420 			{
421 				continue;
422 			}
423 		}
424 
425 		/* don't return claimed connections */
426 		if (connection->claimedExclusively)
427 		{
428 			/* connection is in use for an ongoing operation */
429 			continue;
430 		}
431 
432 		if (connection->forceCloseAtTransactionEnd)
433 		{
434 			/*
435 			 * This is a connection that should be closed, probabably because
436 			 * of old connection options. So we ignore it. It will
437 			 * automatically be closed at the end of the transaction.
438 			 */
439 			continue;
440 		}
441 
442 		if (connection->initilizationState != POOL_STATE_INITIALIZED)
443 		{
444 			/*
445 			 * If the connection has not been initialized, it should not be
446 			 * considered as available.
447 			 */
448 			continue;
449 		}
450 
451 		return connection;
452 	}
453 
454 	return NULL;
455 }
456 
457 
458 /*
459  * CloseAllConnectionsAfterTransaction sets the forceClose flag of all the
460  * connections. This is mainly done when citus.node_conninfo changes.
461  */
462 void
CloseAllConnectionsAfterTransaction(void)463 CloseAllConnectionsAfterTransaction(void)
464 {
465 	if (ConnectionHash == NULL)
466 	{
467 		return;
468 	}
469 	HASH_SEQ_STATUS status;
470 	ConnectionHashEntry *entry;
471 
472 	hash_seq_init(&status, ConnectionHash);
473 	while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
474 	{
475 		if (!entry->isValid)
476 		{
477 			/* skip invalid connection hash entries */
478 			continue;
479 		}
480 
481 		dlist_iter iter;
482 
483 		dlist_head *connections = entry->connections;
484 		dlist_foreach(iter, connections)
485 		{
486 			MultiConnection *connection =
487 				dlist_container(MultiConnection, connectionNode, iter.cur);
488 
489 			connection->forceCloseAtTransactionEnd = true;
490 		}
491 	}
492 }
493 
494 
495 /*
496  * ConnectionAvailableToNode returns a MultiConnection if the session has at least
497  * one connection established and avaliable to use to the give node. Else, returns
498  * false.
499  */
500 MultiConnection *
ConnectionAvailableToNode(char * hostName,int nodePort,const char * userName,const char * database)501 ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
502 						  const char *database)
503 {
504 	ConnectionHashKey key;
505 	bool found = false;
506 
507 	strlcpy(key.hostname, hostName, MAX_NODE_LENGTH);
508 	key.port = nodePort;
509 	strlcpy(key.user, userName, NAMEDATALEN);
510 	strlcpy(key.database, database, NAMEDATALEN);
511 
512 	ConnectionHashEntry *entry =
513 		(ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found);
514 
515 	if (!found || !entry->isValid)
516 	{
517 		return false;
518 	}
519 
520 	int flags = 0;
521 	MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
522 
523 	return connection;
524 }
525 
526 
527 /*
528  * CloseNodeConnectionsAfterTransaction sets the forceClose flag of the connections
529  * to a particular node as true such that the connections are no longer cached. This
530  * is mainly used when a worker leaves the cluster.
531  */
532 void
CloseNodeConnectionsAfterTransaction(char * nodeName,int nodePort)533 CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
534 {
535 	HASH_SEQ_STATUS status;
536 	ConnectionHashEntry *entry;
537 
538 	hash_seq_init(&status, ConnectionHash);
539 	while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
540 	{
541 		if (!entry->isValid)
542 		{
543 			/* skip invalid connection hash entries */
544 			continue;
545 		}
546 
547 		dlist_iter iter;
548 
549 		if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort)
550 		{
551 			continue;
552 		}
553 
554 		dlist_head *connections = entry->connections;
555 		dlist_foreach(iter, connections)
556 		{
557 			MultiConnection *connection =
558 				dlist_container(MultiConnection, connectionNode, iter.cur);
559 
560 			connection->forceCloseAtTransactionEnd = true;
561 		}
562 	}
563 }
564 
565 
566 /*
567  * Close a previously established connection.
568  */
569 void
CloseConnection(MultiConnection * connection)570 CloseConnection(MultiConnection *connection)
571 {
572 	ConnectionHashKey key;
573 	bool found;
574 
575 	/* close connection */
576 	CitusPQFinish(connection);
577 
578 	strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
579 	key.port = connection->port;
580 	strlcpy(key.user, connection->user, NAMEDATALEN);
581 	strlcpy(key.database, connection->database, NAMEDATALEN);
582 
583 	hash_search(ConnectionHash, &key, HASH_FIND, &found);
584 
585 	if (found)
586 	{
587 		/* unlink from list of open connections */
588 		dlist_delete(&connection->connectionNode);
589 
590 		/* same for transaction state and shard/placement machinery */
591 		CloseRemoteTransaction(connection);
592 		CloseShardPlacementAssociation(connection);
593 
594 		/* we leave the per-host entry alive */
595 		pfree(connection);
596 	}
597 	else
598 	{
599 		ereport(ERROR, (errmsg("closing untracked connection")));
600 	}
601 }
602 
603 
604 /*
605  * ShutdownAllConnections shutdowns all the MultiConnections in the
606  * ConnectionHash.
607  *
608  * This function is intended to be called atexit() of the backend, so
609  * that the cached connections are closed properly. Calling this function
610  * at another point in the code could be dangerous, so think twice if you
611  * need to call this function.
612  */
613 void
ShutdownAllConnections(void)614 ShutdownAllConnections(void)
615 {
616 	ConnectionHashEntry *entry = NULL;
617 	HASH_SEQ_STATUS status;
618 
619 	hash_seq_init(&status, ConnectionHash);
620 	while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != NULL)
621 	{
622 		if (!entry->isValid)
623 		{
624 			/* skip invalid connection hash entries */
625 			continue;
626 		}
627 
628 		dlist_iter iter;
629 
630 		dlist_foreach(iter, entry->connections)
631 		{
632 			MultiConnection *connection =
633 				dlist_container(MultiConnection, connectionNode, iter.cur);
634 
635 			ShutdownConnection(connection);
636 		}
637 	}
638 }
639 
640 
641 /*
642  * ShutdownConnection, if necessary cancels the currently running statement,
643  * and then closes the underlying libpq connection.  The MultiConnection
644  * itself is left intact.
645  *
646  * NB: Cancelling a statement requires network IO, and currently is not
647  * interruptible. Unfortunately libpq does not provide a non-blocking
648  * implementation of PQcancel(), so we don't have much choice for now.
649  */
650 void
ShutdownConnection(MultiConnection * connection)651 ShutdownConnection(MultiConnection *connection)
652 {
653 	/*
654 	 * Only cancel statement if there's currently one running, and the
655 	 * connection is in an OK state.
656 	 */
657 	if (PQstatus(connection->pgConn) == CONNECTION_OK &&
658 		PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE)
659 	{
660 		SendCancelationRequest(connection);
661 	}
662 	CitusPQFinish(connection);
663 }
664 
665 
666 /*
667  * MultiConnectionStatePoll executes a PQconnectPoll on the connection to progres the
668  * connection establishment. The return value of this function indicates if the
669  * MultiConnectionPollState has been changed, which could require a change to the WaitEventSet
670  */
671 static bool
MultiConnectionStatePoll(MultiConnectionPollState * connectionState)672 MultiConnectionStatePoll(MultiConnectionPollState *connectionState)
673 {
674 	MultiConnection *connection = connectionState->connection;
675 	ConnStatusType status = PQstatus(connection->pgConn);
676 	PostgresPollingStatusType oldPollmode = connectionState->pollmode;
677 
678 	Assert(connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTING);
679 
680 	if (status == CONNECTION_OK)
681 	{
682 		connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTED;
683 		return true;
684 	}
685 	else if (status == CONNECTION_BAD)
686 	{
687 		/* FIXME: retries? */
688 		connectionState->phase = MULTI_CONNECTION_PHASE_ERROR;
689 		return true;
690 	}
691 	else
692 	{
693 		connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTING;
694 	}
695 
696 	connectionState->pollmode = PQconnectPoll(connection->pgConn);
697 
698 	/*
699 	 * FIXME: Do we want to add transparent retry support here?
700 	 */
701 	if (connectionState->pollmode == PGRES_POLLING_FAILED)
702 	{
703 		connectionState->phase = MULTI_CONNECTION_PHASE_ERROR;
704 		return true;
705 	}
706 	else if (connectionState->pollmode == PGRES_POLLING_OK)
707 	{
708 		connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTED;
709 		return true;
710 	}
711 	else
712 	{
713 		Assert(connectionState->pollmode == PGRES_POLLING_WRITING ||
714 			   connectionState->pollmode == PGRES_POLLING_READING);
715 	}
716 
717 	return (oldPollmode != connectionState->pollmode) ? true : false;
718 }
719 
720 
721 /*
722  * EventSetSizeForConnectionList calculates the space needed for a WaitEventSet based on a
723  * list of connections.
724  */
725 inline static int
EventSetSizeForConnectionList(List * connections)726 EventSetSizeForConnectionList(List *connections)
727 {
728 	/* we need space for 2 postgres events in the waitset on top of the connections */
729 	return list_length(connections) + 2;
730 }
731 
732 
733 /*
734  * WaitEventSetFromMultiConnectionStates takes a list of MultiConnectionStates and adds
735  * all sockets of the connections that are still in the connecting phase to a WaitSet,
736  * taking into account the maximum number of connections that could be added in total to
737  * a WaitSet.
738  *
739  * waitCount populates the number of connections added to the WaitSet in case when a
740  * non-NULL pointer is provided.
741  */
742 static WaitEventSet *
WaitEventSetFromMultiConnectionStates(List * connections,int * waitCount)743 WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
744 {
745 	const int eventSetSize = EventSetSizeForConnectionList(connections);
746 	int numEventsAdded = 0;
747 
748 	if (waitCount)
749 	{
750 		*waitCount = 0;
751 	}
752 
753 	WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
754 	EnsureReleaseResource((MemoryContextCallbackFunction) (&FreeWaitEventSet),
755 						  waitEventSet);
756 
757 	/*
758 	 * Put the wait events for the signal latch and postmaster death at the end such that
759 	 * event index + pendingConnectionsStartIndex = the connection index in the array.
760 	 */
761 	AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
762 	AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
763 	numEventsAdded += 2;
764 
765 	MultiConnectionPollState *connectionState = NULL;
766 	foreach_ptr(connectionState, connections)
767 	{
768 		if (numEventsAdded >= eventSetSize)
769 		{
770 			/* room for events to schedule is exhausted */
771 			break;
772 		}
773 
774 		if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING)
775 		{
776 			/* connections that are not connecting will not be added to the WaitSet */
777 			continue;
778 		}
779 
780 		int sock = PQsocket(connectionState->connection->pgConn);
781 
782 		int eventMask = MultiConnectionStateEventMask(connectionState);
783 
784 		AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, connectionState);
785 		numEventsAdded++;
786 
787 		if (waitCount)
788 		{
789 			*waitCount = *waitCount + 1;
790 		}
791 	}
792 
793 	return waitEventSet;
794 }
795 
796 
797 /*
798  * MultiConnectionStateEventMask returns the eventMask use by the WaitEventSet for the
799  * for the socket associated with the connection based on the pollmode PQconnectPoll
800  * returned in its last invocation
801  */
802 static uint32
MultiConnectionStateEventMask(MultiConnectionPollState * connectionState)803 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState)
804 {
805 	uint32 eventMask = 0;
806 	if (connectionState->pollmode == PGRES_POLLING_READING)
807 	{
808 		eventMask |= WL_SOCKET_READABLE;
809 	}
810 	else
811 	{
812 		eventMask |= WL_SOCKET_WRITEABLE;
813 	}
814 	return eventMask;
815 }
816 
817 
818 /*
819  * FinishConnectionListEstablishment takes a list of MultiConnection and finishes the
820  * connections establishment asynchronously for all connections not already fully
821  * connected.
822  */
823 void
FinishConnectionListEstablishment(List * multiConnectionList)824 FinishConnectionListEstablishment(List *multiConnectionList)
825 {
826 	instr_time connectionStart;
827 	INSTR_TIME_SET_CURRENT(connectionStart);
828 
829 	List *connectionStates = NULL;
830 
831 	WaitEventSet *waitEventSet = NULL;
832 	bool waitEventSetRebuild = true;
833 	int waitCount = 0;
834 
835 	MultiConnection *connection = NULL;
836 	foreach_ptr(connection, multiConnectionList)
837 	{
838 		MultiConnectionPollState *connectionState =
839 			palloc0(sizeof(MultiConnectionPollState));
840 
841 		connectionState->connection = connection;
842 
843 		/*
844 		 * before we can build the waitset to wait for asynchronous IO we need to know the
845 		 * pollmode to use for the sockets. This is populated by executing one round of
846 		 * PQconnectPoll. This updates the MultiConnectionPollState struct with its phase and
847 		 * its next poll mode.
848 		 */
849 		MultiConnectionStatePoll(connectionState);
850 
851 		connectionStates = lappend(connectionStates, connectionState);
852 		if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTING)
853 		{
854 			waitCount++;
855 		}
856 	}
857 
858 	/* prepare space for socket events */
859 	WaitEvent *events = (WaitEvent *) palloc0(EventSetSizeForConnectionList(
860 												  connectionStates) *
861 											  sizeof(WaitEvent));
862 
863 	/*
864 	 * for high connection counts with lots of round trips we could potentially have a lot
865 	 * of (big) waitsets that we'd like to clean right after we have used them. To do this
866 	 * we switch to a temporary memory context for this loop which gets reset at the end
867 	 */
868 	MemoryContext oldContext = MemoryContextSwitchTo(
869 		AllocSetContextCreate(CurrentMemoryContext,
870 							  "connection establishment temporary context",
871 							  ALLOCSET_DEFAULT_SIZES));
872 	while (waitCount > 0)
873 	{
874 		long timeout = MillisecondsToTimeout(connectionStart, NodeConnectionTimeout);
875 
876 		if (waitEventSetRebuild)
877 		{
878 			MemoryContextReset(CurrentMemoryContext);
879 			waitEventSet = WaitEventSetFromMultiConnectionStates(connectionStates,
880 																 &waitCount);
881 			waitEventSetRebuild = false;
882 
883 			if (waitCount <= 0)
884 			{
885 				break;
886 			}
887 		}
888 
889 		int eventCount = WaitEventSetWait(waitEventSet, timeout, events, waitCount,
890 										  WAIT_EVENT_CLIENT_READ);
891 
892 		for (int eventIndex = 0; eventIndex < eventCount; eventIndex++)
893 		{
894 			WaitEvent *event = &events[eventIndex];
895 			MultiConnectionPollState *connectionState =
896 				(MultiConnectionPollState *) event->user_data;
897 
898 			if (event->events & WL_POSTMASTER_DEATH)
899 			{
900 				ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
901 			}
902 
903 			if (event->events & WL_LATCH_SET)
904 			{
905 				ResetLatch(MyLatch);
906 
907 				CHECK_FOR_INTERRUPTS();
908 
909 				if (IsHoldOffCancellationReceived())
910 				{
911 					/*
912 					 * because we can't break from 2 loops easily we need to not forget to
913 					 * reset the memory context
914 					 */
915 					MemoryContextDelete(MemoryContextSwitchTo(oldContext));
916 					return;
917 				}
918 
919 				continue;
920 			}
921 
922 			bool connectionStateChanged = MultiConnectionStatePoll(connectionState);
923 			if (connectionStateChanged)
924 			{
925 				if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING)
926 				{
927 					/* we cannot stop waiting for connection, so rebuild the event set */
928 					waitEventSetRebuild = true;
929 				}
930 				else
931 				{
932 					/* connection state changed, reset the event mask */
933 					uint32 eventMask = MultiConnectionStateEventMask(connectionState);
934 					ModifyWaitEvent(waitEventSet, event->pos, eventMask, NULL);
935 				}
936 
937 				/*
938 				 * The state has changed to connected, update the connection's
939 				 * state as well.
940 				 */
941 				if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED)
942 				{
943 					MarkConnectionConnected(connectionState->connection);
944 				}
945 			}
946 		}
947 
948 		if (eventCount == 0)
949 		{
950 			/*
951 			 * timeout has occured on waitset, double check the timeout since
952 			 * connectionStart and if passed close all non-finished connections
953 			 */
954 
955 			if (MillisecondsPassedSince(connectionStart) >= NodeConnectionTimeout)
956 			{
957 				/*
958 				 * showing as a warning, can't be an error. In some cases queries can
959 				 * proceed with only some of the connections being fully established.
960 				 * Queries that can't will error then and there
961 				 */
962 				ereport(WARNING, (errmsg("could not establish connection after %u ms",
963 										 NodeConnectionTimeout)));
964 
965 				/*
966 				 * Close all connections that have not been fully established.
967 				 */
968 				CloseNotReadyMultiConnectionStates(connectionStates);
969 
970 				break;
971 			}
972 		}
973 	}
974 	MemoryContextDelete(MemoryContextSwitchTo(oldContext));
975 }
976 
977 
978 /*
979  * MillisecondsPassedSince returns the number of milliseconds elapsed between an
980  * instr_time & the current time.
981  */
982 double
MillisecondsPassedSince(instr_time moment)983 MillisecondsPassedSince(instr_time moment)
984 {
985 	instr_time timeSinceMoment;
986 	INSTR_TIME_SET_CURRENT(timeSinceMoment);
987 	INSTR_TIME_SUBTRACT(timeSinceMoment, moment);
988 	return INSTR_TIME_GET_MILLISEC(timeSinceMoment);
989 }
990 
991 
992 /*
993  * MillisecondsToTimeout returns the numer of milliseconds that still need to elapse
994  * before msAfterStart milliseconds have passed since start. The outcome can be used to
995  * pass to the Wait of an EventSet to make sure it returns after the timeout has passed.
996  */
997 long
MillisecondsToTimeout(instr_time start,long msAfterStart)998 MillisecondsToTimeout(instr_time start, long msAfterStart)
999 {
1000 	return msAfterStart - MillisecondsPassedSince(start);
1001 }
1002 
1003 
1004 /*
1005  * CloseNotReadyMultiConnectionStates calls CloseConnection for all MultiConnection's
1006  * tracked in the MultiConnectionPollState list passed in, only if the connection is not yet
1007  * fully established.
1008  *
1009  * This function removes the pointer to the MultiConnection data after the Connections are
1010  * closed since they should not be used anymore.
1011  */
1012 static void
CloseNotReadyMultiConnectionStates(List * connectionStates)1013 CloseNotReadyMultiConnectionStates(List *connectionStates)
1014 {
1015 	MultiConnectionPollState *connectionState = NULL;
1016 	foreach_ptr(connectionState, connectionStates)
1017 	{
1018 		MultiConnection *connection = connectionState->connection;
1019 
1020 		if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING)
1021 		{
1022 			continue;
1023 		}
1024 
1025 		/* close connection, otherwise we take up resource on the other side */
1026 		CitusPQFinish(connection);
1027 	}
1028 }
1029 
1030 
1031 /*
1032  * CitusPQFinish is a wrapper around PQfinish and does book keeping on shared connection
1033  * counters.
1034  */
1035 static void
CitusPQFinish(MultiConnection * connection)1036 CitusPQFinish(MultiConnection *connection)
1037 {
1038 	if (connection->pgConn != NULL)
1039 	{
1040 		PQfinish(connection->pgConn);
1041 		connection->pgConn = NULL;
1042 	}
1043 
1044 	/* behave idempotently, there is no gurantee that CitusPQFinish() is called once */
1045 	if (connection->initilizationState >= POOL_STATE_COUNTER_INCREMENTED)
1046 	{
1047 		DecrementSharedConnectionCounter(connection->hostname, connection->port);
1048 		connection->initilizationState = POOL_STATE_NOT_INITIALIZED;
1049 	}
1050 }
1051 
1052 
1053 /*
1054  * Close connections on timeout in FinishConnectionListEstablishment
1055  * Synchronously finish connection establishment of an individual connection.
1056  * This function is a convenience wrapped around FinishConnectionListEstablishment.
1057  */
1058 void
FinishConnectionEstablishment(MultiConnection * connection)1059 FinishConnectionEstablishment(MultiConnection *connection)
1060 {
1061 	FinishConnectionListEstablishment(list_make1(connection));
1062 }
1063 
1064 
1065 /*
1066  * ClaimConnectionExclusively signals that this connection is actively being
1067  * used. That means it'll not be, again, returned by
1068  * StartNodeUserDatabaseConnection() et al until releases with
1069  * UnclaimConnection().
1070  */
1071 void
ClaimConnectionExclusively(MultiConnection * connection)1072 ClaimConnectionExclusively(MultiConnection *connection)
1073 {
1074 	Assert(!connection->claimedExclusively);
1075 	connection->claimedExclusively = true;
1076 }
1077 
1078 
1079 /*
1080  * UnclaimConnection signals that this connection is not being used
1081  * anymore. That means it again may be returned by
1082  * StartNodeUserDatabaseConnection() et al.
1083  */
1084 void
UnclaimConnection(MultiConnection * connection)1085 UnclaimConnection(MultiConnection *connection)
1086 {
1087 	connection->claimedExclusively = false;
1088 }
1089 
1090 
1091 static uint32
ConnectionHashHash(const void * key,Size keysize)1092 ConnectionHashHash(const void *key, Size keysize)
1093 {
1094 	ConnectionHashKey *entry = (ConnectionHashKey *) key;
1095 
1096 	uint32 hash = string_hash(entry->hostname, NAMEDATALEN);
1097 	hash = hash_combine(hash, hash_uint32(entry->port));
1098 	hash = hash_combine(hash, string_hash(entry->user, NAMEDATALEN));
1099 	hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN));
1100 
1101 	return hash;
1102 }
1103 
1104 
1105 static int
ConnectionHashCompare(const void * a,const void * b,Size keysize)1106 ConnectionHashCompare(const void *a, const void *b, Size keysize)
1107 {
1108 	ConnectionHashKey *ca = (ConnectionHashKey *) a;
1109 	ConnectionHashKey *cb = (ConnectionHashKey *) b;
1110 
1111 	if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 ||
1112 		ca->port != cb->port ||
1113 		strncmp(ca->user, cb->user, NAMEDATALEN) != 0 ||
1114 		strncmp(ca->database, cb->database, NAMEDATALEN) != 0)
1115 	{
1116 		return 1;
1117 	}
1118 	else
1119 	{
1120 		return 0;
1121 	}
1122 }
1123 
1124 
1125 /*
1126  * Asynchronously establish connection to a remote node, but don't wait for
1127  * that to finish. DNS lookups etc. are performed synchronously though.
1128  */
1129 static void
StartConnectionEstablishment(MultiConnection * connection,ConnectionHashKey * key)1130 StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key)
1131 {
1132 	static uint64 connectionId = 1;
1133 
1134 	ConnParamsHashEntry *entry = FindOrCreateConnParamsEntry(key);
1135 
1136 	strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
1137 	connection->port = key->port;
1138 	strlcpy(connection->database, key->database, NAMEDATALEN);
1139 	strlcpy(connection->user, key->user, NAMEDATALEN);
1140 
1141 	connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
1142 											  (const char **) entry->values,
1143 											  false);
1144 	INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentStart);
1145 	connection->connectionId = connectionId++;
1146 
1147 	/*
1148 	 * To avoid issues with interrupts not getting caught all our connections
1149 	 * are managed in a non-blocking manner. remote_commands.c provides
1150 	 * wrappers emulating blocking behaviour.
1151 	 */
1152 	PQsetnonblocking(connection->pgConn, true);
1153 
1154 	SetCitusNoticeReceiver(connection);
1155 }
1156 
1157 
1158 /*
1159  * WarmUpConnParamsHash warms up the ConnParamsHash by loading all the
1160  * conn params for active primary nodes.
1161  */
1162 void
WarmUpConnParamsHash(void)1163 WarmUpConnParamsHash(void)
1164 {
1165 	List *workerNodeList = ActivePrimaryNodeList(AccessShareLock);
1166 	WorkerNode *workerNode = NULL;
1167 	foreach_ptr(workerNode, workerNodeList)
1168 	{
1169 		ConnectionHashKey key;
1170 		strlcpy(key.hostname, workerNode->workerName, MAX_NODE_LENGTH);
1171 		key.port = workerNode->workerPort;
1172 		strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN);
1173 		strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
1174 		FindOrCreateConnParamsEntry(&key);
1175 	}
1176 }
1177 
1178 
1179 /*
1180  * FindOrCreateConnParamsEntry searches ConnParamsHash for the given key,
1181  * if it is not found, it is created.
1182  */
1183 static ConnParamsHashEntry *
FindOrCreateConnParamsEntry(ConnectionHashKey * key)1184 FindOrCreateConnParamsEntry(ConnectionHashKey *key)
1185 {
1186 	bool found = false;
1187 
1188 	/* search our cache for precomputed connection settings */
1189 	ConnParamsHashEntry *entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
1190 	if (!found || !entry->isValid)
1191 	{
1192 		if (!found)
1193 		{
1194 			/*
1195 			 * Zero out entry, but not the key part.
1196 			 * Avoids leaving invalid pointers in hash table if GetConnParam throws with MemoryContextAllocZero.
1197 			 */
1198 			memset(((char *) entry) + sizeof(ConnectionHashKey), 0,
1199 				   sizeof(ConnParamsHashEntry) - sizeof(ConnectionHashKey));
1200 		}
1201 
1202 		/* avoid leaking memory in the keys and values arrays */
1203 		if (found && !entry->isValid)
1204 		{
1205 			FreeConnParamsHashEntryFields(entry);
1206 		}
1207 
1208 		/* if not found or not valid, compute them from GUC, runtime, etc. */
1209 		GetConnParams(key, &entry->keywords, &entry->values, &entry->runtimeParamStart,
1210 					  ConnectionContext);
1211 
1212 		entry->isValid = true;
1213 	}
1214 
1215 	return entry;
1216 }
1217 
1218 
1219 /*
1220  * FreeConnParamsHashEntryFields frees any dynamically allocated memory reachable
1221  * from the fields of the provided ConnParamsHashEntry. This includes all runtime
1222  * libpq keywords and values, as well as the actual arrays storing them.
1223  */
1224 static void
FreeConnParamsHashEntryFields(ConnParamsHashEntry * entry)1225 FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry)
1226 {
1227 	/*
1228 	 * if there was a memory error during the initialization of ConnParamHashEntry in
1229 	 * GetConnParams the keywords or values might not have been initialized completely.
1230 	 * We check if they have been initialized before freeing them.
1231 	 *
1232 	 * We only iteratively free the lists starting at the index pointed to by
1233 	 * entry->runtimeParamStart as all entries before are settings that are managed
1234 	 * separately.
1235 	 */
1236 
1237 	if (entry->keywords != NULL)
1238 	{
1239 		char **keyword = &entry->keywords[entry->runtimeParamStart];
1240 		while (*keyword != NULL)
1241 		{
1242 			pfree(*keyword);
1243 			keyword++;
1244 		}
1245 		pfree(entry->keywords);
1246 		entry->keywords = NULL;
1247 	}
1248 
1249 	if (entry->values != NULL)
1250 	{
1251 		char **value = &entry->values[entry->runtimeParamStart];
1252 		while (*value != NULL)
1253 		{
1254 			pfree(*value);
1255 			value++;
1256 		}
1257 		pfree(entry->values);
1258 		entry->values = NULL;
1259 	}
1260 
1261 	entry->runtimeParamStart = 0;
1262 }
1263 
1264 
1265 /*
1266  * AfterXactHostConnectionHandling closes all remote connections if not necessary anymore (i.e. not session
1267  * lifetime), or if in a failed state.
1268  */
1269 static void
AfterXactHostConnectionHandling(ConnectionHashEntry * entry,bool isCommit)1270 AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
1271 {
1272 	if (!entry || !entry->isValid)
1273 	{
1274 		/* callers only pass valid hash entries but let's be on the safe side */
1275 		ereport(ERROR, (errmsg("connection hash entry is NULL or invalid")));
1276 	}
1277 
1278 	dlist_mutable_iter iter;
1279 	int cachedConnectionCount = 0;
1280 
1281 	dlist_foreach_modify(iter, entry->connections)
1282 	{
1283 		MultiConnection *connection =
1284 			dlist_container(MultiConnection, connectionNode, iter.cur);
1285 
1286 		/*
1287 		 * To avoid leaking connections we warn if connections are
1288 		 * still claimed exclusively. We can only do so if the transaction is
1289 		 * committed, as it's normal that code didn't have chance to clean
1290 		 * up after errors.
1291 		 */
1292 		if (isCommit && connection->claimedExclusively)
1293 		{
1294 			ereport(WARNING,
1295 					(errmsg("connection claimed exclusively at transaction commit")));
1296 		}
1297 
1298 
1299 		if (ShouldShutdownConnection(connection, cachedConnectionCount))
1300 		{
1301 			ShutdownConnection(connection);
1302 
1303 			/* unlink from list */
1304 			dlist_delete(iter.cur);
1305 
1306 			pfree(connection);
1307 		}
1308 		else
1309 		{
1310 			/*
1311 			 * reset healthy session lifespan connections.
1312 			 */
1313 			ResetConnection(connection);
1314 
1315 			cachedConnectionCount++;
1316 		}
1317 	}
1318 }
1319 
1320 
1321 /*
1322  * ShouldShutdownConnection returns true if either one of the followings is true:
1323  * - The connection is citus initiated.
1324  * - Current cached connections is already at MaxCachedConnectionsPerWorker
1325  * - Connection is forced to close at the end of transaction
1326  * - Connection is not in OK state
1327  * - A transaction is still in progress (usually because we are cancelling a distributed transaction)
1328  * - A connection reached its maximum lifetime
1329  */
1330 static bool
ShouldShutdownConnection(MultiConnection * connection,const int cachedConnectionCount)1331 ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount)
1332 {
1333 	/*
1334 	 * When we are in a backend that was created to serve an internal connection
1335 	 * from the coordinator or another worker, we disable connection caching to avoid
1336 	 * escalating the number of cached connections. We can recognize such backends
1337 	 * from their application name.
1338 	 */
1339 	return IsCitusInitiatedRemoteBackend() ||
1340 		   connection->initilizationState != POOL_STATE_INITIALIZED ||
1341 		   cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
1342 		   connection->forceCloseAtTransactionEnd ||
1343 		   PQstatus(connection->pgConn) != CONNECTION_OK ||
1344 		   !RemoteTransactionIdle(connection) ||
1345 		   (MaxCachedConnectionLifetime >= 0 &&
1346 			MillisecondsToTimeout(connection->connectionEstablishmentStart,
1347 								  MaxCachedConnectionLifetime) <= 0);
1348 }
1349 
1350 
1351 /*
1352  * IsCitusInitiatedRemoteBackend returns true if we are in a backend that citus
1353  * initiated via remote connection.
1354  */
1355 bool
IsCitusInitiatedRemoteBackend(void)1356 IsCitusInitiatedRemoteBackend(void)
1357 {
1358 	return application_name && strcmp(application_name, CITUS_APPLICATION_NAME) == 0;
1359 }
1360 
1361 
1362 /*
1363  * ResetConnection preserves the given connection for later usage by
1364  * resetting its states.
1365  */
1366 static void
ResetConnection(MultiConnection * connection)1367 ResetConnection(MultiConnection *connection)
1368 {
1369 	/* reset per-transaction state */
1370 	ResetRemoteTransaction(connection);
1371 	ResetShardPlacementAssociation(connection);
1372 
1373 	/* reset copy state */
1374 	connection->copyBytesWrittenSinceLastFlush = 0;
1375 
1376 	UnclaimConnection(connection);
1377 }
1378 
1379 
1380 /*
1381  * RemoteTransactionIdle function returns true if we manually
1382  * set flag on run_commands_on_session_level_connection_to_node to true to
1383  * force connection API keeping connection open or the status of the connection
1384  * is idle.
1385  */
1386 static bool
RemoteTransactionIdle(MultiConnection * connection)1387 RemoteTransactionIdle(MultiConnection *connection)
1388 {
1389 	/*
1390 	 * This is a very special case where we're running isolation tests on MX.
1391 	 * We don't care whether the transaction is idle or not when we're
1392 	 * running MX isolation tests. Thus, let the caller act as if the remote
1393 	 * transactions is idle.
1394 	 */
1395 	if (AllowNonIdleTransactionOnXactHandling())
1396 	{
1397 		return true;
1398 	}
1399 
1400 	return PQtransactionStatus(connection->pgConn) == PQTRANS_IDLE;
1401 }
1402 
1403 
1404 /*
1405  * MarkConnectionConnected is a helper function which sets the  connection
1406  * connectionState to MULTI_CONNECTION_CONNECTED, and also updates connection
1407  * establishment time when necessary.
1408  */
1409 void
MarkConnectionConnected(MultiConnection * connection)1410 MarkConnectionConnected(MultiConnection *connection)
1411 {
1412 	connection->connectionState = MULTI_CONNECTION_CONNECTED;
1413 
1414 	if (INSTR_TIME_IS_ZERO(connection->connectionEstablishmentEnd))
1415 	{
1416 		INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd);
1417 	}
1418 }
1419