1 /*-------------------------------------------------------------------------
2  *
3  * connection_management.h
4  *   Central management of connections and their life-cycle
5  *
6  * Copyright (c) Citus Data, Inc.
7  *
8  *-------------------------------------------------------------------------
9  */
10 
11 #ifndef CONNECTION_MANAGMENT_H
12 #define CONNECTION_MANAGMENT_H
13 
14 #include "postgres.h"
15 
16 #include "distributed/transaction_management.h"
17 #include "distributed/remote_transaction.h"
18 #include "lib/ilist.h"
19 #include "portability/instr_time.h"
20 #include "utils/guc.h"
21 #include "utils/hsearch.h"
22 #include "utils/timestamp.h"
23 
24 /* maximum (textual) lengths of hostname and port */
25 #define MAX_NODE_LENGTH 255 /* includes 0 byte */
26 
27 /* used for libpq commands that get an error buffer. Postgres docs recommend 256. */
28 #define ERROR_BUFFER_SIZE 256
29 
30 /* application name used for internal connections in Citus */
31 #define CITUS_APPLICATION_NAME "citus"
32 
33 /* forward declare, to avoid forcing large headers on everyone */
34 struct pg_conn; /* target of the PGconn typedef */
35 struct MemoryContextData;
36 
37 /*
38  * Flags determining connection establishment behaviour.
39  */
40 enum MultiConnectionMode
41 {
42 	/* force establishment of a new connection */
43 	FORCE_NEW_CONNECTION = 1 << 0,
44 
45 	FOR_DDL = 1 << 1,
46 
47 	FOR_DML = 1 << 2,
48 
49 	/*
50 	 * During COPY we do not want to use a connection that accessed non-co-located
51 	 * placements. If there is a connection that did not access another placement,
52 	 * then use it. Otherwise open a new clean connection.
53 	 */
54 	REQUIRE_CLEAN_CONNECTION = 1 << 3,
55 
56 	OUTSIDE_TRANSACTION = 1 << 4,
57 
58 	/*
59 	 * Some connections are optional such as when adaptive executor is executing
60 	 * a multi-shard command and requires the second (or further) connections
61 	 * per node. In that case, the connection manager may decide not to allow the
62 	 * connection.
63 	 */
64 	OPTIONAL_CONNECTION = 1 << 5,
65 
66 	/*
67 	 * When this flag is passed, via connection throttling, the connection
68 	 * establishments may be suspended until a connection slot is available to
69 	 * the remote host.
70 	 */
71 	WAIT_FOR_CONNECTION = 1 << 6
72 };
73 
74 
75 /*
76  * This state is used for keeping track of the initilization
77  * of the underlying pg_conn struct.
78  */
79 typedef enum MultiConnectionState
80 {
81 	MULTI_CONNECTION_INITIAL,
82 	MULTI_CONNECTION_CONNECTING,
83 	MULTI_CONNECTION_CONNECTED,
84 	MULTI_CONNECTION_FAILED,
85 	MULTI_CONNECTION_LOST,
86 	MULTI_CONNECTION_TIMED_OUT
87 } MultiConnectionState;
88 
89 
90 /*
91  * This state is used for keeping track of the initilization
92  * of MultiConnection struct, not specifically the underlying
93  * pg_conn. The state is useful to determine the action during
94  * clean-up of connections.
95  */
96 typedef enum MultiConnectionStructInitializationState
97 {
98 	POOL_STATE_NOT_INITIALIZED,
99 	POOL_STATE_COUNTER_INCREMENTED,
100 	POOL_STATE_INITIALIZED
101 } MultiConnectionStructInitializationState;
102 
103 
104 /* declaring this directly above causes uncrustify to format it badly */
105 typedef enum MultiConnectionMode MultiConnectionMode;
106 
107 typedef struct MultiConnection
108 {
109 	/* connection details, useful for error messages and such. */
110 	char hostname[MAX_NODE_LENGTH];
111 	int32 port;
112 	char user[NAMEDATALEN];
113 	char database[NAMEDATALEN];
114 
115 	/* underlying libpq connection */
116 	struct pg_conn *pgConn;
117 
118 	/* connection id */
119 	uint64 connectionId;
120 
121 	/* state of the connection */
122 	MultiConnectionState connectionState;
123 
124 	/* signal that the connection is ready for read/write */
125 	bool ioReady;
126 
127 	/* whether to wait for read/write */
128 	int waitFlags;
129 
130 	/* force the connection to be closed at the end of the transaction */
131 	bool forceCloseAtTransactionEnd;
132 
133 	/* is the connection currently in use, and shouldn't be used by anything else */
134 	bool claimedExclusively;
135 
136 	/* time connection establishment was started, for timeout and executor stats */
137 	instr_time connectionEstablishmentStart;
138 	instr_time connectionEstablishmentEnd;
139 
140 	/* membership in list of list of connections in ConnectionHashEntry */
141 	dlist_node connectionNode;
142 
143 	/* information about the associated remote transaction */
144 	RemoteTransaction remoteTransaction;
145 
146 	/* membership in list of in-progress transactions */
147 	dlist_node transactionNode;
148 
149 	/* list of all placements referenced by this connection */
150 	dlist_head referencedPlacements;
151 
152 	/* number of bytes sent to PQputCopyData() since last flush */
153 	uint64 copyBytesWrittenSinceLastFlush;
154 
155 	MultiConnectionStructInitializationState initilizationState;
156 } MultiConnection;
157 
158 
159 /*
160  * Central connection management hash, mapping (host, port, user, database) to
161  * a list of connections.
162  *
163  * This hash is used to keep track of which connections are open to which
164  * node. Besides allowing connection reuse, that information is e.g. used to
165  * handle closing connections after the end of a transaction.
166  */
167 
168 /* hash key */
169 typedef struct ConnectionHashKey
170 {
171 	char hostname[MAX_NODE_LENGTH];
172 	int32 port;
173 	char user[NAMEDATALEN];
174 	char database[NAMEDATALEN];
175 } ConnectionHashKey;
176 
177 /* hash entry */
178 typedef struct ConnectionHashEntry
179 {
180 	ConnectionHashKey key;
181 	dlist_head *connections;
182 
183 	/* connections list is valid or not */
184 	bool isValid;
185 } ConnectionHashEntry;
186 
187 /* hash entry for cached connection parameters */
188 typedef struct ConnParamsHashEntry
189 {
190 	ConnectionHashKey key;
191 	bool isValid;
192 	Index runtimeParamStart;
193 	char **keywords;
194 	char **values;
195 } ConnParamsHashEntry;
196 
197 
198 /* maximum duration to wait for connection */
199 extern int NodeConnectionTimeout;
200 
201 /* maximum number of connections to cache per worker per session */
202 extern int MaxCachedConnectionsPerWorker;
203 
204 /* maximum lifetime of connections in miliseconds */
205 extern int MaxCachedConnectionLifetime;
206 
207 /* parameters used for outbound connections */
208 extern char *NodeConninfo;
209 extern char *LocalHostName;
210 
211 /* the hash tables are externally accessiable */
212 extern HTAB *ConnectionHash;
213 extern HTAB *ConnParamsHash;
214 
215 /* context for all connection and transaction related memory */
216 extern struct MemoryContextData *ConnectionContext;
217 
218 
219 extern void AfterXactConnectionHandling(bool isCommit);
220 extern void InitializeConnectionManagement(void);
221 
222 extern void InitConnParams(void);
223 extern void ResetConnParams(void);
224 extern void InvalidateConnParamsHashEntries(void);
225 extern void AddConnParam(const char *keyword, const char *value);
226 extern void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
227 						  Index *runtimeParamStart, MemoryContext context);
228 extern const char * GetConnParam(const char *keyword);
229 extern bool CheckConninfo(const char *conninfo, const char **allowedConninfoKeywords,
230 						  Size allowedConninfoKeywordsLength, char **errmsg);
231 
232 
233 /* Low-level connection establishment APIs */
234 extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname,
235 										   int32 port);
236 extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
237 											 int32 port);
238 extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
239 													   int32 port, const char *user,
240 													   const char *database);
241 extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
242 														 const char *hostname,
243 														 int32 port,
244 														 const char *user,
245 														 const char *database);
246 extern void CloseAllConnectionsAfterTransaction(void);
247 extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
248 extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,
249 												   const char *userName,
250 												   const char *database);
251 extern void CloseConnection(MultiConnection *connection);
252 extern void ShutdownAllConnections(void);
253 extern void ShutdownConnection(MultiConnection *connection);
254 
255 /* dealing with a connection */
256 extern void FinishConnectionListEstablishment(List *multiConnectionList);
257 extern void FinishConnectionEstablishment(MultiConnection *connection);
258 extern void ClaimConnectionExclusively(MultiConnection *connection);
259 extern void UnclaimConnection(MultiConnection *connection);
260 extern bool IsCitusInitiatedRemoteBackend(void);
261 extern void MarkConnectionConnected(MultiConnection *connection);
262 
263 /* time utilities */
264 extern double MillisecondsPassedSince(instr_time moment);
265 extern long MillisecondsToTimeout(instr_time start, long msAfterStart);
266 
267 extern void WarmUpConnParamsHash(void);
268 #endif /* CONNECTION_MANAGMENT_H */
269