1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  *		  Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xact.h"
17 #include "catalog/pg_user_mapping.h"
18 #include "commands/defrem.h"
19 #include "funcapi.h"
20 #include "mb/pg_wchar.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postgres_fdw.h"
24 #include "storage/fd.h"
25 #include "storage/latch.h"
26 #include "utils/builtins.h"
27 #include "utils/datetime.h"
28 #include "utils/hsearch.h"
29 #include "utils/inval.h"
30 #include "utils/memutils.h"
31 #include "utils/syscache.h"
32 
33 /*
34  * Connection cache hash table entry
35  *
36  * The lookup key in this hash table is the user mapping OID. We use just one
37  * connection per user mapping ID, which ensures that all the scans use the
38  * same snapshot during a query.  Using the user mapping OID rather than
39  * the foreign server OID + user OID avoids creating multiple connections when
40  * the public user mapping applies to all user OIDs.
41  *
42  * The "conn" pointer can be NULL if we don't currently have a live connection.
43  * When we do have a connection, xact_depth tracks the current depth of
44  * transactions and subtransactions open on the remote side.  We need to issue
45  * commands at the same nesting depth on the remote as we're executing at
46  * ourselves, so that rolling back a subtransaction will kill the right
47  * queries and not the wrong ones.
48  */
49 typedef Oid ConnCacheKey;
50 
51 typedef struct ConnCacheEntry
52 {
53 	ConnCacheKey key;			/* hash key (must be first) */
54 	PGconn	   *conn;			/* connection to foreign server, or NULL */
55 	/* Remaining fields are invalid when conn is NULL: */
56 	int			xact_depth;		/* 0 = no xact open, 1 = main xact open, 2 =
57 								 * one level of subxact open, etc */
58 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
59 	bool		have_error;		/* have any subxacts aborted in this xact? */
60 	bool		changing_xact_state;	/* xact state change in process */
61 	bool		invalidated;	/* true if reconnect is pending */
62 	bool		keep_connections;	/* setting value of keep_connections
63 									 * server option */
64 	Oid			serverid;		/* foreign server OID used to get server name */
65 	uint32		server_hashvalue;	/* hash value of foreign server OID */
66 	uint32		mapping_hashvalue;	/* hash value of user mapping OID */
67 	PgFdwConnState state;		/* extra per-connection state */
68 } ConnCacheEntry;
69 
70 /*
71  * Connection cache (initialized on first use)
72  */
73 static HTAB *ConnectionHash = NULL;
74 
75 /* for assigning cursor numbers and prepared statement numbers */
76 static unsigned int cursor_number = 0;
77 static unsigned int prep_stmt_number = 0;
78 
79 /* tracks whether any work is needed in callback functions */
80 static bool xact_got_connection = false;
81 
82 /*
83  * SQL functions
84  */
85 PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
86 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
87 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
88 
89 /* prototypes of private functions */
90 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
91 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
92 static void disconnect_pg_server(ConnCacheEntry *entry);
93 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
94 static void configure_remote_session(PGconn *conn);
95 static void begin_remote_xact(ConnCacheEntry *entry);
96 static void pgfdw_xact_callback(XactEvent event, void *arg);
97 static void pgfdw_subxact_callback(SubXactEvent event,
98 								   SubTransactionId mySubid,
99 								   SubTransactionId parentSubid,
100 								   void *arg);
101 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
102 static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
103 static bool pgfdw_cancel_query(PGconn *conn);
104 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
105 									 bool ignore_errors);
106 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
107 									 PGresult **result);
108 static bool UserMappingPasswordRequired(UserMapping *user);
109 static bool disconnect_cached_connections(Oid serverid);
110 
111 /*
112  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
113  * server with the user's authorization.  A new connection is established
114  * if we don't already have a suitable one, and a transaction is opened at
115  * the right subtransaction nesting depth if we didn't do that already.
116  *
117  * will_prep_stmt must be true if caller intends to create any prepared
118  * statements.  Since those don't go away automatically at transaction end
119  * (not even on error), we need this flag to cue manual cleanup.
120  *
121  * If state is not NULL, *state receives the per-connection state associated
122  * with the PGconn.
123  */
124 PGconn *
GetConnection(UserMapping * user,bool will_prep_stmt,PgFdwConnState ** state)125 GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
126 {
127 	bool		found;
128 	bool		retry = false;
129 	ConnCacheEntry *entry;
130 	ConnCacheKey key;
131 	MemoryContext ccxt = CurrentMemoryContext;
132 
133 	/* First time through, initialize connection cache hashtable */
134 	if (ConnectionHash == NULL)
135 	{
136 		HASHCTL		ctl;
137 
138 		ctl.keysize = sizeof(ConnCacheKey);
139 		ctl.entrysize = sizeof(ConnCacheEntry);
140 		ConnectionHash = hash_create("postgres_fdw connections", 8,
141 									 &ctl,
142 									 HASH_ELEM | HASH_BLOBS);
143 
144 		/*
145 		 * Register some callback functions that manage connection cleanup.
146 		 * This should be done just once in each backend.
147 		 */
148 		RegisterXactCallback(pgfdw_xact_callback, NULL);
149 		RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
150 		CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
151 									  pgfdw_inval_callback, (Datum) 0);
152 		CacheRegisterSyscacheCallback(USERMAPPINGOID,
153 									  pgfdw_inval_callback, (Datum) 0);
154 	}
155 
156 	/* Set flag that we did GetConnection during the current transaction */
157 	xact_got_connection = true;
158 
159 	/* Create hash key for the entry.  Assume no pad bytes in key struct */
160 	key = user->umid;
161 
162 	/*
163 	 * Find or create cached entry for requested connection.
164 	 */
165 	entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
166 	if (!found)
167 	{
168 		/*
169 		 * We need only clear "conn" here; remaining fields will be filled
170 		 * later when "conn" is set.
171 		 */
172 		entry->conn = NULL;
173 	}
174 
175 	/* Reject further use of connections which failed abort cleanup. */
176 	pgfdw_reject_incomplete_xact_state_change(entry);
177 
178 	/*
179 	 * If the connection needs to be remade due to invalidation, disconnect as
180 	 * soon as we're out of all transactions.
181 	 */
182 	if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
183 	{
184 		elog(DEBUG3, "closing connection %p for option changes to take effect",
185 			 entry->conn);
186 		disconnect_pg_server(entry);
187 	}
188 
189 	/*
190 	 * If cache entry doesn't have a connection, we have to establish a new
191 	 * connection.  (If connect_pg_server throws an error, the cache entry
192 	 * will remain in a valid empty state, ie conn == NULL.)
193 	 */
194 	if (entry->conn == NULL)
195 		make_new_connection(entry, user);
196 
197 	/*
198 	 * We check the health of the cached connection here when using it.  In
199 	 * cases where we're out of all transactions, if a broken connection is
200 	 * detected, we try to reestablish a new connection later.
201 	 */
202 	PG_TRY();
203 	{
204 		/* Process a pending asynchronous request if any. */
205 		if (entry->state.pendingAreq)
206 			process_pending_request(entry->state.pendingAreq);
207 		/* Start a new transaction or subtransaction if needed. */
208 		begin_remote_xact(entry);
209 	}
210 	PG_CATCH();
211 	{
212 		MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
213 		ErrorData  *errdata = CopyErrorData();
214 
215 		/*
216 		 * Determine whether to try to reestablish the connection.
217 		 *
218 		 * After a broken connection is detected in libpq, any error other
219 		 * than connection failure (e.g., out-of-memory) can be thrown
220 		 * somewhere between return from libpq and the expected ereport() call
221 		 * in pgfdw_report_error(). In this case, since PQstatus() indicates
222 		 * CONNECTION_BAD, checking only PQstatus() causes the false detection
223 		 * of connection failure. To avoid this, we also verify that the
224 		 * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
225 		 * checking only the sqlstate can cause another false detection
226 		 * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
227 		 * for any libpq-originated error condition.
228 		 */
229 		if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
230 			PQstatus(entry->conn) != CONNECTION_BAD ||
231 			entry->xact_depth > 0)
232 		{
233 			MemoryContextSwitchTo(ecxt);
234 			PG_RE_THROW();
235 		}
236 
237 		/* Clean up the error state */
238 		FlushErrorState();
239 		FreeErrorData(errdata);
240 		errdata = NULL;
241 
242 		retry = true;
243 	}
244 	PG_END_TRY();
245 
246 	/*
247 	 * If a broken connection is detected, disconnect it, reestablish a new
248 	 * connection and retry a new remote transaction. If connection failure is
249 	 * reported again, we give up getting a connection.
250 	 */
251 	if (retry)
252 	{
253 		Assert(entry->xact_depth == 0);
254 
255 		ereport(DEBUG3,
256 				(errmsg_internal("could not start remote transaction on connection %p",
257 								 entry->conn)),
258 				errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
259 
260 		elog(DEBUG3, "closing connection %p to reestablish a new one",
261 			 entry->conn);
262 		disconnect_pg_server(entry);
263 
264 		if (entry->conn == NULL)
265 			make_new_connection(entry, user);
266 
267 		begin_remote_xact(entry);
268 	}
269 
270 	/* Remember if caller will prepare statements */
271 	entry->have_prep_stmt |= will_prep_stmt;
272 
273 	/* If caller needs access to the per-connection state, return it. */
274 	if (state)
275 		*state = &entry->state;
276 
277 	return entry->conn;
278 }
279 
280 /*
281  * Reset all transient state fields in the cached connection entry and
282  * establish new connection to the remote server.
283  */
284 static void
make_new_connection(ConnCacheEntry * entry,UserMapping * user)285 make_new_connection(ConnCacheEntry *entry, UserMapping *user)
286 {
287 	ForeignServer *server = GetForeignServer(user->serverid);
288 	ListCell   *lc;
289 
290 	Assert(entry->conn == NULL);
291 
292 	/* Reset all transient state fields, to be sure all are clean */
293 	entry->xact_depth = 0;
294 	entry->have_prep_stmt = false;
295 	entry->have_error = false;
296 	entry->changing_xact_state = false;
297 	entry->invalidated = false;
298 	entry->serverid = server->serverid;
299 	entry->server_hashvalue =
300 		GetSysCacheHashValue1(FOREIGNSERVEROID,
301 							  ObjectIdGetDatum(server->serverid));
302 	entry->mapping_hashvalue =
303 		GetSysCacheHashValue1(USERMAPPINGOID,
304 							  ObjectIdGetDatum(user->umid));
305 	memset(&entry->state, 0, sizeof(entry->state));
306 
307 	/*
308 	 * Determine whether to keep the connection that we're about to make here
309 	 * open even after the transaction using it ends, so that the subsequent
310 	 * transactions can re-use it.
311 	 *
312 	 * It's enough to determine this only when making new connection because
313 	 * all the connections to the foreign server whose keep_connections option
314 	 * is changed will be closed and re-made later.
315 	 *
316 	 * By default, all the connections to any foreign servers are kept open.
317 	 */
318 	entry->keep_connections = true;
319 	foreach(lc, server->options)
320 	{
321 		DefElem    *def = (DefElem *) lfirst(lc);
322 
323 		if (strcmp(def->defname, "keep_connections") == 0)
324 			entry->keep_connections = defGetBoolean(def);
325 	}
326 
327 	/* Now try to make the connection */
328 	entry->conn = connect_pg_server(server, user);
329 
330 	elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
331 		 entry->conn, server->servername, user->umid, user->userid);
332 }
333 
334 /*
335  * Connect to remote server using specified server and user mapping properties.
336  */
337 static PGconn *
connect_pg_server(ForeignServer * server,UserMapping * user)338 connect_pg_server(ForeignServer *server, UserMapping *user)
339 {
340 	PGconn	   *volatile conn = NULL;
341 
342 	/*
343 	 * Use PG_TRY block to ensure closing connection on error.
344 	 */
345 	PG_TRY();
346 	{
347 		const char **keywords;
348 		const char **values;
349 		int			n;
350 
351 		/*
352 		 * Construct connection params from generic options of ForeignServer
353 		 * and UserMapping.  (Some of them might not be libpq options, in
354 		 * which case we'll just waste a few array slots.)  Add 3 extra slots
355 		 * for fallback_application_name, client_encoding, end marker.
356 		 */
357 		n = list_length(server->options) + list_length(user->options) + 3;
358 		keywords = (const char **) palloc(n * sizeof(char *));
359 		values = (const char **) palloc(n * sizeof(char *));
360 
361 		n = 0;
362 		n += ExtractConnectionOptions(server->options,
363 									  keywords + n, values + n);
364 		n += ExtractConnectionOptions(user->options,
365 									  keywords + n, values + n);
366 
367 		/* Use "postgres_fdw" as fallback_application_name. */
368 		keywords[n] = "fallback_application_name";
369 		values[n] = "postgres_fdw";
370 		n++;
371 
372 		/* Set client_encoding so that libpq can convert encoding properly. */
373 		keywords[n] = "client_encoding";
374 		values[n] = GetDatabaseEncodingName();
375 		n++;
376 
377 		keywords[n] = values[n] = NULL;
378 
379 		/* verify the set of connection parameters */
380 		check_conn_params(keywords, values, user);
381 
382 		/*
383 		 * We must obey fd.c's limit on non-virtual file descriptors.  Assume
384 		 * that a PGconn represents one long-lived FD.  (Doing this here also
385 		 * ensures that VFDs are closed if needed to make room.)
386 		 */
387 		if (!AcquireExternalFD())
388 		{
389 #ifndef WIN32					/* can't write #if within ereport() macro */
390 			ereport(ERROR,
391 					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
392 					 errmsg("could not connect to server \"%s\"",
393 							server->servername),
394 					 errdetail("There are too many open files on the local server."),
395 					 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
396 #else
397 			ereport(ERROR,
398 					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
399 					 errmsg("could not connect to server \"%s\"",
400 							server->servername),
401 					 errdetail("There are too many open files on the local server."),
402 					 errhint("Raise the server's max_files_per_process setting.")));
403 #endif
404 		}
405 
406 		/* OK to make connection */
407 		conn = PQconnectdbParams(keywords, values, false);
408 
409 		if (!conn)
410 			ReleaseExternalFD();	/* because the PG_CATCH block won't */
411 
412 		if (!conn || PQstatus(conn) != CONNECTION_OK)
413 			ereport(ERROR,
414 					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
415 					 errmsg("could not connect to server \"%s\"",
416 							server->servername),
417 					 errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
418 
419 		/*
420 		 * Check that non-superuser has used password to establish connection;
421 		 * otherwise, he's piggybacking on the postgres server's user
422 		 * identity. See also dblink_security_check() in contrib/dblink and
423 		 * check_conn_params.
424 		 */
425 		if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
426 			!PQconnectionUsedPassword(conn))
427 			ereport(ERROR,
428 					(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
429 					 errmsg("password is required"),
430 					 errdetail("Non-superuser cannot connect if the server does not request a password."),
431 					 errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
432 
433 		/* Prepare new session for use */
434 		configure_remote_session(conn);
435 
436 		pfree(keywords);
437 		pfree(values);
438 	}
439 	PG_CATCH();
440 	{
441 		/* Release PGconn data structure if we managed to create one */
442 		if (conn)
443 		{
444 			PQfinish(conn);
445 			ReleaseExternalFD();
446 		}
447 		PG_RE_THROW();
448 	}
449 	PG_END_TRY();
450 
451 	return conn;
452 }
453 
454 /*
455  * Disconnect any open connection for a connection cache entry.
456  */
457 static void
disconnect_pg_server(ConnCacheEntry * entry)458 disconnect_pg_server(ConnCacheEntry *entry)
459 {
460 	if (entry->conn != NULL)
461 	{
462 		PQfinish(entry->conn);
463 		entry->conn = NULL;
464 		ReleaseExternalFD();
465 	}
466 }
467 
468 /*
469  * Return true if the password_required is defined and false for this user
470  * mapping, otherwise false. The mapping has been pre-validated.
471  */
472 static bool
UserMappingPasswordRequired(UserMapping * user)473 UserMappingPasswordRequired(UserMapping *user)
474 {
475 	ListCell   *cell;
476 
477 	foreach(cell, user->options)
478 	{
479 		DefElem    *def = (DefElem *) lfirst(cell);
480 
481 		if (strcmp(def->defname, "password_required") == 0)
482 			return defGetBoolean(def);
483 	}
484 
485 	return true;
486 }
487 
488 /*
489  * For non-superusers, insist that the connstr specify a password.  This
490  * prevents a password from being picked up from .pgpass, a service file, the
491  * environment, etc.  We don't want the postgres user's passwords,
492  * certificates, etc to be accessible to non-superusers.  (See also
493  * dblink_connstr_check in contrib/dblink.)
494  */
495 static void
check_conn_params(const char ** keywords,const char ** values,UserMapping * user)496 check_conn_params(const char **keywords, const char **values, UserMapping *user)
497 {
498 	int			i;
499 
500 	/* no check required if superuser */
501 	if (superuser_arg(user->userid))
502 		return;
503 
504 	/* ok if params contain a non-empty password */
505 	for (i = 0; keywords[i] != NULL; i++)
506 	{
507 		if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
508 			return;
509 	}
510 
511 	/* ok if the superuser explicitly said so at user mapping creation time */
512 	if (!UserMappingPasswordRequired(user))
513 		return;
514 
515 	ereport(ERROR,
516 			(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
517 			 errmsg("password is required"),
518 			 errdetail("Non-superusers must provide a password in the user mapping.")));
519 }
520 
521 /*
522  * Issue SET commands to make sure remote session is configured properly.
523  *
524  * We do this just once at connection, assuming nothing will change the
525  * values later.  Since we'll never send volatile function calls to the
526  * remote, there shouldn't be any way to break this assumption from our end.
527  * It's possible to think of ways to break it at the remote end, eg making
528  * a foreign table point to a view that includes a set_config call ---
529  * but once you admit the possibility of a malicious view definition,
530  * there are any number of ways to break things.
531  */
532 static void
configure_remote_session(PGconn * conn)533 configure_remote_session(PGconn *conn)
534 {
535 	int			remoteversion = PQserverVersion(conn);
536 
537 	/* Force the search path to contain only pg_catalog (see deparse.c) */
538 	do_sql_command(conn, "SET search_path = pg_catalog");
539 
540 	/*
541 	 * Set remote timezone; this is basically just cosmetic, since all
542 	 * transmitted and returned timestamptzs should specify a zone explicitly
543 	 * anyway.  However it makes the regression test outputs more predictable.
544 	 *
545 	 * We don't risk setting remote zone equal to ours, since the remote
546 	 * server might use a different timezone database.  Instead, use UTC
547 	 * (quoted, because very old servers are picky about case).
548 	 */
549 	do_sql_command(conn, "SET timezone = 'UTC'");
550 
551 	/*
552 	 * Set values needed to ensure unambiguous data output from remote.  (This
553 	 * logic should match what pg_dump does.  See also set_transmission_modes
554 	 * in postgres_fdw.c.)
555 	 */
556 	do_sql_command(conn, "SET datestyle = ISO");
557 	if (remoteversion >= 80400)
558 		do_sql_command(conn, "SET intervalstyle = postgres");
559 	if (remoteversion >= 90000)
560 		do_sql_command(conn, "SET extra_float_digits = 3");
561 	else
562 		do_sql_command(conn, "SET extra_float_digits = 2");
563 }
564 
565 /*
566  * Convenience subroutine to issue a non-data-returning SQL command to remote
567  */
568 void
do_sql_command(PGconn * conn,const char * sql)569 do_sql_command(PGconn *conn, const char *sql)
570 {
571 	PGresult   *res;
572 
573 	if (!PQsendQuery(conn, sql))
574 		pgfdw_report_error(ERROR, NULL, conn, false, sql);
575 	res = pgfdw_get_result(conn, sql);
576 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
577 		pgfdw_report_error(ERROR, res, conn, true, sql);
578 	PQclear(res);
579 }
580 
581 /*
582  * Start remote transaction or subtransaction, if needed.
583  *
584  * Note that we always use at least REPEATABLE READ in the remote session.
585  * This is so that, if a query initiates multiple scans of the same or
586  * different foreign tables, we will get snapshot-consistent results from
587  * those scans.  A disadvantage is that we can't provide sane emulation of
588  * READ COMMITTED behavior --- it would be nice if we had some other way to
589  * control which remote queries share a snapshot.
590  */
591 static void
begin_remote_xact(ConnCacheEntry * entry)592 begin_remote_xact(ConnCacheEntry *entry)
593 {
594 	int			curlevel = GetCurrentTransactionNestLevel();
595 
596 	/* Start main transaction if we haven't yet */
597 	if (entry->xact_depth <= 0)
598 	{
599 		const char *sql;
600 
601 		elog(DEBUG3, "starting remote transaction on connection %p",
602 			 entry->conn);
603 
604 		if (IsolationIsSerializable())
605 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
606 		else
607 			sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
608 		entry->changing_xact_state = true;
609 		do_sql_command(entry->conn, sql);
610 		entry->xact_depth = 1;
611 		entry->changing_xact_state = false;
612 	}
613 
614 	/*
615 	 * If we're in a subtransaction, stack up savepoints to match our level.
616 	 * This ensures we can rollback just the desired effects when a
617 	 * subtransaction aborts.
618 	 */
619 	while (entry->xact_depth < curlevel)
620 	{
621 		char		sql[64];
622 
623 		snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
624 		entry->changing_xact_state = true;
625 		do_sql_command(entry->conn, sql);
626 		entry->xact_depth++;
627 		entry->changing_xact_state = false;
628 	}
629 }
630 
631 /*
632  * Release connection reference count created by calling GetConnection.
633  */
634 void
ReleaseConnection(PGconn * conn)635 ReleaseConnection(PGconn *conn)
636 {
637 	/*
638 	 * Currently, we don't actually track connection references because all
639 	 * cleanup is managed on a transaction or subtransaction basis instead. So
640 	 * there's nothing to do here.
641 	 */
642 }
643 
644 /*
645  * Assign a "unique" number for a cursor.
646  *
647  * These really only need to be unique per connection within a transaction.
648  * For the moment we ignore the per-connection point and assign them across
649  * all connections in the transaction, but we ask for the connection to be
650  * supplied in case we want to refine that.
651  *
652  * Note that even if wraparound happens in a very long transaction, actual
653  * collisions are highly improbable; just be sure to use %u not %d to print.
654  */
655 unsigned int
GetCursorNumber(PGconn * conn)656 GetCursorNumber(PGconn *conn)
657 {
658 	return ++cursor_number;
659 }
660 
661 /*
662  * Assign a "unique" number for a prepared statement.
663  *
664  * This works much like GetCursorNumber, except that we never reset the counter
665  * within a session.  That's because we can't be 100% sure we've gotten rid
666  * of all prepared statements on all connections, and it's not really worth
667  * increasing the risk of prepared-statement name collisions by resetting.
668  */
669 unsigned int
GetPrepStmtNumber(PGconn * conn)670 GetPrepStmtNumber(PGconn *conn)
671 {
672 	return ++prep_stmt_number;
673 }
674 
675 /*
676  * Submit a query and wait for the result.
677  *
678  * This function is interruptible by signals.
679  *
680  * Caller is responsible for the error handling on the result.
681  */
682 PGresult *
pgfdw_exec_query(PGconn * conn,const char * query,PgFdwConnState * state)683 pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
684 {
685 	/* First, process a pending asynchronous request, if any. */
686 	if (state && state->pendingAreq)
687 		process_pending_request(state->pendingAreq);
688 
689 	/*
690 	 * Submit a query.  Since we don't use non-blocking mode, this also can
691 	 * block.  But its risk is relatively small, so we ignore that for now.
692 	 */
693 	if (!PQsendQuery(conn, query))
694 		pgfdw_report_error(ERROR, NULL, conn, false, query);
695 
696 	/* Wait for the result. */
697 	return pgfdw_get_result(conn, query);
698 }
699 
700 /*
701  * Wait for the result from a prior asynchronous execution function call.
702  *
703  * This function offers quick responsiveness by checking for any interruptions.
704  *
705  * This function emulates PQexec()'s behavior of returning the last result
706  * when there are many.
707  *
708  * Caller is responsible for the error handling on the result.
709  */
710 PGresult *
pgfdw_get_result(PGconn * conn,const char * query)711 pgfdw_get_result(PGconn *conn, const char *query)
712 {
713 	PGresult   *volatile last_res = NULL;
714 
715 	/* In what follows, do not leak any PGresults on an error. */
716 	PG_TRY();
717 	{
718 		for (;;)
719 		{
720 			PGresult   *res;
721 
722 			while (PQisBusy(conn))
723 			{
724 				int			wc;
725 
726 				/* Sleep until there's something to do */
727 				wc = WaitLatchOrSocket(MyLatch,
728 									   WL_LATCH_SET | WL_SOCKET_READABLE |
729 									   WL_EXIT_ON_PM_DEATH,
730 									   PQsocket(conn),
731 									   -1L, PG_WAIT_EXTENSION);
732 				ResetLatch(MyLatch);
733 
734 				CHECK_FOR_INTERRUPTS();
735 
736 				/* Data available in socket? */
737 				if (wc & WL_SOCKET_READABLE)
738 				{
739 					if (!PQconsumeInput(conn))
740 						pgfdw_report_error(ERROR, NULL, conn, false, query);
741 				}
742 			}
743 
744 			res = PQgetResult(conn);
745 			if (res == NULL)
746 				break;			/* query is complete */
747 
748 			PQclear(last_res);
749 			last_res = res;
750 		}
751 	}
752 	PG_CATCH();
753 	{
754 		PQclear(last_res);
755 		PG_RE_THROW();
756 	}
757 	PG_END_TRY();
758 
759 	return last_res;
760 }
761 
762 /*
763  * Report an error we got from the remote server.
764  *
765  * elevel: error level to use (typically ERROR, but might be less)
766  * res: PGresult containing the error
767  * conn: connection we did the query on
768  * clear: if true, PQclear the result (otherwise caller will handle it)
769  * sql: NULL, or text of remote command we tried to execute
770  *
771  * Note: callers that choose not to throw ERROR for a remote error are
772  * responsible for making sure that the associated ConnCacheEntry gets
773  * marked with have_error = true.
774  */
775 void
pgfdw_report_error(int elevel,PGresult * res,PGconn * conn,bool clear,const char * sql)776 pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
777 				   bool clear, const char *sql)
778 {
779 	/* If requested, PGresult must be released before leaving this function. */
780 	PG_TRY();
781 	{
782 		char	   *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
783 		char	   *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
784 		char	   *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
785 		char	   *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
786 		char	   *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
787 		int			sqlstate;
788 
789 		if (diag_sqlstate)
790 			sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
791 									 diag_sqlstate[1],
792 									 diag_sqlstate[2],
793 									 diag_sqlstate[3],
794 									 diag_sqlstate[4]);
795 		else
796 			sqlstate = ERRCODE_CONNECTION_FAILURE;
797 
798 		/*
799 		 * If we don't get a message from the PGresult, try the PGconn.  This
800 		 * is needed because for connection-level failures, PQexec may just
801 		 * return NULL, not a PGresult at all.
802 		 */
803 		if (message_primary == NULL)
804 			message_primary = pchomp(PQerrorMessage(conn));
805 
806 		ereport(elevel,
807 				(errcode(sqlstate),
808 				 message_primary ? errmsg_internal("%s", message_primary) :
809 				 errmsg("could not obtain message string for remote error"),
810 				 message_detail ? errdetail_internal("%s", message_detail) : 0,
811 				 message_hint ? errhint("%s", message_hint) : 0,
812 				 message_context ? errcontext("%s", message_context) : 0,
813 				 sql ? errcontext("remote SQL command: %s", sql) : 0));
814 	}
815 	PG_FINALLY();
816 	{
817 		if (clear)
818 			PQclear(res);
819 	}
820 	PG_END_TRY();
821 }
822 
823 /*
824  * pgfdw_xact_callback --- cleanup at main-transaction end.
825  *
826  * This runs just late enough that it must not enter user-defined code
827  * locally.  (Entering such code on the remote side is fine.  Its remote
828  * COMMIT TRANSACTION may run deferred triggers.)
829  */
830 static void
pgfdw_xact_callback(XactEvent event,void * arg)831 pgfdw_xact_callback(XactEvent event, void *arg)
832 {
833 	HASH_SEQ_STATUS scan;
834 	ConnCacheEntry *entry;
835 
836 	/* Quick exit if no connections were touched in this transaction. */
837 	if (!xact_got_connection)
838 		return;
839 
840 	/*
841 	 * Scan all connection cache entries to find open remote transactions, and
842 	 * close them.
843 	 */
844 	hash_seq_init(&scan, ConnectionHash);
845 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
846 	{
847 		PGresult   *res;
848 
849 		/* Ignore cache entry if no open connection right now */
850 		if (entry->conn == NULL)
851 			continue;
852 
853 		/* If it has an open remote transaction, try to close it */
854 		if (entry->xact_depth > 0)
855 		{
856 			bool		abort_cleanup_failure = false;
857 
858 			elog(DEBUG3, "closing remote transaction on connection %p",
859 				 entry->conn);
860 
861 			switch (event)
862 			{
863 				case XACT_EVENT_PARALLEL_PRE_COMMIT:
864 				case XACT_EVENT_PRE_COMMIT:
865 
866 					/*
867 					 * If abort cleanup previously failed for this connection,
868 					 * we can't issue any more commands against it.
869 					 */
870 					pgfdw_reject_incomplete_xact_state_change(entry);
871 
872 					/* Commit all remote transactions during pre-commit */
873 					entry->changing_xact_state = true;
874 					do_sql_command(entry->conn, "COMMIT TRANSACTION");
875 					entry->changing_xact_state = false;
876 
877 					/*
878 					 * If there were any errors in subtransactions, and we
879 					 * made prepared statements, do a DEALLOCATE ALL to make
880 					 * sure we get rid of all prepared statements. This is
881 					 * annoying and not terribly bulletproof, but it's
882 					 * probably not worth trying harder.
883 					 *
884 					 * DEALLOCATE ALL only exists in 8.3 and later, so this
885 					 * constrains how old a server postgres_fdw can
886 					 * communicate with.  We intentionally ignore errors in
887 					 * the DEALLOCATE, so that we can hobble along to some
888 					 * extent with older servers (leaking prepared statements
889 					 * as we go; but we don't really support update operations
890 					 * pre-8.3 anyway).
891 					 */
892 					if (entry->have_prep_stmt && entry->have_error)
893 					{
894 						res = PQexec(entry->conn, "DEALLOCATE ALL");
895 						PQclear(res);
896 					}
897 					entry->have_prep_stmt = false;
898 					entry->have_error = false;
899 					break;
900 				case XACT_EVENT_PRE_PREPARE:
901 
902 					/*
903 					 * We disallow any remote transactions, since it's not
904 					 * very reasonable to hold them open until the prepared
905 					 * transaction is committed.  For the moment, throw error
906 					 * unconditionally; later we might allow read-only cases.
907 					 * Note that the error will cause us to come right back
908 					 * here with event == XACT_EVENT_ABORT, so we'll clean up
909 					 * the connection state at that point.
910 					 */
911 					ereport(ERROR,
912 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
913 							 errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
914 					break;
915 				case XACT_EVENT_PARALLEL_COMMIT:
916 				case XACT_EVENT_COMMIT:
917 				case XACT_EVENT_PREPARE:
918 					/* Pre-commit should have closed the open transaction */
919 					elog(ERROR, "missed cleaning up connection during pre-commit");
920 					break;
921 				case XACT_EVENT_PARALLEL_ABORT:
922 				case XACT_EVENT_ABORT:
923 
924 					/*
925 					 * Don't try to clean up the connection if we're already
926 					 * in error recursion trouble.
927 					 */
928 					if (in_error_recursion_trouble())
929 						entry->changing_xact_state = true;
930 
931 					/*
932 					 * If connection is already unsalvageable, don't touch it
933 					 * further.
934 					 */
935 					if (entry->changing_xact_state)
936 						break;
937 
938 					/*
939 					 * Mark this connection as in the process of changing
940 					 * transaction state.
941 					 */
942 					entry->changing_xact_state = true;
943 
944 					/* Assume we might have lost track of prepared statements */
945 					entry->have_error = true;
946 
947 					/*
948 					 * If a command has been submitted to the remote server by
949 					 * using an asynchronous execution function, the command
950 					 * might not have yet completed.  Check to see if a
951 					 * command is still being processed by the remote server,
952 					 * and if so, request cancellation of the command.
953 					 */
954 					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
955 						!pgfdw_cancel_query(entry->conn))
956 					{
957 						/* Unable to cancel running query. */
958 						abort_cleanup_failure = true;
959 					}
960 					else if (!pgfdw_exec_cleanup_query(entry->conn,
961 													   "ABORT TRANSACTION",
962 													   false))
963 					{
964 						/* Unable to abort remote transaction. */
965 						abort_cleanup_failure = true;
966 					}
967 					else if (entry->have_prep_stmt && entry->have_error &&
968 							 !pgfdw_exec_cleanup_query(entry->conn,
969 													   "DEALLOCATE ALL",
970 													   true))
971 					{
972 						/* Trouble clearing prepared statements. */
973 						abort_cleanup_failure = true;
974 					}
975 					else
976 					{
977 						entry->have_prep_stmt = false;
978 						entry->have_error = false;
979 						/* Also reset per-connection state */
980 						memset(&entry->state, 0, sizeof(entry->state));
981 					}
982 
983 					/* Disarm changing_xact_state if it all worked. */
984 					entry->changing_xact_state = abort_cleanup_failure;
985 					break;
986 			}
987 		}
988 
989 		/* Reset state to show we're out of a transaction */
990 		entry->xact_depth = 0;
991 
992 		/*
993 		 * If the connection isn't in a good idle state, it is marked as
994 		 * invalid or keep_connections option of its server is disabled, then
995 		 * discard it to recover. Next GetConnection will open a new
996 		 * connection.
997 		 */
998 		if (PQstatus(entry->conn) != CONNECTION_OK ||
999 			PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1000 			entry->changing_xact_state ||
1001 			entry->invalidated ||
1002 			!entry->keep_connections)
1003 		{
1004 			elog(DEBUG3, "discarding connection %p", entry->conn);
1005 			disconnect_pg_server(entry);
1006 		}
1007 	}
1008 
1009 	/*
1010 	 * Regardless of the event type, we can now mark ourselves as out of the
1011 	 * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1012 	 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1013 	 */
1014 	xact_got_connection = false;
1015 
1016 	/* Also reset cursor numbering for next transaction */
1017 	cursor_number = 0;
1018 }
1019 
1020 /*
1021  * pgfdw_subxact_callback --- cleanup at subtransaction end.
1022  */
1023 static void
pgfdw_subxact_callback(SubXactEvent event,SubTransactionId mySubid,SubTransactionId parentSubid,void * arg)1024 pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1025 					   SubTransactionId parentSubid, void *arg)
1026 {
1027 	HASH_SEQ_STATUS scan;
1028 	ConnCacheEntry *entry;
1029 	int			curlevel;
1030 
1031 	/* Nothing to do at subxact start, nor after commit. */
1032 	if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1033 		  event == SUBXACT_EVENT_ABORT_SUB))
1034 		return;
1035 
1036 	/* Quick exit if no connections were touched in this transaction. */
1037 	if (!xact_got_connection)
1038 		return;
1039 
1040 	/*
1041 	 * Scan all connection cache entries to find open remote subtransactions
1042 	 * of the current level, and close them.
1043 	 */
1044 	curlevel = GetCurrentTransactionNestLevel();
1045 	hash_seq_init(&scan, ConnectionHash);
1046 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1047 	{
1048 		char		sql[100];
1049 
1050 		/*
1051 		 * We only care about connections with open remote subtransactions of
1052 		 * the current level.
1053 		 */
1054 		if (entry->conn == NULL || entry->xact_depth < curlevel)
1055 			continue;
1056 
1057 		if (entry->xact_depth > curlevel)
1058 			elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1059 				 entry->xact_depth);
1060 
1061 		if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1062 		{
1063 			/*
1064 			 * If abort cleanup previously failed for this connection, we
1065 			 * can't issue any more commands against it.
1066 			 */
1067 			pgfdw_reject_incomplete_xact_state_change(entry);
1068 
1069 			/* Commit all remote subtransactions during pre-commit */
1070 			snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1071 			entry->changing_xact_state = true;
1072 			do_sql_command(entry->conn, sql);
1073 			entry->changing_xact_state = false;
1074 		}
1075 		else if (in_error_recursion_trouble())
1076 		{
1077 			/*
1078 			 * Don't try to clean up the connection if we're already in error
1079 			 * recursion trouble.
1080 			 */
1081 			entry->changing_xact_state = true;
1082 		}
1083 		else if (!entry->changing_xact_state)
1084 		{
1085 			bool		abort_cleanup_failure = false;
1086 
1087 			/* Remember that abort cleanup is in progress. */
1088 			entry->changing_xact_state = true;
1089 
1090 			/* Assume we might have lost track of prepared statements */
1091 			entry->have_error = true;
1092 
1093 			/*
1094 			 * If a command has been submitted to the remote server by using
1095 			 * an asynchronous execution function, the command might not have
1096 			 * yet completed.  Check to see if a command is still being
1097 			 * processed by the remote server, and if so, request cancellation
1098 			 * of the command.
1099 			 */
1100 			if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1101 				!pgfdw_cancel_query(entry->conn))
1102 				abort_cleanup_failure = true;
1103 			else
1104 			{
1105 				/* Rollback all remote subtransactions during abort */
1106 				snprintf(sql, sizeof(sql),
1107 						 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
1108 						 curlevel, curlevel);
1109 				if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1110 					abort_cleanup_failure = true;
1111 			}
1112 
1113 			/* Disarm changing_xact_state if it all worked. */
1114 			entry->changing_xact_state = abort_cleanup_failure;
1115 		}
1116 
1117 		/* OK, we're outta that level of subtransaction */
1118 		entry->xact_depth--;
1119 	}
1120 }
1121 
1122 /*
1123  * Connection invalidation callback function
1124  *
1125  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1126  * close connections depending on that entry immediately if current transaction
1127  * has not used those connections yet. Otherwise, mark those connections as
1128  * invalid and then make pgfdw_xact_callback() close them at the end of current
1129  * transaction, since they cannot be closed in the midst of the transaction
1130  * using them. Closed connections will be remade at the next opportunity if
1131  * necessary.
1132  *
1133  * Although most cache invalidation callbacks blow away all the related stuff
1134  * regardless of the given hashvalue, connections are expensive enough that
1135  * it's worth trying to avoid that.
1136  *
1137  * NB: We could avoid unnecessary disconnection more strictly by examining
1138  * individual option values, but it seems too much effort for the gain.
1139  */
1140 static void
pgfdw_inval_callback(Datum arg,int cacheid,uint32 hashvalue)1141 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1142 {
1143 	HASH_SEQ_STATUS scan;
1144 	ConnCacheEntry *entry;
1145 
1146 	Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1147 
1148 	/* ConnectionHash must exist already, if we're registered */
1149 	hash_seq_init(&scan, ConnectionHash);
1150 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1151 	{
1152 		/* Ignore invalid entries */
1153 		if (entry->conn == NULL)
1154 			continue;
1155 
1156 		/* hashvalue == 0 means a cache reset, must clear all state */
1157 		if (hashvalue == 0 ||
1158 			(cacheid == FOREIGNSERVEROID &&
1159 			 entry->server_hashvalue == hashvalue) ||
1160 			(cacheid == USERMAPPINGOID &&
1161 			 entry->mapping_hashvalue == hashvalue))
1162 		{
1163 			/*
1164 			 * Close the connection immediately if it's not used yet in this
1165 			 * transaction. Otherwise mark it as invalid so that
1166 			 * pgfdw_xact_callback() can close it at the end of this
1167 			 * transaction.
1168 			 */
1169 			if (entry->xact_depth == 0)
1170 			{
1171 				elog(DEBUG3, "discarding connection %p", entry->conn);
1172 				disconnect_pg_server(entry);
1173 			}
1174 			else
1175 				entry->invalidated = true;
1176 		}
1177 	}
1178 }
1179 
1180 /*
1181  * Raise an error if the given connection cache entry is marked as being
1182  * in the middle of an xact state change.  This should be called at which no
1183  * such change is expected to be in progress; if one is found to be in
1184  * progress, it means that we aborted in the middle of a previous state change
1185  * and now don't know what the remote transaction state actually is.
1186  * Such connections can't safely be further used.  Re-establishing the
1187  * connection would change the snapshot and roll back any writes already
1188  * performed, so that's not an option, either. Thus, we must abort.
1189  */
1190 static void
pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry * entry)1191 pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
1192 {
1193 	ForeignServer *server;
1194 
1195 	/* nothing to do for inactive entries and entries of sane state */
1196 	if (entry->conn == NULL || !entry->changing_xact_state)
1197 		return;
1198 
1199 	/* make sure this entry is inactive */
1200 	disconnect_pg_server(entry);
1201 
1202 	/* find server name to be shown in the message below */
1203 	server = GetForeignServer(entry->serverid);
1204 
1205 	ereport(ERROR,
1206 			(errcode(ERRCODE_CONNECTION_EXCEPTION),
1207 			 errmsg("connection to server \"%s\" was lost",
1208 					server->servername)));
1209 }
1210 
1211 /*
1212  * Cancel the currently-in-progress query (whose query text we do not have)
1213  * and ignore the result.  Returns true if we successfully cancel the query
1214  * and discard any pending result, and false if not.
1215  *
1216  * It's not a huge problem if we throw an ERROR here, but if we get into error
1217  * recursion trouble, we'll end up slamming the connection shut, which will
1218  * necessitate failing the entire toplevel transaction even if subtransactions
1219  * were used.  Try to use WARNING where we can.
1220  *
1221  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1222  * query text from the pendingAreq saved in the per-connection state, then
1223  * report the query using it.
1224  */
1225 static bool
pgfdw_cancel_query(PGconn * conn)1226 pgfdw_cancel_query(PGconn *conn)
1227 {
1228 	PGcancel   *cancel;
1229 	char		errbuf[256];
1230 	PGresult   *result = NULL;
1231 	TimestampTz endtime;
1232 
1233 	/*
1234 	 * If it takes too long to cancel the query and discard the result, assume
1235 	 * the connection is dead.
1236 	 */
1237 	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
1238 
1239 	/*
1240 	 * Issue cancel request.  Unfortunately, there's no good way to limit the
1241 	 * amount of time that we might block inside PQgetCancel().
1242 	 */
1243 	if ((cancel = PQgetCancel(conn)))
1244 	{
1245 		if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1246 		{
1247 			ereport(WARNING,
1248 					(errcode(ERRCODE_CONNECTION_FAILURE),
1249 					 errmsg("could not send cancel request: %s",
1250 							errbuf)));
1251 			PQfreeCancel(cancel);
1252 			return false;
1253 		}
1254 		PQfreeCancel(cancel);
1255 	}
1256 
1257 	/* Get and discard the result of the query. */
1258 	if (pgfdw_get_cleanup_result(conn, endtime, &result))
1259 		return false;
1260 	PQclear(result);
1261 
1262 	return true;
1263 }
1264 
1265 /*
1266  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1267  * result.  If the query is executed without error, the return value is true.
1268  * If the query is executed successfully but returns an error, the return
1269  * value is true if and only if ignore_errors is set.  If the query can't be
1270  * sent or times out, the return value is false.
1271  *
1272  * It's not a huge problem if we throw an ERROR here, but if we get into error
1273  * recursion trouble, we'll end up slamming the connection shut, which will
1274  * necessitate failing the entire toplevel transaction even if subtransactions
1275  * were used.  Try to use WARNING where we can.
1276  */
1277 static bool
pgfdw_exec_cleanup_query(PGconn * conn,const char * query,bool ignore_errors)1278 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1279 {
1280 	PGresult   *result = NULL;
1281 	TimestampTz endtime;
1282 
1283 	/*
1284 	 * If it takes too long to execute a cleanup query, assume the connection
1285 	 * is dead.  It's fairly likely that this is why we aborted in the first
1286 	 * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1287 	 * be too long.
1288 	 */
1289 	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
1290 
1291 	/*
1292 	 * Submit a query.  Since we don't use non-blocking mode, this also can
1293 	 * block.  But its risk is relatively small, so we ignore that for now.
1294 	 */
1295 	if (!PQsendQuery(conn, query))
1296 	{
1297 		pgfdw_report_error(WARNING, NULL, conn, false, query);
1298 		return false;
1299 	}
1300 
1301 	/* Get the result of the query. */
1302 	if (pgfdw_get_cleanup_result(conn, endtime, &result))
1303 		return false;
1304 
1305 	/* Issue a warning if not successful. */
1306 	if (PQresultStatus(result) != PGRES_COMMAND_OK)
1307 	{
1308 		pgfdw_report_error(WARNING, result, conn, true, query);
1309 		return ignore_errors;
1310 	}
1311 	PQclear(result);
1312 
1313 	return true;
1314 }
1315 
1316 /*
1317  * Get, during abort cleanup, the result of a query that is in progress.  This
1318  * might be a query that is being interrupted by transaction abort, or it might
1319  * be a query that was initiated as part of transaction abort to get the remote
1320  * side back to the appropriate state.
1321  *
1322  * endtime is the time at which we should give up and assume the remote
1323  * side is dead.  Returns true if the timeout expired, otherwise false.
1324  * Sets *result except in case of a timeout.
1325  */
1326 static bool
pgfdw_get_cleanup_result(PGconn * conn,TimestampTz endtime,PGresult ** result)1327 pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
1328 {
1329 	volatile bool timed_out = false;
1330 	PGresult   *volatile last_res = NULL;
1331 
1332 	/* In what follows, do not leak any PGresults on an error. */
1333 	PG_TRY();
1334 	{
1335 		for (;;)
1336 		{
1337 			PGresult   *res;
1338 
1339 			while (PQisBusy(conn))
1340 			{
1341 				int			wc;
1342 				TimestampTz now = GetCurrentTimestamp();
1343 				long		cur_timeout;
1344 
1345 				/* If timeout has expired, give up, else get sleep time. */
1346 				cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1347 				if (cur_timeout <= 0)
1348 				{
1349 					timed_out = true;
1350 					goto exit;
1351 				}
1352 
1353 				/* Sleep until there's something to do */
1354 				wc = WaitLatchOrSocket(MyLatch,
1355 									   WL_LATCH_SET | WL_SOCKET_READABLE |
1356 									   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1357 									   PQsocket(conn),
1358 									   cur_timeout, PG_WAIT_EXTENSION);
1359 				ResetLatch(MyLatch);
1360 
1361 				CHECK_FOR_INTERRUPTS();
1362 
1363 				/* Data available in socket? */
1364 				if (wc & WL_SOCKET_READABLE)
1365 				{
1366 					if (!PQconsumeInput(conn))
1367 					{
1368 						/* connection trouble; treat the same as a timeout */
1369 						timed_out = true;
1370 						goto exit;
1371 					}
1372 				}
1373 			}
1374 
1375 			res = PQgetResult(conn);
1376 			if (res == NULL)
1377 				break;			/* query is complete */
1378 
1379 			PQclear(last_res);
1380 			last_res = res;
1381 		}
1382 exit:	;
1383 	}
1384 	PG_CATCH();
1385 	{
1386 		PQclear(last_res);
1387 		PG_RE_THROW();
1388 	}
1389 	PG_END_TRY();
1390 
1391 	if (timed_out)
1392 		PQclear(last_res);
1393 	else
1394 		*result = last_res;
1395 	return timed_out;
1396 }
1397 
1398 /*
1399  * List active foreign server connections.
1400  *
1401  * This function takes no input parameter and returns setof record made of
1402  * following values:
1403  * - server_name - server name of active connection. In case the foreign server
1404  *   is dropped but still the connection is active, then the server name will
1405  *   be NULL in output.
1406  * - valid - true/false representing whether the connection is valid or not.
1407  * 	 Note that the connections can get invalidated in pgfdw_inval_callback.
1408  *
1409  * No records are returned when there are no cached connections at all.
1410  */
1411 Datum
postgres_fdw_get_connections(PG_FUNCTION_ARGS)1412 postgres_fdw_get_connections(PG_FUNCTION_ARGS)
1413 {
1414 #define POSTGRES_FDW_GET_CONNECTIONS_COLS	2
1415 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1416 	TupleDesc	tupdesc;
1417 	Tuplestorestate *tupstore;
1418 	MemoryContext per_query_ctx;
1419 	MemoryContext oldcontext;
1420 	HASH_SEQ_STATUS scan;
1421 	ConnCacheEntry *entry;
1422 
1423 	/* check to see if caller supports us returning a tuplestore */
1424 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1425 		ereport(ERROR,
1426 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1427 				 errmsg("set-valued function called in context that cannot accept a set")));
1428 	if (!(rsinfo->allowedModes & SFRM_Materialize))
1429 		ereport(ERROR,
1430 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1431 				 errmsg("materialize mode required, but it is not allowed in this context")));
1432 
1433 	/* Build a tuple descriptor for our result type */
1434 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1435 		elog(ERROR, "return type must be a row type");
1436 
1437 	/* Build tuplestore to hold the result rows */
1438 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1439 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
1440 
1441 	tupstore = tuplestore_begin_heap(true, false, work_mem);
1442 	rsinfo->returnMode = SFRM_Materialize;
1443 	rsinfo->setResult = tupstore;
1444 	rsinfo->setDesc = tupdesc;
1445 
1446 	MemoryContextSwitchTo(oldcontext);
1447 
1448 	/* If cache doesn't exist, we return no records */
1449 	if (!ConnectionHash)
1450 	{
1451 		/* clean up and return the tuplestore */
1452 		tuplestore_donestoring(tupstore);
1453 
1454 		PG_RETURN_VOID();
1455 	}
1456 
1457 	hash_seq_init(&scan, ConnectionHash);
1458 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1459 	{
1460 		ForeignServer *server;
1461 		Datum		values[POSTGRES_FDW_GET_CONNECTIONS_COLS];
1462 		bool		nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS];
1463 
1464 		/* We only look for open remote connections */
1465 		if (!entry->conn)
1466 			continue;
1467 
1468 		server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
1469 
1470 		MemSet(values, 0, sizeof(values));
1471 		MemSet(nulls, 0, sizeof(nulls));
1472 
1473 		/*
1474 		 * The foreign server may have been dropped in current explicit
1475 		 * transaction. It is not possible to drop the server from another
1476 		 * session when the connection associated with it is in use in the
1477 		 * current transaction, if tried so, the drop query in another session
1478 		 * blocks until the current transaction finishes.
1479 		 *
1480 		 * Even though the server is dropped in the current transaction, the
1481 		 * cache can still have associated active connection entry, say we
1482 		 * call such connections dangling. Since we can not fetch the server
1483 		 * name from system catalogs for dangling connections, instead we show
1484 		 * NULL value for server name in output.
1485 		 *
1486 		 * We could have done better by storing the server name in the cache
1487 		 * entry instead of server oid so that it could be used in the output.
1488 		 * But the server name in each cache entry requires 64 bytes of
1489 		 * memory, which is huge, when there are many cached connections and
1490 		 * the use case i.e. dropping the foreign server within the explicit
1491 		 * current transaction seems rare. So, we chose to show NULL value for
1492 		 * server name in output.
1493 		 *
1494 		 * Such dangling connections get closed either in next use or at the
1495 		 * end of current explicit transaction in pgfdw_xact_callback.
1496 		 */
1497 		if (!server)
1498 		{
1499 			/*
1500 			 * If the server has been dropped in the current explicit
1501 			 * transaction, then this entry would have been invalidated in
1502 			 * pgfdw_inval_callback at the end of drop server command. Note
1503 			 * that this connection would not have been closed in
1504 			 * pgfdw_inval_callback because it is still being used in the
1505 			 * current explicit transaction. So, assert that here.
1506 			 */
1507 			Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
1508 
1509 			/* Show null, if no server name was found */
1510 			nulls[0] = true;
1511 		}
1512 		else
1513 			values[0] = CStringGetTextDatum(server->servername);
1514 
1515 		values[1] = BoolGetDatum(!entry->invalidated);
1516 
1517 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1518 	}
1519 
1520 	/* clean up and return the tuplestore */
1521 	tuplestore_donestoring(tupstore);
1522 
1523 	PG_RETURN_VOID();
1524 }
1525 
1526 /*
1527  * Disconnect the specified cached connections.
1528  *
1529  * This function discards the open connections that are established by
1530  * postgres_fdw from the local session to the foreign server with
1531  * the given name. Note that there can be multiple connections to
1532  * the given server using different user mappings. If the connections
1533  * are used in the current local transaction, they are not disconnected
1534  * and warning messages are reported. This function returns true
1535  * if it disconnects at least one connection, otherwise false. If no
1536  * foreign server with the given name is found, an error is reported.
1537  */
1538 Datum
postgres_fdw_disconnect(PG_FUNCTION_ARGS)1539 postgres_fdw_disconnect(PG_FUNCTION_ARGS)
1540 {
1541 	ForeignServer *server;
1542 	char	   *servername;
1543 
1544 	servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
1545 	server = GetForeignServerByName(servername, false);
1546 
1547 	PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
1548 }
1549 
1550 /*
1551  * Disconnect all the cached connections.
1552  *
1553  * This function discards all the open connections that are established by
1554  * postgres_fdw from the local session to the foreign servers.
1555  * If the connections are used in the current local transaction, they are
1556  * not disconnected and warning messages are reported. This function
1557  * returns true if it disconnects at least one connection, otherwise false.
1558  */
1559 Datum
postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)1560 postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
1561 {
1562 	PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
1563 }
1564 
1565 /*
1566  * Workhorse to disconnect cached connections.
1567  *
1568  * This function scans all the connection cache entries and disconnects
1569  * the open connections whose foreign server OID matches with
1570  * the specified one. If InvalidOid is specified, it disconnects all
1571  * the cached connections.
1572  *
1573  * This function emits a warning for each connection that's used in
1574  * the current transaction and doesn't close it. It returns true if
1575  * it disconnects at least one connection, otherwise false.
1576  *
1577  * Note that this function disconnects even the connections that are
1578  * established by other users in the same local session using different
1579  * user mappings. This leads even non-superuser to be able to close
1580  * the connections established by superusers in the same local session.
1581  *
1582  * XXX As of now we don't see any security risk doing this. But we should
1583  * set some restrictions on that, for example, prevent non-superuser
1584  * from closing the connections established by superusers even
1585  * in the same session?
1586  */
1587 static bool
disconnect_cached_connections(Oid serverid)1588 disconnect_cached_connections(Oid serverid)
1589 {
1590 	HASH_SEQ_STATUS scan;
1591 	ConnCacheEntry *entry;
1592 	bool		all = !OidIsValid(serverid);
1593 	bool		result = false;
1594 
1595 	/*
1596 	 * Connection cache hashtable has not been initialized yet in this
1597 	 * session, so return false.
1598 	 */
1599 	if (!ConnectionHash)
1600 		return false;
1601 
1602 	hash_seq_init(&scan, ConnectionHash);
1603 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1604 	{
1605 		/* Ignore cache entry if no open connection right now. */
1606 		if (!entry->conn)
1607 			continue;
1608 
1609 		if (all || entry->serverid == serverid)
1610 		{
1611 			/*
1612 			 * Emit a warning because the connection to close is used in the
1613 			 * current transaction and cannot be disconnected right now.
1614 			 */
1615 			if (entry->xact_depth > 0)
1616 			{
1617 				ForeignServer *server;
1618 
1619 				server = GetForeignServerExtended(entry->serverid,
1620 												  FSV_MISSING_OK);
1621 
1622 				if (!server)
1623 				{
1624 					/*
1625 					 * If the foreign server was dropped while its connection
1626 					 * was used in the current transaction, the connection
1627 					 * must have been marked as invalid by
1628 					 * pgfdw_inval_callback at the end of DROP SERVER command.
1629 					 */
1630 					Assert(entry->invalidated);
1631 
1632 					ereport(WARNING,
1633 							(errmsg("cannot close dropped server connection because it is still in use")));
1634 				}
1635 				else
1636 					ereport(WARNING,
1637 							(errmsg("cannot close connection for server \"%s\" because it is still in use",
1638 									server->servername)));
1639 			}
1640 			else
1641 			{
1642 				elog(DEBUG3, "discarding connection %p", entry->conn);
1643 				disconnect_pg_server(entry);
1644 				result = true;
1645 			}
1646 		}
1647 	}
1648 
1649 	return result;
1650 }
1651