1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6
7 /*
8 * This file contains source code that was copied and/or modified from the
9 * PostgreSQL database, which is licensed under the open-source PostgreSQL
10 * License. Please see the NOTICE at the top level directory for a copy of
11 * the PostgreSQL License.
12 */
13 #include <postgres.h>
14 #include <access/xact.h>
15 #include <access/reloptions.h>
16 #include <catalog/pg_foreign_server.h>
17 #include <catalog/pg_user_mapping.h>
18 #include <commands/defrem.h>
19 #include <common/md5.h>
20 #include <foreign/foreign.h>
21 #include <libpq-events.h>
22 #include <libpq/libpq.h>
23 #include <mb/pg_wchar.h>
24 #include <miscadmin.h>
25 #include <nodes/makefuncs.h>
26 #include <port.h>
27 #include <postmaster/postmaster.h>
28 #include <utils/builtins.h>
29 #include <utils/fmgrprotos.h>
30 #include <utils/inval.h>
31 #include <utils/guc.h>
32 #include <utils/syscache.h>
33
34 #include <annotations.h>
35 #include <dist_util.h>
36 #include <errors.h>
37 #include <extension_constants.h>
38 #include <guc.h>
39 #include <telemetry/telemetry_metadata.h>
40 #include "connection.h"
41 #include "data_node.h"
42 #include "debug_point.h"
43 #include "utils.h"
44
45 /*
46 * Connection library for TimescaleDB.
47 *
48 * This library file contains convenience functionality around the libpq
49 * API. The major additional functionality offered includes:
50 *
51 * - libpq object lifecycles are tied to transactions (connections and
52 * results). This ensures that there are no memory leaks caused by libpq
53 * objects after a transaction completes.
54 * - connection configuration suitable for TimescaleDB.
55 *
56 * NOTE that it is strongly adviced that connection-related functions do not
57 * throw exceptions with, e.g., elog(ERROR). While exceptions can be caught
58 * with PG_TRY-CATCH for cleanup, it is not possible to safely continue the
59 * transaction that threw the exception as if no error occurred (see the
60 * following post if unconvinced:
61 * https://www.postgresql.org/message-id/27190.1508727890%40sss.pgh.pa.us).
62 *
63 * In some cases, we need to be able to continue a transaction even if a
64 * connection fails. One example is the removal of a data node, which must be
65 * able to proceed even if the node is no longer available to respond to a
66 * connection. Another example is performing a liveness check for node status.
67 *
68 * Therefore, it is best that defer throwing exceptions to high-level
69 * functions that know when it is appropriate.
70 */
71
72 /* for assigning cursor numbers and prepared statement numbers */
73 static unsigned int cursor_number = 0;
74 static unsigned int prep_stmt_number = 0;
75 static RemoteConnectionStats connstats = { 0 };
76
77 static int eventproc(PGEventId eventid, void *eventinfo, void *data);
78
79 TSConnectionId
remote_connection_id(const Oid server_oid,const Oid user_oid)80 remote_connection_id(const Oid server_oid, const Oid user_oid)
81 {
82 TSConnectionId id = { .server_id = server_oid, .user_id = user_oid };
83 return id;
84 }
85
86 void
remote_connection_id_set(TSConnectionId * const id,Oid const server_oid,Oid const user_oid)87 remote_connection_id_set(TSConnectionId *const id, Oid const server_oid, Oid const user_oid)
88 {
89 id->server_id = server_oid;
90 id->user_id = user_oid;
91 }
92
93 /*
94 * A simple circular list implementation for tracking libpq connection and
95 * result objects. We can't use pg_list here since it is bound to PostgreSQL's
96 * memory management system, while libpq is not.
97 */
98 typedef struct ListNode
99 {
100 struct ListNode *next;
101 struct ListNode *prev;
102 } ListNode;
103
104 #define IS_DETACHED_ENTRY(entry) ((entry)->next == NULL && (entry)->prev == NULL)
105
106 /*
107 * Detach a list node.
108 *
109 * Detaches a list node from the list, unless it is the anchor/head (which is
110 * a no-op).
111 */
112 static inline void
list_detach(ListNode * entry)113 list_detach(ListNode *entry)
114 {
115 ListNode *prev = entry->prev;
116 ListNode *next = entry->next;
117
118 next->prev = prev;
119 prev->next = next;
120 /* Clear entry fields */
121 entry->prev = NULL;
122 entry->next = NULL;
123 }
124
125 /*
126 * Insert a list node entry after the prev node.
127 */
128 static inline void
list_insert_after(ListNode * entry,ListNode * prev)129 list_insert_after(ListNode *entry, ListNode *prev)
130 {
131 ListNode *next = prev->next;
132
133 next->prev = entry;
134 entry->next = next;
135 entry->prev = prev;
136 prev->next = entry;
137 }
138
139 /*
140 * List entry that holds a PGresult object.
141 */
142 typedef struct ResultEntry
143 {
144 struct ListNode ln; /* Must be first entry */
145 TSConnection *conn; /* The connection the result was created on */
146 SubTransactionId subtxid; /* The subtransaction ID that created this result, if any. */
147 PGresult *result;
148 } ResultEntry;
149
150 typedef struct TSConnection
151 {
152 ListNode ln; /* Must be first entry */
153 PGconn *pg_conn; /* PostgreSQL connection */
154 bool closing_guard; /* Guard against calling PQfinish() directly on PGconn */
155 TSConnectionStatus status;
156 NameData node_name; /* Associated data node name */
157 char *tz_name; /* Timezone name last sent over connection */
158 bool autoclose; /* Set if this connection should automatically
159 * close at the end of the (sub-)transaction */
160 SubTransactionId subtxid; /* The subtransaction ID that created this connection, if any. */
161 int xact_depth; /* 0 => no transaction, 1 => main transaction, > 1 =>
162 * levels of subtransactions */
163 bool xact_transitioning; /* TRUE if connection is transitioning to
164 * another transaction state */
165 ListNode results; /* Head of PGresult list */
166 bool binary_copy;
167 } TSConnection;
168
169 /*
170 * List of all connections we create. Used to auto-free connections and/or
171 * PGresults at transaction end.
172 */
173 static ListNode connections = { &connections, &connections };
174
175 static bool
fill_simple_error(TSConnectionError * err,int errcode,const char * errmsg,const TSConnection * conn)176 fill_simple_error(TSConnectionError *err, int errcode, const char *errmsg, const TSConnection *conn)
177 {
178 if (NULL == err)
179 return false;
180
181 MemSet(err, 0, sizeof(*err));
182
183 err->errcode = errcode;
184 err->msg = errmsg;
185 err->host = pstrdup(PQhost(conn->pg_conn));
186 err->nodename = pstrdup(NameStr(conn->node_name));
187
188 return false;
189 }
190
191 static bool
fill_connection_error(TSConnectionError * err,int errcode,const char * errmsg,const TSConnection * conn)192 fill_connection_error(TSConnectionError *err, int errcode, const char *errmsg,
193 const TSConnection *conn)
194 {
195 if (NULL == err)
196 return false;
197
198 fill_simple_error(err, errcode, errmsg, conn);
199 err->connmsg = pstrdup(PQerrorMessage(conn->pg_conn));
200
201 return false;
202 }
203
204 static char *
get_error_field_copy(const PGresult * res,int fieldcode)205 get_error_field_copy(const PGresult *res, int fieldcode)
206 {
207 const char *msg = PQresultErrorField(res, fieldcode);
208
209 if (NULL == msg)
210 return NULL;
211 return pchomp(msg);
212 }
213
214 /*
215 * Convert libpq error severity to local error level.
216 */
217 static int
severity_to_elevel(const char * severity)218 severity_to_elevel(const char *severity)
219 {
220 /* According to https://www.postgresql.org/docs/current/libpq-exec.html,
221 * libpq only returns the severity levels listed below. */
222 static const struct
223 {
224 const char *severity;
225 int elevel;
226 } severity_levels[] = { {
227 .severity = "ERROR",
228 .elevel = ERROR,
229 },
230 {
231 .severity = "FATAL",
232 .elevel = FATAL,
233 },
234 {
235 .severity = "PANIC",
236 .elevel = PANIC,
237 },
238 {
239 .severity = "WARNING",
240 .elevel = WARNING,
241 },
242 {
243 .severity = "NOTICE",
244 .elevel = NOTICE,
245 },
246 {
247 .severity = "DEBUG",
248 .elevel = DEBUG1,
249 },
250 {
251 .severity = "INFO",
252 .elevel = INFO,
253 },
254 {
255 .severity = "LOG",
256 .elevel = LOG,
257 },
258 /* End marker */
259 {
260 .severity = NULL,
261 .elevel = 0,
262 } };
263 int i;
264
265 if (NULL == severity)
266 return 0;
267
268 i = 0;
269
270 while (NULL != severity_levels[i].severity)
271 {
272 if (strcmp(severity_levels[i].severity, severity) == 0)
273 return severity_levels[i].elevel;
274 i++;
275 }
276
277 pg_unreachable();
278
279 return ERROR;
280 }
281
282 /*
283 * Fill a connection error based on the result of a remote query.
284 */
285 static bool
fill_result_error(TSConnectionError * err,int errcode,const char * errmsg,const PGresult * res)286 fill_result_error(TSConnectionError *err, int errcode, const char *errmsg, const PGresult *res)
287 {
288 const ResultEntry *entry = PQresultInstanceData(res, eventproc);
289 const char *sqlstate;
290
291 if (NULL == err || NULL == res || NULL == entry)
292 return false;
293
294 Assert(entry->conn);
295
296 fill_simple_error(err, errcode, errmsg, entry->conn);
297 err->remote.elevel = severity_to_elevel(PQresultErrorField(res, PG_DIAG_SEVERITY_NONLOCALIZED));
298 err->remote.sqlstate = get_error_field_copy(res, PG_DIAG_SQLSTATE);
299 err->remote.msg = get_error_field_copy(res, PG_DIAG_MESSAGE_PRIMARY);
300 err->remote.detail = get_error_field_copy(res, PG_DIAG_MESSAGE_DETAIL);
301 err->remote.hint = get_error_field_copy(res, PG_DIAG_MESSAGE_HINT);
302 err->remote.context = get_error_field_copy(res, PG_DIAG_CONTEXT);
303 err->remote.stmtpos = get_error_field_copy(res, PG_DIAG_STATEMENT_POSITION);
304
305 sqlstate = err->remote.sqlstate;
306
307 if (sqlstate && strlen(sqlstate) == 5)
308 err->remote.errcode =
309 MAKE_SQLSTATE(sqlstate[0], sqlstate[1], sqlstate[2], sqlstate[3], sqlstate[4]);
310 else
311 err->remote.errcode = ERRCODE_INTERNAL_ERROR;
312
313 return false;
314 }
315
316 /*
317 * The following event handlers make sure all PGresult are freed with
318 * PQClear() when its parent connection is closed.
319 *
320 * It is still recommended to explicitly call PGclear() or
321 * remote_connection_result_close(), however, especially when PGresults are
322 * created in a tight loop (e.g., when scanning many tuples on a remote
323 * table).
324 */
325 #define EVENTPROC_FAILURE 0
326 #define EVENTPROC_SUCCESS 1
327
328 static void
remote_connection_free(TSConnection * conn)329 remote_connection_free(TSConnection *conn)
330 {
331 if (NULL != conn->tz_name)
332 free(conn->tz_name);
333
334 free(conn);
335 }
336
337 /*
338 * Invoked on PQfinish(conn). Frees all PGresult objects created on the
339 * connection, apart from those already freed with PQclear().
340 */
341 static int
handle_conn_destroy(PGEventConnDestroy * event)342 handle_conn_destroy(PGEventConnDestroy *event)
343 {
344 TSConnection *conn = PQinstanceData(event->conn, eventproc);
345 unsigned int results_count = 0;
346 ListNode *curr;
347
348 Assert(NULL != conn);
349 Assert(conn->closing_guard);
350
351 curr = conn->results.next;
352
353 while (curr != &conn->results)
354 {
355 ResultEntry *entry = (ResultEntry *) curr;
356 PGresult *result = entry->result;
357
358 curr = curr->next;
359 PQclear(result);
360 /* No need to free curr here since PQclear will invoke
361 * handle_result_destroy() which will free it */
362 results_count++;
363 }
364
365 conn->pg_conn = NULL;
366 list_detach(&conn->ln);
367
368 if (results_count > 0)
369 elog(DEBUG3, "cleared %u result objects on connection %p", results_count, conn);
370
371 connstats.connections_closed++;
372
373 if (!conn->closing_guard)
374 {
375 ereport(WARNING,
376 (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("invalid closing of connection")));
377 remote_connection_free(conn);
378 }
379
380 return EVENTPROC_SUCCESS;
381 }
382
383 /*
384 * Invoked on PQgetResult(conn). Adds the PGresult to the list in the parent
385 * TSConnection.
386 */
387 static int
handle_result_create(PGEventResultCreate * event)388 handle_result_create(PGEventResultCreate *event)
389 {
390 TSConnection *conn = PQinstanceData(event->conn, eventproc);
391 ResultEntry *entry;
392
393 Assert(NULL != conn);
394
395 /* We malloc this (instead of palloc) since bound PGresult, which also
396 * lives outside PostgreSQL's memory management. */
397 entry = malloc(sizeof(ResultEntry));
398
399 if (NULL == entry)
400 return EVENTPROC_FAILURE;
401
402 MemSet(entry, 0, sizeof(ResultEntry));
403 entry->ln.next = entry->ln.prev = NULL;
404 entry->conn = conn;
405 entry->result = event->result;
406 entry->subtxid = GetCurrentSubTransactionId();
407
408 /* Add entry as new head and set instance data */
409 list_insert_after(&entry->ln, &conn->results);
410 PQresultSetInstanceData(event->result, eventproc, entry);
411
412 elog(DEBUG3,
413 "created result %p on connection %p subtxid %u",
414 event->result,
415 conn,
416 entry->subtxid);
417
418 connstats.results_created++;
419
420 return EVENTPROC_SUCCESS;
421 }
422
423 /*
424 * Invoked on PQclear(result). Removes the PGresult from the list in the
425 * parent TSConnection.
426 */
427 static int
handle_result_destroy(PGEventResultDestroy * event)428 handle_result_destroy(PGEventResultDestroy *event)
429 {
430 ResultEntry *entry = PQresultInstanceData(event->result, eventproc);
431
432 Assert(NULL != entry);
433
434 /* Detach entry */
435 list_detach(&entry->ln);
436
437 elog(DEBUG3, "destroyed result %p for subtxnid %u", entry->result, entry->subtxid);
438
439 free(entry);
440
441 connstats.results_cleared++;
442
443 return EVENTPROC_SUCCESS;
444 }
445
446 /*
447 * Main event handler invoked when events happen on a PGconn.
448 *
449 * According to the libpq API, the function should return a non-zero value if
450 * it succeeds and zero if it fails. We use EVENTPROC_SUCCESS and
451 * EVENTPROC_FAILURE in place of these two options.
452 */
453 static int
eventproc(PGEventId eventid,void * eventinfo,void * data)454 eventproc(PGEventId eventid, void *eventinfo, void *data)
455 {
456 int res = EVENTPROC_SUCCESS;
457
458 switch (eventid)
459 {
460 case PGEVT_CONNDESTROY:
461 res = handle_conn_destroy((PGEventConnDestroy *) eventinfo);
462 break;
463 case PGEVT_RESULTCREATE:
464 res = handle_result_create((PGEventResultCreate *) eventinfo);
465 break;
466 case PGEVT_RESULTDESTROY:
467 res = handle_result_destroy((PGEventResultDestroy *) eventinfo);
468 break;
469 default:
470 /* Not of interest, so return success */
471 break;
472 }
473
474 return res;
475 }
476
477 static PQconninfoOption *
get_libpq_options()478 get_libpq_options()
479 {
480 /* make static to fetch once per backend */
481 static PQconninfoOption *libpq_options = NULL;
482
483 if (libpq_options == NULL)
484 {
485 /* Note that the options array is Malloc'ed */
486 libpq_options = PQconndefaults();
487 }
488
489 if (libpq_options == NULL)
490 {
491 /* probably OOM */
492 elog(ERROR, "could not get default libpq options");
493 }
494
495 return libpq_options;
496 }
497
498 static void
unset_libpq_envvar(void)499 unset_libpq_envvar(void)
500 {
501 PQconninfoOption *lopt;
502 PQconninfoOption *options = PQconndefaults();
503
504 Assert(options != NULL);
505
506 /* Explicitly unset all libpq environment variables.
507 *
508 * By default libpq uses environment variables as a fallback
509 * to specify connection options, potentially they could be in
510 * a conflict with PostgreSQL variables and introduce
511 * security risks.
512 */
513 for (lopt = options; lopt->keyword; lopt++)
514 {
515 if (lopt->envvar)
516 unsetenv(lopt->envvar);
517 }
518
519 PQconninfoFree(options);
520 }
521
522 static bool
is_libpq_option(const char * keyword,char ** display_option)523 is_libpq_option(const char *keyword, char **display_option)
524 {
525 PQconninfoOption *lopt;
526
527 for (lopt = get_libpq_options(); lopt->keyword; lopt++)
528 {
529 if (strcmp(lopt->keyword, keyword) == 0)
530 {
531 if (display_option != NULL)
532 *display_option = lopt->dispchar;
533 return true;
534 }
535 }
536 return false;
537 }
538
539 ConnOptionType
remote_connection_option_type(const char * keyword)540 remote_connection_option_type(const char *keyword)
541 {
542 char *display_option;
543
544 if (!is_libpq_option(keyword, &display_option))
545 return CONN_OPTION_TYPE_NONE;
546
547 /* Hide debug options, as well as settings we override internally. */
548 if (strchr(display_option, 'D') || strcmp(keyword, "fallback_application_name") == 0 ||
549 strcmp(keyword, "client_encoding") == 0)
550 return CONN_OPTION_TYPE_NONE;
551
552 /*
553 * "user" and any secret options are allowed only on user mappings.
554 * Everything else is a data node option.
555 */
556 if (strchr(display_option, '*') || strcmp(keyword, "user") == 0)
557 return CONN_OPTION_TYPE_USER;
558
559 return CONN_OPTION_TYPE_NODE;
560 }
561
562 bool
remote_connection_valid_user_option(const char * keyword)563 remote_connection_valid_user_option(const char *keyword)
564 {
565 return remote_connection_option_type(keyword) == CONN_OPTION_TYPE_USER;
566 }
567
568 bool
remote_connection_valid_node_option(const char * keyword)569 remote_connection_valid_node_option(const char *keyword)
570 {
571 return remote_connection_option_type(keyword) == CONN_OPTION_TYPE_NODE;
572 }
573
574 static int
extract_connection_options(List * defelems,const char ** keywords,const char ** values,const char ** user)575 extract_connection_options(List *defelems, const char **keywords, const char **values,
576 const char **user)
577 {
578 ListCell *lc;
579 int option_pos = 0;
580
581 Assert(keywords != NULL);
582 Assert(values != NULL);
583 Assert(user != NULL);
584
585 *user = NULL;
586 foreach (lc, defelems)
587 {
588 DefElem *d = (DefElem *) lfirst(lc);
589
590 if (is_libpq_option(d->defname, NULL))
591 {
592 keywords[option_pos] = d->defname;
593 values[option_pos] = defGetString(d);
594 if (strcmp(d->defname, "user") == 0)
595 {
596 Assert(*user == NULL);
597 *user = values[option_pos];
598 }
599 option_pos++;
600 }
601 }
602
603 return option_pos;
604 }
605
606 /*
607 * Internal connection configure.
608 *
609 * This function will send internal configuration settings if they have
610 * changed. It is used to pass on configuration settings before executing a
611 * command requested by module users.
612 *
613 * ATTENTION! This function should *not* use
614 * `remote_connection_exec_ok_command` since this function is called
615 * indirectly whenever a remote command is executed, which would lead to
616 * infinite recursion. Stick to `PQ*` functions.
617 *
618 * Returns true if the current configuration is OK (no change) or was
619 * successfully applied, otherwise false.
620 */
621 bool
remote_connection_configure_if_changed(TSConnection * conn)622 remote_connection_configure_if_changed(TSConnection *conn)
623 {
624 const char *local_tz_name = pg_get_timezone_name(session_timezone);
625 bool success = true;
626
627 /*
628 * We need to enforce the same timezone setting across nodes. Otherwise,
629 * we might get the wrong result when we push down things like
630 * date_trunc(text, timestamptz). To safely do that, we also need the
631 * timezone databases to be the same on all data nodes.
632 *
633 * We save away the timezone name so that we know what we last sent over
634 * the connection. If the time zone changed since last time we sent a
635 * command, we will send a SET TIMEZONE command with the new timezone
636 * first.
637 */
638 if (conn->tz_name == NULL ||
639 (local_tz_name && pg_strcasecmp(conn->tz_name, local_tz_name) != 0))
640 {
641 char *set_timezone_cmd = psprintf("SET TIMEZONE = '%s'", local_tz_name);
642 PGresult *result = PQexec(conn->pg_conn, set_timezone_cmd);
643
644 success = PQresultStatus(result) == PGRES_COMMAND_OK;
645 PQclear(result);
646 pfree(set_timezone_cmd);
647 free(conn->tz_name);
648 conn->tz_name = strdup(local_tz_name);
649 }
650
651 return success;
652 }
653
654 /*
655 * Default options/commands to set on every new connection.
656 *
657 * Timezone is indirectly set with the first command executed.
658 */
659 static const char *default_connection_options[] = {
660 /*
661 * Force the search path to contain only pg_catalog, which will force
662 * functions to output fully qualified identifier names (i.e., they will
663 * include the schema).
664 */
665 "SET search_path = pg_catalog",
666 /*
667 * Set values needed to ensure unambiguous data output from remote. (This
668 * logic should match what pg_dump does. See also set_transmission_modes
669 * in fdw.c.)
670 */
671 "SET datestyle = ISO",
672 "SET intervalstyle = postgres",
673 "SET extra_float_digits = 3",
674 NULL,
675 };
676
677 /*
678 * Issue SET commands to make sure remote session is configured properly.
679 *
680 * We do this just once at connection, assuming nothing will change the
681 * values later. Since we'll never send volatile function calls to the
682 * remote, there shouldn't be any way to break this assumption from our end.
683 * It's possible to think of ways to break it at the remote end, eg making a
684 * foreign table point to a view that includes a set_config call --- but once
685 * you admit the possibility of a malicious view definition, there are any
686 * number of ways to break things.
687 */
688 bool
remote_connection_configure(TSConnection * conn)689 remote_connection_configure(TSConnection *conn)
690 {
691 const char *cmd;
692 StringInfoData sql;
693 PGresult *result;
694 bool success = true;
695 int i = 0;
696
697 initStringInfo(&sql);
698
699 while ((cmd = default_connection_options[i]) != NULL)
700 {
701 appendStringInfo(&sql, "%s;", cmd);
702 i++;
703 }
704
705 result = PQexec(conn->pg_conn, sql.data);
706 success = PQresultStatus(result) == PGRES_COMMAND_OK;
707 PQclear(result);
708
709 return success;
710 }
711
712 static TSConnection *
remote_connection_create(PGconn * pg_conn,bool processing,const char * node_name)713 remote_connection_create(PGconn *pg_conn, bool processing, const char *node_name)
714 {
715 TSConnection *conn = malloc(sizeof(TSConnection));
716 int ret;
717
718 if (NULL == conn)
719 return NULL;
720
721 MemSet(conn, 0, sizeof(TSConnection));
722
723 /* Must register the event procedure before attaching any instance data */
724 ret = PQregisterEventProc(pg_conn, eventproc, "remote connection", conn);
725
726 if (ret == 0)
727 {
728 free(conn);
729 return NULL;
730 }
731
732 ret = PQsetInstanceData(pg_conn, eventproc, conn);
733 Assert(ret != 0);
734
735 conn->ln.next = conn->ln.prev = NULL;
736 conn->pg_conn = pg_conn;
737 conn->closing_guard = false;
738 conn->status = processing ? CONN_PROCESSING : CONN_IDLE;
739 namestrcpy(&conn->node_name, node_name);
740 conn->tz_name = NULL;
741 conn->autoclose = true;
742 conn->subtxid = GetCurrentSubTransactionId();
743 conn->xact_depth = 0;
744 conn->xact_transitioning = false;
745 /* Initialize results head */
746 conn->results.next = &conn->results;
747 conn->results.prev = &conn->results;
748 conn->binary_copy = false;
749 list_insert_after(&conn->ln, &connections);
750
751 elog(DEBUG3, "created connection %p", conn);
752 connstats.connections_created++;
753
754 return conn;
755 }
756
757 /*
758 * Set the auto-close behavior.
759 *
760 * If set, the connection will be closed at the end of the (sub-)transaction
761 * it was created on.
762 *
763 * The default value is on (true).
764 *
765 * Returns the previous setting.
766 */
767 bool
remote_connection_set_autoclose(TSConnection * conn,bool autoclose)768 remote_connection_set_autoclose(TSConnection *conn, bool autoclose)
769 {
770 bool old = conn->autoclose;
771
772 conn->autoclose = autoclose;
773 return old;
774 }
775
776 int
remote_connection_xact_depth_get(const TSConnection * conn)777 remote_connection_xact_depth_get(const TSConnection *conn)
778 {
779 Assert(conn->xact_depth >= 0);
780 return conn->xact_depth;
781 }
782
783 int
remote_connection_xact_depth_inc(TSConnection * conn)784 remote_connection_xact_depth_inc(TSConnection *conn)
785 {
786 Assert(conn->xact_depth >= 0);
787 return ++conn->xact_depth;
788 }
789
790 int
remote_connection_xact_depth_dec(TSConnection * conn)791 remote_connection_xact_depth_dec(TSConnection *conn)
792 {
793 Assert(conn->xact_depth > 0);
794 return --conn->xact_depth;
795 }
796
797 void
remote_connection_xact_transition_begin(TSConnection * conn)798 remote_connection_xact_transition_begin(TSConnection *conn)
799 {
800 Assert(!conn->xact_transitioning);
801 conn->xact_transitioning = true;
802 }
803
804 void
remote_connection_xact_transition_end(TSConnection * conn)805 remote_connection_xact_transition_end(TSConnection *conn)
806 {
807 Assert(conn->xact_transitioning);
808 conn->xact_transitioning = false;
809 }
810
811 bool
remote_connection_xact_is_transitioning(const TSConnection * conn)812 remote_connection_xact_is_transitioning(const TSConnection *conn)
813 {
814 return conn->xact_transitioning;
815 }
816
817 PGconn *
remote_connection_get_pg_conn(const TSConnection * conn)818 remote_connection_get_pg_conn(const TSConnection *conn)
819 {
820 Assert(conn != NULL);
821 return conn->pg_conn;
822 }
823
824 bool
remote_connection_is_processing(const TSConnection * conn)825 remote_connection_is_processing(const TSConnection *conn)
826 {
827 Assert(conn != NULL);
828 return conn->status != CONN_IDLE;
829 }
830
831 void
remote_connection_set_status(TSConnection * conn,TSConnectionStatus status)832 remote_connection_set_status(TSConnection *conn, TSConnectionStatus status)
833 {
834 Assert(conn != NULL);
835 conn->status = status;
836 }
837
838 TSConnectionStatus
remote_connection_get_status(const TSConnection * conn)839 remote_connection_get_status(const TSConnection *conn)
840 {
841 return conn->status;
842 }
843
844 const char *
remote_connection_node_name(const TSConnection * conn)845 remote_connection_node_name(const TSConnection *conn)
846 {
847 return NameStr(conn->node_name);
848 }
849
850 void
remote_connection_get_error(const TSConnection * conn,TSConnectionError * err)851 remote_connection_get_error(const TSConnection *conn, TSConnectionError *err)
852 {
853 fill_connection_error(err, ERRCODE_CONNECTION_FAILURE, "", conn);
854 }
855
856 void
remote_connection_get_result_error(const PGresult * res,TSConnectionError * err)857 remote_connection_get_result_error(const PGresult *res, TSConnectionError *err)
858 {
859 fill_result_error(err, ERRCODE_CONNECTION_EXCEPTION, "", res);
860 }
861
862 /*
863 * Execute a remote command.
864 *
865 * Like PQexec, which this functions uses internally, the PGresult returned
866 * describes only the last command executed in a multi-command string.
867 */
868 PGresult *
remote_connection_exec(TSConnection * conn,const char * cmd)869 remote_connection_exec(TSConnection *conn, const char *cmd)
870 {
871 if (!remote_connection_configure_if_changed(conn))
872 {
873 PGresult *res = PQmakeEmptyPGresult(conn->pg_conn, PGRES_FATAL_ERROR);
874 PQfireResultCreateEvents(conn->pg_conn, res);
875 return res;
876 }
877
878 return PQexec(conn->pg_conn, cmd);
879 }
880
881 /*
882 * Must be a macro since va_start() must be called in the function that takes
883 * a variable number of arguments.
884 */
885 #define stringinfo_va(fmt, sql) \
886 do \
887 { \
888 initStringInfo((sql)); \
889 for (;;) \
890 { \
891 va_list args; \
892 int needed; \
893 va_start(args, fmt); \
894 needed = appendStringInfoVA((sql), fmt, args); \
895 va_end(args); \
896 if (needed == 0) \
897 break; \
898 /* Increase the buffer size and try again. */ \
899 enlargeStringInfo((sql), needed); \
900 } \
901 } while (0);
902
903 /*
904 * Execute a remote command.
905 *
906 * Like remote_connection_exec but takes a variable number of arguments.
907 */
908 PGresult *
remote_connection_execf(TSConnection * conn,const char * fmt,...)909 remote_connection_execf(TSConnection *conn, const char *fmt, ...)
910 {
911 PGresult *res;
912 StringInfoData sql;
913
914 stringinfo_va(fmt, &sql);
915 res = remote_connection_exec(conn, sql.data);
916 pfree(sql.data);
917
918 return res;
919 }
920
921 PGresult *
remote_connection_queryf_ok(TSConnection * conn,const char * fmt,...)922 remote_connection_queryf_ok(TSConnection *conn, const char *fmt, ...)
923 {
924 StringInfoData sql;
925 PGresult *res;
926
927 stringinfo_va(fmt, &sql);
928 res = remote_result_query_ok(remote_connection_exec(conn, sql.data));
929 pfree(sql.data);
930 return res;
931 }
932
933 PGresult *
remote_connection_query_ok(TSConnection * conn,const char * query)934 remote_connection_query_ok(TSConnection *conn, const char *query)
935 {
936 return remote_result_query_ok(remote_connection_exec(conn, query));
937 }
938
939 void
remote_connection_cmd_ok(TSConnection * conn,const char * cmd)940 remote_connection_cmd_ok(TSConnection *conn, const char *cmd)
941 {
942 remote_result_cmd_ok(remote_connection_exec(conn, cmd));
943 }
944
945 void
remote_connection_cmdf_ok(TSConnection * conn,const char * fmt,...)946 remote_connection_cmdf_ok(TSConnection *conn, const char *fmt, ...)
947 {
948 StringInfoData sql;
949
950 stringinfo_va(fmt, &sql);
951 remote_result_cmd_ok(remote_connection_exec(conn, sql.data));
952 pfree(sql.data);
953 }
954
955 static PGresult *
remote_result_ok(PGresult * res,ExecStatusType expected)956 remote_result_ok(PGresult *res, ExecStatusType expected)
957 {
958 if (PQresultStatus(res) != expected)
959 remote_result_elog(res, ERROR);
960
961 return res;
962 }
963
964 void
remote_result_cmd_ok(PGresult * res)965 remote_result_cmd_ok(PGresult *res)
966 {
967 PQclear(remote_result_ok(res, PGRES_COMMAND_OK));
968 }
969
970 PGresult *
remote_result_query_ok(PGresult * res)971 remote_result_query_ok(PGresult *res)
972 {
973 return remote_result_ok(res, PGRES_TUPLES_OK);
974 }
975
976 /**
977 * Validate extension version.
978 */
979 void
remote_validate_extension_version(TSConnection * conn,const char * data_node_version)980 remote_validate_extension_version(TSConnection *conn, const char *data_node_version)
981 {
982 bool old_version;
983
984 if (!dist_util_is_compatible_version(data_node_version, TIMESCALEDB_VERSION, &old_version))
985 ereport(ERROR,
986 (errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
987 errmsg("remote PostgreSQL instance has an incompatible timescaledb extension "
988 "version"),
989 errdetail_internal("Access node version: %s, remote version: %s.",
990 TIMESCALEDB_VERSION_MOD,
991 data_node_version)));
992 if (old_version)
993 ereport(WARNING,
994 (errmsg("remote PostgreSQL instance has an outdated timescaledb extension version"),
995 errdetail_internal("Access node version: %s, remote version: %s.",
996 TIMESCALEDB_VERSION_MOD,
997 data_node_version)));
998 }
999
1000 /*
1001 * Check timescaledb extension version on a data node.
1002 *
1003 * Compare remote connection extension version with the one installed
1004 * locally on the access node.
1005 *
1006 * Return false if extension is not found, true otherwise.
1007 */
1008 bool
remote_connection_check_extension(TSConnection * conn)1009 remote_connection_check_extension(TSConnection *conn)
1010 {
1011 PGresult *res;
1012
1013 res = remote_connection_execf(conn,
1014 "SELECT extversion FROM pg_extension WHERE extname = %s",
1015 quote_literal_cstr(EXTENSION_NAME));
1016
1017 /* Just to capture any bugs in the SELECT above */
1018 Assert(PQnfields(res) == 1);
1019
1020 switch (PQntuples(res))
1021 {
1022 case 0: /* extension does not exists */
1023 PQclear(res);
1024 return false;
1025
1026 case 1:
1027 break;
1028
1029 default: /* something strange happend */
1030 ereport(WARNING,
1031 (errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
1032 errmsg("more than one TimescaleDB extension loaded")));
1033 break;
1034 }
1035
1036 /* validate extension version on data node and make sure that it is
1037 * compatible */
1038 remote_validate_extension_version(conn, PQgetvalue(res, 0, 0));
1039
1040 PQclear(res);
1041 return true;
1042 }
1043
1044 /*
1045 * Configure remote connection using current instance UUID.
1046 *
1047 * This allows remote side to reason about whether this connection has been
1048 * originated by access node.
1049 *
1050 * Returns true on success and false on error, in which case the optional
1051 * errmsg parameter can be used to retrieve an error message.
1052 */
1053 static bool
remote_connection_set_peer_dist_id(TSConnection * conn)1054 remote_connection_set_peer_dist_id(TSConnection *conn)
1055 {
1056 Datum id_string = DirectFunctionCall1(uuid_out, ts_telemetry_metadata_get_uuid());
1057 PGresult *res;
1058 bool success = true;
1059
1060 res = remote_connection_execf(conn,
1061 "SELECT * FROM _timescaledb_internal.set_peer_dist_id('%s')",
1062 DatumGetCString(id_string));
1063 success = PQresultStatus(res) == PGRES_TUPLES_OK;
1064 PQclear(res);
1065
1066 return success;
1067 }
1068
1069 /* fallback_application_name, client_encoding, end marker */
1070 #define REMOTE_CONNECTION_SESSION_OPTIONS_N 3
1071
1072 /* passfile */
1073 #define REMOTE_CONNECTION_PASSWORD_OPTIONS_N 1
1074
1075 /* sslmode, sslrootcert, sslcert, sslkey */
1076 #define REMOTE_CONNECTION_SSL_OPTIONS_N 4
1077
1078 #define REMOTE_CONNECTION_OPTIONS_TOTAL_N \
1079 (REMOTE_CONNECTION_SESSION_OPTIONS_N + REMOTE_CONNECTION_PASSWORD_OPTIONS_N + \
1080 REMOTE_CONNECTION_SSL_OPTIONS_N)
1081
1082 /* default password file basename */
1083 #define DEFAULT_PASSFILE_NAME "passfile"
1084
1085 static void
set_password_options(const char ** keywords,const char ** values,int * option_start)1086 set_password_options(const char **keywords, const char **values, int *option_start)
1087 {
1088 int option_pos = *option_start;
1089
1090 /* Set user specified password file path using timescaledb.passfile or
1091 * use default path assuming that the file is stored in the
1092 * data directory */
1093 keywords[option_pos] = "passfile";
1094 if (ts_guc_passfile)
1095 values[option_pos] = ts_guc_passfile;
1096 else
1097 values[option_pos] = psprintf("%s/" DEFAULT_PASSFILE_NAME, DataDir);
1098 option_pos++;
1099
1100 *option_start = option_pos;
1101 }
1102
1103 typedef enum PathKind
1104 {
1105 PATH_KIND_CRT,
1106 PATH_KIND_KEY
1107 } PathKind;
1108
1109 /* Path description for human consumption */
1110 static const char *path_kind_text[PATH_KIND_KEY + 1] = {
1111 [PATH_KIND_CRT] = "certificate",
1112 [PATH_KIND_KEY] = "private key",
1113 };
1114
1115 /* Path extension string for file system */
1116 static const char *path_kind_ext[PATH_KIND_KEY + 1] = {
1117 [PATH_KIND_CRT] = "crt",
1118 [PATH_KIND_KEY] = "key",
1119 };
1120
1121 /*
1122 * Helper function to report error.
1123 *
1124 * This is needed to avoid code coverage reporting low coverage for error
1125 * cases in `make_user_path` that cannot be reached in normal situations.
1126 */
1127 static void
report_path_error(PathKind path_kind,const char * user_name)1128 report_path_error(PathKind path_kind, const char *user_name)
1129 {
1130 elog(ERROR,
1131 "cannot write %s for user \"%s\": path too long",
1132 path_kind_text[path_kind],
1133 user_name);
1134 }
1135
1136 /*
1137 * Make a user path with the given extension and user name in a portable and
1138 * safe manner.
1139 *
1140 * We use MD5 to compute a filename for the user name, which allows all forms
1141 * of user names. It is not necessary for the function to be cryptographically
1142 * secure, only to have a low risk of collisions, and MD5 is fast and with a
1143 * low risk of collisions.
1144 *
1145 * Will return the resulting path, or abort with an error.
1146 */
1147 static StringInfo
make_user_path(const char * user_name,PathKind path_kind)1148 make_user_path(const char *user_name, PathKind path_kind)
1149 {
1150 char ret_path[MAXPGPATH];
1151 char hexsum[33];
1152 StringInfo result;
1153
1154 pg_md5_hash(user_name, strlen(user_name), hexsum);
1155
1156 if (strlcpy(ret_path, ts_guc_ssl_dir ? ts_guc_ssl_dir : DataDir, MAXPGPATH) > MAXPGPATH)
1157 report_path_error(path_kind, user_name);
1158 canonicalize_path(ret_path);
1159
1160 if (!ts_guc_ssl_dir)
1161 {
1162 join_path_components(ret_path, ret_path, EXTENSION_NAME);
1163 join_path_components(ret_path, ret_path, "certs");
1164 }
1165
1166 join_path_components(ret_path, ret_path, hexsum);
1167
1168 result = makeStringInfo();
1169 appendStringInfo(result, "%s.%s", ret_path, path_kind_ext[path_kind]);
1170 return result;
1171 }
1172
1173 static void
set_ssl_options(const char * user_name,const char ** keywords,const char ** values,int * option_start)1174 set_ssl_options(const char *user_name, const char **keywords, const char **values,
1175 int *option_start)
1176 {
1177 int option_pos = *option_start;
1178 const char *ssl_enabled;
1179 const char *ssl_ca_file;
1180
1181 ssl_enabled = GetConfigOption("ssl", true, false);
1182
1183 if (!ssl_enabled || strcmp(ssl_enabled, "on") != 0)
1184 return;
1185
1186 /* If SSL is enabled on AN then we assume it is also should be used for DN
1187 * connections as well, otherwise we need to introduce some other way to
1188 * control it */
1189 keywords[option_pos] = "sslmode";
1190 values[option_pos] = "require";
1191 option_pos++;
1192
1193 ssl_ca_file = GetConfigOption("ssl_ca_file", true, false);
1194
1195 /* Use ssl_ca_file as the root certificate when verifying the
1196 * data node we connect to */
1197 if (ssl_ca_file)
1198 {
1199 keywords[option_pos] = "sslrootcert";
1200 values[option_pos] = ssl_ca_file;
1201 option_pos++;
1202 }
1203
1204 /* Search for the user certificate in the user subdirectory of either
1205 * timescaledb.ssl_dir or data directory. The user subdirectory is
1206 * currently hardcoded. */
1207
1208 keywords[option_pos] = "sslcert";
1209 values[option_pos] = make_user_path(user_name, PATH_KIND_CRT)->data;
1210 option_pos++;
1211
1212 keywords[option_pos] = "sslkey";
1213 values[option_pos] = make_user_path(user_name, PATH_KIND_KEY)->data;
1214 option_pos++;
1215
1216 *option_start = option_pos;
1217 }
1218
1219 /*
1220 * Finish the connection and, optionally, save the connection error.
1221 */
1222 static void
finish_connection(PGconn * conn,char ** errmsg)1223 finish_connection(PGconn *conn, char **errmsg)
1224 {
1225 if (NULL != errmsg)
1226 {
1227 if (NULL == conn)
1228 *errmsg = "invalid connection";
1229 else
1230 *errmsg = pchomp(PQerrorMessage(conn));
1231 }
1232
1233 PQfinish(conn);
1234 }
1235
1236 /*
1237 * Take options belonging to a foreign server and add additional default and
1238 * other user/ssl related options as appropriate
1239 */
1240 static void
setup_full_connection_options(List * connection_options,const char *** all_keywords,const char *** all_values)1241 setup_full_connection_options(List *connection_options, const char ***all_keywords,
1242 const char ***all_values)
1243 {
1244 const char *user_name = NULL;
1245 const char **keywords;
1246 const char **values;
1247 int option_count;
1248 int option_pos;
1249
1250 /*
1251 * Construct connection params from generic options of ForeignServer
1252 * and user. (Some of them might not be libpq options, in
1253 * which case we'll just waste a few array slots.) Add 3 extra slots
1254 * for fallback_application_name, client_encoding, end marker.
1255 * One additional slot to set passfile and 4 slots for ssl options.
1256 */
1257 option_count = list_length(connection_options) + REMOTE_CONNECTION_OPTIONS_TOTAL_N;
1258 keywords = (const char **) palloc(option_count * sizeof(char *));
1259 values = (const char **) palloc(option_count * sizeof(char *));
1260
1261 option_pos = extract_connection_options(connection_options, keywords, values, &user_name);
1262
1263 if (NULL == user_name)
1264 user_name = GetUserNameFromId(GetUserId(), false);
1265
1266 /* Use the extension name as fallback_application_name. */
1267 keywords[option_pos] = "fallback_application_name";
1268 values[option_pos] = EXTENSION_NAME;
1269 option_pos++;
1270
1271 /* Set client_encoding so that libpq can convert encoding properly. */
1272 keywords[option_pos] = "client_encoding";
1273 values[option_pos] = GetDatabaseEncodingName();
1274 option_pos++;
1275
1276 /* Set passfile options */
1277 set_password_options(keywords, values, &option_pos);
1278
1279 /* Set client specific SSL connection options */
1280 set_ssl_options(user_name, keywords, values, &option_pos);
1281
1282 /* Set end marker */
1283 keywords[option_pos] = values[option_pos] = NULL;
1284 Assert(option_pos <= option_count);
1285
1286 *all_keywords = keywords;
1287 *all_values = values;
1288 }
1289
1290 /*
1291 * This will only open a connection to a specific node, but not do anything
1292 * else. In particular, it will not perform any validation nor configure the
1293 * connection since it cannot know that it connects to a data node database or
1294 * not. For that, please use the `remote_connection_open_with_options`
1295 * function.
1296 */
1297 TSConnection *
remote_connection_open_with_options_nothrow(const char * node_name,List * connection_options,char ** errmsg)1298 remote_connection_open_with_options_nothrow(const char *node_name, List *connection_options,
1299 char **errmsg)
1300 {
1301 PGconn *volatile pg_conn = NULL;
1302 TSConnection *ts_conn;
1303 const char **keywords;
1304 const char **values;
1305
1306 if (NULL != errmsg)
1307 *errmsg = NULL;
1308
1309 setup_full_connection_options(connection_options, &keywords, &values);
1310
1311 pg_conn = PQconnectdbParams(keywords, values, 0 /* Do not expand dbname param */);
1312
1313 /* Cast to (char **) to silence warning with MSVC compiler */
1314 pfree((char **) keywords);
1315 pfree((char **) values);
1316
1317 if (NULL == pg_conn)
1318 return NULL;
1319
1320 if (PQstatus(pg_conn) != CONNECTION_OK)
1321 {
1322 finish_connection(pg_conn, errmsg);
1323 return NULL;
1324 }
1325
1326 ts_conn = remote_connection_create(pg_conn, false, node_name);
1327
1328 if (NULL == ts_conn)
1329 finish_connection(pg_conn, errmsg);
1330
1331 return ts_conn;
1332 }
1333
1334 /*
1335 * Opens a connection.
1336 *
1337 * Raw connections are not part of the transaction and do not have transactions
1338 * auto-started. They must be explicitly closed by
1339 * remote_connection_close. Note that connections are allocated using malloc
1340 * and so if you do not call remote_connection_close, you'll have a memory
1341 * leak. Note that the connection cache handles all of this for you so use
1342 * that if you can.
1343 */
1344 TSConnection *
remote_connection_open_with_options(const char * node_name,List * connection_options,bool set_dist_id)1345 remote_connection_open_with_options(const char *node_name, List *connection_options,
1346 bool set_dist_id)
1347 {
1348 char *err = NULL;
1349 TSConnection *conn =
1350 remote_connection_open_with_options_nothrow(node_name, connection_options, &err);
1351
1352 if (NULL == conn)
1353 ereport(ERROR,
1354 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
1355 errmsg("could not connect to \"%s\"", node_name),
1356 err == NULL ? 0 : errdetail_internal("%s", err)));
1357
1358 /*
1359 * Use PG_TRY block to ensure closing connection on error.
1360 */
1361 PG_TRY();
1362 {
1363 Assert(NULL != conn->pg_conn);
1364
1365 if (PQstatus(conn->pg_conn) != CONNECTION_OK)
1366 ereport(ERROR,
1367 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
1368 errmsg("could not connect to \"%s\"", node_name),
1369 errdetail_internal("%s", pchomp(PQerrorMessage(conn->pg_conn)))));
1370
1371 /* Prepare new session for use */
1372 if (!remote_connection_configure(conn))
1373 ereport(ERROR,
1374 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
1375 errmsg("could not configure remote connection to \"%s\"", node_name),
1376 errdetail_internal("%s", PQerrorMessage(conn->pg_conn))));
1377
1378 /* Check a data node extension version and show a warning
1379 * message if it differs */
1380 remote_connection_check_extension(conn);
1381
1382 if (set_dist_id)
1383 {
1384 /* Inform remote node about instance UUID */
1385 if (!remote_connection_set_peer_dist_id(conn))
1386 ereport(ERROR,
1387 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
1388 errmsg("could not set distributed ID for \"%s\"", node_name),
1389 errdetail_internal("%s", PQerrorMessage(conn->pg_conn))));
1390 }
1391 }
1392 PG_CATCH();
1393 {
1394 /* Release PGconn data structure if we managed to create one */
1395 remote_connection_close(conn);
1396 PG_RE_THROW();
1397 }
1398 PG_END_TRY();
1399
1400 return conn;
1401 }
1402
1403 /*
1404 * Based on PG's GetUserMapping, but this version does not fail when a user
1405 * mapping is not found.
1406 */
1407 static UserMapping *
get_user_mapping(Oid userid,Oid serverid)1408 get_user_mapping(Oid userid, Oid serverid)
1409 {
1410 Datum datum;
1411 HeapTuple tp;
1412 bool isnull;
1413 UserMapping *um;
1414
1415 tp = SearchSysCache2(USERMAPPINGUSERSERVER,
1416 ObjectIdGetDatum(userid),
1417 ObjectIdGetDatum(serverid));
1418
1419 if (!HeapTupleIsValid(tp))
1420 {
1421 /* Not found for the specific user -- try PUBLIC */
1422 tp = SearchSysCache2(USERMAPPINGUSERSERVER,
1423 ObjectIdGetDatum(InvalidOid),
1424 ObjectIdGetDatum(serverid));
1425 }
1426
1427 if (!HeapTupleIsValid(tp))
1428 return NULL;
1429
1430 um = (UserMapping *) palloc(sizeof(UserMapping));
1431 um->umid = ((Form_pg_user_mapping) GETSTRUCT(tp))->oid;
1432 um->userid = userid;
1433 um->serverid = serverid;
1434
1435 /* Extract the umoptions */
1436 datum = SysCacheGetAttr(USERMAPPINGUSERSERVER, tp, Anum_pg_user_mapping_umoptions, &isnull);
1437 if (isnull)
1438 um->options = NIL;
1439 else
1440 um->options = untransformRelOptions(datum);
1441
1442 ReleaseSysCache(tp);
1443
1444 return um;
1445 }
1446
1447 static bool
options_contain(List * options,const char * key)1448 options_contain(List *options, const char *key)
1449 {
1450 ListCell *lc;
1451
1452 foreach (lc, options)
1453 {
1454 DefElem *d = (DefElem *) lfirst(lc);
1455
1456 if (strcmp(d->defname, key) == 0)
1457 return true;
1458 }
1459
1460 return false;
1461 }
1462
1463 /*
1464 * Add user info (username and optionally password) to the connection
1465 * options).
1466 */
1467 static List *
add_userinfo_to_server_options(ForeignServer * server,Oid user_id)1468 add_userinfo_to_server_options(ForeignServer *server, Oid user_id)
1469 {
1470 const UserMapping *um = get_user_mapping(user_id, server->serverid);
1471 List *options = list_copy(server->options);
1472
1473 /* If a user mapping exists, then use the "user" and "password" options
1474 * from the user mapping (we assume that these options exist, or the
1475 * connection will later fail). Otherwise, just add the "user" and rely on
1476 * other authentication mechanisms. */
1477 if (NULL != um)
1478 options = list_concat(options, um->options);
1479
1480 if (!options_contain(options, "user"))
1481 {
1482 char *user_name = GetUserNameFromId(user_id, false);
1483 options = lappend(options, makeDefElem("user", (Node *) makeString(user_name), -1));
1484 }
1485
1486 return options;
1487 }
1488
1489 /*
1490 * Append the given string to the buffer, with suitable quoting for passing
1491 * the string as a value in a keyword/value pair in a libpq connection string.
1492 *
1493 * The implementation is based on libpq appendConnStrVal().
1494 */
1495 static void
remote_connection_append_connstr_value(StringInfo buf,const char * str)1496 remote_connection_append_connstr_value(StringInfo buf, const char *str)
1497 {
1498 const char *s;
1499 bool needquotes;
1500
1501 /*
1502 * If the string is one or more plain ASCII characters, no need to quote
1503 * it. This is quite conservative, but better safe than sorry.
1504 */
1505 needquotes = true;
1506 for (s = str; *s; s++)
1507 {
1508 if (!((*s >= 'a' && *s <= 'z') || (*s >= 'A' && *s <= 'Z') || (*s >= '0' && *s <= '9') ||
1509 *s == '_' || *s == '.'))
1510 {
1511 needquotes = true;
1512 break;
1513 }
1514 needquotes = false;
1515 }
1516
1517 if (needquotes)
1518 {
1519 appendStringInfoChar(buf, '\'');
1520 while (*str)
1521 {
1522 /* ' and \ must be escaped by to \' and \\ */
1523 if (*str == '\'' || *str == '\\')
1524 appendStringInfoChar(buf, '\\');
1525
1526 appendStringInfoChar(buf, *str);
1527 str++;
1528 }
1529 appendStringInfoChar(buf, '\'');
1530 }
1531 else
1532 appendStringInfoString(buf, str);
1533 }
1534
1535 char *
remote_connection_get_connstr(const char * node_name)1536 remote_connection_get_connstr(const char *node_name)
1537 {
1538 ForeignServer *server;
1539 List *connection_options;
1540 const char **keywords;
1541 const char **values;
1542 StringInfoData connstr;
1543 StringInfoData connstr_escape;
1544 int i;
1545
1546 server = data_node_get_foreign_server(node_name, ACL_NO_CHECK, false, false);
1547 connection_options = add_userinfo_to_server_options(server, GetUserId());
1548 setup_full_connection_options(connection_options, &keywords, &values);
1549
1550 /* Cycle through the options and create the connection string */
1551 initStringInfo(&connstr);
1552 i = 0;
1553 while (keywords[i] != NULL)
1554 {
1555 appendStringInfo(&connstr, " %s=", keywords[i]);
1556 remote_connection_append_connstr_value(&connstr, values[i]);
1557 i++;
1558 }
1559 Assert(keywords[i] == NULL && values[i] == NULL);
1560
1561 initStringInfo(&connstr_escape);
1562 enlargeStringInfo(&connstr_escape, connstr.len * 2 + 1);
1563 connstr_escape.len += PQescapeString(connstr_escape.data, connstr.data, connstr.len);
1564
1565 /* Cast to (char **) to silence warning with MSVC compiler */
1566 pfree((char **) keywords);
1567 pfree((char **) values);
1568 pfree(connstr.data);
1569
1570 return connstr_escape.data;
1571 }
1572
1573 TSConnection *
remote_connection_open_by_id(TSConnectionId id)1574 remote_connection_open_by_id(TSConnectionId id)
1575 {
1576 ForeignServer *server = GetForeignServer(id.server_id);
1577 List *connection_options = add_userinfo_to_server_options(server, id.user_id);
1578
1579 return remote_connection_open_with_options(server->servername, connection_options, true);
1580 }
1581
1582 TSConnection *
remote_connection_open(Oid server_id,Oid user_id)1583 remote_connection_open(Oid server_id, Oid user_id)
1584 {
1585 TSConnectionId id = remote_connection_id(server_id, user_id);
1586
1587 return remote_connection_open_by_id(id);
1588 }
1589
1590 /*
1591 * Open a connection without throwing and error.
1592 *
1593 * Returns the connection pointer on success. On failure NULL is returned and
1594 * the errmsg (if given) is used to return an error message.
1595 */
1596 TSConnection *
remote_connection_open_nothrow(Oid server_id,Oid user_id,char ** errmsg)1597 remote_connection_open_nothrow(Oid server_id, Oid user_id, char **errmsg)
1598 {
1599 ForeignServer *server = GetForeignServer(server_id);
1600 Oid fdwid = get_foreign_data_wrapper_oid(EXTENSION_FDW_NAME, false);
1601 List *connection_options;
1602 TSConnection *conn;
1603
1604 if (server->fdwid != fdwid)
1605 {
1606 elog(WARNING, "invalid node type for \"%s\"", server->servername);
1607 return NULL;
1608 }
1609
1610 connection_options = add_userinfo_to_server_options(server, user_id);
1611 conn =
1612 remote_connection_open_with_options_nothrow(server->servername, connection_options, errmsg);
1613
1614 if (NULL == conn)
1615 {
1616 if (NULL != errmsg && NULL == *errmsg)
1617 *errmsg = "internal connection error";
1618 return NULL;
1619 }
1620
1621 if (PQstatus(conn->pg_conn) != CONNECTION_OK || !remote_connection_set_peer_dist_id(conn))
1622 {
1623 if (NULL != errmsg)
1624 *errmsg = pchomp(PQerrorMessage(conn->pg_conn));
1625 remote_connection_close(conn);
1626 return NULL;
1627 }
1628
1629 return conn;
1630 }
1631
1632 #define PING_QUERY "SELECT 1"
1633
1634 bool
remote_connection_ping(const char * node_name)1635 remote_connection_ping(const char *node_name)
1636 {
1637 Oid server_id = get_foreign_server_oid(node_name, false);
1638 TSConnection *conn = remote_connection_open_nothrow(server_id, GetUserId(), NULL);
1639 bool success = false;
1640
1641 if (NULL == conn)
1642 return false;
1643
1644 if (PQstatus(conn->pg_conn) == CONNECTION_OK)
1645 {
1646 if (1 == PQsendQuery(conn->pg_conn, PING_QUERY))
1647 {
1648 PGresult *res = PQgetResult(conn->pg_conn);
1649
1650 success = (PQresultStatus(res) == PGRES_TUPLES_OK);
1651 PQclear(res);
1652 }
1653 }
1654
1655 remote_connection_close(conn);
1656
1657 return success;
1658 }
1659
1660 void
remote_connection_close(TSConnection * conn)1661 remote_connection_close(TSConnection *conn)
1662 {
1663 Assert(conn != NULL);
1664
1665 conn->closing_guard = true;
1666
1667 if (NULL != conn->pg_conn)
1668 PQfinish(conn->pg_conn);
1669
1670 /* Assert that PQfinish detached this connection from the global list of
1671 * connections */
1672 Assert(IS_DETACHED_ENTRY(&conn->ln));
1673
1674 remote_connection_free(conn);
1675 }
1676
1677 /*
1678 * Assign a "unique" number for a cursor.
1679 *
1680 * TODO should this be moved into the session?
1681 *
1682 * These really only need to be unique per connection within a transaction.
1683 * For the moment we ignore the per-connection point and assign them across
1684 * all connections in the transaction, but we ask for the connection to be
1685 * supplied in case we want to refine that.
1686 *
1687 * Note that even if wraparound happens in a very long transaction, actual
1688 * collisions are highly improbable; just be sure to use %u not %d to print.
1689 */
1690 unsigned int
remote_connection_get_cursor_number()1691 remote_connection_get_cursor_number()
1692 {
1693 return ++cursor_number;
1694 }
1695
1696 void
remote_connection_reset_cursor_number()1697 remote_connection_reset_cursor_number()
1698 {
1699 cursor_number = 0;
1700 }
1701
1702 /*
1703 * Assign a "unique" number for a prepared statement.
1704 *
1705 * This works much like remote_connection_get_cursor_number, except that we never reset the counter
1706 * within a session. That's because we can't be 100% sure we've gotten rid
1707 * of all prepared statements on all connections, and it's not really worth
1708 * increasing the risk of prepared-statement name collisions by resetting.
1709 */
1710 unsigned int
remote_connection_get_prep_stmt_number()1711 remote_connection_get_prep_stmt_number()
1712 {
1713 return ++prep_stmt_number;
1714 }
1715
1716 #define MAX_CONN_WAIT_TIMEOUT_MS 60000
1717
1718 /*
1719 * Drain a connection of all data coming in and discard the results. Return
1720 * CONN_OK if all data is drained before the deadline expires.
1721 *
1722 * This is mainly used in abort processing. This result being returned
1723 * might be for a query that is being interrupted by transaction abort, or it might
1724 * be a query that was initiated as part of transaction abort to get the remote
1725 * side back to the appropriate state.
1726 *
1727 * It's not a huge problem if we throw an ERROR here, but if we get into error
1728 * recursion trouble, we'll end up slamming the connection shut, which will
1729 * necessitate failing the entire toplevel transaction even if subtransactions
1730 * were used. Try to use WARNING where we can.
1731 *
1732 * end_time is the time at which we should give up and assume the remote
1733 * side is dead.
1734 */
1735 TSConnectionResult
remote_connection_drain(TSConnection * conn,TimestampTz endtime,PGresult ** result)1736 remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **result)
1737 {
1738 volatile TSConnectionResult connresult = CONN_OK;
1739 PGresult *volatile last_res = NULL;
1740 PGconn *pg_conn = remote_connection_get_pg_conn(conn);
1741
1742 /* In what follows, do not leak any PGresults on an error. */
1743 PG_TRY();
1744 {
1745 for (;;)
1746 {
1747 PGresult *res;
1748
1749 while (PQisBusy(pg_conn))
1750 {
1751 int wc;
1752 TimestampTz now = GetCurrentTimestamp();
1753 long remaining_secs;
1754 int remaining_usecs;
1755 long cur_timeout_ms;
1756
1757 /* If timeout has expired, give up, else get sleep time. */
1758 if (now >= endtime)
1759 {
1760 connresult = CONN_TIMEOUT;
1761 goto exit;
1762 }
1763
1764 TimestampDifference(now, endtime, &remaining_secs, &remaining_usecs);
1765
1766 /* To protect against clock skew, limit sleep to one minute. */
1767 cur_timeout_ms =
1768 Min(MAX_CONN_WAIT_TIMEOUT_MS, remaining_secs * USECS_PER_SEC + remaining_usecs);
1769
1770 /* Sleep until there's something to do */
1771 wc = WaitLatchOrSocket(MyLatch,
1772 WL_LATCH_SET | WL_SOCKET_READABLE | WL_EXIT_ON_PM_DEATH |
1773 WL_TIMEOUT,
1774 PQsocket(pg_conn),
1775 cur_timeout_ms,
1776 PG_WAIT_EXTENSION);
1777 ResetLatch(MyLatch);
1778
1779 CHECK_FOR_INTERRUPTS();
1780
1781 /* Data available in socket? */
1782 if ((wc & WL_SOCKET_READABLE) && (0 == PQconsumeInput(pg_conn)))
1783 {
1784 connresult = CONN_DISCONNECT;
1785 goto exit;
1786 }
1787 }
1788
1789 res = PQgetResult(pg_conn);
1790
1791 if (res == NULL)
1792 {
1793 /* query is complete */
1794 conn->status = CONN_IDLE;
1795 connresult = CONN_OK;
1796 break;
1797 }
1798
1799 PQclear(last_res);
1800 last_res = res;
1801 }
1802 exit:;
1803 }
1804 PG_CATCH();
1805 {
1806 PQclear(last_res);
1807 PG_RE_THROW();
1808 }
1809 PG_END_TRY();
1810
1811 switch (connresult)
1812 {
1813 case CONN_OK:
1814 if (last_res == NULL)
1815 connresult = CONN_NO_RESPONSE;
1816 else if (result != NULL)
1817 *result = last_res;
1818 else
1819 PQclear(last_res);
1820 break;
1821 case CONN_TIMEOUT:
1822 case CONN_DISCONNECT:
1823 PQclear(last_res);
1824 break;
1825 case CONN_NO_RESPONSE:
1826 Assert(last_res == NULL);
1827 break;
1828 }
1829
1830 return connresult;
1831 }
1832
1833 /*
1834 * Cancel the currently-in-progress query and ignore the result. Returns true if we successfully
1835 * cancel the query and discard any pending result, and false if not.
1836 */
1837 bool
remote_connection_cancel_query(TSConnection * conn)1838 remote_connection_cancel_query(TSConnection *conn)
1839 {
1840 PGcancel *cancel;
1841 char errbuf[256];
1842 TimestampTz endtime;
1843 TSConnectionError err;
1844 bool success;
1845
1846 if (!conn)
1847 return true;
1848
1849 memset(&err, 0, sizeof(TSConnectionError));
1850
1851 /*
1852 * Catch exceptions so that we can ensure the status is IDLE after the
1853 * cancel operation even in case of errors being thrown. Note that we
1854 * cannot set the status before we drain, since the drain function needs
1855 * to know the status (e.g., if the connection is in COPY_IN mode).
1856 */
1857 PG_TRY();
1858 {
1859 if (conn->status == CONN_COPY_IN && !remote_connection_end_copy(conn, &err))
1860 remote_connection_error_elog(&err, WARNING);
1861
1862 /*
1863 * If it takes too long to cancel the query and discard the result, assume
1864 * the connection is dead.
1865 */
1866 endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
1867
1868 /*
1869 * Issue cancel request. Unfortunately, there's no good way to limit the
1870 * amount of time that we might block inside PQcancel().
1871 */
1872 if ((cancel = PQgetCancel(conn->pg_conn)))
1873 {
1874 if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1875 {
1876 ereport(WARNING,
1877 (errcode(ERRCODE_CONNECTION_FAILURE),
1878 errmsg("could not send cancel request: %s", errbuf)));
1879 PQfreeCancel(cancel);
1880 conn->status = CONN_IDLE;
1881 return false;
1882 }
1883 PQfreeCancel(cancel);
1884 }
1885
1886 switch (remote_connection_drain(conn, endtime, NULL))
1887 {
1888 case CONN_OK:
1889 /* Successfully, drained */
1890 case CONN_NO_RESPONSE:
1891 /* No response, likely beceause there was nothing to cancel */
1892 success = true;
1893 break;
1894 default:
1895 success = false;
1896 break;
1897 }
1898 }
1899 PG_CATCH();
1900 {
1901 conn->status = CONN_IDLE;
1902 PG_RE_THROW();
1903 }
1904 PG_END_TRY();
1905
1906 conn->status = CONN_IDLE;
1907
1908 return success;
1909 }
1910
1911 void
remote_result_close(PGresult * res)1912 remote_result_close(PGresult *res)
1913 {
1914 PQclear(res);
1915 }
1916
1917 /*
1918 * Cleanup connections and results at the end of a (sub-)transaction.
1919 *
1920 * This function is called at the end of transactions and sub-transactions to
1921 * auto-cleanup connections and result objects.
1922 */
1923 static void
remote_connections_cleanup(SubTransactionId subtxid,bool isabort)1924 remote_connections_cleanup(SubTransactionId subtxid, bool isabort)
1925 {
1926 ListNode *curr = connections.next;
1927 unsigned int num_connections = 0;
1928 unsigned int num_results = 0;
1929
1930 while (curr != &connections)
1931 {
1932 TSConnection *conn = (TSConnection *) curr;
1933
1934 /* Move to next connection since closing the current one might
1935 * otherwise make the curr pointer invalid. */
1936 curr = curr->next;
1937
1938 if (conn->autoclose && (subtxid == InvalidSubTransactionId || subtxid == conn->subtxid))
1939 {
1940 /* Closes the connection and frees all its PGresult objects */
1941 remote_connection_close(conn);
1942 num_connections++;
1943 }
1944 else
1945 {
1946 /* We're not closing the connection, but we should clean up any
1947 * lingering results */
1948 ListNode *curr_result = conn->results.next;
1949
1950 while (curr_result != &conn->results)
1951 {
1952 ResultEntry *entry = (ResultEntry *) curr_result;
1953
1954 curr_result = curr_result->next;
1955
1956 if (subtxid == InvalidSubTransactionId || subtxid == entry->subtxid)
1957 {
1958 PQclear(entry->result);
1959 num_results++;
1960 }
1961 }
1962 }
1963 }
1964
1965 if (subtxid == InvalidSubTransactionId)
1966 elog(DEBUG3,
1967 "cleaned up %u connections and %u results at %s of transaction",
1968 num_connections,
1969 num_results,
1970 isabort ? "abort" : "commit");
1971 else
1972 elog(DEBUG3,
1973 "cleaned up %u connections and %u results at %s of sub-transaction %u",
1974 num_connections,
1975 num_results,
1976 isabort ? "abort" : "commit",
1977 subtxid);
1978 }
1979
1980 static void
remote_connection_xact_end(XactEvent event,void * unused_arg)1981 remote_connection_xact_end(XactEvent event, void *unused_arg)
1982 {
1983 /*
1984 * We are deep down in CommitTransaction code path. We do not want our
1985 * emit_log_hook_callback to interfere since it uses its own transaction
1986 */
1987 emit_log_hook_type prev_emit_log_hook = emit_log_hook;
1988 emit_log_hook = NULL;
1989
1990 switch (event)
1991 {
1992 case XACT_EVENT_ABORT:
1993 case XACT_EVENT_PARALLEL_ABORT:
1994 /*
1995 * We expect that the waitpoint will be retried and then we
1996 * will return due to the process receiving a SIGTERM if
1997 * the advisory lock is exclusively held by a user call
1998 */
1999 DEBUG_RETRY_WAITPOINT("remote_conn_xact_end");
2000 remote_connections_cleanup(InvalidSubTransactionId, true);
2001 break;
2002 case XACT_EVENT_COMMIT:
2003 case XACT_EVENT_PARALLEL_COMMIT:
2004 /* Same retry behavior as above */
2005 DEBUG_RETRY_WAITPOINT("remote_conn_xact_end");
2006 remote_connections_cleanup(InvalidSubTransactionId, false);
2007 break;
2008 case XACT_EVENT_PREPARE:
2009 /*
2010 * We expect that the waitpoint will be retried and then we
2011 * will return with a warning on crossing the retry count if
2012 * the advisory lock is exclusively held by a user call
2013 */
2014 DEBUG_RETRY_WAITPOINT("remote_conn_xact_end");
2015 break;
2016 default:
2017 /* other events are too early to use DEBUG_WAITPOINT.. */
2018 break;
2019 }
2020
2021 /* re-enable the emit_log_hook */
2022 emit_log_hook = prev_emit_log_hook;
2023 }
2024
2025 static void
remote_connection_subxact_end(SubXactEvent event,SubTransactionId subtxid,SubTransactionId parent_subtxid,void * unused_arg)2026 remote_connection_subxact_end(SubXactEvent event, SubTransactionId subtxid,
2027 SubTransactionId parent_subtxid, void *unused_arg)
2028 {
2029 /*
2030 * We are deep down in CommitTransaction code path. We do not want our
2031 * emit_log_hook_callback to interfere since it uses its own transaction
2032 */
2033 emit_log_hook_type prev_emit_log_hook = emit_log_hook;
2034 emit_log_hook = NULL;
2035
2036 switch (event)
2037 {
2038 case SUBXACT_EVENT_ABORT_SUB:
2039 remote_connections_cleanup(subtxid, true);
2040 break;
2041 case SUBXACT_EVENT_COMMIT_SUB:
2042 remote_connections_cleanup(subtxid, false);
2043 break;
2044 default:
2045 break;
2046 }
2047
2048 /* re-enable the emit_log_hook */
2049 emit_log_hook = prev_emit_log_hook;
2050 }
2051
2052 bool
remote_connection_set_single_row_mode(TSConnection * conn)2053 remote_connection_set_single_row_mode(TSConnection *conn)
2054 {
2055 return PQsetSingleRowMode(conn->pg_conn);
2056 }
2057
2058 static bool
send_binary_copy_header(const TSConnection * conn,TSConnectionError * err)2059 send_binary_copy_header(const TSConnection *conn, TSConnectionError *err)
2060 {
2061 /* File header for binary format */
2062 static const char file_header[] = {
2063 'P', 'G', 'C', 'O', 'P', 'Y', '\n', '\377', '\r', '\n', '\0', /* Signature */
2064 0, 0, 0, 0, /* 4 bytes flags */
2065 0, 0, 0, 0 /* 4 bytes header extension length (unused) */
2066 };
2067
2068 int res = PQputCopyData(conn->pg_conn, file_header, sizeof(file_header));
2069
2070 if (res != 1)
2071 return fill_connection_error(err,
2072 ERRCODE_CONNECTION_FAILURE,
2073 "could not set binary COPY mode",
2074 conn);
2075 return true;
2076 }
2077
2078 bool
remote_connection_begin_copy(TSConnection * conn,const char * copycmd,bool binary,TSConnectionError * err)2079 remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binary,
2080 TSConnectionError *err)
2081 {
2082 PGconn *pg_conn = remote_connection_get_pg_conn(conn);
2083 PGresult *volatile res = NULL;
2084
2085 if (PQisnonblocking(pg_conn))
2086 return fill_simple_error(err,
2087 ERRCODE_FEATURE_NOT_SUPPORTED,
2088 "distributed copy doesn't support non-blocking connections",
2089 conn);
2090
2091 if (conn->status != CONN_IDLE)
2092 return fill_simple_error(err,
2093 ERRCODE_INTERNAL_ERROR,
2094 "connection not IDLE when beginning COPY",
2095 conn);
2096
2097 res = PQexec(pg_conn, copycmd);
2098
2099 if (PQresultStatus(res) != PGRES_COPY_IN)
2100 {
2101 fill_result_error(err,
2102 ERRCODE_CONNECTION_FAILURE,
2103 "unable to start remote COPY on data node",
2104 res);
2105 PQclear(res);
2106 return false;
2107 }
2108
2109 PQclear(res);
2110
2111 if (binary && !send_binary_copy_header(conn, err))
2112 goto err_end_copy;
2113
2114 conn->binary_copy = binary;
2115 conn->status = CONN_COPY_IN;
2116
2117 return true;
2118 err_end_copy:
2119 PQputCopyEnd(pg_conn, err->msg);
2120
2121 return false;
2122 }
2123
2124 bool
remote_connection_put_copy_data(TSConnection * conn,const char * buffer,size_t len,TSConnectionError * err)2125 remote_connection_put_copy_data(TSConnection *conn, const char *buffer, size_t len,
2126 TSConnectionError *err)
2127 {
2128 int res;
2129
2130 res = PQputCopyData(remote_connection_get_pg_conn(conn), buffer, len);
2131
2132 if (res != 1)
2133 return fill_connection_error(err,
2134 ERRCODE_CONNECTION_EXCEPTION,
2135 "could not send COPY data",
2136 conn);
2137
2138 return true;
2139 }
2140
2141 static bool
send_end_binary_copy_data(const TSConnection * conn,TSConnectionError * err)2142 send_end_binary_copy_data(const TSConnection *conn, TSConnectionError *err)
2143 {
2144 const uint16 buf = pg_hton16((uint16) -1);
2145
2146 if (PQputCopyData(conn->pg_conn, (char *) &buf, sizeof(buf)) != 1)
2147 return fill_simple_error(err, ERRCODE_INTERNAL_ERROR, "could not end binary COPY", conn);
2148
2149 return true;
2150 }
2151
2152 bool
remote_connection_end_copy(TSConnection * conn,TSConnectionError * err)2153 remote_connection_end_copy(TSConnection *conn, TSConnectionError *err)
2154 {
2155 PGresult *res;
2156 bool success;
2157
2158 if (conn->status != CONN_COPY_IN)
2159 return fill_simple_error(err,
2160 ERRCODE_INTERNAL_ERROR,
2161 "connection not in COPY_IN state when ending COPY",
2162 conn);
2163
2164 if (conn->binary_copy && !send_end_binary_copy_data(conn, err))
2165 return false;
2166
2167 if (PQputCopyEnd(conn->pg_conn, NULL) != 1)
2168 return fill_simple_error(err,
2169 ERRCODE_CONNECTION_EXCEPTION,
2170 "could not end remote COPY",
2171 conn);
2172
2173 success = true;
2174 conn->status = CONN_PROCESSING;
2175
2176 while ((res = PQgetResult(conn->pg_conn)))
2177 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2178 success = fill_result_error(err,
2179 ERRCODE_CONNECTION_EXCEPTION,
2180 "invalid result when ending remote COPY",
2181 res);
2182
2183 Assert(res == NULL);
2184 conn->status = CONN_IDLE;
2185
2186 return success;
2187 }
2188
2189 #ifdef TS_DEBUG
2190 /*
2191 * Reset the current connection stats.
2192 */
2193 void
remote_connection_stats_reset(void)2194 remote_connection_stats_reset(void)
2195 {
2196 MemSet(&connstats, 0, sizeof(RemoteConnectionStats));
2197 }
2198
2199 /*
2200 * Get the current connection stats.
2201 */
2202 RemoteConnectionStats *
remote_connection_stats_get(void)2203 remote_connection_stats_get(void)
2204 {
2205 return &connstats;
2206 }
2207 #endif
2208
2209 void
_remote_connection_init(void)2210 _remote_connection_init(void)
2211 {
2212 RegisterXactCallback(remote_connection_xact_end, NULL);
2213 RegisterSubXactCallback(remote_connection_subxact_end, NULL);
2214
2215 unset_libpq_envvar();
2216 }
2217
2218 void
_remote_connection_fini(void)2219 _remote_connection_fini(void)
2220 {
2221 UnregisterXactCallback(remote_connection_xact_end, NULL);
2222 UnregisterSubXactCallback(remote_connection_subxact_end, NULL);
2223 }
2224