1 /* 2 * This file and its contents are licensed under the Timescale License. 3 * Please see the included NOTICE for copyright information and 4 * LICENSE-TIMESCALE for a copy of the license. 5 */ 6 #ifndef TIMESCALEDB_TSL_REMOTE_CONNECTION_H 7 #define TIMESCALEDB_TSL_REMOTE_CONNECTION_H 8 9 #include <postgres.h> 10 #include <foreign/foreign.h> 11 #include <libpq-fe.h> 12 13 #include "async.h" 14 15 typedef struct TSConnection TSConnection; 16 17 /* Associated with a connection foreign server and user id */ 18 typedef struct TSConnectionId 19 { 20 Oid server_id; 21 Oid user_id; 22 } TSConnectionId; 23 24 typedef enum ConnOptionType 25 { 26 CONN_OPTION_TYPE_NONE, 27 CONN_OPTION_TYPE_USER, 28 CONN_OPTION_TYPE_NODE, 29 } ConnOptionType; 30 31 typedef struct TSConnectionError 32 { 33 /* Local error information */ 34 int errcode; 35 const char *msg; 36 const char *host; 37 const char *nodename; 38 const char *connmsg; 39 /* Remote error information, if available */ 40 struct 41 { 42 int elevel; 43 int errcode; 44 const char *sqlstate; 45 const char *msg; 46 const char *hint; 47 const char *detail; 48 const char *context; 49 const char *stmtpos; 50 const char *sqlcmd; 51 } remote; 52 } TSConnectionError; 53 54 /* Open a connection with a remote endpoint. Note that this is a raw 55 * connection that does not obey txn semantics and is allocated using 56 * malloc. Most users should use `remote_dist_txn_get_connection` or 57 * `remote_connection_cache_get_connection` instead. Must be closed with 58 * `remote_connection_close` 59 */ 60 extern TSConnection *remote_connection_open_with_options(const char *node_name, 61 List *connection_options, 62 bool set_dist_id); 63 extern TSConnection *remote_connection_open_with_options_nothrow(const char *node_name, 64 List *connection_options, 65 char **errmsg); 66 extern TSConnection *remote_connection_open_by_id(TSConnectionId id); 67 extern TSConnection *remote_connection_open(Oid server_id, Oid user_id); 68 extern TSConnection *remote_connection_open_nothrow(Oid server_id, Oid user_id, char **errmsg); 69 extern bool remote_connection_set_autoclose(TSConnection *conn, bool autoclose); 70 extern int remote_connection_xact_depth_get(const TSConnection *conn); 71 extern int remote_connection_xact_depth_inc(TSConnection *conn); 72 extern int remote_connection_xact_depth_dec(TSConnection *conn); 73 extern void remote_connection_xact_transition_begin(TSConnection *conn); 74 extern void remote_connection_xact_transition_end(TSConnection *conn); 75 extern bool remote_connection_xact_is_transitioning(const TSConnection *conn); 76 extern bool remote_connection_ping(const char *node_name); 77 extern void remote_connection_close(TSConnection *conn); 78 extern PGresult *remote_connection_exec(TSConnection *conn, const char *cmd); 79 extern PGresult *remote_connection_execf(TSConnection *conn, const char *fmt, ...) 80 pg_attribute_printf(2, 3); 81 extern PGresult *remote_connection_query_ok(TSConnection *conn, const char *query); 82 extern PGresult *remote_connection_queryf_ok(TSConnection *conn, const char *fmt, ...) 83 pg_attribute_printf(2, 3); 84 extern void remote_connection_cmd_ok(TSConnection *conn, const char *cmd); 85 extern void remote_connection_cmdf_ok(TSConnection *conn, const char *fmt, ...) 86 pg_attribute_printf(2, 3); 87 extern ConnOptionType remote_connection_option_type(const char *keyword); 88 extern bool remote_connection_valid_user_option(const char *keyword); 89 extern bool remote_connection_valid_node_option(const char *keyword); 90 extern unsigned int remote_connection_get_cursor_number(void); 91 extern void remote_connection_reset_cursor_number(void); 92 extern unsigned int remote_connection_get_prep_stmt_number(void); 93 extern bool remote_connection_configure(TSConnection *conn); 94 extern bool remote_connection_check_extension(TSConnection *conn); 95 extern void remote_validate_extension_version(TSConnection *conn, const char *data_node_version); 96 extern char *remote_connection_get_connstr(const char *node_name); 97 98 typedef enum TSConnectionResult 99 { 100 CONN_OK, 101 CONN_TIMEOUT, 102 CONN_DISCONNECT, 103 CONN_NO_RESPONSE, 104 } TSConnectionResult; 105 106 typedef enum TSConnectionStatus 107 { 108 CONN_IDLE, /* No command being processed */ 109 CONN_PROCESSING, /* Command/query is being processed */ 110 CONN_COPY_IN, /* Connection is in COPY_IN mode */ 111 } TSConnectionStatus; 112 113 TSConnectionResult remote_connection_drain(TSConnection *conn, TimestampTz endtime, 114 PGresult **result); 115 extern bool remote_connection_cancel_query(TSConnection *conn); 116 extern PGconn *remote_connection_get_pg_conn(const TSConnection *conn); 117 extern bool remote_connection_is_processing(const TSConnection *conn); 118 extern void remote_connection_set_status(TSConnection *conn, TSConnectionStatus status); 119 extern TSConnectionStatus remote_connection_get_status(const TSConnection *conn); 120 extern bool remote_connection_configure_if_changed(TSConnection *conn); 121 extern const char *remote_connection_node_name(const TSConnection *conn); 122 extern bool remote_connection_set_single_row_mode(TSConnection *conn); 123 124 /* Functions operating on PGresult objects */ 125 extern void remote_result_cmd_ok(PGresult *res); 126 extern PGresult *remote_result_query_ok(PGresult *res); 127 extern void remote_result_close(PGresult *res); 128 129 /* wrappers around async stuff to emulate sync communication */ 130 131 extern TSConnectionId remote_connection_id(const Oid server_oid, const Oid user_oid); 132 extern void remote_connection_id_set(TSConnectionId *const id, const Oid server_oid, 133 const Oid user_oid); 134 135 typedef struct RemoteConnectionStats 136 { 137 unsigned int connections_created; 138 unsigned int connections_closed; 139 unsigned int results_created; 140 unsigned int results_cleared; 141 } RemoteConnectionStats; 142 143 #ifdef TS_DEBUG 144 extern void remote_connection_stats_reset(void); 145 extern RemoteConnectionStats *remote_connection_stats_get(void); 146 #endif 147 148 /* 149 * Connection functions for COPY mode. 150 */ 151 extern bool remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binary, 152 TSConnectionError *err); 153 extern bool remote_connection_end_copy(TSConnection *conn, TSConnectionError *err); 154 extern bool remote_connection_put_copy_data(TSConnection *conn, const char *buffer, size_t len, 155 TSConnectionError *err); 156 157 /* Error handling functions for connections */ 158 extern void remote_connection_get_error(const TSConnection *conn, TSConnectionError *err); 159 extern void remote_connection_get_result_error(const PGresult *res, TSConnectionError *err); 160 161 /* 162 * The following are macros for emitting errors related to connections or 163 * remote command execution. They need to be macros to preserve the error 164 * context of where they are called (line number, statement, etc.). 165 */ 166 #define remote_connection_error_elog(err, elevel) \ 167 ereport(elevel, \ 168 ((err)->remote.errcode != 0 ? errcode((err)->remote.errcode) : \ 169 errcode((err)->errcode), \ 170 (err)->remote.msg ? \ 171 errmsg_internal("[%s]: %s", (err)->nodename, (err)->remote.msg) : \ 172 ((err)->connmsg ? errmsg_internal("[%s]: %s", (err)->nodename, (err)->connmsg) : \ 173 errmsg_internal("[%s]: %s", (err)->nodename, (err)->msg)), \ 174 (err)->remote.detail ? errdetail_internal("%s", (err)->remote.detail) : 0, \ 175 (err)->remote.hint ? errhint("%s", (err)->remote.hint) : 0, \ 176 (err)->remote.sqlcmd ? errcontext("Remote SQL command: %s", (err)->remote.sqlcmd) : \ 177 0)) 178 179 /* 180 * Report an error we got from the remote host. 181 * 182 * elevel: error level to use (typically ERROR, but might be less) 183 * res: PGresult containing the error 184 */ 185 #define remote_result_elog(pgres, elevel) \ 186 do \ 187 { \ 188 PG_TRY(); \ 189 { \ 190 TSConnectionError err; \ 191 remote_connection_get_result_error(pgres, &err); \ 192 remote_connection_error_elog(&err, elevel); \ 193 } \ 194 PG_CATCH(); \ 195 { \ 196 PQclear(pgres); \ 197 PG_RE_THROW(); \ 198 } \ 199 PG_END_TRY(); \ 200 } while (0) 201 202 #define remote_connection_elog(conn, elevel) \ 203 do \ 204 { \ 205 TSConnectionError err; \ 206 remote_connection_get_error(conn, &err); \ 207 remote_connection_error_elog(&err, elevel); \ 208 } while (0) 209 210 extern void _remote_connection_init(void); 211 extern void _remote_connection_fini(void); 212 213 #endif /* TIMESCALEDB_TSL_REMOTE_CONNECTION_H */ 214