1 /*-------------------------------------------------------------------------
2 *
3 * connection.c
4 * Connection management functions for postgres_fdw
5 *
6 * Portions Copyright (c) 2012-2021, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/postgres_fdw/connection.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "access/htup_details.h"
16 #include "access/xact.h"
17 #include "catalog/pg_user_mapping.h"
18 #include "commands/defrem.h"
19 #include "funcapi.h"
20 #include "mb/pg_wchar.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postgres_fdw.h"
24 #include "storage/fd.h"
25 #include "storage/latch.h"
26 #include "utils/builtins.h"
27 #include "utils/datetime.h"
28 #include "utils/hsearch.h"
29 #include "utils/inval.h"
30 #include "utils/memutils.h"
31 #include "utils/syscache.h"
32
33 /*
34 * Connection cache hash table entry
35 *
36 * The lookup key in this hash table is the user mapping OID. We use just one
37 * connection per user mapping ID, which ensures that all the scans use the
38 * same snapshot during a query. Using the user mapping OID rather than
39 * the foreign server OID + user OID avoids creating multiple connections when
40 * the public user mapping applies to all user OIDs.
41 *
42 * The "conn" pointer can be NULL if we don't currently have a live connection.
43 * When we do have a connection, xact_depth tracks the current depth of
44 * transactions and subtransactions open on the remote side. We need to issue
45 * commands at the same nesting depth on the remote as we're executing at
46 * ourselves, so that rolling back a subtransaction will kill the right
47 * queries and not the wrong ones.
48 */
49 typedef Oid ConnCacheKey;
50
51 typedef struct ConnCacheEntry
52 {
53 ConnCacheKey key; /* hash key (must be first) */
54 PGconn *conn; /* connection to foreign server, or NULL */
55 /* Remaining fields are invalid when conn is NULL: */
56 int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
57 * one level of subxact open, etc */
58 bool have_prep_stmt; /* have we prepared any stmts in this xact? */
59 bool have_error; /* have any subxacts aborted in this xact? */
60 bool changing_xact_state; /* xact state change in process */
61 bool invalidated; /* true if reconnect is pending */
62 bool keep_connections; /* setting value of keep_connections
63 * server option */
64 Oid serverid; /* foreign server OID used to get server name */
65 uint32 server_hashvalue; /* hash value of foreign server OID */
66 uint32 mapping_hashvalue; /* hash value of user mapping OID */
67 PgFdwConnState state; /* extra per-connection state */
68 } ConnCacheEntry;
69
70 /*
71 * Connection cache (initialized on first use)
72 */
73 static HTAB *ConnectionHash = NULL;
74
75 /* for assigning cursor numbers and prepared statement numbers */
76 static unsigned int cursor_number = 0;
77 static unsigned int prep_stmt_number = 0;
78
79 /* tracks whether any work is needed in callback functions */
80 static bool xact_got_connection = false;
81
82 /*
83 * SQL functions
84 */
85 PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
86 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
87 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
88
89 /* prototypes of private functions */
90 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
91 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
92 static void disconnect_pg_server(ConnCacheEntry *entry);
93 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
94 static void configure_remote_session(PGconn *conn);
95 static void begin_remote_xact(ConnCacheEntry *entry);
96 static void pgfdw_xact_callback(XactEvent event, void *arg);
97 static void pgfdw_subxact_callback(SubXactEvent event,
98 SubTransactionId mySubid,
99 SubTransactionId parentSubid,
100 void *arg);
101 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
102 static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
103 static bool pgfdw_cancel_query(PGconn *conn);
104 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
105 bool ignore_errors);
106 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
107 PGresult **result);
108 static bool UserMappingPasswordRequired(UserMapping *user);
109 static bool disconnect_cached_connections(Oid serverid);
110
111 /*
112 * Get a PGconn which can be used to execute queries on the remote PostgreSQL
113 * server with the user's authorization. A new connection is established
114 * if we don't already have a suitable one, and a transaction is opened at
115 * the right subtransaction nesting depth if we didn't do that already.
116 *
117 * will_prep_stmt must be true if caller intends to create any prepared
118 * statements. Since those don't go away automatically at transaction end
119 * (not even on error), we need this flag to cue manual cleanup.
120 *
121 * If state is not NULL, *state receives the per-connection state associated
122 * with the PGconn.
123 */
124 PGconn *
GetConnection(UserMapping * user,bool will_prep_stmt,PgFdwConnState ** state)125 GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
126 {
127 bool found;
128 bool retry = false;
129 ConnCacheEntry *entry;
130 ConnCacheKey key;
131 MemoryContext ccxt = CurrentMemoryContext;
132
133 /* First time through, initialize connection cache hashtable */
134 if (ConnectionHash == NULL)
135 {
136 HASHCTL ctl;
137
138 ctl.keysize = sizeof(ConnCacheKey);
139 ctl.entrysize = sizeof(ConnCacheEntry);
140 ConnectionHash = hash_create("postgres_fdw connections", 8,
141 &ctl,
142 HASH_ELEM | HASH_BLOBS);
143
144 /*
145 * Register some callback functions that manage connection cleanup.
146 * This should be done just once in each backend.
147 */
148 RegisterXactCallback(pgfdw_xact_callback, NULL);
149 RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
150 CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
151 pgfdw_inval_callback, (Datum) 0);
152 CacheRegisterSyscacheCallback(USERMAPPINGOID,
153 pgfdw_inval_callback, (Datum) 0);
154 }
155
156 /* Set flag that we did GetConnection during the current transaction */
157 xact_got_connection = true;
158
159 /* Create hash key for the entry. Assume no pad bytes in key struct */
160 key = user->umid;
161
162 /*
163 * Find or create cached entry for requested connection.
164 */
165 entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
166 if (!found)
167 {
168 /*
169 * We need only clear "conn" here; remaining fields will be filled
170 * later when "conn" is set.
171 */
172 entry->conn = NULL;
173 }
174
175 /* Reject further use of connections which failed abort cleanup. */
176 pgfdw_reject_incomplete_xact_state_change(entry);
177
178 /*
179 * If the connection needs to be remade due to invalidation, disconnect as
180 * soon as we're out of all transactions.
181 */
182 if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
183 {
184 elog(DEBUG3, "closing connection %p for option changes to take effect",
185 entry->conn);
186 disconnect_pg_server(entry);
187 }
188
189 /*
190 * If cache entry doesn't have a connection, we have to establish a new
191 * connection. (If connect_pg_server throws an error, the cache entry
192 * will remain in a valid empty state, ie conn == NULL.)
193 */
194 if (entry->conn == NULL)
195 make_new_connection(entry, user);
196
197 /*
198 * We check the health of the cached connection here when using it. In
199 * cases where we're out of all transactions, if a broken connection is
200 * detected, we try to reestablish a new connection later.
201 */
202 PG_TRY();
203 {
204 /* Process a pending asynchronous request if any. */
205 if (entry->state.pendingAreq)
206 process_pending_request(entry->state.pendingAreq);
207 /* Start a new transaction or subtransaction if needed. */
208 begin_remote_xact(entry);
209 }
210 PG_CATCH();
211 {
212 MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
213 ErrorData *errdata = CopyErrorData();
214
215 /*
216 * Determine whether to try to reestablish the connection.
217 *
218 * After a broken connection is detected in libpq, any error other
219 * than connection failure (e.g., out-of-memory) can be thrown
220 * somewhere between return from libpq and the expected ereport() call
221 * in pgfdw_report_error(). In this case, since PQstatus() indicates
222 * CONNECTION_BAD, checking only PQstatus() causes the false detection
223 * of connection failure. To avoid this, we also verify that the
224 * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
225 * checking only the sqlstate can cause another false detection
226 * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
227 * for any libpq-originated error condition.
228 */
229 if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
230 PQstatus(entry->conn) != CONNECTION_BAD ||
231 entry->xact_depth > 0)
232 {
233 MemoryContextSwitchTo(ecxt);
234 PG_RE_THROW();
235 }
236
237 /* Clean up the error state */
238 FlushErrorState();
239 FreeErrorData(errdata);
240 errdata = NULL;
241
242 retry = true;
243 }
244 PG_END_TRY();
245
246 /*
247 * If a broken connection is detected, disconnect it, reestablish a new
248 * connection and retry a new remote transaction. If connection failure is
249 * reported again, we give up getting a connection.
250 */
251 if (retry)
252 {
253 Assert(entry->xact_depth == 0);
254
255 ereport(DEBUG3,
256 (errmsg_internal("could not start remote transaction on connection %p",
257 entry->conn)),
258 errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
259
260 elog(DEBUG3, "closing connection %p to reestablish a new one",
261 entry->conn);
262 disconnect_pg_server(entry);
263
264 if (entry->conn == NULL)
265 make_new_connection(entry, user);
266
267 begin_remote_xact(entry);
268 }
269
270 /* Remember if caller will prepare statements */
271 entry->have_prep_stmt |= will_prep_stmt;
272
273 /* If caller needs access to the per-connection state, return it. */
274 if (state)
275 *state = &entry->state;
276
277 return entry->conn;
278 }
279
280 /*
281 * Reset all transient state fields in the cached connection entry and
282 * establish new connection to the remote server.
283 */
284 static void
make_new_connection(ConnCacheEntry * entry,UserMapping * user)285 make_new_connection(ConnCacheEntry *entry, UserMapping *user)
286 {
287 ForeignServer *server = GetForeignServer(user->serverid);
288 ListCell *lc;
289
290 Assert(entry->conn == NULL);
291
292 /* Reset all transient state fields, to be sure all are clean */
293 entry->xact_depth = 0;
294 entry->have_prep_stmt = false;
295 entry->have_error = false;
296 entry->changing_xact_state = false;
297 entry->invalidated = false;
298 entry->serverid = server->serverid;
299 entry->server_hashvalue =
300 GetSysCacheHashValue1(FOREIGNSERVEROID,
301 ObjectIdGetDatum(server->serverid));
302 entry->mapping_hashvalue =
303 GetSysCacheHashValue1(USERMAPPINGOID,
304 ObjectIdGetDatum(user->umid));
305 memset(&entry->state, 0, sizeof(entry->state));
306
307 /*
308 * Determine whether to keep the connection that we're about to make here
309 * open even after the transaction using it ends, so that the subsequent
310 * transactions can re-use it.
311 *
312 * It's enough to determine this only when making new connection because
313 * all the connections to the foreign server whose keep_connections option
314 * is changed will be closed and re-made later.
315 *
316 * By default, all the connections to any foreign servers are kept open.
317 */
318 entry->keep_connections = true;
319 foreach(lc, server->options)
320 {
321 DefElem *def = (DefElem *) lfirst(lc);
322
323 if (strcmp(def->defname, "keep_connections") == 0)
324 entry->keep_connections = defGetBoolean(def);
325 }
326
327 /* Now try to make the connection */
328 entry->conn = connect_pg_server(server, user);
329
330 elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
331 entry->conn, server->servername, user->umid, user->userid);
332 }
333
334 /*
335 * Connect to remote server using specified server and user mapping properties.
336 */
337 static PGconn *
connect_pg_server(ForeignServer * server,UserMapping * user)338 connect_pg_server(ForeignServer *server, UserMapping *user)
339 {
340 PGconn *volatile conn = NULL;
341
342 /*
343 * Use PG_TRY block to ensure closing connection on error.
344 */
345 PG_TRY();
346 {
347 const char **keywords;
348 const char **values;
349 int n;
350
351 /*
352 * Construct connection params from generic options of ForeignServer
353 * and UserMapping. (Some of them might not be libpq options, in
354 * which case we'll just waste a few array slots.) Add 3 extra slots
355 * for fallback_application_name, client_encoding, end marker.
356 */
357 n = list_length(server->options) + list_length(user->options) + 3;
358 keywords = (const char **) palloc(n * sizeof(char *));
359 values = (const char **) palloc(n * sizeof(char *));
360
361 n = 0;
362 n += ExtractConnectionOptions(server->options,
363 keywords + n, values + n);
364 n += ExtractConnectionOptions(user->options,
365 keywords + n, values + n);
366
367 /* Use "postgres_fdw" as fallback_application_name. */
368 keywords[n] = "fallback_application_name";
369 values[n] = "postgres_fdw";
370 n++;
371
372 /* Set client_encoding so that libpq can convert encoding properly. */
373 keywords[n] = "client_encoding";
374 values[n] = GetDatabaseEncodingName();
375 n++;
376
377 keywords[n] = values[n] = NULL;
378
379 /* verify the set of connection parameters */
380 check_conn_params(keywords, values, user);
381
382 /*
383 * We must obey fd.c's limit on non-virtual file descriptors. Assume
384 * that a PGconn represents one long-lived FD. (Doing this here also
385 * ensures that VFDs are closed if needed to make room.)
386 */
387 if (!AcquireExternalFD())
388 {
389 #ifndef WIN32 /* can't write #if within ereport() macro */
390 ereport(ERROR,
391 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
392 errmsg("could not connect to server \"%s\"",
393 server->servername),
394 errdetail("There are too many open files on the local server."),
395 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
396 #else
397 ereport(ERROR,
398 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
399 errmsg("could not connect to server \"%s\"",
400 server->servername),
401 errdetail("There are too many open files on the local server."),
402 errhint("Raise the server's max_files_per_process setting.")));
403 #endif
404 }
405
406 /* OK to make connection */
407 conn = PQconnectdbParams(keywords, values, false);
408
409 if (!conn)
410 ReleaseExternalFD(); /* because the PG_CATCH block won't */
411
412 if (!conn || PQstatus(conn) != CONNECTION_OK)
413 ereport(ERROR,
414 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
415 errmsg("could not connect to server \"%s\"",
416 server->servername),
417 errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
418
419 /*
420 * Check that non-superuser has used password to establish connection;
421 * otherwise, he's piggybacking on the postgres server's user
422 * identity. See also dblink_security_check() in contrib/dblink and
423 * check_conn_params.
424 */
425 if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
426 !PQconnectionUsedPassword(conn))
427 ereport(ERROR,
428 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
429 errmsg("password is required"),
430 errdetail("Non-superuser cannot connect if the server does not request a password."),
431 errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
432
433 /* Prepare new session for use */
434 configure_remote_session(conn);
435
436 pfree(keywords);
437 pfree(values);
438 }
439 PG_CATCH();
440 {
441 /* Release PGconn data structure if we managed to create one */
442 if (conn)
443 {
444 PQfinish(conn);
445 ReleaseExternalFD();
446 }
447 PG_RE_THROW();
448 }
449 PG_END_TRY();
450
451 return conn;
452 }
453
454 /*
455 * Disconnect any open connection for a connection cache entry.
456 */
457 static void
disconnect_pg_server(ConnCacheEntry * entry)458 disconnect_pg_server(ConnCacheEntry *entry)
459 {
460 if (entry->conn != NULL)
461 {
462 PQfinish(entry->conn);
463 entry->conn = NULL;
464 ReleaseExternalFD();
465 }
466 }
467
468 /*
469 * Return true if the password_required is defined and false for this user
470 * mapping, otherwise false. The mapping has been pre-validated.
471 */
472 static bool
UserMappingPasswordRequired(UserMapping * user)473 UserMappingPasswordRequired(UserMapping *user)
474 {
475 ListCell *cell;
476
477 foreach(cell, user->options)
478 {
479 DefElem *def = (DefElem *) lfirst(cell);
480
481 if (strcmp(def->defname, "password_required") == 0)
482 return defGetBoolean(def);
483 }
484
485 return true;
486 }
487
488 /*
489 * For non-superusers, insist that the connstr specify a password. This
490 * prevents a password from being picked up from .pgpass, a service file, the
491 * environment, etc. We don't want the postgres user's passwords,
492 * certificates, etc to be accessible to non-superusers. (See also
493 * dblink_connstr_check in contrib/dblink.)
494 */
495 static void
check_conn_params(const char ** keywords,const char ** values,UserMapping * user)496 check_conn_params(const char **keywords, const char **values, UserMapping *user)
497 {
498 int i;
499
500 /* no check required if superuser */
501 if (superuser_arg(user->userid))
502 return;
503
504 /* ok if params contain a non-empty password */
505 for (i = 0; keywords[i] != NULL; i++)
506 {
507 if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
508 return;
509 }
510
511 /* ok if the superuser explicitly said so at user mapping creation time */
512 if (!UserMappingPasswordRequired(user))
513 return;
514
515 ereport(ERROR,
516 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
517 errmsg("password is required"),
518 errdetail("Non-superusers must provide a password in the user mapping.")));
519 }
520
521 /*
522 * Issue SET commands to make sure remote session is configured properly.
523 *
524 * We do this just once at connection, assuming nothing will change the
525 * values later. Since we'll never send volatile function calls to the
526 * remote, there shouldn't be any way to break this assumption from our end.
527 * It's possible to think of ways to break it at the remote end, eg making
528 * a foreign table point to a view that includes a set_config call ---
529 * but once you admit the possibility of a malicious view definition,
530 * there are any number of ways to break things.
531 */
532 static void
configure_remote_session(PGconn * conn)533 configure_remote_session(PGconn *conn)
534 {
535 int remoteversion = PQserverVersion(conn);
536
537 /* Force the search path to contain only pg_catalog (see deparse.c) */
538 do_sql_command(conn, "SET search_path = pg_catalog");
539
540 /*
541 * Set remote timezone; this is basically just cosmetic, since all
542 * transmitted and returned timestamptzs should specify a zone explicitly
543 * anyway. However it makes the regression test outputs more predictable.
544 *
545 * We don't risk setting remote zone equal to ours, since the remote
546 * server might use a different timezone database. Instead, use UTC
547 * (quoted, because very old servers are picky about case).
548 */
549 do_sql_command(conn, "SET timezone = 'UTC'");
550
551 /*
552 * Set values needed to ensure unambiguous data output from remote. (This
553 * logic should match what pg_dump does. See also set_transmission_modes
554 * in postgres_fdw.c.)
555 */
556 do_sql_command(conn, "SET datestyle = ISO");
557 if (remoteversion >= 80400)
558 do_sql_command(conn, "SET intervalstyle = postgres");
559 if (remoteversion >= 90000)
560 do_sql_command(conn, "SET extra_float_digits = 3");
561 else
562 do_sql_command(conn, "SET extra_float_digits = 2");
563 }
564
565 /*
566 * Convenience subroutine to issue a non-data-returning SQL command to remote
567 */
568 void
do_sql_command(PGconn * conn,const char * sql)569 do_sql_command(PGconn *conn, const char *sql)
570 {
571 PGresult *res;
572
573 if (!PQsendQuery(conn, sql))
574 pgfdw_report_error(ERROR, NULL, conn, false, sql);
575 res = pgfdw_get_result(conn, sql);
576 if (PQresultStatus(res) != PGRES_COMMAND_OK)
577 pgfdw_report_error(ERROR, res, conn, true, sql);
578 PQclear(res);
579 }
580
581 /*
582 * Start remote transaction or subtransaction, if needed.
583 *
584 * Note that we always use at least REPEATABLE READ in the remote session.
585 * This is so that, if a query initiates multiple scans of the same or
586 * different foreign tables, we will get snapshot-consistent results from
587 * those scans. A disadvantage is that we can't provide sane emulation of
588 * READ COMMITTED behavior --- it would be nice if we had some other way to
589 * control which remote queries share a snapshot.
590 */
591 static void
begin_remote_xact(ConnCacheEntry * entry)592 begin_remote_xact(ConnCacheEntry *entry)
593 {
594 int curlevel = GetCurrentTransactionNestLevel();
595
596 /* Start main transaction if we haven't yet */
597 if (entry->xact_depth <= 0)
598 {
599 const char *sql;
600
601 elog(DEBUG3, "starting remote transaction on connection %p",
602 entry->conn);
603
604 if (IsolationIsSerializable())
605 sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
606 else
607 sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
608 entry->changing_xact_state = true;
609 do_sql_command(entry->conn, sql);
610 entry->xact_depth = 1;
611 entry->changing_xact_state = false;
612 }
613
614 /*
615 * If we're in a subtransaction, stack up savepoints to match our level.
616 * This ensures we can rollback just the desired effects when a
617 * subtransaction aborts.
618 */
619 while (entry->xact_depth < curlevel)
620 {
621 char sql[64];
622
623 snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
624 entry->changing_xact_state = true;
625 do_sql_command(entry->conn, sql);
626 entry->xact_depth++;
627 entry->changing_xact_state = false;
628 }
629 }
630
631 /*
632 * Release connection reference count created by calling GetConnection.
633 */
634 void
ReleaseConnection(PGconn * conn)635 ReleaseConnection(PGconn *conn)
636 {
637 /*
638 * Currently, we don't actually track connection references because all
639 * cleanup is managed on a transaction or subtransaction basis instead. So
640 * there's nothing to do here.
641 */
642 }
643
644 /*
645 * Assign a "unique" number for a cursor.
646 *
647 * These really only need to be unique per connection within a transaction.
648 * For the moment we ignore the per-connection point and assign them across
649 * all connections in the transaction, but we ask for the connection to be
650 * supplied in case we want to refine that.
651 *
652 * Note that even if wraparound happens in a very long transaction, actual
653 * collisions are highly improbable; just be sure to use %u not %d to print.
654 */
655 unsigned int
GetCursorNumber(PGconn * conn)656 GetCursorNumber(PGconn *conn)
657 {
658 return ++cursor_number;
659 }
660
661 /*
662 * Assign a "unique" number for a prepared statement.
663 *
664 * This works much like GetCursorNumber, except that we never reset the counter
665 * within a session. That's because we can't be 100% sure we've gotten rid
666 * of all prepared statements on all connections, and it's not really worth
667 * increasing the risk of prepared-statement name collisions by resetting.
668 */
669 unsigned int
GetPrepStmtNumber(PGconn * conn)670 GetPrepStmtNumber(PGconn *conn)
671 {
672 return ++prep_stmt_number;
673 }
674
675 /*
676 * Submit a query and wait for the result.
677 *
678 * This function is interruptible by signals.
679 *
680 * Caller is responsible for the error handling on the result.
681 */
682 PGresult *
pgfdw_exec_query(PGconn * conn,const char * query,PgFdwConnState * state)683 pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
684 {
685 /* First, process a pending asynchronous request, if any. */
686 if (state && state->pendingAreq)
687 process_pending_request(state->pendingAreq);
688
689 /*
690 * Submit a query. Since we don't use non-blocking mode, this also can
691 * block. But its risk is relatively small, so we ignore that for now.
692 */
693 if (!PQsendQuery(conn, query))
694 pgfdw_report_error(ERROR, NULL, conn, false, query);
695
696 /* Wait for the result. */
697 return pgfdw_get_result(conn, query);
698 }
699
700 /*
701 * Wait for the result from a prior asynchronous execution function call.
702 *
703 * This function offers quick responsiveness by checking for any interruptions.
704 *
705 * This function emulates PQexec()'s behavior of returning the last result
706 * when there are many.
707 *
708 * Caller is responsible for the error handling on the result.
709 */
710 PGresult *
pgfdw_get_result(PGconn * conn,const char * query)711 pgfdw_get_result(PGconn *conn, const char *query)
712 {
713 PGresult *volatile last_res = NULL;
714
715 /* In what follows, do not leak any PGresults on an error. */
716 PG_TRY();
717 {
718 for (;;)
719 {
720 PGresult *res;
721
722 while (PQisBusy(conn))
723 {
724 int wc;
725
726 /* Sleep until there's something to do */
727 wc = WaitLatchOrSocket(MyLatch,
728 WL_LATCH_SET | WL_SOCKET_READABLE |
729 WL_EXIT_ON_PM_DEATH,
730 PQsocket(conn),
731 -1L, PG_WAIT_EXTENSION);
732 ResetLatch(MyLatch);
733
734 CHECK_FOR_INTERRUPTS();
735
736 /* Data available in socket? */
737 if (wc & WL_SOCKET_READABLE)
738 {
739 if (!PQconsumeInput(conn))
740 pgfdw_report_error(ERROR, NULL, conn, false, query);
741 }
742 }
743
744 res = PQgetResult(conn);
745 if (res == NULL)
746 break; /* query is complete */
747
748 PQclear(last_res);
749 last_res = res;
750 }
751 }
752 PG_CATCH();
753 {
754 PQclear(last_res);
755 PG_RE_THROW();
756 }
757 PG_END_TRY();
758
759 return last_res;
760 }
761
762 /*
763 * Report an error we got from the remote server.
764 *
765 * elevel: error level to use (typically ERROR, but might be less)
766 * res: PGresult containing the error
767 * conn: connection we did the query on
768 * clear: if true, PQclear the result (otherwise caller will handle it)
769 * sql: NULL, or text of remote command we tried to execute
770 *
771 * Note: callers that choose not to throw ERROR for a remote error are
772 * responsible for making sure that the associated ConnCacheEntry gets
773 * marked with have_error = true.
774 */
775 void
pgfdw_report_error(int elevel,PGresult * res,PGconn * conn,bool clear,const char * sql)776 pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
777 bool clear, const char *sql)
778 {
779 /* If requested, PGresult must be released before leaving this function. */
780 PG_TRY();
781 {
782 char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
783 char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
784 char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
785 char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
786 char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
787 int sqlstate;
788
789 if (diag_sqlstate)
790 sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
791 diag_sqlstate[1],
792 diag_sqlstate[2],
793 diag_sqlstate[3],
794 diag_sqlstate[4]);
795 else
796 sqlstate = ERRCODE_CONNECTION_FAILURE;
797
798 /*
799 * If we don't get a message from the PGresult, try the PGconn. This
800 * is needed because for connection-level failures, PQexec may just
801 * return NULL, not a PGresult at all.
802 */
803 if (message_primary == NULL)
804 message_primary = pchomp(PQerrorMessage(conn));
805
806 ereport(elevel,
807 (errcode(sqlstate),
808 message_primary ? errmsg_internal("%s", message_primary) :
809 errmsg("could not obtain message string for remote error"),
810 message_detail ? errdetail_internal("%s", message_detail) : 0,
811 message_hint ? errhint("%s", message_hint) : 0,
812 message_context ? errcontext("%s", message_context) : 0,
813 sql ? errcontext("remote SQL command: %s", sql) : 0));
814 }
815 PG_FINALLY();
816 {
817 if (clear)
818 PQclear(res);
819 }
820 PG_END_TRY();
821 }
822
823 /*
824 * pgfdw_xact_callback --- cleanup at main-transaction end.
825 *
826 * This runs just late enough that it must not enter user-defined code
827 * locally. (Entering such code on the remote side is fine. Its remote
828 * COMMIT TRANSACTION may run deferred triggers.)
829 */
830 static void
pgfdw_xact_callback(XactEvent event,void * arg)831 pgfdw_xact_callback(XactEvent event, void *arg)
832 {
833 HASH_SEQ_STATUS scan;
834 ConnCacheEntry *entry;
835
836 /* Quick exit if no connections were touched in this transaction. */
837 if (!xact_got_connection)
838 return;
839
840 /*
841 * Scan all connection cache entries to find open remote transactions, and
842 * close them.
843 */
844 hash_seq_init(&scan, ConnectionHash);
845 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
846 {
847 PGresult *res;
848
849 /* Ignore cache entry if no open connection right now */
850 if (entry->conn == NULL)
851 continue;
852
853 /* If it has an open remote transaction, try to close it */
854 if (entry->xact_depth > 0)
855 {
856 bool abort_cleanup_failure = false;
857
858 elog(DEBUG3, "closing remote transaction on connection %p",
859 entry->conn);
860
861 switch (event)
862 {
863 case XACT_EVENT_PARALLEL_PRE_COMMIT:
864 case XACT_EVENT_PRE_COMMIT:
865
866 /*
867 * If abort cleanup previously failed for this connection,
868 * we can't issue any more commands against it.
869 */
870 pgfdw_reject_incomplete_xact_state_change(entry);
871
872 /* Commit all remote transactions during pre-commit */
873 entry->changing_xact_state = true;
874 do_sql_command(entry->conn, "COMMIT TRANSACTION");
875 entry->changing_xact_state = false;
876
877 /*
878 * If there were any errors in subtransactions, and we
879 * made prepared statements, do a DEALLOCATE ALL to make
880 * sure we get rid of all prepared statements. This is
881 * annoying and not terribly bulletproof, but it's
882 * probably not worth trying harder.
883 *
884 * DEALLOCATE ALL only exists in 8.3 and later, so this
885 * constrains how old a server postgres_fdw can
886 * communicate with. We intentionally ignore errors in
887 * the DEALLOCATE, so that we can hobble along to some
888 * extent with older servers (leaking prepared statements
889 * as we go; but we don't really support update operations
890 * pre-8.3 anyway).
891 */
892 if (entry->have_prep_stmt && entry->have_error)
893 {
894 res = PQexec(entry->conn, "DEALLOCATE ALL");
895 PQclear(res);
896 }
897 entry->have_prep_stmt = false;
898 entry->have_error = false;
899 break;
900 case XACT_EVENT_PRE_PREPARE:
901
902 /*
903 * We disallow any remote transactions, since it's not
904 * very reasonable to hold them open until the prepared
905 * transaction is committed. For the moment, throw error
906 * unconditionally; later we might allow read-only cases.
907 * Note that the error will cause us to come right back
908 * here with event == XACT_EVENT_ABORT, so we'll clean up
909 * the connection state at that point.
910 */
911 ereport(ERROR,
912 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
913 errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
914 break;
915 case XACT_EVENT_PARALLEL_COMMIT:
916 case XACT_EVENT_COMMIT:
917 case XACT_EVENT_PREPARE:
918 /* Pre-commit should have closed the open transaction */
919 elog(ERROR, "missed cleaning up connection during pre-commit");
920 break;
921 case XACT_EVENT_PARALLEL_ABORT:
922 case XACT_EVENT_ABORT:
923
924 /*
925 * Don't try to clean up the connection if we're already
926 * in error recursion trouble.
927 */
928 if (in_error_recursion_trouble())
929 entry->changing_xact_state = true;
930
931 /*
932 * If connection is already unsalvageable, don't touch it
933 * further.
934 */
935 if (entry->changing_xact_state)
936 break;
937
938 /*
939 * Mark this connection as in the process of changing
940 * transaction state.
941 */
942 entry->changing_xact_state = true;
943
944 /* Assume we might have lost track of prepared statements */
945 entry->have_error = true;
946
947 /*
948 * If a command has been submitted to the remote server by
949 * using an asynchronous execution function, the command
950 * might not have yet completed. Check to see if a
951 * command is still being processed by the remote server,
952 * and if so, request cancellation of the command.
953 */
954 if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
955 !pgfdw_cancel_query(entry->conn))
956 {
957 /* Unable to cancel running query. */
958 abort_cleanup_failure = true;
959 }
960 else if (!pgfdw_exec_cleanup_query(entry->conn,
961 "ABORT TRANSACTION",
962 false))
963 {
964 /* Unable to abort remote transaction. */
965 abort_cleanup_failure = true;
966 }
967 else if (entry->have_prep_stmt && entry->have_error &&
968 !pgfdw_exec_cleanup_query(entry->conn,
969 "DEALLOCATE ALL",
970 true))
971 {
972 /* Trouble clearing prepared statements. */
973 abort_cleanup_failure = true;
974 }
975 else
976 {
977 entry->have_prep_stmt = false;
978 entry->have_error = false;
979 /* Also reset per-connection state */
980 memset(&entry->state, 0, sizeof(entry->state));
981 }
982
983 /* Disarm changing_xact_state if it all worked. */
984 entry->changing_xact_state = abort_cleanup_failure;
985 break;
986 }
987 }
988
989 /* Reset state to show we're out of a transaction */
990 entry->xact_depth = 0;
991
992 /*
993 * If the connection isn't in a good idle state, it is marked as
994 * invalid or keep_connections option of its server is disabled, then
995 * discard it to recover. Next GetConnection will open a new
996 * connection.
997 */
998 if (PQstatus(entry->conn) != CONNECTION_OK ||
999 PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1000 entry->changing_xact_state ||
1001 entry->invalidated ||
1002 !entry->keep_connections)
1003 {
1004 elog(DEBUG3, "discarding connection %p", entry->conn);
1005 disconnect_pg_server(entry);
1006 }
1007 }
1008
1009 /*
1010 * Regardless of the event type, we can now mark ourselves as out of the
1011 * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1012 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1013 */
1014 xact_got_connection = false;
1015
1016 /* Also reset cursor numbering for next transaction */
1017 cursor_number = 0;
1018 }
1019
1020 /*
1021 * pgfdw_subxact_callback --- cleanup at subtransaction end.
1022 */
1023 static void
pgfdw_subxact_callback(SubXactEvent event,SubTransactionId mySubid,SubTransactionId parentSubid,void * arg)1024 pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1025 SubTransactionId parentSubid, void *arg)
1026 {
1027 HASH_SEQ_STATUS scan;
1028 ConnCacheEntry *entry;
1029 int curlevel;
1030
1031 /* Nothing to do at subxact start, nor after commit. */
1032 if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1033 event == SUBXACT_EVENT_ABORT_SUB))
1034 return;
1035
1036 /* Quick exit if no connections were touched in this transaction. */
1037 if (!xact_got_connection)
1038 return;
1039
1040 /*
1041 * Scan all connection cache entries to find open remote subtransactions
1042 * of the current level, and close them.
1043 */
1044 curlevel = GetCurrentTransactionNestLevel();
1045 hash_seq_init(&scan, ConnectionHash);
1046 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1047 {
1048 char sql[100];
1049
1050 /*
1051 * We only care about connections with open remote subtransactions of
1052 * the current level.
1053 */
1054 if (entry->conn == NULL || entry->xact_depth < curlevel)
1055 continue;
1056
1057 if (entry->xact_depth > curlevel)
1058 elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1059 entry->xact_depth);
1060
1061 if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1062 {
1063 /*
1064 * If abort cleanup previously failed for this connection, we
1065 * can't issue any more commands against it.
1066 */
1067 pgfdw_reject_incomplete_xact_state_change(entry);
1068
1069 /* Commit all remote subtransactions during pre-commit */
1070 snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1071 entry->changing_xact_state = true;
1072 do_sql_command(entry->conn, sql);
1073 entry->changing_xact_state = false;
1074 }
1075 else if (in_error_recursion_trouble())
1076 {
1077 /*
1078 * Don't try to clean up the connection if we're already in error
1079 * recursion trouble.
1080 */
1081 entry->changing_xact_state = true;
1082 }
1083 else if (!entry->changing_xact_state)
1084 {
1085 bool abort_cleanup_failure = false;
1086
1087 /* Remember that abort cleanup is in progress. */
1088 entry->changing_xact_state = true;
1089
1090 /* Assume we might have lost track of prepared statements */
1091 entry->have_error = true;
1092
1093 /*
1094 * If a command has been submitted to the remote server by using
1095 * an asynchronous execution function, the command might not have
1096 * yet completed. Check to see if a command is still being
1097 * processed by the remote server, and if so, request cancellation
1098 * of the command.
1099 */
1100 if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1101 !pgfdw_cancel_query(entry->conn))
1102 abort_cleanup_failure = true;
1103 else
1104 {
1105 /* Rollback all remote subtransactions during abort */
1106 snprintf(sql, sizeof(sql),
1107 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
1108 curlevel, curlevel);
1109 if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1110 abort_cleanup_failure = true;
1111 }
1112
1113 /* Disarm changing_xact_state if it all worked. */
1114 entry->changing_xact_state = abort_cleanup_failure;
1115 }
1116
1117 /* OK, we're outta that level of subtransaction */
1118 entry->xact_depth--;
1119 }
1120 }
1121
1122 /*
1123 * Connection invalidation callback function
1124 *
1125 * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1126 * close connections depending on that entry immediately if current transaction
1127 * has not used those connections yet. Otherwise, mark those connections as
1128 * invalid and then make pgfdw_xact_callback() close them at the end of current
1129 * transaction, since they cannot be closed in the midst of the transaction
1130 * using them. Closed connections will be remade at the next opportunity if
1131 * necessary.
1132 *
1133 * Although most cache invalidation callbacks blow away all the related stuff
1134 * regardless of the given hashvalue, connections are expensive enough that
1135 * it's worth trying to avoid that.
1136 *
1137 * NB: We could avoid unnecessary disconnection more strictly by examining
1138 * individual option values, but it seems too much effort for the gain.
1139 */
1140 static void
pgfdw_inval_callback(Datum arg,int cacheid,uint32 hashvalue)1141 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1142 {
1143 HASH_SEQ_STATUS scan;
1144 ConnCacheEntry *entry;
1145
1146 Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1147
1148 /* ConnectionHash must exist already, if we're registered */
1149 hash_seq_init(&scan, ConnectionHash);
1150 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1151 {
1152 /* Ignore invalid entries */
1153 if (entry->conn == NULL)
1154 continue;
1155
1156 /* hashvalue == 0 means a cache reset, must clear all state */
1157 if (hashvalue == 0 ||
1158 (cacheid == FOREIGNSERVEROID &&
1159 entry->server_hashvalue == hashvalue) ||
1160 (cacheid == USERMAPPINGOID &&
1161 entry->mapping_hashvalue == hashvalue))
1162 {
1163 /*
1164 * Close the connection immediately if it's not used yet in this
1165 * transaction. Otherwise mark it as invalid so that
1166 * pgfdw_xact_callback() can close it at the end of this
1167 * transaction.
1168 */
1169 if (entry->xact_depth == 0)
1170 {
1171 elog(DEBUG3, "discarding connection %p", entry->conn);
1172 disconnect_pg_server(entry);
1173 }
1174 else
1175 entry->invalidated = true;
1176 }
1177 }
1178 }
1179
1180 /*
1181 * Raise an error if the given connection cache entry is marked as being
1182 * in the middle of an xact state change. This should be called at which no
1183 * such change is expected to be in progress; if one is found to be in
1184 * progress, it means that we aborted in the middle of a previous state change
1185 * and now don't know what the remote transaction state actually is.
1186 * Such connections can't safely be further used. Re-establishing the
1187 * connection would change the snapshot and roll back any writes already
1188 * performed, so that's not an option, either. Thus, we must abort.
1189 */
1190 static void
pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry * entry)1191 pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
1192 {
1193 ForeignServer *server;
1194
1195 /* nothing to do for inactive entries and entries of sane state */
1196 if (entry->conn == NULL || !entry->changing_xact_state)
1197 return;
1198
1199 /* make sure this entry is inactive */
1200 disconnect_pg_server(entry);
1201
1202 /* find server name to be shown in the message below */
1203 server = GetForeignServer(entry->serverid);
1204
1205 ereport(ERROR,
1206 (errcode(ERRCODE_CONNECTION_EXCEPTION),
1207 errmsg("connection to server \"%s\" was lost",
1208 server->servername)));
1209 }
1210
1211 /*
1212 * Cancel the currently-in-progress query (whose query text we do not have)
1213 * and ignore the result. Returns true if we successfully cancel the query
1214 * and discard any pending result, and false if not.
1215 *
1216 * It's not a huge problem if we throw an ERROR here, but if we get into error
1217 * recursion trouble, we'll end up slamming the connection shut, which will
1218 * necessitate failing the entire toplevel transaction even if subtransactions
1219 * were used. Try to use WARNING where we can.
1220 *
1221 * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1222 * query text from the pendingAreq saved in the per-connection state, then
1223 * report the query using it.
1224 */
1225 static bool
pgfdw_cancel_query(PGconn * conn)1226 pgfdw_cancel_query(PGconn *conn)
1227 {
1228 PGcancel *cancel;
1229 char errbuf[256];
1230 PGresult *result = NULL;
1231 TimestampTz endtime;
1232
1233 /*
1234 * If it takes too long to cancel the query and discard the result, assume
1235 * the connection is dead.
1236 */
1237 endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
1238
1239 /*
1240 * Issue cancel request. Unfortunately, there's no good way to limit the
1241 * amount of time that we might block inside PQgetCancel().
1242 */
1243 if ((cancel = PQgetCancel(conn)))
1244 {
1245 if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1246 {
1247 ereport(WARNING,
1248 (errcode(ERRCODE_CONNECTION_FAILURE),
1249 errmsg("could not send cancel request: %s",
1250 errbuf)));
1251 PQfreeCancel(cancel);
1252 return false;
1253 }
1254 PQfreeCancel(cancel);
1255 }
1256
1257 /* Get and discard the result of the query. */
1258 if (pgfdw_get_cleanup_result(conn, endtime, &result))
1259 return false;
1260 PQclear(result);
1261
1262 return true;
1263 }
1264
1265 /*
1266 * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1267 * result. If the query is executed without error, the return value is true.
1268 * If the query is executed successfully but returns an error, the return
1269 * value is true if and only if ignore_errors is set. If the query can't be
1270 * sent or times out, the return value is false.
1271 *
1272 * It's not a huge problem if we throw an ERROR here, but if we get into error
1273 * recursion trouble, we'll end up slamming the connection shut, which will
1274 * necessitate failing the entire toplevel transaction even if subtransactions
1275 * were used. Try to use WARNING where we can.
1276 */
1277 static bool
pgfdw_exec_cleanup_query(PGconn * conn,const char * query,bool ignore_errors)1278 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1279 {
1280 PGresult *result = NULL;
1281 TimestampTz endtime;
1282
1283 /*
1284 * If it takes too long to execute a cleanup query, assume the connection
1285 * is dead. It's fairly likely that this is why we aborted in the first
1286 * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1287 * be too long.
1288 */
1289 endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
1290
1291 /*
1292 * Submit a query. Since we don't use non-blocking mode, this also can
1293 * block. But its risk is relatively small, so we ignore that for now.
1294 */
1295 if (!PQsendQuery(conn, query))
1296 {
1297 pgfdw_report_error(WARNING, NULL, conn, false, query);
1298 return false;
1299 }
1300
1301 /* Get the result of the query. */
1302 if (pgfdw_get_cleanup_result(conn, endtime, &result))
1303 return false;
1304
1305 /* Issue a warning if not successful. */
1306 if (PQresultStatus(result) != PGRES_COMMAND_OK)
1307 {
1308 pgfdw_report_error(WARNING, result, conn, true, query);
1309 return ignore_errors;
1310 }
1311 PQclear(result);
1312
1313 return true;
1314 }
1315
1316 /*
1317 * Get, during abort cleanup, the result of a query that is in progress. This
1318 * might be a query that is being interrupted by transaction abort, or it might
1319 * be a query that was initiated as part of transaction abort to get the remote
1320 * side back to the appropriate state.
1321 *
1322 * endtime is the time at which we should give up and assume the remote
1323 * side is dead. Returns true if the timeout expired, otherwise false.
1324 * Sets *result except in case of a timeout.
1325 */
1326 static bool
pgfdw_get_cleanup_result(PGconn * conn,TimestampTz endtime,PGresult ** result)1327 pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
1328 {
1329 volatile bool timed_out = false;
1330 PGresult *volatile last_res = NULL;
1331
1332 /* In what follows, do not leak any PGresults on an error. */
1333 PG_TRY();
1334 {
1335 for (;;)
1336 {
1337 PGresult *res;
1338
1339 while (PQisBusy(conn))
1340 {
1341 int wc;
1342 TimestampTz now = GetCurrentTimestamp();
1343 long cur_timeout;
1344
1345 /* If timeout has expired, give up, else get sleep time. */
1346 cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1347 if (cur_timeout <= 0)
1348 {
1349 timed_out = true;
1350 goto exit;
1351 }
1352
1353 /* Sleep until there's something to do */
1354 wc = WaitLatchOrSocket(MyLatch,
1355 WL_LATCH_SET | WL_SOCKET_READABLE |
1356 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1357 PQsocket(conn),
1358 cur_timeout, PG_WAIT_EXTENSION);
1359 ResetLatch(MyLatch);
1360
1361 CHECK_FOR_INTERRUPTS();
1362
1363 /* Data available in socket? */
1364 if (wc & WL_SOCKET_READABLE)
1365 {
1366 if (!PQconsumeInput(conn))
1367 {
1368 /* connection trouble; treat the same as a timeout */
1369 timed_out = true;
1370 goto exit;
1371 }
1372 }
1373 }
1374
1375 res = PQgetResult(conn);
1376 if (res == NULL)
1377 break; /* query is complete */
1378
1379 PQclear(last_res);
1380 last_res = res;
1381 }
1382 exit: ;
1383 }
1384 PG_CATCH();
1385 {
1386 PQclear(last_res);
1387 PG_RE_THROW();
1388 }
1389 PG_END_TRY();
1390
1391 if (timed_out)
1392 PQclear(last_res);
1393 else
1394 *result = last_res;
1395 return timed_out;
1396 }
1397
1398 /*
1399 * List active foreign server connections.
1400 *
1401 * This function takes no input parameter and returns setof record made of
1402 * following values:
1403 * - server_name - server name of active connection. In case the foreign server
1404 * is dropped but still the connection is active, then the server name will
1405 * be NULL in output.
1406 * - valid - true/false representing whether the connection is valid or not.
1407 * Note that the connections can get invalidated in pgfdw_inval_callback.
1408 *
1409 * No records are returned when there are no cached connections at all.
1410 */
1411 Datum
postgres_fdw_get_connections(PG_FUNCTION_ARGS)1412 postgres_fdw_get_connections(PG_FUNCTION_ARGS)
1413 {
1414 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1415 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1416 TupleDesc tupdesc;
1417 Tuplestorestate *tupstore;
1418 MemoryContext per_query_ctx;
1419 MemoryContext oldcontext;
1420 HASH_SEQ_STATUS scan;
1421 ConnCacheEntry *entry;
1422
1423 /* check to see if caller supports us returning a tuplestore */
1424 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1425 ereport(ERROR,
1426 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1427 errmsg("set-valued function called in context that cannot accept a set")));
1428 if (!(rsinfo->allowedModes & SFRM_Materialize))
1429 ereport(ERROR,
1430 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1431 errmsg("materialize mode required, but it is not allowed in this context")));
1432
1433 /* Build a tuple descriptor for our result type */
1434 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1435 elog(ERROR, "return type must be a row type");
1436
1437 /* Build tuplestore to hold the result rows */
1438 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1439 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1440
1441 tupstore = tuplestore_begin_heap(true, false, work_mem);
1442 rsinfo->returnMode = SFRM_Materialize;
1443 rsinfo->setResult = tupstore;
1444 rsinfo->setDesc = tupdesc;
1445
1446 MemoryContextSwitchTo(oldcontext);
1447
1448 /* If cache doesn't exist, we return no records */
1449 if (!ConnectionHash)
1450 {
1451 /* clean up and return the tuplestore */
1452 tuplestore_donestoring(tupstore);
1453
1454 PG_RETURN_VOID();
1455 }
1456
1457 hash_seq_init(&scan, ConnectionHash);
1458 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1459 {
1460 ForeignServer *server;
1461 Datum values[POSTGRES_FDW_GET_CONNECTIONS_COLS];
1462 bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS];
1463
1464 /* We only look for open remote connections */
1465 if (!entry->conn)
1466 continue;
1467
1468 server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
1469
1470 MemSet(values, 0, sizeof(values));
1471 MemSet(nulls, 0, sizeof(nulls));
1472
1473 /*
1474 * The foreign server may have been dropped in current explicit
1475 * transaction. It is not possible to drop the server from another
1476 * session when the connection associated with it is in use in the
1477 * current transaction, if tried so, the drop query in another session
1478 * blocks until the current transaction finishes.
1479 *
1480 * Even though the server is dropped in the current transaction, the
1481 * cache can still have associated active connection entry, say we
1482 * call such connections dangling. Since we can not fetch the server
1483 * name from system catalogs for dangling connections, instead we show
1484 * NULL value for server name in output.
1485 *
1486 * We could have done better by storing the server name in the cache
1487 * entry instead of server oid so that it could be used in the output.
1488 * But the server name in each cache entry requires 64 bytes of
1489 * memory, which is huge, when there are many cached connections and
1490 * the use case i.e. dropping the foreign server within the explicit
1491 * current transaction seems rare. So, we chose to show NULL value for
1492 * server name in output.
1493 *
1494 * Such dangling connections get closed either in next use or at the
1495 * end of current explicit transaction in pgfdw_xact_callback.
1496 */
1497 if (!server)
1498 {
1499 /*
1500 * If the server has been dropped in the current explicit
1501 * transaction, then this entry would have been invalidated in
1502 * pgfdw_inval_callback at the end of drop server command. Note
1503 * that this connection would not have been closed in
1504 * pgfdw_inval_callback because it is still being used in the
1505 * current explicit transaction. So, assert that here.
1506 */
1507 Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
1508
1509 /* Show null, if no server name was found */
1510 nulls[0] = true;
1511 }
1512 else
1513 values[0] = CStringGetTextDatum(server->servername);
1514
1515 values[1] = BoolGetDatum(!entry->invalidated);
1516
1517 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1518 }
1519
1520 /* clean up and return the tuplestore */
1521 tuplestore_donestoring(tupstore);
1522
1523 PG_RETURN_VOID();
1524 }
1525
1526 /*
1527 * Disconnect the specified cached connections.
1528 *
1529 * This function discards the open connections that are established by
1530 * postgres_fdw from the local session to the foreign server with
1531 * the given name. Note that there can be multiple connections to
1532 * the given server using different user mappings. If the connections
1533 * are used in the current local transaction, they are not disconnected
1534 * and warning messages are reported. This function returns true
1535 * if it disconnects at least one connection, otherwise false. If no
1536 * foreign server with the given name is found, an error is reported.
1537 */
1538 Datum
postgres_fdw_disconnect(PG_FUNCTION_ARGS)1539 postgres_fdw_disconnect(PG_FUNCTION_ARGS)
1540 {
1541 ForeignServer *server;
1542 char *servername;
1543
1544 servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
1545 server = GetForeignServerByName(servername, false);
1546
1547 PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
1548 }
1549
1550 /*
1551 * Disconnect all the cached connections.
1552 *
1553 * This function discards all the open connections that are established by
1554 * postgres_fdw from the local session to the foreign servers.
1555 * If the connections are used in the current local transaction, they are
1556 * not disconnected and warning messages are reported. This function
1557 * returns true if it disconnects at least one connection, otherwise false.
1558 */
1559 Datum
postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)1560 postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
1561 {
1562 PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
1563 }
1564
1565 /*
1566 * Workhorse to disconnect cached connections.
1567 *
1568 * This function scans all the connection cache entries and disconnects
1569 * the open connections whose foreign server OID matches with
1570 * the specified one. If InvalidOid is specified, it disconnects all
1571 * the cached connections.
1572 *
1573 * This function emits a warning for each connection that's used in
1574 * the current transaction and doesn't close it. It returns true if
1575 * it disconnects at least one connection, otherwise false.
1576 *
1577 * Note that this function disconnects even the connections that are
1578 * established by other users in the same local session using different
1579 * user mappings. This leads even non-superuser to be able to close
1580 * the connections established by superusers in the same local session.
1581 *
1582 * XXX As of now we don't see any security risk doing this. But we should
1583 * set some restrictions on that, for example, prevent non-superuser
1584 * from closing the connections established by superusers even
1585 * in the same session?
1586 */
1587 static bool
disconnect_cached_connections(Oid serverid)1588 disconnect_cached_connections(Oid serverid)
1589 {
1590 HASH_SEQ_STATUS scan;
1591 ConnCacheEntry *entry;
1592 bool all = !OidIsValid(serverid);
1593 bool result = false;
1594
1595 /*
1596 * Connection cache hashtable has not been initialized yet in this
1597 * session, so return false.
1598 */
1599 if (!ConnectionHash)
1600 return false;
1601
1602 hash_seq_init(&scan, ConnectionHash);
1603 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1604 {
1605 /* Ignore cache entry if no open connection right now. */
1606 if (!entry->conn)
1607 continue;
1608
1609 if (all || entry->serverid == serverid)
1610 {
1611 /*
1612 * Emit a warning because the connection to close is used in the
1613 * current transaction and cannot be disconnected right now.
1614 */
1615 if (entry->xact_depth > 0)
1616 {
1617 ForeignServer *server;
1618
1619 server = GetForeignServerExtended(entry->serverid,
1620 FSV_MISSING_OK);
1621
1622 if (!server)
1623 {
1624 /*
1625 * If the foreign server was dropped while its connection
1626 * was used in the current transaction, the connection
1627 * must have been marked as invalid by
1628 * pgfdw_inval_callback at the end of DROP SERVER command.
1629 */
1630 Assert(entry->invalidated);
1631
1632 ereport(WARNING,
1633 (errmsg("cannot close dropped server connection because it is still in use")));
1634 }
1635 else
1636 ereport(WARNING,
1637 (errmsg("cannot close connection for server \"%s\" because it is still in use",
1638 server->servername)));
1639 }
1640 else
1641 {
1642 elog(DEBUG3, "discarding connection %p", entry->conn);
1643 disconnect_pg_server(entry);
1644 result = true;
1645 }
1646 }
1647 }
1648
1649 return result;
1650 }
1651