1 /*------------------------------------------------------------------------- 2 * 3 * connection_management.h 4 * Central management of connections and their life-cycle 5 * 6 * Copyright (c) Citus Data, Inc. 7 * 8 *------------------------------------------------------------------------- 9 */ 10 11 #ifndef CONNECTION_MANAGMENT_H 12 #define CONNECTION_MANAGMENT_H 13 14 #include "postgres.h" 15 16 #include "distributed/transaction_management.h" 17 #include "distributed/remote_transaction.h" 18 #include "lib/ilist.h" 19 #include "portability/instr_time.h" 20 #include "utils/guc.h" 21 #include "utils/hsearch.h" 22 #include "utils/timestamp.h" 23 24 /* maximum (textual) lengths of hostname and port */ 25 #define MAX_NODE_LENGTH 255 /* includes 0 byte */ 26 27 /* used for libpq commands that get an error buffer. Postgres docs recommend 256. */ 28 #define ERROR_BUFFER_SIZE 256 29 30 /* application name used for internal connections in Citus */ 31 #define CITUS_APPLICATION_NAME "citus" 32 33 /* forward declare, to avoid forcing large headers on everyone */ 34 struct pg_conn; /* target of the PGconn typedef */ 35 struct MemoryContextData; 36 37 /* 38 * Flags determining connection establishment behaviour. 39 */ 40 enum MultiConnectionMode 41 { 42 /* force establishment of a new connection */ 43 FORCE_NEW_CONNECTION = 1 << 0, 44 45 FOR_DDL = 1 << 1, 46 47 FOR_DML = 1 << 2, 48 49 /* 50 * During COPY we do not want to use a connection that accessed non-co-located 51 * placements. If there is a connection that did not access another placement, 52 * then use it. Otherwise open a new clean connection. 53 */ 54 REQUIRE_CLEAN_CONNECTION = 1 << 3, 55 56 OUTSIDE_TRANSACTION = 1 << 4, 57 58 /* 59 * Some connections are optional such as when adaptive executor is executing 60 * a multi-shard command and requires the second (or further) connections 61 * per node. In that case, the connection manager may decide not to allow the 62 * connection. 63 */ 64 OPTIONAL_CONNECTION = 1 << 5, 65 66 /* 67 * When this flag is passed, via connection throttling, the connection 68 * establishments may be suspended until a connection slot is available to 69 * the remote host. 70 */ 71 WAIT_FOR_CONNECTION = 1 << 6 72 }; 73 74 75 /* 76 * This state is used for keeping track of the initilization 77 * of the underlying pg_conn struct. 78 */ 79 typedef enum MultiConnectionState 80 { 81 MULTI_CONNECTION_INITIAL, 82 MULTI_CONNECTION_CONNECTING, 83 MULTI_CONNECTION_CONNECTED, 84 MULTI_CONNECTION_FAILED, 85 MULTI_CONNECTION_LOST, 86 MULTI_CONNECTION_TIMED_OUT 87 } MultiConnectionState; 88 89 90 /* 91 * This state is used for keeping track of the initilization 92 * of MultiConnection struct, not specifically the underlying 93 * pg_conn. The state is useful to determine the action during 94 * clean-up of connections. 95 */ 96 typedef enum MultiConnectionStructInitializationState 97 { 98 POOL_STATE_NOT_INITIALIZED, 99 POOL_STATE_COUNTER_INCREMENTED, 100 POOL_STATE_INITIALIZED 101 } MultiConnectionStructInitializationState; 102 103 104 /* declaring this directly above causes uncrustify to format it badly */ 105 typedef enum MultiConnectionMode MultiConnectionMode; 106 107 typedef struct MultiConnection 108 { 109 /* connection details, useful for error messages and such. */ 110 char hostname[MAX_NODE_LENGTH]; 111 int32 port; 112 char user[NAMEDATALEN]; 113 char database[NAMEDATALEN]; 114 115 /* underlying libpq connection */ 116 struct pg_conn *pgConn; 117 118 /* connection id */ 119 uint64 connectionId; 120 121 /* state of the connection */ 122 MultiConnectionState connectionState; 123 124 /* signal that the connection is ready for read/write */ 125 bool ioReady; 126 127 /* whether to wait for read/write */ 128 int waitFlags; 129 130 /* force the connection to be closed at the end of the transaction */ 131 bool forceCloseAtTransactionEnd; 132 133 /* is the connection currently in use, and shouldn't be used by anything else */ 134 bool claimedExclusively; 135 136 /* time connection establishment was started, for timeout and executor stats */ 137 instr_time connectionEstablishmentStart; 138 instr_time connectionEstablishmentEnd; 139 140 /* membership in list of list of connections in ConnectionHashEntry */ 141 dlist_node connectionNode; 142 143 /* information about the associated remote transaction */ 144 RemoteTransaction remoteTransaction; 145 146 /* membership in list of in-progress transactions */ 147 dlist_node transactionNode; 148 149 /* list of all placements referenced by this connection */ 150 dlist_head referencedPlacements; 151 152 /* number of bytes sent to PQputCopyData() since last flush */ 153 uint64 copyBytesWrittenSinceLastFlush; 154 155 MultiConnectionStructInitializationState initilizationState; 156 } MultiConnection; 157 158 159 /* 160 * Central connection management hash, mapping (host, port, user, database) to 161 * a list of connections. 162 * 163 * This hash is used to keep track of which connections are open to which 164 * node. Besides allowing connection reuse, that information is e.g. used to 165 * handle closing connections after the end of a transaction. 166 */ 167 168 /* hash key */ 169 typedef struct ConnectionHashKey 170 { 171 char hostname[MAX_NODE_LENGTH]; 172 int32 port; 173 char user[NAMEDATALEN]; 174 char database[NAMEDATALEN]; 175 } ConnectionHashKey; 176 177 /* hash entry */ 178 typedef struct ConnectionHashEntry 179 { 180 ConnectionHashKey key; 181 dlist_head *connections; 182 183 /* connections list is valid or not */ 184 bool isValid; 185 } ConnectionHashEntry; 186 187 /* hash entry for cached connection parameters */ 188 typedef struct ConnParamsHashEntry 189 { 190 ConnectionHashKey key; 191 bool isValid; 192 Index runtimeParamStart; 193 char **keywords; 194 char **values; 195 } ConnParamsHashEntry; 196 197 198 /* maximum duration to wait for connection */ 199 extern int NodeConnectionTimeout; 200 201 /* maximum number of connections to cache per worker per session */ 202 extern int MaxCachedConnectionsPerWorker; 203 204 /* maximum lifetime of connections in miliseconds */ 205 extern int MaxCachedConnectionLifetime; 206 207 /* parameters used for outbound connections */ 208 extern char *NodeConninfo; 209 extern char *LocalHostName; 210 211 /* the hash tables are externally accessiable */ 212 extern HTAB *ConnectionHash; 213 extern HTAB *ConnParamsHash; 214 215 /* context for all connection and transaction related memory */ 216 extern struct MemoryContextData *ConnectionContext; 217 218 219 extern void AfterXactConnectionHandling(bool isCommit); 220 extern void InitializeConnectionManagement(void); 221 222 extern void InitConnParams(void); 223 extern void ResetConnParams(void); 224 extern void InvalidateConnParamsHashEntries(void); 225 extern void AddConnParam(const char *keyword, const char *value); 226 extern void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, 227 Index *runtimeParamStart, MemoryContext context); 228 extern const char * GetConnParam(const char *keyword); 229 extern bool CheckConninfo(const char *conninfo, const char **allowedConninfoKeywords, 230 Size allowedConninfoKeywordsLength, char **errmsg); 231 232 233 /* Low-level connection establishment APIs */ 234 extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname, 235 int32 port); 236 extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, 237 int32 port); 238 extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, 239 int32 port, const char *user, 240 const char *database); 241 extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, 242 const char *hostname, 243 int32 port, 244 const char *user, 245 const char *database); 246 extern void CloseAllConnectionsAfterTransaction(void); 247 extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); 248 extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort, 249 const char *userName, 250 const char *database); 251 extern void CloseConnection(MultiConnection *connection); 252 extern void ShutdownAllConnections(void); 253 extern void ShutdownConnection(MultiConnection *connection); 254 255 /* dealing with a connection */ 256 extern void FinishConnectionListEstablishment(List *multiConnectionList); 257 extern void FinishConnectionEstablishment(MultiConnection *connection); 258 extern void ClaimConnectionExclusively(MultiConnection *connection); 259 extern void UnclaimConnection(MultiConnection *connection); 260 extern bool IsCitusInitiatedRemoteBackend(void); 261 extern void MarkConnectionConnected(MultiConnection *connection); 262 263 /* time utilities */ 264 extern double MillisecondsPassedSince(instr_time moment); 265 extern long MillisecondsToTimeout(instr_time start, long msAfterStart); 266 267 extern void WarmUpConnParamsHash(void); 268 #endif /* CONNECTION_MANAGMENT_H */ 269