1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 
7 /*
8  * This file contains source code that was copied and/or modified from the
9  * PostgreSQL database, which is licensed under the open-source PostgreSQL
10  * License. Please see the NOTICE at the top level directory for a copy of
11  * the PostgreSQL License.
12  */
13 #include <postgres.h>
14 #include <access/xact.h>
15 #include <access/reloptions.h>
16 #include <catalog/pg_foreign_server.h>
17 #include <catalog/pg_user_mapping.h>
18 #include <commands/defrem.h>
19 #include <common/md5.h>
20 #include <foreign/foreign.h>
21 #include <libpq-events.h>
22 #include <libpq/libpq.h>
23 #include <mb/pg_wchar.h>
24 #include <miscadmin.h>
25 #include <nodes/makefuncs.h>
26 #include <port.h>
27 #include <postmaster/postmaster.h>
28 #include <utils/builtins.h>
29 #include <utils/fmgrprotos.h>
30 #include <utils/inval.h>
31 #include <utils/guc.h>
32 #include <utils/syscache.h>
33 
34 #include <annotations.h>
35 #include <dist_util.h>
36 #include <errors.h>
37 #include <extension_constants.h>
38 #include <guc.h>
39 #include <telemetry/telemetry_metadata.h>
40 #include "connection.h"
41 #include "data_node.h"
42 #include "debug_point.h"
43 #include "utils.h"
44 
45 /*
46  * Connection library for TimescaleDB.
47  *
48  * This library file contains convenience functionality around the libpq
49  * API. The major additional functionality offered includes:
50  *
51  * - libpq object lifecycles are tied to transactions (connections and
52  *   results). This ensures that there are no memory leaks caused by libpq
53  *   objects after a transaction completes.
54  * - connection configuration suitable for TimescaleDB.
55  *
56  * NOTE that it is strongly adviced that connection-related functions do not
57  * throw exceptions with, e.g., elog(ERROR). While exceptions can be caught
58  * with PG_TRY-CATCH for cleanup, it is not possible to safely continue the
59  * transaction that threw the exception as if no error occurred (see the
60  * following post if unconvinced:
61  * https://www.postgresql.org/message-id/27190.1508727890%40sss.pgh.pa.us).
62  *
63  * In some cases, we need to be able to continue a transaction even if a
64  * connection fails. One example is the removal of a data node, which must be
65  * able to proceed even if the node is no longer available to respond to a
66  * connection. Another example is performing a liveness check for node status.
67  *
68  * Therefore, it is best that defer throwing exceptions to high-level
69  * functions that know when it is appropriate.
70  */
71 
72 /* for assigning cursor numbers and prepared statement numbers */
73 static unsigned int cursor_number = 0;
74 static unsigned int prep_stmt_number = 0;
75 static RemoteConnectionStats connstats = { 0 };
76 
77 static int eventproc(PGEventId eventid, void *eventinfo, void *data);
78 
79 TSConnectionId
remote_connection_id(const Oid server_oid,const Oid user_oid)80 remote_connection_id(const Oid server_oid, const Oid user_oid)
81 {
82 	TSConnectionId id = { .server_id = server_oid, .user_id = user_oid };
83 	return id;
84 }
85 
86 void
remote_connection_id_set(TSConnectionId * const id,Oid const server_oid,Oid const user_oid)87 remote_connection_id_set(TSConnectionId *const id, Oid const server_oid, Oid const user_oid)
88 {
89 	id->server_id = server_oid;
90 	id->user_id = user_oid;
91 }
92 
93 /*
94  * A simple circular list implementation for tracking libpq connection and
95  * result objects. We can't use pg_list here since it is bound to PostgreSQL's
96  * memory management system, while libpq is not.
97  */
98 typedef struct ListNode
99 {
100 	struct ListNode *next;
101 	struct ListNode *prev;
102 } ListNode;
103 
104 #define IS_DETACHED_ENTRY(entry) ((entry)->next == NULL && (entry)->prev == NULL)
105 
106 /*
107  * Detach a list node.
108  *
109  * Detaches a list node from the list, unless it is the anchor/head (which is
110  * a no-op).
111  */
112 static inline void
list_detach(ListNode * entry)113 list_detach(ListNode *entry)
114 {
115 	ListNode *prev = entry->prev;
116 	ListNode *next = entry->next;
117 
118 	next->prev = prev;
119 	prev->next = next;
120 	/* Clear entry fields */
121 	entry->prev = NULL;
122 	entry->next = NULL;
123 }
124 
125 /*
126  * Insert a list node entry after the prev node.
127  */
128 static inline void
list_insert_after(ListNode * entry,ListNode * prev)129 list_insert_after(ListNode *entry, ListNode *prev)
130 {
131 	ListNode *next = prev->next;
132 
133 	next->prev = entry;
134 	entry->next = next;
135 	entry->prev = prev;
136 	prev->next = entry;
137 }
138 
139 /*
140  * List entry that holds a PGresult object.
141  */
142 typedef struct ResultEntry
143 {
144 	struct ListNode ln;		  /* Must be first entry */
145 	TSConnection *conn;		  /* The connection the result was created on */
146 	SubTransactionId subtxid; /* The subtransaction ID that created this result, if any. */
147 	PGresult *result;
148 } ResultEntry;
149 
150 typedef struct TSConnection
151 {
152 	ListNode ln;		/* Must be first entry */
153 	PGconn *pg_conn;	/* PostgreSQL connection */
154 	bool closing_guard; /* Guard against calling PQfinish() directly on PGconn */
155 	TSConnectionStatus status;
156 	NameData node_name;		  /* Associated data node name */
157 	char *tz_name;			  /* Timezone name last sent over connection */
158 	bool autoclose;			  /* Set if this connection should automatically
159 							   * close at the end of the (sub-)transaction */
160 	SubTransactionId subtxid; /* The subtransaction ID that created this connection, if any. */
161 	int xact_depth;			  /* 0 => no transaction, 1 => main transaction, > 1 =>
162 							   * levels of subtransactions */
163 	bool xact_transitioning;  /* TRUE if connection is transitioning to
164 							   * another transaction state */
165 	ListNode results;		  /* Head of PGresult list */
166 	bool binary_copy;
167 } TSConnection;
168 
169 /*
170  * List of all connections we create. Used to auto-free connections and/or
171  * PGresults at transaction end.
172  */
173 static ListNode connections = { &connections, &connections };
174 
175 static bool
fill_simple_error(TSConnectionError * err,int errcode,const char * errmsg,const TSConnection * conn)176 fill_simple_error(TSConnectionError *err, int errcode, const char *errmsg, const TSConnection *conn)
177 {
178 	if (NULL == err)
179 		return false;
180 
181 	MemSet(err, 0, sizeof(*err));
182 
183 	err->errcode = errcode;
184 	err->msg = errmsg;
185 	err->host = pstrdup(PQhost(conn->pg_conn));
186 	err->nodename = pstrdup(NameStr(conn->node_name));
187 
188 	return false;
189 }
190 
191 static bool
fill_connection_error(TSConnectionError * err,int errcode,const char * errmsg,const TSConnection * conn)192 fill_connection_error(TSConnectionError *err, int errcode, const char *errmsg,
193 					  const TSConnection *conn)
194 {
195 	if (NULL == err)
196 		return false;
197 
198 	fill_simple_error(err, errcode, errmsg, conn);
199 	err->connmsg = pstrdup(PQerrorMessage(conn->pg_conn));
200 
201 	return false;
202 }
203 
204 static char *
get_error_field_copy(const PGresult * res,int fieldcode)205 get_error_field_copy(const PGresult *res, int fieldcode)
206 {
207 	const char *msg = PQresultErrorField(res, fieldcode);
208 
209 	if (NULL == msg)
210 		return NULL;
211 	return pchomp(msg);
212 }
213 
214 /*
215  * Convert libpq error severity to local error level.
216  */
217 static int
severity_to_elevel(const char * severity)218 severity_to_elevel(const char *severity)
219 {
220 	/* According to https://www.postgresql.org/docs/current/libpq-exec.html,
221 	 * libpq only returns the severity levels listed below. */
222 	static const struct
223 	{
224 		const char *severity;
225 		int elevel;
226 	} severity_levels[] = { {
227 								.severity = "ERROR",
228 								.elevel = ERROR,
229 							},
230 							{
231 								.severity = "FATAL",
232 								.elevel = FATAL,
233 							},
234 							{
235 								.severity = "PANIC",
236 								.elevel = PANIC,
237 							},
238 							{
239 								.severity = "WARNING",
240 								.elevel = WARNING,
241 							},
242 							{
243 								.severity = "NOTICE",
244 								.elevel = NOTICE,
245 							},
246 							{
247 								.severity = "DEBUG",
248 								.elevel = DEBUG1,
249 							},
250 							{
251 								.severity = "INFO",
252 								.elevel = INFO,
253 							},
254 							{
255 								.severity = "LOG",
256 								.elevel = LOG,
257 							},
258 							/* End marker */
259 							{
260 								.severity = NULL,
261 								.elevel = 0,
262 							} };
263 	int i;
264 
265 	if (NULL == severity)
266 		return 0;
267 
268 	i = 0;
269 
270 	while (NULL != severity_levels[i].severity)
271 	{
272 		if (strcmp(severity_levels[i].severity, severity) == 0)
273 			return severity_levels[i].elevel;
274 		i++;
275 	}
276 
277 	pg_unreachable();
278 
279 	return ERROR;
280 }
281 
282 /*
283  * Fill a connection error based on the result of a remote query.
284  */
285 static bool
fill_result_error(TSConnectionError * err,int errcode,const char * errmsg,const PGresult * res)286 fill_result_error(TSConnectionError *err, int errcode, const char *errmsg, const PGresult *res)
287 {
288 	const ResultEntry *entry = PQresultInstanceData(res, eventproc);
289 	const char *sqlstate;
290 
291 	if (NULL == err || NULL == res || NULL == entry)
292 		return false;
293 
294 	Assert(entry->conn);
295 
296 	fill_simple_error(err, errcode, errmsg, entry->conn);
297 	err->remote.elevel = severity_to_elevel(PQresultErrorField(res, PG_DIAG_SEVERITY_NONLOCALIZED));
298 	err->remote.sqlstate = get_error_field_copy(res, PG_DIAG_SQLSTATE);
299 	err->remote.msg = get_error_field_copy(res, PG_DIAG_MESSAGE_PRIMARY);
300 	err->remote.detail = get_error_field_copy(res, PG_DIAG_MESSAGE_DETAIL);
301 	err->remote.hint = get_error_field_copy(res, PG_DIAG_MESSAGE_HINT);
302 	err->remote.context = get_error_field_copy(res, PG_DIAG_CONTEXT);
303 	err->remote.stmtpos = get_error_field_copy(res, PG_DIAG_STATEMENT_POSITION);
304 
305 	sqlstate = err->remote.sqlstate;
306 
307 	if (sqlstate && strlen(sqlstate) == 5)
308 		err->remote.errcode =
309 			MAKE_SQLSTATE(sqlstate[0], sqlstate[1], sqlstate[2], sqlstate[3], sqlstate[4]);
310 	else
311 		err->remote.errcode = ERRCODE_INTERNAL_ERROR;
312 
313 	return false;
314 }
315 
316 /*
317  * The following event handlers make sure all PGresult are freed with
318  * PQClear() when its parent connection is closed.
319  *
320  * It is still recommended to explicitly call PGclear() or
321  * remote_connection_result_close(), however, especially when PGresults are
322  * created in a tight loop (e.g., when scanning many tuples on a remote
323  * table).
324  */
325 #define EVENTPROC_FAILURE 0
326 #define EVENTPROC_SUCCESS 1
327 
328 static void
remote_connection_free(TSConnection * conn)329 remote_connection_free(TSConnection *conn)
330 {
331 	if (NULL != conn->tz_name)
332 		free(conn->tz_name);
333 
334 	free(conn);
335 }
336 
337 /*
338  * Invoked on PQfinish(conn). Frees all PGresult objects created on the
339  * connection, apart from those already freed with PQclear().
340  */
341 static int
handle_conn_destroy(PGEventConnDestroy * event)342 handle_conn_destroy(PGEventConnDestroy *event)
343 {
344 	TSConnection *conn = PQinstanceData(event->conn, eventproc);
345 	unsigned int results_count = 0;
346 	ListNode *curr;
347 
348 	Assert(NULL != conn);
349 	Assert(conn->closing_guard);
350 
351 	curr = conn->results.next;
352 
353 	while (curr != &conn->results)
354 	{
355 		ResultEntry *entry = (ResultEntry *) curr;
356 		PGresult *result = entry->result;
357 
358 		curr = curr->next;
359 		PQclear(result);
360 		/* No need to free curr here since PQclear will invoke
361 		 * handle_result_destroy() which will free it */
362 		results_count++;
363 	}
364 
365 	conn->pg_conn = NULL;
366 	list_detach(&conn->ln);
367 
368 	if (results_count > 0)
369 		elog(DEBUG3, "cleared %u result objects on connection %p", results_count, conn);
370 
371 	connstats.connections_closed++;
372 
373 	if (!conn->closing_guard)
374 	{
375 		ereport(WARNING,
376 				(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("invalid closing of connection")));
377 		remote_connection_free(conn);
378 	}
379 
380 	return EVENTPROC_SUCCESS;
381 }
382 
383 /*
384  * Invoked on PQgetResult(conn). Adds the PGresult to the list in the parent
385  * TSConnection.
386  */
387 static int
handle_result_create(PGEventResultCreate * event)388 handle_result_create(PGEventResultCreate *event)
389 {
390 	TSConnection *conn = PQinstanceData(event->conn, eventproc);
391 	ResultEntry *entry;
392 
393 	Assert(NULL != conn);
394 
395 	/* We malloc this (instead of palloc) since bound PGresult, which also
396 	 * lives outside PostgreSQL's memory management. */
397 	entry = malloc(sizeof(ResultEntry));
398 
399 	if (NULL == entry)
400 		return EVENTPROC_FAILURE;
401 
402 	MemSet(entry, 0, sizeof(ResultEntry));
403 	entry->ln.next = entry->ln.prev = NULL;
404 	entry->conn = conn;
405 	entry->result = event->result;
406 	entry->subtxid = GetCurrentSubTransactionId();
407 
408 	/* Add entry as new head and set instance data */
409 	list_insert_after(&entry->ln, &conn->results);
410 	PQresultSetInstanceData(event->result, eventproc, entry);
411 
412 	elog(DEBUG3,
413 		 "created result %p on connection %p subtxid %u",
414 		 event->result,
415 		 conn,
416 		 entry->subtxid);
417 
418 	connstats.results_created++;
419 
420 	return EVENTPROC_SUCCESS;
421 }
422 
423 /*
424  * Invoked on PQclear(result). Removes the PGresult from the list in the
425  * parent TSConnection.
426  */
427 static int
handle_result_destroy(PGEventResultDestroy * event)428 handle_result_destroy(PGEventResultDestroy *event)
429 {
430 	ResultEntry *entry = PQresultInstanceData(event->result, eventproc);
431 
432 	Assert(NULL != entry);
433 
434 	/* Detach entry */
435 	list_detach(&entry->ln);
436 
437 	elog(DEBUG3, "destroyed result %p for subtxnid %u", entry->result, entry->subtxid);
438 
439 	free(entry);
440 
441 	connstats.results_cleared++;
442 
443 	return EVENTPROC_SUCCESS;
444 }
445 
446 /*
447  * Main event handler invoked when events happen on a PGconn.
448  *
449  * According to the libpq API, the function should return a non-zero value if
450  * it succeeds and zero if it fails. We use EVENTPROC_SUCCESS and
451  * EVENTPROC_FAILURE in place of these two options.
452  */
453 static int
eventproc(PGEventId eventid,void * eventinfo,void * data)454 eventproc(PGEventId eventid, void *eventinfo, void *data)
455 {
456 	int res = EVENTPROC_SUCCESS;
457 
458 	switch (eventid)
459 	{
460 		case PGEVT_CONNDESTROY:
461 			res = handle_conn_destroy((PGEventConnDestroy *) eventinfo);
462 			break;
463 		case PGEVT_RESULTCREATE:
464 			res = handle_result_create((PGEventResultCreate *) eventinfo);
465 			break;
466 		case PGEVT_RESULTDESTROY:
467 			res = handle_result_destroy((PGEventResultDestroy *) eventinfo);
468 			break;
469 		default:
470 			/* Not of interest, so return success */
471 			break;
472 	}
473 
474 	return res;
475 }
476 
477 static PQconninfoOption *
get_libpq_options()478 get_libpq_options()
479 {
480 	/* make static to fetch once per backend */
481 	static PQconninfoOption *libpq_options = NULL;
482 
483 	if (libpq_options == NULL)
484 	{
485 		/* Note that the options array is Malloc'ed */
486 		libpq_options = PQconndefaults();
487 	}
488 
489 	if (libpq_options == NULL)
490 	{
491 		/* probably OOM */
492 		elog(ERROR, "could not get default libpq options");
493 	}
494 
495 	return libpq_options;
496 }
497 
498 static void
unset_libpq_envvar(void)499 unset_libpq_envvar(void)
500 {
501 	PQconninfoOption *lopt;
502 	PQconninfoOption *options = PQconndefaults();
503 
504 	Assert(options != NULL);
505 
506 	/* Explicitly unset all libpq environment variables.
507 	 *
508 	 * By default libpq uses environment variables as a fallback
509 	 * to specify connection options, potentially they could be in
510 	 * a conflict with PostgreSQL variables and introduce
511 	 * security risks.
512 	 */
513 	for (lopt = options; lopt->keyword; lopt++)
514 	{
515 		if (lopt->envvar)
516 			unsetenv(lopt->envvar);
517 	}
518 
519 	PQconninfoFree(options);
520 }
521 
522 static bool
is_libpq_option(const char * keyword,char ** display_option)523 is_libpq_option(const char *keyword, char **display_option)
524 {
525 	PQconninfoOption *lopt;
526 
527 	for (lopt = get_libpq_options(); lopt->keyword; lopt++)
528 	{
529 		if (strcmp(lopt->keyword, keyword) == 0)
530 		{
531 			if (display_option != NULL)
532 				*display_option = lopt->dispchar;
533 			return true;
534 		}
535 	}
536 	return false;
537 }
538 
539 ConnOptionType
remote_connection_option_type(const char * keyword)540 remote_connection_option_type(const char *keyword)
541 {
542 	char *display_option;
543 
544 	if (!is_libpq_option(keyword, &display_option))
545 		return CONN_OPTION_TYPE_NONE;
546 
547 	/* Hide debug options, as well as settings we override internally. */
548 	if (strchr(display_option, 'D') || strcmp(keyword, "fallback_application_name") == 0 ||
549 		strcmp(keyword, "client_encoding") == 0)
550 		return CONN_OPTION_TYPE_NONE;
551 
552 	/*
553 	 * "user" and any secret options are allowed only on user mappings.
554 	 * Everything else is a data node option.
555 	 */
556 	if (strchr(display_option, '*') || strcmp(keyword, "user") == 0)
557 		return CONN_OPTION_TYPE_USER;
558 
559 	return CONN_OPTION_TYPE_NODE;
560 }
561 
562 bool
remote_connection_valid_user_option(const char * keyword)563 remote_connection_valid_user_option(const char *keyword)
564 {
565 	return remote_connection_option_type(keyword) == CONN_OPTION_TYPE_USER;
566 }
567 
568 bool
remote_connection_valid_node_option(const char * keyword)569 remote_connection_valid_node_option(const char *keyword)
570 {
571 	return remote_connection_option_type(keyword) == CONN_OPTION_TYPE_NODE;
572 }
573 
574 static int
extract_connection_options(List * defelems,const char ** keywords,const char ** values,const char ** user)575 extract_connection_options(List *defelems, const char **keywords, const char **values,
576 						   const char **user)
577 {
578 	ListCell *lc;
579 	int option_pos = 0;
580 
581 	Assert(keywords != NULL);
582 	Assert(values != NULL);
583 	Assert(user != NULL);
584 
585 	*user = NULL;
586 	foreach (lc, defelems)
587 	{
588 		DefElem *d = (DefElem *) lfirst(lc);
589 
590 		if (is_libpq_option(d->defname, NULL))
591 		{
592 			keywords[option_pos] = d->defname;
593 			values[option_pos] = defGetString(d);
594 			if (strcmp(d->defname, "user") == 0)
595 			{
596 				Assert(*user == NULL);
597 				*user = values[option_pos];
598 			}
599 			option_pos++;
600 		}
601 	}
602 
603 	return option_pos;
604 }
605 
606 /*
607  * Internal connection configure.
608  *
609  * This function will send internal configuration settings if they have
610  * changed. It is used to pass on configuration settings before executing a
611  * command requested by module users.
612  *
613  * ATTENTION! This function should *not* use
614  * `remote_connection_exec_ok_command` since this function is called
615  * indirectly whenever a remote command is executed, which would lead to
616  * infinite recursion. Stick to `PQ*` functions.
617  *
618  * Returns true if the current configuration is OK (no change) or was
619  * successfully applied, otherwise false.
620  */
621 bool
remote_connection_configure_if_changed(TSConnection * conn)622 remote_connection_configure_if_changed(TSConnection *conn)
623 {
624 	const char *local_tz_name = pg_get_timezone_name(session_timezone);
625 	bool success = true;
626 
627 	/*
628 	 * We need to enforce the same timezone setting across nodes. Otherwise,
629 	 * we might get the wrong result when we push down things like
630 	 * date_trunc(text, timestamptz). To safely do that, we also need the
631 	 * timezone databases to be the same on all data nodes.
632 	 *
633 	 * We save away the timezone name so that we know what we last sent over
634 	 * the connection. If the time zone changed since last time we sent a
635 	 * command, we will send a SET TIMEZONE command with the new timezone
636 	 * first.
637 	 */
638 	if (conn->tz_name == NULL ||
639 		(local_tz_name && pg_strcasecmp(conn->tz_name, local_tz_name) != 0))
640 	{
641 		char *set_timezone_cmd = psprintf("SET TIMEZONE = '%s'", local_tz_name);
642 		PGresult *result = PQexec(conn->pg_conn, set_timezone_cmd);
643 
644 		success = PQresultStatus(result) == PGRES_COMMAND_OK;
645 		PQclear(result);
646 		pfree(set_timezone_cmd);
647 		free(conn->tz_name);
648 		conn->tz_name = strdup(local_tz_name);
649 	}
650 
651 	return success;
652 }
653 
654 /*
655  * Default options/commands to set on every new connection.
656  *
657  * Timezone is indirectly set with the first command executed.
658  */
659 static const char *default_connection_options[] = {
660 	/*
661 	 * Force the search path to contain only pg_catalog, which will force
662 	 * functions to output fully qualified identifier names (i.e., they will
663 	 * include the schema).
664 	 */
665 	"SET search_path = pg_catalog",
666 	/*
667 	 * Set values needed to ensure unambiguous data output from remote.  (This
668 	 * logic should match what pg_dump does.  See also set_transmission_modes
669 	 * in fdw.c.)
670 	 */
671 	"SET datestyle = ISO",
672 	"SET intervalstyle = postgres",
673 	"SET extra_float_digits = 3",
674 	NULL,
675 };
676 
677 /*
678  * Issue SET commands to make sure remote session is configured properly.
679  *
680  * We do this just once at connection, assuming nothing will change the
681  * values later.  Since we'll never send volatile function calls to the
682  * remote, there shouldn't be any way to break this assumption from our end.
683  * It's possible to think of ways to break it at the remote end, eg making a
684  * foreign table point to a view that includes a set_config call --- but once
685  * you admit the possibility of a malicious view definition, there are any
686  * number of ways to break things.
687  */
688 bool
remote_connection_configure(TSConnection * conn)689 remote_connection_configure(TSConnection *conn)
690 {
691 	const char *cmd;
692 	StringInfoData sql;
693 	PGresult *result;
694 	bool success = true;
695 	int i = 0;
696 
697 	initStringInfo(&sql);
698 
699 	while ((cmd = default_connection_options[i]) != NULL)
700 	{
701 		appendStringInfo(&sql, "%s;", cmd);
702 		i++;
703 	}
704 
705 	result = PQexec(conn->pg_conn, sql.data);
706 	success = PQresultStatus(result) == PGRES_COMMAND_OK;
707 	PQclear(result);
708 
709 	return success;
710 }
711 
712 static TSConnection *
remote_connection_create(PGconn * pg_conn,bool processing,const char * node_name)713 remote_connection_create(PGconn *pg_conn, bool processing, const char *node_name)
714 {
715 	TSConnection *conn = malloc(sizeof(TSConnection));
716 	int ret;
717 
718 	if (NULL == conn)
719 		return NULL;
720 
721 	MemSet(conn, 0, sizeof(TSConnection));
722 
723 	/* Must register the event procedure before attaching any instance data */
724 	ret = PQregisterEventProc(pg_conn, eventproc, "remote connection", conn);
725 
726 	if (ret == 0)
727 	{
728 		free(conn);
729 		return NULL;
730 	}
731 
732 	ret = PQsetInstanceData(pg_conn, eventproc, conn);
733 	Assert(ret != 0);
734 
735 	conn->ln.next = conn->ln.prev = NULL;
736 	conn->pg_conn = pg_conn;
737 	conn->closing_guard = false;
738 	conn->status = processing ? CONN_PROCESSING : CONN_IDLE;
739 	namestrcpy(&conn->node_name, node_name);
740 	conn->tz_name = NULL;
741 	conn->autoclose = true;
742 	conn->subtxid = GetCurrentSubTransactionId();
743 	conn->xact_depth = 0;
744 	conn->xact_transitioning = false;
745 	/* Initialize results head */
746 	conn->results.next = &conn->results;
747 	conn->results.prev = &conn->results;
748 	conn->binary_copy = false;
749 	list_insert_after(&conn->ln, &connections);
750 
751 	elog(DEBUG3, "created connection %p", conn);
752 	connstats.connections_created++;
753 
754 	return conn;
755 }
756 
757 /*
758  * Set the auto-close behavior.
759  *
760  * If set, the connection will be closed at the end of the (sub-)transaction
761  * it was created on.
762  *
763  * The default value is on (true).
764  *
765  * Returns the previous setting.
766  */
767 bool
remote_connection_set_autoclose(TSConnection * conn,bool autoclose)768 remote_connection_set_autoclose(TSConnection *conn, bool autoclose)
769 {
770 	bool old = conn->autoclose;
771 
772 	conn->autoclose = autoclose;
773 	return old;
774 }
775 
776 int
remote_connection_xact_depth_get(const TSConnection * conn)777 remote_connection_xact_depth_get(const TSConnection *conn)
778 {
779 	Assert(conn->xact_depth >= 0);
780 	return conn->xact_depth;
781 }
782 
783 int
remote_connection_xact_depth_inc(TSConnection * conn)784 remote_connection_xact_depth_inc(TSConnection *conn)
785 {
786 	Assert(conn->xact_depth >= 0);
787 	return ++conn->xact_depth;
788 }
789 
790 int
remote_connection_xact_depth_dec(TSConnection * conn)791 remote_connection_xact_depth_dec(TSConnection *conn)
792 {
793 	Assert(conn->xact_depth > 0);
794 	return --conn->xact_depth;
795 }
796 
797 void
remote_connection_xact_transition_begin(TSConnection * conn)798 remote_connection_xact_transition_begin(TSConnection *conn)
799 {
800 	Assert(!conn->xact_transitioning);
801 	conn->xact_transitioning = true;
802 }
803 
804 void
remote_connection_xact_transition_end(TSConnection * conn)805 remote_connection_xact_transition_end(TSConnection *conn)
806 {
807 	Assert(conn->xact_transitioning);
808 	conn->xact_transitioning = false;
809 }
810 
811 bool
remote_connection_xact_is_transitioning(const TSConnection * conn)812 remote_connection_xact_is_transitioning(const TSConnection *conn)
813 {
814 	return conn->xact_transitioning;
815 }
816 
817 PGconn *
remote_connection_get_pg_conn(const TSConnection * conn)818 remote_connection_get_pg_conn(const TSConnection *conn)
819 {
820 	Assert(conn != NULL);
821 	return conn->pg_conn;
822 }
823 
824 bool
remote_connection_is_processing(const TSConnection * conn)825 remote_connection_is_processing(const TSConnection *conn)
826 {
827 	Assert(conn != NULL);
828 	return conn->status != CONN_IDLE;
829 }
830 
831 void
remote_connection_set_status(TSConnection * conn,TSConnectionStatus status)832 remote_connection_set_status(TSConnection *conn, TSConnectionStatus status)
833 {
834 	Assert(conn != NULL);
835 	conn->status = status;
836 }
837 
838 TSConnectionStatus
remote_connection_get_status(const TSConnection * conn)839 remote_connection_get_status(const TSConnection *conn)
840 {
841 	return conn->status;
842 }
843 
844 const char *
remote_connection_node_name(const TSConnection * conn)845 remote_connection_node_name(const TSConnection *conn)
846 {
847 	return NameStr(conn->node_name);
848 }
849 
850 void
remote_connection_get_error(const TSConnection * conn,TSConnectionError * err)851 remote_connection_get_error(const TSConnection *conn, TSConnectionError *err)
852 {
853 	fill_connection_error(err, ERRCODE_CONNECTION_FAILURE, "", conn);
854 }
855 
856 void
remote_connection_get_result_error(const PGresult * res,TSConnectionError * err)857 remote_connection_get_result_error(const PGresult *res, TSConnectionError *err)
858 {
859 	fill_result_error(err, ERRCODE_CONNECTION_EXCEPTION, "", res);
860 }
861 
862 /*
863  * Execute a remote command.
864  *
865  * Like PQexec, which this functions uses internally, the PGresult returned
866  * describes only the last command executed in a multi-command string.
867  */
868 PGresult *
remote_connection_exec(TSConnection * conn,const char * cmd)869 remote_connection_exec(TSConnection *conn, const char *cmd)
870 {
871 	if (!remote_connection_configure_if_changed(conn))
872 	{
873 		PGresult *res = PQmakeEmptyPGresult(conn->pg_conn, PGRES_FATAL_ERROR);
874 		PQfireResultCreateEvents(conn->pg_conn, res);
875 		return res;
876 	}
877 
878 	return PQexec(conn->pg_conn, cmd);
879 }
880 
881 /*
882  * Must be a macro since va_start() must be called in the function that takes
883  * a variable number of arguments.
884  */
885 #define stringinfo_va(fmt, sql)                                                                    \
886 	do                                                                                             \
887 	{                                                                                              \
888 		initStringInfo((sql));                                                                     \
889 		for (;;)                                                                                   \
890 		{                                                                                          \
891 			va_list args;                                                                          \
892 			int needed;                                                                            \
893 			va_start(args, fmt);                                                                   \
894 			needed = appendStringInfoVA((sql), fmt, args);                                         \
895 			va_end(args);                                                                          \
896 			if (needed == 0)                                                                       \
897 				break;                                                                             \
898 			/* Increase the buffer size and try again. */                                          \
899 			enlargeStringInfo((sql), needed);                                                      \
900 		}                                                                                          \
901 	} while (0);
902 
903 /*
904  * Execute a remote command.
905  *
906  * Like remote_connection_exec but takes a variable number of arguments.
907  */
908 PGresult *
remote_connection_execf(TSConnection * conn,const char * fmt,...)909 remote_connection_execf(TSConnection *conn, const char *fmt, ...)
910 {
911 	PGresult *res;
912 	StringInfoData sql;
913 
914 	stringinfo_va(fmt, &sql);
915 	res = remote_connection_exec(conn, sql.data);
916 	pfree(sql.data);
917 
918 	return res;
919 }
920 
921 PGresult *
remote_connection_queryf_ok(TSConnection * conn,const char * fmt,...)922 remote_connection_queryf_ok(TSConnection *conn, const char *fmt, ...)
923 {
924 	StringInfoData sql;
925 	PGresult *res;
926 
927 	stringinfo_va(fmt, &sql);
928 	res = remote_result_query_ok(remote_connection_exec(conn, sql.data));
929 	pfree(sql.data);
930 	return res;
931 }
932 
933 PGresult *
remote_connection_query_ok(TSConnection * conn,const char * query)934 remote_connection_query_ok(TSConnection *conn, const char *query)
935 {
936 	return remote_result_query_ok(remote_connection_exec(conn, query));
937 }
938 
939 void
remote_connection_cmd_ok(TSConnection * conn,const char * cmd)940 remote_connection_cmd_ok(TSConnection *conn, const char *cmd)
941 {
942 	remote_result_cmd_ok(remote_connection_exec(conn, cmd));
943 }
944 
945 void
remote_connection_cmdf_ok(TSConnection * conn,const char * fmt,...)946 remote_connection_cmdf_ok(TSConnection *conn, const char *fmt, ...)
947 {
948 	StringInfoData sql;
949 
950 	stringinfo_va(fmt, &sql);
951 	remote_result_cmd_ok(remote_connection_exec(conn, sql.data));
952 	pfree(sql.data);
953 }
954 
955 static PGresult *
remote_result_ok(PGresult * res,ExecStatusType expected)956 remote_result_ok(PGresult *res, ExecStatusType expected)
957 {
958 	if (PQresultStatus(res) != expected)
959 		remote_result_elog(res, ERROR);
960 
961 	return res;
962 }
963 
964 void
remote_result_cmd_ok(PGresult * res)965 remote_result_cmd_ok(PGresult *res)
966 {
967 	PQclear(remote_result_ok(res, PGRES_COMMAND_OK));
968 }
969 
970 PGresult *
remote_result_query_ok(PGresult * res)971 remote_result_query_ok(PGresult *res)
972 {
973 	return remote_result_ok(res, PGRES_TUPLES_OK);
974 }
975 
976 /**
977  * Validate extension version.
978  */
979 void
remote_validate_extension_version(TSConnection * conn,const char * data_node_version)980 remote_validate_extension_version(TSConnection *conn, const char *data_node_version)
981 {
982 	bool old_version;
983 
984 	if (!dist_util_is_compatible_version(data_node_version, TIMESCALEDB_VERSION, &old_version))
985 		ereport(ERROR,
986 				(errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
987 				 errmsg("remote PostgreSQL instance has an incompatible timescaledb extension "
988 						"version"),
989 				 errdetail_internal("Access node version: %s, remote version: %s.",
990 									TIMESCALEDB_VERSION_MOD,
991 									data_node_version)));
992 	if (old_version)
993 		ereport(WARNING,
994 				(errmsg("remote PostgreSQL instance has an outdated timescaledb extension version"),
995 				 errdetail_internal("Access node version: %s, remote version: %s.",
996 									TIMESCALEDB_VERSION_MOD,
997 									data_node_version)));
998 }
999 
1000 /*
1001  * Check timescaledb extension version on a data node.
1002  *
1003  * Compare remote connection extension version with the one installed
1004  * locally on the access node.
1005  *
1006  * Return false if extension is not found, true otherwise.
1007  */
1008 bool
remote_connection_check_extension(TSConnection * conn)1009 remote_connection_check_extension(TSConnection *conn)
1010 {
1011 	PGresult *res;
1012 
1013 	res = remote_connection_execf(conn,
1014 								  "SELECT extversion FROM pg_extension WHERE extname = %s",
1015 								  quote_literal_cstr(EXTENSION_NAME));
1016 
1017 	/* Just to capture any bugs in the SELECT above */
1018 	Assert(PQnfields(res) == 1);
1019 
1020 	switch (PQntuples(res))
1021 	{
1022 		case 0: /* extension does not exists */
1023 			PQclear(res);
1024 			return false;
1025 
1026 		case 1:
1027 			break;
1028 
1029 		default: /* something strange happend */
1030 			ereport(WARNING,
1031 					(errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
1032 					 errmsg("more than one TimescaleDB extension loaded")));
1033 			break;
1034 	}
1035 
1036 	/* validate extension version on data node and make sure that it is
1037 	 * compatible */
1038 	remote_validate_extension_version(conn, PQgetvalue(res, 0, 0));
1039 
1040 	PQclear(res);
1041 	return true;
1042 }
1043 
1044 /*
1045  * Configure remote connection using current instance UUID.
1046  *
1047  * This allows remote side to reason about whether this connection has been
1048  * originated by access node.
1049  *
1050  * Returns true on success and false on error, in which case the optional
1051  * errmsg parameter can be used to retrieve an error message.
1052  */
1053 static bool
remote_connection_set_peer_dist_id(TSConnection * conn)1054 remote_connection_set_peer_dist_id(TSConnection *conn)
1055 {
1056 	Datum id_string = DirectFunctionCall1(uuid_out, ts_telemetry_metadata_get_uuid());
1057 	PGresult *res;
1058 	bool success = true;
1059 
1060 	res = remote_connection_execf(conn,
1061 								  "SELECT * FROM _timescaledb_internal.set_peer_dist_id('%s')",
1062 								  DatumGetCString(id_string));
1063 	success = PQresultStatus(res) == PGRES_TUPLES_OK;
1064 	PQclear(res);
1065 
1066 	return success;
1067 }
1068 
1069 /* fallback_application_name, client_encoding, end marker */
1070 #define REMOTE_CONNECTION_SESSION_OPTIONS_N 3
1071 
1072 /* passfile */
1073 #define REMOTE_CONNECTION_PASSWORD_OPTIONS_N 1
1074 
1075 /* sslmode, sslrootcert, sslcert, sslkey */
1076 #define REMOTE_CONNECTION_SSL_OPTIONS_N 4
1077 
1078 #define REMOTE_CONNECTION_OPTIONS_TOTAL_N                                                          \
1079 	(REMOTE_CONNECTION_SESSION_OPTIONS_N + REMOTE_CONNECTION_PASSWORD_OPTIONS_N +                  \
1080 	 REMOTE_CONNECTION_SSL_OPTIONS_N)
1081 
1082 /* default password file basename */
1083 #define DEFAULT_PASSFILE_NAME "passfile"
1084 
1085 static void
set_password_options(const char ** keywords,const char ** values,int * option_start)1086 set_password_options(const char **keywords, const char **values, int *option_start)
1087 {
1088 	int option_pos = *option_start;
1089 
1090 	/* Set user specified password file path using timescaledb.passfile or
1091 	 * use default path assuming that the file is stored in the
1092 	 * data directory */
1093 	keywords[option_pos] = "passfile";
1094 	if (ts_guc_passfile)
1095 		values[option_pos] = ts_guc_passfile;
1096 	else
1097 		values[option_pos] = psprintf("%s/" DEFAULT_PASSFILE_NAME, DataDir);
1098 	option_pos++;
1099 
1100 	*option_start = option_pos;
1101 }
1102 
1103 typedef enum PathKind
1104 {
1105 	PATH_KIND_CRT,
1106 	PATH_KIND_KEY
1107 } PathKind;
1108 
1109 /* Path description for human consumption */
1110 static const char *path_kind_text[PATH_KIND_KEY + 1] = {
1111 	[PATH_KIND_CRT] = "certificate",
1112 	[PATH_KIND_KEY] = "private key",
1113 };
1114 
1115 /* Path extension string for file system */
1116 static const char *path_kind_ext[PATH_KIND_KEY + 1] = {
1117 	[PATH_KIND_CRT] = "crt",
1118 	[PATH_KIND_KEY] = "key",
1119 };
1120 
1121 /*
1122  * Helper function to report error.
1123  *
1124  * This is needed to avoid code coverage reporting low coverage for error
1125  * cases in `make_user_path` that cannot be reached in normal situations.
1126  */
1127 static void
report_path_error(PathKind path_kind,const char * user_name)1128 report_path_error(PathKind path_kind, const char *user_name)
1129 {
1130 	elog(ERROR,
1131 		 "cannot write %s for user \"%s\": path too long",
1132 		 path_kind_text[path_kind],
1133 		 user_name);
1134 }
1135 
1136 /*
1137  * Make a user path with the given extension and user name in a portable and
1138  * safe manner.
1139  *
1140  * We use MD5 to compute a filename for the user name, which allows all forms
1141  * of user names. It is not necessary for the function to be cryptographically
1142  * secure, only to have a low risk of collisions, and MD5 is fast and with a
1143  * low risk of collisions.
1144  *
1145  * Will return the resulting path, or abort with an error.
1146  */
1147 static StringInfo
make_user_path(const char * user_name,PathKind path_kind)1148 make_user_path(const char *user_name, PathKind path_kind)
1149 {
1150 	char ret_path[MAXPGPATH];
1151 	char hexsum[33];
1152 	StringInfo result;
1153 
1154 	pg_md5_hash(user_name, strlen(user_name), hexsum);
1155 
1156 	if (strlcpy(ret_path, ts_guc_ssl_dir ? ts_guc_ssl_dir : DataDir, MAXPGPATH) > MAXPGPATH)
1157 		report_path_error(path_kind, user_name);
1158 	canonicalize_path(ret_path);
1159 
1160 	if (!ts_guc_ssl_dir)
1161 	{
1162 		join_path_components(ret_path, ret_path, EXTENSION_NAME);
1163 		join_path_components(ret_path, ret_path, "certs");
1164 	}
1165 
1166 	join_path_components(ret_path, ret_path, hexsum);
1167 
1168 	result = makeStringInfo();
1169 	appendStringInfo(result, "%s.%s", ret_path, path_kind_ext[path_kind]);
1170 	return result;
1171 }
1172 
1173 static void
set_ssl_options(const char * user_name,const char ** keywords,const char ** values,int * option_start)1174 set_ssl_options(const char *user_name, const char **keywords, const char **values,
1175 				int *option_start)
1176 {
1177 	int option_pos = *option_start;
1178 	const char *ssl_enabled;
1179 	const char *ssl_ca_file;
1180 
1181 	ssl_enabled = GetConfigOption("ssl", true, false);
1182 
1183 	if (!ssl_enabled || strcmp(ssl_enabled, "on") != 0)
1184 		return;
1185 
1186 	/* If SSL is enabled on AN then we assume it is also should be used for DN
1187 	 * connections as well, otherwise we need to introduce some other way to
1188 	 * control it */
1189 	keywords[option_pos] = "sslmode";
1190 	values[option_pos] = "require";
1191 	option_pos++;
1192 
1193 	ssl_ca_file = GetConfigOption("ssl_ca_file", true, false);
1194 
1195 	/* Use ssl_ca_file as the root certificate when verifying the
1196 	 * data node we connect to */
1197 	if (ssl_ca_file)
1198 	{
1199 		keywords[option_pos] = "sslrootcert";
1200 		values[option_pos] = ssl_ca_file;
1201 		option_pos++;
1202 	}
1203 
1204 	/* Search for the user certificate in the user subdirectory of either
1205 	 * timescaledb.ssl_dir or data directory. The user subdirectory is
1206 	 * currently hardcoded. */
1207 
1208 	keywords[option_pos] = "sslcert";
1209 	values[option_pos] = make_user_path(user_name, PATH_KIND_CRT)->data;
1210 	option_pos++;
1211 
1212 	keywords[option_pos] = "sslkey";
1213 	values[option_pos] = make_user_path(user_name, PATH_KIND_KEY)->data;
1214 	option_pos++;
1215 
1216 	*option_start = option_pos;
1217 }
1218 
1219 /*
1220  * Finish the connection and, optionally, save the connection error.
1221  */
1222 static void
finish_connection(PGconn * conn,char ** errmsg)1223 finish_connection(PGconn *conn, char **errmsg)
1224 {
1225 	if (NULL != errmsg)
1226 	{
1227 		if (NULL == conn)
1228 			*errmsg = "invalid connection";
1229 		else
1230 			*errmsg = pchomp(PQerrorMessage(conn));
1231 	}
1232 
1233 	PQfinish(conn);
1234 }
1235 
1236 /*
1237  * Take options belonging to a foreign server and add additional default and
1238  * other user/ssl related options as appropriate
1239  */
1240 static void
setup_full_connection_options(List * connection_options,const char *** all_keywords,const char *** all_values)1241 setup_full_connection_options(List *connection_options, const char ***all_keywords,
1242 							  const char ***all_values)
1243 {
1244 	const char *user_name = NULL;
1245 	const char **keywords;
1246 	const char **values;
1247 	int option_count;
1248 	int option_pos;
1249 
1250 	/*
1251 	 * Construct connection params from generic options of ForeignServer
1252 	 * and user. (Some of them might not be libpq options, in
1253 	 * which case we'll just waste a few array slots.)  Add 3 extra slots
1254 	 * for fallback_application_name, client_encoding, end marker.
1255 	 * One additional slot to set passfile and 4 slots for ssl options.
1256 	 */
1257 	option_count = list_length(connection_options) + REMOTE_CONNECTION_OPTIONS_TOTAL_N;
1258 	keywords = (const char **) palloc(option_count * sizeof(char *));
1259 	values = (const char **) palloc(option_count * sizeof(char *));
1260 
1261 	option_pos = extract_connection_options(connection_options, keywords, values, &user_name);
1262 
1263 	if (NULL == user_name)
1264 		user_name = GetUserNameFromId(GetUserId(), false);
1265 
1266 	/* Use the extension name as fallback_application_name. */
1267 	keywords[option_pos] = "fallback_application_name";
1268 	values[option_pos] = EXTENSION_NAME;
1269 	option_pos++;
1270 
1271 	/* Set client_encoding so that libpq can convert encoding properly. */
1272 	keywords[option_pos] = "client_encoding";
1273 	values[option_pos] = GetDatabaseEncodingName();
1274 	option_pos++;
1275 
1276 	/* Set passfile options */
1277 	set_password_options(keywords, values, &option_pos);
1278 
1279 	/* Set client specific SSL connection options */
1280 	set_ssl_options(user_name, keywords, values, &option_pos);
1281 
1282 	/* Set end marker */
1283 	keywords[option_pos] = values[option_pos] = NULL;
1284 	Assert(option_pos <= option_count);
1285 
1286 	*all_keywords = keywords;
1287 	*all_values = values;
1288 }
1289 
1290 /*
1291  * This will only open a connection to a specific node, but not do anything
1292  * else. In particular, it will not perform any validation nor configure the
1293  * connection since it cannot know that it connects to a data node database or
1294  * not. For that, please use the `remote_connection_open_with_options`
1295  * function.
1296  */
1297 TSConnection *
remote_connection_open_with_options_nothrow(const char * node_name,List * connection_options,char ** errmsg)1298 remote_connection_open_with_options_nothrow(const char *node_name, List *connection_options,
1299 											char **errmsg)
1300 {
1301 	PGconn *volatile pg_conn = NULL;
1302 	TSConnection *ts_conn;
1303 	const char **keywords;
1304 	const char **values;
1305 
1306 	if (NULL != errmsg)
1307 		*errmsg = NULL;
1308 
1309 	setup_full_connection_options(connection_options, &keywords, &values);
1310 
1311 	pg_conn = PQconnectdbParams(keywords, values, 0 /* Do not expand dbname param */);
1312 
1313 	/* Cast to (char **) to silence warning with MSVC compiler */
1314 	pfree((char **) keywords);
1315 	pfree((char **) values);
1316 
1317 	if (NULL == pg_conn)
1318 		return NULL;
1319 
1320 	if (PQstatus(pg_conn) != CONNECTION_OK)
1321 	{
1322 		finish_connection(pg_conn, errmsg);
1323 		return NULL;
1324 	}
1325 
1326 	ts_conn = remote_connection_create(pg_conn, false, node_name);
1327 
1328 	if (NULL == ts_conn)
1329 		finish_connection(pg_conn, errmsg);
1330 
1331 	return ts_conn;
1332 }
1333 
1334 /*
1335  * Opens a connection.
1336  *
1337  * Raw connections are not part of the transaction and do not have transactions
1338  * auto-started. They must be explicitly closed by
1339  * remote_connection_close. Note that connections are allocated using malloc
1340  * and so if you do not call remote_connection_close, you'll have a memory
1341  * leak. Note that the connection cache handles all of this for you so use
1342  * that if you can.
1343  */
1344 TSConnection *
remote_connection_open_with_options(const char * node_name,List * connection_options,bool set_dist_id)1345 remote_connection_open_with_options(const char *node_name, List *connection_options,
1346 									bool set_dist_id)
1347 {
1348 	char *err = NULL;
1349 	TSConnection *conn =
1350 		remote_connection_open_with_options_nothrow(node_name, connection_options, &err);
1351 
1352 	if (NULL == conn)
1353 		ereport(ERROR,
1354 				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
1355 				 errmsg("could not connect to \"%s\"", node_name),
1356 				 err == NULL ? 0 : errdetail_internal("%s", err)));
1357 
1358 	/*
1359 	 * Use PG_TRY block to ensure closing connection on error.
1360 	 */
1361 	PG_TRY();
1362 	{
1363 		Assert(NULL != conn->pg_conn);
1364 
1365 		if (PQstatus(conn->pg_conn) != CONNECTION_OK)
1366 			ereport(ERROR,
1367 					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
1368 					 errmsg("could not connect to \"%s\"", node_name),
1369 					 errdetail_internal("%s", pchomp(PQerrorMessage(conn->pg_conn)))));
1370 
1371 		/* Prepare new session for use */
1372 		if (!remote_connection_configure(conn))
1373 			ereport(ERROR,
1374 					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
1375 					 errmsg("could not configure remote connection to \"%s\"", node_name),
1376 					 errdetail_internal("%s", PQerrorMessage(conn->pg_conn))));
1377 
1378 		/* Check a data node extension version and show a warning
1379 		 * message if it differs */
1380 		remote_connection_check_extension(conn);
1381 
1382 		if (set_dist_id)
1383 		{
1384 			/* Inform remote node about instance UUID */
1385 			if (!remote_connection_set_peer_dist_id(conn))
1386 				ereport(ERROR,
1387 						(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
1388 						 errmsg("could not set distributed ID for \"%s\"", node_name),
1389 						 errdetail_internal("%s", PQerrorMessage(conn->pg_conn))));
1390 		}
1391 	}
1392 	PG_CATCH();
1393 	{
1394 		/* Release PGconn data structure if we managed to create one */
1395 		remote_connection_close(conn);
1396 		PG_RE_THROW();
1397 	}
1398 	PG_END_TRY();
1399 
1400 	return conn;
1401 }
1402 
1403 /*
1404  * Based on PG's GetUserMapping, but this version does not fail when a user
1405  * mapping is not found.
1406  */
1407 static UserMapping *
get_user_mapping(Oid userid,Oid serverid)1408 get_user_mapping(Oid userid, Oid serverid)
1409 {
1410 	Datum datum;
1411 	HeapTuple tp;
1412 	bool isnull;
1413 	UserMapping *um;
1414 
1415 	tp = SearchSysCache2(USERMAPPINGUSERSERVER,
1416 						 ObjectIdGetDatum(userid),
1417 						 ObjectIdGetDatum(serverid));
1418 
1419 	if (!HeapTupleIsValid(tp))
1420 	{
1421 		/* Not found for the specific user -- try PUBLIC */
1422 		tp = SearchSysCache2(USERMAPPINGUSERSERVER,
1423 							 ObjectIdGetDatum(InvalidOid),
1424 							 ObjectIdGetDatum(serverid));
1425 	}
1426 
1427 	if (!HeapTupleIsValid(tp))
1428 		return NULL;
1429 
1430 	um = (UserMapping *) palloc(sizeof(UserMapping));
1431 	um->umid = ((Form_pg_user_mapping) GETSTRUCT(tp))->oid;
1432 	um->userid = userid;
1433 	um->serverid = serverid;
1434 
1435 	/* Extract the umoptions */
1436 	datum = SysCacheGetAttr(USERMAPPINGUSERSERVER, tp, Anum_pg_user_mapping_umoptions, &isnull);
1437 	if (isnull)
1438 		um->options = NIL;
1439 	else
1440 		um->options = untransformRelOptions(datum);
1441 
1442 	ReleaseSysCache(tp);
1443 
1444 	return um;
1445 }
1446 
1447 static bool
options_contain(List * options,const char * key)1448 options_contain(List *options, const char *key)
1449 {
1450 	ListCell *lc;
1451 
1452 	foreach (lc, options)
1453 	{
1454 		DefElem *d = (DefElem *) lfirst(lc);
1455 
1456 		if (strcmp(d->defname, key) == 0)
1457 			return true;
1458 	}
1459 
1460 	return false;
1461 }
1462 
1463 /*
1464  * Add user info (username and optionally password) to the connection
1465  * options).
1466  */
1467 static List *
add_userinfo_to_server_options(ForeignServer * server,Oid user_id)1468 add_userinfo_to_server_options(ForeignServer *server, Oid user_id)
1469 {
1470 	const UserMapping *um = get_user_mapping(user_id, server->serverid);
1471 	List *options = list_copy(server->options);
1472 
1473 	/* If a user mapping exists, then use the "user" and "password" options
1474 	 * from the user mapping (we assume that these options exist, or the
1475 	 * connection will later fail). Otherwise, just add the "user" and rely on
1476 	 * other authentication mechanisms. */
1477 	if (NULL != um)
1478 		options = list_concat(options, um->options);
1479 
1480 	if (!options_contain(options, "user"))
1481 	{
1482 		char *user_name = GetUserNameFromId(user_id, false);
1483 		options = lappend(options, makeDefElem("user", (Node *) makeString(user_name), -1));
1484 	}
1485 
1486 	return options;
1487 }
1488 
1489 /*
1490  * Append the given string to the buffer, with suitable quoting for passing
1491  * the string as a value in a keyword/value pair in a libpq connection string.
1492  *
1493  * The implementation is based on libpq appendConnStrVal().
1494  */
1495 static void
remote_connection_append_connstr_value(StringInfo buf,const char * str)1496 remote_connection_append_connstr_value(StringInfo buf, const char *str)
1497 {
1498 	const char *s;
1499 	bool needquotes;
1500 
1501 	/*
1502 	 * If the string is one or more plain ASCII characters, no need to quote
1503 	 * it. This is quite conservative, but better safe than sorry.
1504 	 */
1505 	needquotes = true;
1506 	for (s = str; *s; s++)
1507 	{
1508 		if (!((*s >= 'a' && *s <= 'z') || (*s >= 'A' && *s <= 'Z') || (*s >= '0' && *s <= '9') ||
1509 			  *s == '_' || *s == '.'))
1510 		{
1511 			needquotes = true;
1512 			break;
1513 		}
1514 		needquotes = false;
1515 	}
1516 
1517 	if (needquotes)
1518 	{
1519 		appendStringInfoChar(buf, '\'');
1520 		while (*str)
1521 		{
1522 			/* ' and \ must be escaped by to \' and \\ */
1523 			if (*str == '\'' || *str == '\\')
1524 				appendStringInfoChar(buf, '\\');
1525 
1526 			appendStringInfoChar(buf, *str);
1527 			str++;
1528 		}
1529 		appendStringInfoChar(buf, '\'');
1530 	}
1531 	else
1532 		appendStringInfoString(buf, str);
1533 }
1534 
1535 char *
remote_connection_get_connstr(const char * node_name)1536 remote_connection_get_connstr(const char *node_name)
1537 {
1538 	ForeignServer *server;
1539 	List *connection_options;
1540 	const char **keywords;
1541 	const char **values;
1542 	StringInfoData connstr;
1543 	StringInfoData connstr_escape;
1544 	int i;
1545 
1546 	server = data_node_get_foreign_server(node_name, ACL_NO_CHECK, false, false);
1547 	connection_options = add_userinfo_to_server_options(server, GetUserId());
1548 	setup_full_connection_options(connection_options, &keywords, &values);
1549 
1550 	/* Cycle through the options and create the connection string */
1551 	initStringInfo(&connstr);
1552 	i = 0;
1553 	while (keywords[i] != NULL)
1554 	{
1555 		appendStringInfo(&connstr, " %s=", keywords[i]);
1556 		remote_connection_append_connstr_value(&connstr, values[i]);
1557 		i++;
1558 	}
1559 	Assert(keywords[i] == NULL && values[i] == NULL);
1560 
1561 	initStringInfo(&connstr_escape);
1562 	enlargeStringInfo(&connstr_escape, connstr.len * 2 + 1);
1563 	connstr_escape.len += PQescapeString(connstr_escape.data, connstr.data, connstr.len);
1564 
1565 	/* Cast to (char **) to silence warning with MSVC compiler */
1566 	pfree((char **) keywords);
1567 	pfree((char **) values);
1568 	pfree(connstr.data);
1569 
1570 	return connstr_escape.data;
1571 }
1572 
1573 TSConnection *
remote_connection_open_by_id(TSConnectionId id)1574 remote_connection_open_by_id(TSConnectionId id)
1575 {
1576 	ForeignServer *server = GetForeignServer(id.server_id);
1577 	List *connection_options = add_userinfo_to_server_options(server, id.user_id);
1578 
1579 	return remote_connection_open_with_options(server->servername, connection_options, true);
1580 }
1581 
1582 TSConnection *
remote_connection_open(Oid server_id,Oid user_id)1583 remote_connection_open(Oid server_id, Oid user_id)
1584 {
1585 	TSConnectionId id = remote_connection_id(server_id, user_id);
1586 
1587 	return remote_connection_open_by_id(id);
1588 }
1589 
1590 /*
1591  * Open a connection without throwing and error.
1592  *
1593  * Returns the connection pointer on success. On failure NULL is returned and
1594  * the errmsg (if given) is used to return an error message.
1595  */
1596 TSConnection *
remote_connection_open_nothrow(Oid server_id,Oid user_id,char ** errmsg)1597 remote_connection_open_nothrow(Oid server_id, Oid user_id, char **errmsg)
1598 {
1599 	ForeignServer *server = GetForeignServer(server_id);
1600 	Oid fdwid = get_foreign_data_wrapper_oid(EXTENSION_FDW_NAME, false);
1601 	List *connection_options;
1602 	TSConnection *conn;
1603 
1604 	if (server->fdwid != fdwid)
1605 	{
1606 		elog(WARNING, "invalid node type for \"%s\"", server->servername);
1607 		return NULL;
1608 	}
1609 
1610 	connection_options = add_userinfo_to_server_options(server, user_id);
1611 	conn =
1612 		remote_connection_open_with_options_nothrow(server->servername, connection_options, errmsg);
1613 
1614 	if (NULL == conn)
1615 	{
1616 		if (NULL != errmsg && NULL == *errmsg)
1617 			*errmsg = "internal connection error";
1618 		return NULL;
1619 	}
1620 
1621 	if (PQstatus(conn->pg_conn) != CONNECTION_OK || !remote_connection_set_peer_dist_id(conn))
1622 	{
1623 		if (NULL != errmsg)
1624 			*errmsg = pchomp(PQerrorMessage(conn->pg_conn));
1625 		remote_connection_close(conn);
1626 		return NULL;
1627 	}
1628 
1629 	return conn;
1630 }
1631 
1632 #define PING_QUERY "SELECT 1"
1633 
1634 bool
remote_connection_ping(const char * node_name)1635 remote_connection_ping(const char *node_name)
1636 {
1637 	Oid server_id = get_foreign_server_oid(node_name, false);
1638 	TSConnection *conn = remote_connection_open_nothrow(server_id, GetUserId(), NULL);
1639 	bool success = false;
1640 
1641 	if (NULL == conn)
1642 		return false;
1643 
1644 	if (PQstatus(conn->pg_conn) == CONNECTION_OK)
1645 	{
1646 		if (1 == PQsendQuery(conn->pg_conn, PING_QUERY))
1647 		{
1648 			PGresult *res = PQgetResult(conn->pg_conn);
1649 
1650 			success = (PQresultStatus(res) == PGRES_TUPLES_OK);
1651 			PQclear(res);
1652 		}
1653 	}
1654 
1655 	remote_connection_close(conn);
1656 
1657 	return success;
1658 }
1659 
1660 void
remote_connection_close(TSConnection * conn)1661 remote_connection_close(TSConnection *conn)
1662 {
1663 	Assert(conn != NULL);
1664 
1665 	conn->closing_guard = true;
1666 
1667 	if (NULL != conn->pg_conn)
1668 		PQfinish(conn->pg_conn);
1669 
1670 	/* Assert that PQfinish detached this connection from the global list of
1671 	 * connections */
1672 	Assert(IS_DETACHED_ENTRY(&conn->ln));
1673 
1674 	remote_connection_free(conn);
1675 }
1676 
1677 /*
1678  * Assign a "unique" number for a cursor.
1679  *
1680  * TODO should this be moved into the session?
1681  *
1682  * These really only need to be unique per connection within a transaction.
1683  * For the moment we ignore the per-connection point and assign them across
1684  * all connections in the transaction, but we ask for the connection to be
1685  * supplied in case we want to refine that.
1686  *
1687  * Note that even if wraparound happens in a very long transaction, actual
1688  * collisions are highly improbable; just be sure to use %u not %d to print.
1689  */
1690 unsigned int
remote_connection_get_cursor_number()1691 remote_connection_get_cursor_number()
1692 {
1693 	return ++cursor_number;
1694 }
1695 
1696 void
remote_connection_reset_cursor_number()1697 remote_connection_reset_cursor_number()
1698 {
1699 	cursor_number = 0;
1700 }
1701 
1702 /*
1703  * Assign a "unique" number for a prepared statement.
1704  *
1705  * This works much like remote_connection_get_cursor_number, except that we never reset the counter
1706  * within a session.  That's because we can't be 100% sure we've gotten rid
1707  * of all prepared statements on all connections, and it's not really worth
1708  * increasing the risk of prepared-statement name collisions by resetting.
1709  */
1710 unsigned int
remote_connection_get_prep_stmt_number()1711 remote_connection_get_prep_stmt_number()
1712 {
1713 	return ++prep_stmt_number;
1714 }
1715 
1716 #define MAX_CONN_WAIT_TIMEOUT_MS 60000
1717 
1718 /*
1719  * Drain a connection of all data coming in and discard the results. Return
1720  * CONN_OK if all data is drained before the deadline expires.
1721  *
1722  * This is mainly used in abort processing. This result being returned
1723  * might be for a query that is being interrupted by transaction abort, or it might
1724  * be a query that was initiated as part of transaction abort to get the remote
1725  * side back to the appropriate state.
1726  *
1727  * It's not a huge problem if we throw an ERROR here, but if we get into error
1728  * recursion trouble, we'll end up slamming the connection shut, which will
1729  * necessitate failing the entire toplevel transaction even if subtransactions
1730  * were used.  Try to use WARNING where we can.
1731  *
1732  * end_time is the time at which we should give up and assume the remote
1733  * side is dead.
1734  */
1735 TSConnectionResult
remote_connection_drain(TSConnection * conn,TimestampTz endtime,PGresult ** result)1736 remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **result)
1737 {
1738 	volatile TSConnectionResult connresult = CONN_OK;
1739 	PGresult *volatile last_res = NULL;
1740 	PGconn *pg_conn = remote_connection_get_pg_conn(conn);
1741 
1742 	/* In what follows, do not leak any PGresults on an error. */
1743 	PG_TRY();
1744 	{
1745 		for (;;)
1746 		{
1747 			PGresult *res;
1748 
1749 			while (PQisBusy(pg_conn))
1750 			{
1751 				int wc;
1752 				TimestampTz now = GetCurrentTimestamp();
1753 				long remaining_secs;
1754 				int remaining_usecs;
1755 				long cur_timeout_ms;
1756 
1757 				/* If timeout has expired, give up, else get sleep time. */
1758 				if (now >= endtime)
1759 				{
1760 					connresult = CONN_TIMEOUT;
1761 					goto exit;
1762 				}
1763 
1764 				TimestampDifference(now, endtime, &remaining_secs, &remaining_usecs);
1765 
1766 				/* To protect against clock skew, limit sleep to one minute. */
1767 				cur_timeout_ms =
1768 					Min(MAX_CONN_WAIT_TIMEOUT_MS, remaining_secs * USECS_PER_SEC + remaining_usecs);
1769 
1770 				/* Sleep until there's something to do */
1771 				wc = WaitLatchOrSocket(MyLatch,
1772 									   WL_LATCH_SET | WL_SOCKET_READABLE | WL_EXIT_ON_PM_DEATH |
1773 										   WL_TIMEOUT,
1774 									   PQsocket(pg_conn),
1775 									   cur_timeout_ms,
1776 									   PG_WAIT_EXTENSION);
1777 				ResetLatch(MyLatch);
1778 
1779 				CHECK_FOR_INTERRUPTS();
1780 
1781 				/* Data available in socket? */
1782 				if ((wc & WL_SOCKET_READABLE) && (0 == PQconsumeInput(pg_conn)))
1783 				{
1784 					connresult = CONN_DISCONNECT;
1785 					goto exit;
1786 				}
1787 			}
1788 
1789 			res = PQgetResult(pg_conn);
1790 
1791 			if (res == NULL)
1792 			{
1793 				/* query is complete */
1794 				conn->status = CONN_IDLE;
1795 				connresult = CONN_OK;
1796 				break;
1797 			}
1798 
1799 			PQclear(last_res);
1800 			last_res = res;
1801 		}
1802 	exit:;
1803 	}
1804 	PG_CATCH();
1805 	{
1806 		PQclear(last_res);
1807 		PG_RE_THROW();
1808 	}
1809 	PG_END_TRY();
1810 
1811 	switch (connresult)
1812 	{
1813 		case CONN_OK:
1814 			if (last_res == NULL)
1815 				connresult = CONN_NO_RESPONSE;
1816 			else if (result != NULL)
1817 				*result = last_res;
1818 			else
1819 				PQclear(last_res);
1820 			break;
1821 		case CONN_TIMEOUT:
1822 		case CONN_DISCONNECT:
1823 			PQclear(last_res);
1824 			break;
1825 		case CONN_NO_RESPONSE:
1826 			Assert(last_res == NULL);
1827 			break;
1828 	}
1829 
1830 	return connresult;
1831 }
1832 
1833 /*
1834  * Cancel the currently-in-progress query and ignore the result.  Returns true if we successfully
1835  * cancel the query and discard any pending result, and false if not.
1836  */
1837 bool
remote_connection_cancel_query(TSConnection * conn)1838 remote_connection_cancel_query(TSConnection *conn)
1839 {
1840 	PGcancel *cancel;
1841 	char errbuf[256];
1842 	TimestampTz endtime;
1843 	TSConnectionError err;
1844 	bool success;
1845 
1846 	if (!conn)
1847 		return true;
1848 
1849 	memset(&err, 0, sizeof(TSConnectionError));
1850 
1851 	/*
1852 	 * Catch exceptions so that we can ensure the status is IDLE after the
1853 	 * cancel operation even in case of errors being thrown. Note that we
1854 	 * cannot set the status before we drain, since the drain function needs
1855 	 * to know the status (e.g., if the connection is in COPY_IN mode).
1856 	 */
1857 	PG_TRY();
1858 	{
1859 		if (conn->status == CONN_COPY_IN && !remote_connection_end_copy(conn, &err))
1860 			remote_connection_error_elog(&err, WARNING);
1861 
1862 		/*
1863 		 * If it takes too long to cancel the query and discard the result, assume
1864 		 * the connection is dead.
1865 		 */
1866 		endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
1867 
1868 		/*
1869 		 * Issue cancel request.  Unfortunately, there's no good way to limit the
1870 		 * amount of time that we might block inside PQcancel().
1871 		 */
1872 		if ((cancel = PQgetCancel(conn->pg_conn)))
1873 		{
1874 			if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1875 			{
1876 				ereport(WARNING,
1877 						(errcode(ERRCODE_CONNECTION_FAILURE),
1878 						 errmsg("could not send cancel request: %s", errbuf)));
1879 				PQfreeCancel(cancel);
1880 				conn->status = CONN_IDLE;
1881 				return false;
1882 			}
1883 			PQfreeCancel(cancel);
1884 		}
1885 
1886 		switch (remote_connection_drain(conn, endtime, NULL))
1887 		{
1888 			case CONN_OK:
1889 				/* Successfully, drained */
1890 			case CONN_NO_RESPONSE:
1891 				/* No response, likely beceause there was nothing to cancel */
1892 				success = true;
1893 				break;
1894 			default:
1895 				success = false;
1896 				break;
1897 		}
1898 	}
1899 	PG_CATCH();
1900 	{
1901 		conn->status = CONN_IDLE;
1902 		PG_RE_THROW();
1903 	}
1904 	PG_END_TRY();
1905 
1906 	conn->status = CONN_IDLE;
1907 
1908 	return success;
1909 }
1910 
1911 void
remote_result_close(PGresult * res)1912 remote_result_close(PGresult *res)
1913 {
1914 	PQclear(res);
1915 }
1916 
1917 /*
1918  * Cleanup connections and results at the end of a (sub-)transaction.
1919  *
1920  * This function is called at the end of transactions and sub-transactions to
1921  * auto-cleanup connections and result objects.
1922  */
1923 static void
remote_connections_cleanup(SubTransactionId subtxid,bool isabort)1924 remote_connections_cleanup(SubTransactionId subtxid, bool isabort)
1925 {
1926 	ListNode *curr = connections.next;
1927 	unsigned int num_connections = 0;
1928 	unsigned int num_results = 0;
1929 
1930 	while (curr != &connections)
1931 	{
1932 		TSConnection *conn = (TSConnection *) curr;
1933 
1934 		/* Move to next connection since closing the current one might
1935 		 * otherwise make the curr pointer invalid. */
1936 		curr = curr->next;
1937 
1938 		if (conn->autoclose && (subtxid == InvalidSubTransactionId || subtxid == conn->subtxid))
1939 		{
1940 			/* Closes the connection and frees all its PGresult objects */
1941 			remote_connection_close(conn);
1942 			num_connections++;
1943 		}
1944 		else
1945 		{
1946 			/* We're not closing the connection, but we should clean up any
1947 			 * lingering results */
1948 			ListNode *curr_result = conn->results.next;
1949 
1950 			while (curr_result != &conn->results)
1951 			{
1952 				ResultEntry *entry = (ResultEntry *) curr_result;
1953 
1954 				curr_result = curr_result->next;
1955 
1956 				if (subtxid == InvalidSubTransactionId || subtxid == entry->subtxid)
1957 				{
1958 					PQclear(entry->result);
1959 					num_results++;
1960 				}
1961 			}
1962 		}
1963 	}
1964 
1965 	if (subtxid == InvalidSubTransactionId)
1966 		elog(DEBUG3,
1967 			 "cleaned up %u connections and %u results at %s of transaction",
1968 			 num_connections,
1969 			 num_results,
1970 			 isabort ? "abort" : "commit");
1971 	else
1972 		elog(DEBUG3,
1973 			 "cleaned up %u connections and %u results at %s of sub-transaction %u",
1974 			 num_connections,
1975 			 num_results,
1976 			 isabort ? "abort" : "commit",
1977 			 subtxid);
1978 }
1979 
1980 static void
remote_connection_xact_end(XactEvent event,void * unused_arg)1981 remote_connection_xact_end(XactEvent event, void *unused_arg)
1982 {
1983 	/*
1984 	 * We are deep down in CommitTransaction code path. We do not want our
1985 	 * emit_log_hook_callback to interfere since it uses its own transaction
1986 	 */
1987 	emit_log_hook_type prev_emit_log_hook = emit_log_hook;
1988 	emit_log_hook = NULL;
1989 
1990 	switch (event)
1991 	{
1992 		case XACT_EVENT_ABORT:
1993 		case XACT_EVENT_PARALLEL_ABORT:
1994 			/*
1995 			 * We expect that the waitpoint will be retried and then we
1996 			 * will return due to the process receiving a SIGTERM if
1997 			 * the advisory lock is exclusively held by a user call
1998 			 */
1999 			DEBUG_RETRY_WAITPOINT("remote_conn_xact_end");
2000 			remote_connections_cleanup(InvalidSubTransactionId, true);
2001 			break;
2002 		case XACT_EVENT_COMMIT:
2003 		case XACT_EVENT_PARALLEL_COMMIT:
2004 			/* Same retry behavior as above */
2005 			DEBUG_RETRY_WAITPOINT("remote_conn_xact_end");
2006 			remote_connections_cleanup(InvalidSubTransactionId, false);
2007 			break;
2008 		case XACT_EVENT_PREPARE:
2009 			/*
2010 			 * We expect that the waitpoint will be retried and then we
2011 			 * will return with a warning on crossing the retry count if
2012 			 * the advisory lock is exclusively held by a user call
2013 			 */
2014 			DEBUG_RETRY_WAITPOINT("remote_conn_xact_end");
2015 			break;
2016 		default:
2017 			/* other events are too early to use DEBUG_WAITPOINT.. */
2018 			break;
2019 	}
2020 
2021 	/* re-enable the emit_log_hook */
2022 	emit_log_hook = prev_emit_log_hook;
2023 }
2024 
2025 static void
remote_connection_subxact_end(SubXactEvent event,SubTransactionId subtxid,SubTransactionId parent_subtxid,void * unused_arg)2026 remote_connection_subxact_end(SubXactEvent event, SubTransactionId subtxid,
2027 							  SubTransactionId parent_subtxid, void *unused_arg)
2028 {
2029 	/*
2030 	 * We are deep down in CommitTransaction code path. We do not want our
2031 	 * emit_log_hook_callback to interfere since it uses its own transaction
2032 	 */
2033 	emit_log_hook_type prev_emit_log_hook = emit_log_hook;
2034 	emit_log_hook = NULL;
2035 
2036 	switch (event)
2037 	{
2038 		case SUBXACT_EVENT_ABORT_SUB:
2039 			remote_connections_cleanup(subtxid, true);
2040 			break;
2041 		case SUBXACT_EVENT_COMMIT_SUB:
2042 			remote_connections_cleanup(subtxid, false);
2043 			break;
2044 		default:
2045 			break;
2046 	}
2047 
2048 	/* re-enable the emit_log_hook */
2049 	emit_log_hook = prev_emit_log_hook;
2050 }
2051 
2052 bool
remote_connection_set_single_row_mode(TSConnection * conn)2053 remote_connection_set_single_row_mode(TSConnection *conn)
2054 {
2055 	return PQsetSingleRowMode(conn->pg_conn);
2056 }
2057 
2058 static bool
send_binary_copy_header(const TSConnection * conn,TSConnectionError * err)2059 send_binary_copy_header(const TSConnection *conn, TSConnectionError *err)
2060 {
2061 	/* File header for binary format */
2062 	static const char file_header[] = {
2063 		'P', 'G', 'C', 'O', 'P', 'Y', '\n', '\377', '\r', '\n', '\0', /* Signature */
2064 		0,   0,   0,   0,											  /* 4 bytes flags */
2065 		0,   0,   0,   0 /* 4 bytes header extension length (unused) */
2066 	};
2067 
2068 	int res = PQputCopyData(conn->pg_conn, file_header, sizeof(file_header));
2069 
2070 	if (res != 1)
2071 		return fill_connection_error(err,
2072 									 ERRCODE_CONNECTION_FAILURE,
2073 									 "could not set binary COPY mode",
2074 									 conn);
2075 	return true;
2076 }
2077 
2078 bool
remote_connection_begin_copy(TSConnection * conn,const char * copycmd,bool binary,TSConnectionError * err)2079 remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binary,
2080 							 TSConnectionError *err)
2081 {
2082 	PGconn *pg_conn = remote_connection_get_pg_conn(conn);
2083 	PGresult *volatile res = NULL;
2084 
2085 	if (PQisnonblocking(pg_conn))
2086 		return fill_simple_error(err,
2087 								 ERRCODE_FEATURE_NOT_SUPPORTED,
2088 								 "distributed copy doesn't support non-blocking connections",
2089 								 conn);
2090 
2091 	if (conn->status != CONN_IDLE)
2092 		return fill_simple_error(err,
2093 								 ERRCODE_INTERNAL_ERROR,
2094 								 "connection not IDLE when beginning COPY",
2095 								 conn);
2096 
2097 	res = PQexec(pg_conn, copycmd);
2098 
2099 	if (PQresultStatus(res) != PGRES_COPY_IN)
2100 	{
2101 		fill_result_error(err,
2102 						  ERRCODE_CONNECTION_FAILURE,
2103 						  "unable to start remote COPY on data node",
2104 						  res);
2105 		PQclear(res);
2106 		return false;
2107 	}
2108 
2109 	PQclear(res);
2110 
2111 	if (binary && !send_binary_copy_header(conn, err))
2112 		goto err_end_copy;
2113 
2114 	conn->binary_copy = binary;
2115 	conn->status = CONN_COPY_IN;
2116 
2117 	return true;
2118 err_end_copy:
2119 	PQputCopyEnd(pg_conn, err->msg);
2120 
2121 	return false;
2122 }
2123 
2124 bool
remote_connection_put_copy_data(TSConnection * conn,const char * buffer,size_t len,TSConnectionError * err)2125 remote_connection_put_copy_data(TSConnection *conn, const char *buffer, size_t len,
2126 								TSConnectionError *err)
2127 {
2128 	int res;
2129 
2130 	res = PQputCopyData(remote_connection_get_pg_conn(conn), buffer, len);
2131 
2132 	if (res != 1)
2133 		return fill_connection_error(err,
2134 									 ERRCODE_CONNECTION_EXCEPTION,
2135 									 "could not send COPY data",
2136 									 conn);
2137 
2138 	return true;
2139 }
2140 
2141 static bool
send_end_binary_copy_data(const TSConnection * conn,TSConnectionError * err)2142 send_end_binary_copy_data(const TSConnection *conn, TSConnectionError *err)
2143 {
2144 	const uint16 buf = pg_hton16((uint16) -1);
2145 
2146 	if (PQputCopyData(conn->pg_conn, (char *) &buf, sizeof(buf)) != 1)
2147 		return fill_simple_error(err, ERRCODE_INTERNAL_ERROR, "could not end binary COPY", conn);
2148 
2149 	return true;
2150 }
2151 
2152 bool
remote_connection_end_copy(TSConnection * conn,TSConnectionError * err)2153 remote_connection_end_copy(TSConnection *conn, TSConnectionError *err)
2154 {
2155 	PGresult *res;
2156 	bool success;
2157 
2158 	if (conn->status != CONN_COPY_IN)
2159 		return fill_simple_error(err,
2160 								 ERRCODE_INTERNAL_ERROR,
2161 								 "connection not in COPY_IN state when ending COPY",
2162 								 conn);
2163 
2164 	if (conn->binary_copy && !send_end_binary_copy_data(conn, err))
2165 		return false;
2166 
2167 	if (PQputCopyEnd(conn->pg_conn, NULL) != 1)
2168 		return fill_simple_error(err,
2169 								 ERRCODE_CONNECTION_EXCEPTION,
2170 								 "could not end remote COPY",
2171 								 conn);
2172 
2173 	success = true;
2174 	conn->status = CONN_PROCESSING;
2175 
2176 	while ((res = PQgetResult(conn->pg_conn)))
2177 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
2178 			success = fill_result_error(err,
2179 										ERRCODE_CONNECTION_EXCEPTION,
2180 										"invalid result when ending remote COPY",
2181 										res);
2182 
2183 	Assert(res == NULL);
2184 	conn->status = CONN_IDLE;
2185 
2186 	return success;
2187 }
2188 
2189 #ifdef TS_DEBUG
2190 /*
2191  * Reset the current connection stats.
2192  */
2193 void
remote_connection_stats_reset(void)2194 remote_connection_stats_reset(void)
2195 {
2196 	MemSet(&connstats, 0, sizeof(RemoteConnectionStats));
2197 }
2198 
2199 /*
2200  * Get the current connection stats.
2201  */
2202 RemoteConnectionStats *
remote_connection_stats_get(void)2203 remote_connection_stats_get(void)
2204 {
2205 	return &connstats;
2206 }
2207 #endif
2208 
2209 void
_remote_connection_init(void)2210 _remote_connection_init(void)
2211 {
2212 	RegisterXactCallback(remote_connection_xact_end, NULL);
2213 	RegisterSubXactCallback(remote_connection_subxact_end, NULL);
2214 
2215 	unset_libpq_envvar();
2216 }
2217 
2218 void
_remote_connection_fini(void)2219 _remote_connection_fini(void)
2220 {
2221 	UnregisterXactCallback(remote_connection_xact_end, NULL);
2222 	UnregisterSubXactCallback(remote_connection_subxact_end, NULL);
2223 }
2224