1 /*
2 * dblink.c
3 *
4 * Functions returning results from a remote database
5 *
6 * Joe Conway <mail@joeconway.com>
7 * And contributors:
8 * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10 *
11 * contrib/dblink/dblink.c
12 * Copyright (c) 2001-2020, PostgreSQL Global Development Group
13 * ALL RIGHTS RESERVED;
14 *
15 * Permission to use, copy, modify, and distribute this software and its
16 * documentation for any purpose, without fee, and without a written agreement
17 * is hereby granted, provided that the above copyright notice and this
18 * paragraph and the following two paragraphs appear in all copies.
19 *
20 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
25 *
26 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 *
32 */
33 #include "postgres.h"
34
35 #include <limits.h>
36
37 #include "access/htup_details.h"
38 #include "access/relation.h"
39 #include "access/reloptions.h"
40 #include "access/table.h"
41 #include "catalog/indexing.h"
42 #include "catalog/namespace.h"
43 #include "catalog/pg_foreign_data_wrapper.h"
44 #include "catalog/pg_foreign_server.h"
45 #include "catalog/pg_type.h"
46 #include "catalog/pg_user_mapping.h"
47 #include "executor/spi.h"
48 #include "foreign/foreign.h"
49 #include "funcapi.h"
50 #include "lib/stringinfo.h"
51 #include "libpq-fe.h"
52 #include "mb/pg_wchar.h"
53 #include "miscadmin.h"
54 #include "parser/scansup.h"
55 #include "utils/acl.h"
56 #include "utils/builtins.h"
57 #include "utils/fmgroids.h"
58 #include "utils/guc.h"
59 #include "utils/lsyscache.h"
60 #include "utils/memutils.h"
61 #include "utils/rel.h"
62 #include "utils/varlena.h"
63
64 PG_MODULE_MAGIC;
65
66 typedef struct remoteConn
67 {
68 PGconn *conn; /* Hold the remote connection */
69 int openCursorCount; /* The number of open cursors */
70 bool newXactForCursor; /* Opened a transaction for a cursor */
71 } remoteConn;
72
73 typedef struct storeInfo
74 {
75 FunctionCallInfo fcinfo;
76 Tuplestorestate *tuplestore;
77 AttInMetadata *attinmeta;
78 MemoryContext tmpcontext;
79 char **cstrs;
80 /* temp storage for results to avoid leaks on exception */
81 PGresult *last_res;
82 PGresult *cur_res;
83 } storeInfo;
84
85 /*
86 * Internal declarations
87 */
88 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
89 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
90 static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
91 PGresult *res);
92 static void materializeQueryResult(FunctionCallInfo fcinfo,
93 PGconn *conn,
94 const char *conname,
95 const char *sql,
96 bool fail);
97 static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
98 static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
99 static remoteConn *getConnectionByName(const char *name);
100 static HTAB *createConnHash(void);
101 static void createNewConnection(const char *name, remoteConn *rconn);
102 static void deleteConnection(const char *name);
103 static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
104 static char **get_text_array_contents(ArrayType *array, int *numitems);
105 static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
106 static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
107 static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
108 static char *quote_ident_cstr(char *rawstr);
109 static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
110 static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
111 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
112 static char *generate_relation_name(Relation rel);
113 static void dblink_connstr_check(const char *connstr);
114 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
115 static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
116 bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
117 static char *get_connect_string(const char *servername);
118 static char *escape_param_str(const char *from);
119 static void validate_pkattnums(Relation rel,
120 int2vector *pkattnums_arg, int32 pknumatts_arg,
121 int **pkattnums, int *pknumatts);
122 static bool is_valid_dblink_option(const PQconninfoOption *options,
123 const char *option, Oid context);
124 static int applyRemoteGucs(PGconn *conn);
125 static void restoreLocalGucs(int nestlevel);
126
127 /* Global */
128 static remoteConn *pconn = NULL;
129 static HTAB *remoteConnHash = NULL;
130
131 /*
132 * Following is list that holds multiple remote connections.
133 * Calling convention of each dblink function changes to accept
134 * connection name as the first parameter. The connection list is
135 * much like ecpg e.g. a mapping between a name and a PGconn object.
136 */
137
138 typedef struct remoteConnHashEnt
139 {
140 char name[NAMEDATALEN];
141 remoteConn *rconn;
142 } remoteConnHashEnt;
143
144 /* initial number of connection hashes */
145 #define NUMCONN 16
146
147 static char *
xpstrdup(const char * in)148 xpstrdup(const char *in)
149 {
150 if (in == NULL)
151 return NULL;
152 return pstrdup(in);
153 }
154
155 static void
pg_attribute_noreturn()156 pg_attribute_noreturn()
157 dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
158 {
159 char *msg = pchomp(PQerrorMessage(conn));
160
161 if (res)
162 PQclear(res);
163 elog(ERROR, "%s: %s", p2, msg);
164 }
165
166 static void
pg_attribute_noreturn()167 pg_attribute_noreturn()
168 dblink_conn_not_avail(const char *conname)
169 {
170 if (conname)
171 ereport(ERROR,
172 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
173 errmsg("connection \"%s\" not available", conname)));
174 else
175 ereport(ERROR,
176 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
177 errmsg("connection not available")));
178 }
179
180 static void
dblink_get_conn(char * conname_or_str,PGconn * volatile * conn_p,char ** conname_p,volatile bool * freeconn_p)181 dblink_get_conn(char *conname_or_str,
182 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
183 {
184 remoteConn *rconn = getConnectionByName(conname_or_str);
185 PGconn *conn;
186 char *conname;
187 bool freeconn;
188
189 if (rconn)
190 {
191 conn = rconn->conn;
192 conname = conname_or_str;
193 freeconn = false;
194 }
195 else
196 {
197 const char *connstr;
198
199 connstr = get_connect_string(conname_or_str);
200 if (connstr == NULL)
201 connstr = conname_or_str;
202 dblink_connstr_check(connstr);
203
204 /*
205 * We must obey fd.c's limit on non-virtual file descriptors. Assume
206 * that a PGconn represents one long-lived FD. (Doing this here also
207 * ensures that VFDs are closed if needed to make room.)
208 */
209 if (!AcquireExternalFD())
210 {
211 #ifndef WIN32 /* can't write #if within ereport() macro */
212 ereport(ERROR,
213 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
214 errmsg("could not establish connection"),
215 errdetail("There are too many open files on the local server."),
216 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
217 #else
218 ereport(ERROR,
219 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
220 errmsg("could not establish connection"),
221 errdetail("There are too many open files on the local server."),
222 errhint("Raise the server's max_files_per_process setting.")));
223 #endif
224 }
225
226 /* OK to make connection */
227 conn = PQconnectdb(connstr);
228
229 if (PQstatus(conn) == CONNECTION_BAD)
230 {
231 char *msg = pchomp(PQerrorMessage(conn));
232
233 PQfinish(conn);
234 ReleaseExternalFD();
235 ereport(ERROR,
236 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
237 errmsg("could not establish connection"),
238 errdetail_internal("%s", msg)));
239 }
240 dblink_security_check(conn, rconn);
241 if (PQclientEncoding(conn) != GetDatabaseEncoding())
242 PQsetClientEncoding(conn, GetDatabaseEncodingName());
243 freeconn = true;
244 conname = NULL;
245 }
246
247 *conn_p = conn;
248 *conname_p = conname;
249 *freeconn_p = freeconn;
250 }
251
252 static PGconn *
dblink_get_named_conn(const char * conname)253 dblink_get_named_conn(const char *conname)
254 {
255 remoteConn *rconn = getConnectionByName(conname);
256
257 if (rconn)
258 return rconn->conn;
259
260 dblink_conn_not_avail(conname);
261 return NULL; /* keep compiler quiet */
262 }
263
264 static void
dblink_init(void)265 dblink_init(void)
266 {
267 if (!pconn)
268 {
269 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
270 pconn->conn = NULL;
271 pconn->openCursorCount = 0;
272 pconn->newXactForCursor = false;
273 }
274 }
275
276 /*
277 * Create a persistent connection to another database
278 */
279 PG_FUNCTION_INFO_V1(dblink_connect);
280 Datum
dblink_connect(PG_FUNCTION_ARGS)281 dblink_connect(PG_FUNCTION_ARGS)
282 {
283 char *conname_or_str = NULL;
284 char *connstr = NULL;
285 char *connname = NULL;
286 char *msg;
287 PGconn *conn = NULL;
288 remoteConn *rconn = NULL;
289
290 dblink_init();
291
292 if (PG_NARGS() == 2)
293 {
294 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
295 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
296 }
297 else if (PG_NARGS() == 1)
298 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
299
300 if (connname)
301 {
302 rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
303 sizeof(remoteConn));
304 rconn->conn = NULL;
305 rconn->openCursorCount = 0;
306 rconn->newXactForCursor = false;
307 }
308
309 /* first check for valid foreign data server */
310 connstr = get_connect_string(conname_or_str);
311 if (connstr == NULL)
312 connstr = conname_or_str;
313
314 /* check password in connection string if not superuser */
315 dblink_connstr_check(connstr);
316
317 /*
318 * We must obey fd.c's limit on non-virtual file descriptors. Assume that
319 * a PGconn represents one long-lived FD. (Doing this here also ensures
320 * that VFDs are closed if needed to make room.)
321 */
322 if (!AcquireExternalFD())
323 {
324 #ifndef WIN32 /* can't write #if within ereport() macro */
325 ereport(ERROR,
326 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
327 errmsg("could not establish connection"),
328 errdetail("There are too many open files on the local server."),
329 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
330 #else
331 ereport(ERROR,
332 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
333 errmsg("could not establish connection"),
334 errdetail("There are too many open files on the local server."),
335 errhint("Raise the server's max_files_per_process setting.")));
336 #endif
337 }
338
339 /* OK to make connection */
340 conn = PQconnectdb(connstr);
341
342 if (PQstatus(conn) == CONNECTION_BAD)
343 {
344 msg = pchomp(PQerrorMessage(conn));
345 PQfinish(conn);
346 ReleaseExternalFD();
347 if (rconn)
348 pfree(rconn);
349
350 ereport(ERROR,
351 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
352 errmsg("could not establish connection"),
353 errdetail_internal("%s", msg)));
354 }
355
356 /* check password actually used if not superuser */
357 dblink_security_check(conn, rconn);
358
359 /* attempt to set client encoding to match server encoding, if needed */
360 if (PQclientEncoding(conn) != GetDatabaseEncoding())
361 PQsetClientEncoding(conn, GetDatabaseEncodingName());
362
363 if (connname)
364 {
365 rconn->conn = conn;
366 createNewConnection(connname, rconn);
367 }
368 else
369 {
370 if (pconn->conn)
371 {
372 PQfinish(pconn->conn);
373 ReleaseExternalFD();
374 }
375 pconn->conn = conn;
376 }
377
378 PG_RETURN_TEXT_P(cstring_to_text("OK"));
379 }
380
381 /*
382 * Clear a persistent connection to another database
383 */
384 PG_FUNCTION_INFO_V1(dblink_disconnect);
385 Datum
dblink_disconnect(PG_FUNCTION_ARGS)386 dblink_disconnect(PG_FUNCTION_ARGS)
387 {
388 char *conname = NULL;
389 remoteConn *rconn = NULL;
390 PGconn *conn = NULL;
391
392 dblink_init();
393
394 if (PG_NARGS() == 1)
395 {
396 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
397 rconn = getConnectionByName(conname);
398 if (rconn)
399 conn = rconn->conn;
400 }
401 else
402 conn = pconn->conn;
403
404 if (!conn)
405 dblink_conn_not_avail(conname);
406
407 PQfinish(conn);
408 ReleaseExternalFD();
409 if (rconn)
410 {
411 deleteConnection(conname);
412 pfree(rconn);
413 }
414 else
415 pconn->conn = NULL;
416
417 PG_RETURN_TEXT_P(cstring_to_text("OK"));
418 }
419
420 /*
421 * opens a cursor using a persistent connection
422 */
423 PG_FUNCTION_INFO_V1(dblink_open);
424 Datum
dblink_open(PG_FUNCTION_ARGS)425 dblink_open(PG_FUNCTION_ARGS)
426 {
427 PGresult *res = NULL;
428 PGconn *conn;
429 char *curname = NULL;
430 char *sql = NULL;
431 char *conname = NULL;
432 StringInfoData buf;
433 remoteConn *rconn = NULL;
434 bool fail = true; /* default to backward compatible behavior */
435
436 dblink_init();
437 initStringInfo(&buf);
438
439 if (PG_NARGS() == 2)
440 {
441 /* text,text */
442 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
443 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
444 rconn = pconn;
445 }
446 else if (PG_NARGS() == 3)
447 {
448 /* might be text,text,text or text,text,bool */
449 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
450 {
451 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
452 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
453 fail = PG_GETARG_BOOL(2);
454 rconn = pconn;
455 }
456 else
457 {
458 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
459 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
460 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
461 rconn = getConnectionByName(conname);
462 }
463 }
464 else if (PG_NARGS() == 4)
465 {
466 /* text,text,text,bool */
467 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
468 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
469 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
470 fail = PG_GETARG_BOOL(3);
471 rconn = getConnectionByName(conname);
472 }
473
474 if (!rconn || !rconn->conn)
475 dblink_conn_not_avail(conname);
476
477 conn = rconn->conn;
478
479 /* If we are not in a transaction, start one */
480 if (PQtransactionStatus(conn) == PQTRANS_IDLE)
481 {
482 res = PQexec(conn, "BEGIN");
483 if (PQresultStatus(res) != PGRES_COMMAND_OK)
484 dblink_res_internalerror(conn, res, "begin error");
485 PQclear(res);
486 rconn->newXactForCursor = true;
487
488 /*
489 * Since transaction state was IDLE, we force cursor count to
490 * initially be 0. This is needed as a previous ABORT might have wiped
491 * out our transaction without maintaining the cursor count for us.
492 */
493 rconn->openCursorCount = 0;
494 }
495
496 /* if we started a transaction, increment cursor count */
497 if (rconn->newXactForCursor)
498 (rconn->openCursorCount)++;
499
500 appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
501 res = PQexec(conn, buf.data);
502 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
503 {
504 dblink_res_error(conn, conname, res, fail,
505 "while opening cursor \"%s\"", curname);
506 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
507 }
508
509 PQclear(res);
510 PG_RETURN_TEXT_P(cstring_to_text("OK"));
511 }
512
513 /*
514 * closes a cursor
515 */
516 PG_FUNCTION_INFO_V1(dblink_close);
517 Datum
dblink_close(PG_FUNCTION_ARGS)518 dblink_close(PG_FUNCTION_ARGS)
519 {
520 PGconn *conn;
521 PGresult *res = NULL;
522 char *curname = NULL;
523 char *conname = NULL;
524 StringInfoData buf;
525 remoteConn *rconn = NULL;
526 bool fail = true; /* default to backward compatible behavior */
527
528 dblink_init();
529 initStringInfo(&buf);
530
531 if (PG_NARGS() == 1)
532 {
533 /* text */
534 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
535 rconn = pconn;
536 }
537 else if (PG_NARGS() == 2)
538 {
539 /* might be text,text or text,bool */
540 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
541 {
542 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
543 fail = PG_GETARG_BOOL(1);
544 rconn = pconn;
545 }
546 else
547 {
548 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
549 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
550 rconn = getConnectionByName(conname);
551 }
552 }
553 if (PG_NARGS() == 3)
554 {
555 /* text,text,bool */
556 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
557 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
558 fail = PG_GETARG_BOOL(2);
559 rconn = getConnectionByName(conname);
560 }
561
562 if (!rconn || !rconn->conn)
563 dblink_conn_not_avail(conname);
564
565 conn = rconn->conn;
566
567 appendStringInfo(&buf, "CLOSE %s", curname);
568
569 /* close the cursor */
570 res = PQexec(conn, buf.data);
571 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
572 {
573 dblink_res_error(conn, conname, res, fail,
574 "while closing cursor \"%s\"", curname);
575 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
576 }
577
578 PQclear(res);
579
580 /* if we started a transaction, decrement cursor count */
581 if (rconn->newXactForCursor)
582 {
583 (rconn->openCursorCount)--;
584
585 /* if count is zero, commit the transaction */
586 if (rconn->openCursorCount == 0)
587 {
588 rconn->newXactForCursor = false;
589
590 res = PQexec(conn, "COMMIT");
591 if (PQresultStatus(res) != PGRES_COMMAND_OK)
592 dblink_res_internalerror(conn, res, "commit error");
593 PQclear(res);
594 }
595 }
596
597 PG_RETURN_TEXT_P(cstring_to_text("OK"));
598 }
599
600 /*
601 * Fetch results from an open cursor
602 */
603 PG_FUNCTION_INFO_V1(dblink_fetch);
604 Datum
dblink_fetch(PG_FUNCTION_ARGS)605 dblink_fetch(PG_FUNCTION_ARGS)
606 {
607 PGresult *res = NULL;
608 char *conname = NULL;
609 remoteConn *rconn = NULL;
610 PGconn *conn = NULL;
611 StringInfoData buf;
612 char *curname = NULL;
613 int howmany = 0;
614 bool fail = true; /* default to backward compatible */
615
616 prepTuplestoreResult(fcinfo);
617
618 dblink_init();
619
620 if (PG_NARGS() == 4)
621 {
622 /* text,text,int,bool */
623 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
624 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
625 howmany = PG_GETARG_INT32(2);
626 fail = PG_GETARG_BOOL(3);
627
628 rconn = getConnectionByName(conname);
629 if (rconn)
630 conn = rconn->conn;
631 }
632 else if (PG_NARGS() == 3)
633 {
634 /* text,text,int or text,int,bool */
635 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
636 {
637 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
638 howmany = PG_GETARG_INT32(1);
639 fail = PG_GETARG_BOOL(2);
640 conn = pconn->conn;
641 }
642 else
643 {
644 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
645 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
646 howmany = PG_GETARG_INT32(2);
647
648 rconn = getConnectionByName(conname);
649 if (rconn)
650 conn = rconn->conn;
651 }
652 }
653 else if (PG_NARGS() == 2)
654 {
655 /* text,int */
656 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
657 howmany = PG_GETARG_INT32(1);
658 conn = pconn->conn;
659 }
660
661 if (!conn)
662 dblink_conn_not_avail(conname);
663
664 initStringInfo(&buf);
665 appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
666
667 /*
668 * Try to execute the query. Note that since libpq uses malloc, the
669 * PGresult will be long-lived even though we are still in a short-lived
670 * memory context.
671 */
672 res = PQexec(conn, buf.data);
673 if (!res ||
674 (PQresultStatus(res) != PGRES_COMMAND_OK &&
675 PQresultStatus(res) != PGRES_TUPLES_OK))
676 {
677 dblink_res_error(conn, conname, res, fail,
678 "while fetching from cursor \"%s\"", curname);
679 return (Datum) 0;
680 }
681 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
682 {
683 /* cursor does not exist - closed already or bad name */
684 PQclear(res);
685 ereport(ERROR,
686 (errcode(ERRCODE_INVALID_CURSOR_NAME),
687 errmsg("cursor \"%s\" does not exist", curname)));
688 }
689
690 materializeResult(fcinfo, conn, res);
691 return (Datum) 0;
692 }
693
694 /*
695 * Note: this is the new preferred version of dblink
696 */
697 PG_FUNCTION_INFO_V1(dblink_record);
698 Datum
dblink_record(PG_FUNCTION_ARGS)699 dblink_record(PG_FUNCTION_ARGS)
700 {
701 return dblink_record_internal(fcinfo, false);
702 }
703
704 PG_FUNCTION_INFO_V1(dblink_send_query);
705 Datum
dblink_send_query(PG_FUNCTION_ARGS)706 dblink_send_query(PG_FUNCTION_ARGS)
707 {
708 PGconn *conn;
709 char *sql;
710 int retval;
711
712 if (PG_NARGS() == 2)
713 {
714 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
715 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
716 }
717 else
718 /* shouldn't happen */
719 elog(ERROR, "wrong number of arguments");
720
721 /* async query send */
722 retval = PQsendQuery(conn, sql);
723 if (retval != 1)
724 elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
725
726 PG_RETURN_INT32(retval);
727 }
728
729 PG_FUNCTION_INFO_V1(dblink_get_result);
730 Datum
dblink_get_result(PG_FUNCTION_ARGS)731 dblink_get_result(PG_FUNCTION_ARGS)
732 {
733 return dblink_record_internal(fcinfo, true);
734 }
735
736 static Datum
dblink_record_internal(FunctionCallInfo fcinfo,bool is_async)737 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
738 {
739 PGconn *volatile conn = NULL;
740 volatile bool freeconn = false;
741
742 prepTuplestoreResult(fcinfo);
743
744 dblink_init();
745
746 PG_TRY();
747 {
748 char *sql = NULL;
749 char *conname = NULL;
750 bool fail = true; /* default to backward compatible */
751
752 if (!is_async)
753 {
754 if (PG_NARGS() == 3)
755 {
756 /* text,text,bool */
757 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
758 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
759 fail = PG_GETARG_BOOL(2);
760 dblink_get_conn(conname, &conn, &conname, &freeconn);
761 }
762 else if (PG_NARGS() == 2)
763 {
764 /* text,text or text,bool */
765 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
766 {
767 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
768 fail = PG_GETARG_BOOL(1);
769 conn = pconn->conn;
770 }
771 else
772 {
773 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
774 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
775 dblink_get_conn(conname, &conn, &conname, &freeconn);
776 }
777 }
778 else if (PG_NARGS() == 1)
779 {
780 /* text */
781 conn = pconn->conn;
782 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
783 }
784 else
785 /* shouldn't happen */
786 elog(ERROR, "wrong number of arguments");
787 }
788 else /* is_async */
789 {
790 /* get async result */
791 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
792
793 if (PG_NARGS() == 2)
794 {
795 /* text,bool */
796 fail = PG_GETARG_BOOL(1);
797 conn = dblink_get_named_conn(conname);
798 }
799 else if (PG_NARGS() == 1)
800 {
801 /* text */
802 conn = dblink_get_named_conn(conname);
803 }
804 else
805 /* shouldn't happen */
806 elog(ERROR, "wrong number of arguments");
807 }
808
809 if (!conn)
810 dblink_conn_not_avail(conname);
811
812 if (!is_async)
813 {
814 /* synchronous query, use efficient tuple collection method */
815 materializeQueryResult(fcinfo, conn, conname, sql, fail);
816 }
817 else
818 {
819 /* async result retrieval, do it the old way */
820 PGresult *res = PQgetResult(conn);
821
822 /* NULL means we're all done with the async results */
823 if (res)
824 {
825 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
826 PQresultStatus(res) != PGRES_TUPLES_OK)
827 {
828 dblink_res_error(conn, conname, res, fail,
829 "while executing query");
830 /* if fail isn't set, we'll return an empty query result */
831 }
832 else
833 {
834 materializeResult(fcinfo, conn, res);
835 }
836 }
837 }
838 }
839 PG_FINALLY();
840 {
841 /* if needed, close the connection to the database */
842 if (freeconn)
843 {
844 PQfinish(conn);
845 ReleaseExternalFD();
846 }
847 }
848 PG_END_TRY();
849
850 return (Datum) 0;
851 }
852
853 /*
854 * Verify function caller can handle a tuplestore result, and set up for that.
855 *
856 * Note: if the caller returns without actually creating a tuplestore, the
857 * executor will treat the function result as an empty set.
858 */
859 static void
prepTuplestoreResult(FunctionCallInfo fcinfo)860 prepTuplestoreResult(FunctionCallInfo fcinfo)
861 {
862 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
863
864 /* check to see if query supports us returning a tuplestore */
865 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
866 ereport(ERROR,
867 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
868 errmsg("set-valued function called in context that cannot accept a set")));
869 if (!(rsinfo->allowedModes & SFRM_Materialize))
870 ereport(ERROR,
871 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
872 errmsg("materialize mode required, but it is not allowed in this context")));
873
874 /* let the executor know we're sending back a tuplestore */
875 rsinfo->returnMode = SFRM_Materialize;
876
877 /* caller must fill these to return a non-empty result */
878 rsinfo->setResult = NULL;
879 rsinfo->setDesc = NULL;
880 }
881
882 /*
883 * Copy the contents of the PGresult into a tuplestore to be returned
884 * as the result of the current function.
885 * The PGresult will be released in this function.
886 */
887 static void
materializeResult(FunctionCallInfo fcinfo,PGconn * conn,PGresult * res)888 materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
889 {
890 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
891
892 /* prepTuplestoreResult must have been called previously */
893 Assert(rsinfo->returnMode == SFRM_Materialize);
894
895 PG_TRY();
896 {
897 TupleDesc tupdesc;
898 bool is_sql_cmd;
899 int ntuples;
900 int nfields;
901
902 if (PQresultStatus(res) == PGRES_COMMAND_OK)
903 {
904 is_sql_cmd = true;
905
906 /*
907 * need a tuple descriptor representing one TEXT column to return
908 * the command status string as our result tuple
909 */
910 tupdesc = CreateTemplateTupleDesc(1);
911 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
912 TEXTOID, -1, 0);
913 ntuples = 1;
914 nfields = 1;
915 }
916 else
917 {
918 Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
919
920 is_sql_cmd = false;
921
922 /* get a tuple descriptor for our result type */
923 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
924 {
925 case TYPEFUNC_COMPOSITE:
926 /* success */
927 break;
928 case TYPEFUNC_RECORD:
929 /* failed to determine actual type of RECORD */
930 ereport(ERROR,
931 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
932 errmsg("function returning record called in context "
933 "that cannot accept type record")));
934 break;
935 default:
936 /* result type isn't composite */
937 elog(ERROR, "return type must be a row type");
938 break;
939 }
940
941 /* make sure we have a persistent copy of the tupdesc */
942 tupdesc = CreateTupleDescCopy(tupdesc);
943 ntuples = PQntuples(res);
944 nfields = PQnfields(res);
945 }
946
947 /*
948 * check result and tuple descriptor have the same number of columns
949 */
950 if (nfields != tupdesc->natts)
951 ereport(ERROR,
952 (errcode(ERRCODE_DATATYPE_MISMATCH),
953 errmsg("remote query result rowtype does not match "
954 "the specified FROM clause rowtype")));
955
956 if (ntuples > 0)
957 {
958 AttInMetadata *attinmeta;
959 int nestlevel = -1;
960 Tuplestorestate *tupstore;
961 MemoryContext oldcontext;
962 int row;
963 char **values;
964
965 attinmeta = TupleDescGetAttInMetadata(tupdesc);
966
967 /* Set GUCs to ensure we read GUC-sensitive data types correctly */
968 if (!is_sql_cmd)
969 nestlevel = applyRemoteGucs(conn);
970
971 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
972 tupstore = tuplestore_begin_heap(true, false, work_mem);
973 rsinfo->setResult = tupstore;
974 rsinfo->setDesc = tupdesc;
975 MemoryContextSwitchTo(oldcontext);
976
977 values = (char **) palloc(nfields * sizeof(char *));
978
979 /* put all tuples into the tuplestore */
980 for (row = 0; row < ntuples; row++)
981 {
982 HeapTuple tuple;
983
984 if (!is_sql_cmd)
985 {
986 int i;
987
988 for (i = 0; i < nfields; i++)
989 {
990 if (PQgetisnull(res, row, i))
991 values[i] = NULL;
992 else
993 values[i] = PQgetvalue(res, row, i);
994 }
995 }
996 else
997 {
998 values[0] = PQcmdStatus(res);
999 }
1000
1001 /* build the tuple and put it into the tuplestore. */
1002 tuple = BuildTupleFromCStrings(attinmeta, values);
1003 tuplestore_puttuple(tupstore, tuple);
1004 }
1005
1006 /* clean up GUC settings, if we changed any */
1007 restoreLocalGucs(nestlevel);
1008
1009 /* clean up and return the tuplestore */
1010 tuplestore_donestoring(tupstore);
1011 }
1012 }
1013 PG_FINALLY();
1014 {
1015 /* be sure to release the libpq result */
1016 PQclear(res);
1017 }
1018 PG_END_TRY();
1019 }
1020
1021 /*
1022 * Execute the given SQL command and store its results into a tuplestore
1023 * to be returned as the result of the current function.
1024 *
1025 * This is equivalent to PQexec followed by materializeResult, but we make
1026 * use of libpq's single-row mode to avoid accumulating the whole result
1027 * inside libpq before it gets transferred to the tuplestore.
1028 */
1029 static void
materializeQueryResult(FunctionCallInfo fcinfo,PGconn * conn,const char * conname,const char * sql,bool fail)1030 materializeQueryResult(FunctionCallInfo fcinfo,
1031 PGconn *conn,
1032 const char *conname,
1033 const char *sql,
1034 bool fail)
1035 {
1036 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1037 PGresult *volatile res = NULL;
1038 volatile storeInfo sinfo = {0};
1039
1040 /* prepTuplestoreResult must have been called previously */
1041 Assert(rsinfo->returnMode == SFRM_Materialize);
1042
1043 sinfo.fcinfo = fcinfo;
1044
1045 PG_TRY();
1046 {
1047 /* Create short-lived memory context for data conversions */
1048 sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
1049 "dblink temporary context",
1050 ALLOCSET_DEFAULT_SIZES);
1051
1052 /* execute query, collecting any tuples into the tuplestore */
1053 res = storeQueryResult(&sinfo, conn, sql);
1054
1055 if (!res ||
1056 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1057 PQresultStatus(res) != PGRES_TUPLES_OK))
1058 {
1059 /*
1060 * dblink_res_error will clear the passed PGresult, so we need
1061 * this ugly dance to avoid doing so twice during error exit
1062 */
1063 PGresult *res1 = res;
1064
1065 res = NULL;
1066 dblink_res_error(conn, conname, res1, fail,
1067 "while executing query");
1068 /* if fail isn't set, we'll return an empty query result */
1069 }
1070 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1071 {
1072 /*
1073 * storeRow didn't get called, so we need to convert the command
1074 * status string to a tuple manually
1075 */
1076 TupleDesc tupdesc;
1077 AttInMetadata *attinmeta;
1078 Tuplestorestate *tupstore;
1079 HeapTuple tuple;
1080 char *values[1];
1081 MemoryContext oldcontext;
1082
1083 /*
1084 * need a tuple descriptor representing one TEXT column to return
1085 * the command status string as our result tuple
1086 */
1087 tupdesc = CreateTemplateTupleDesc(1);
1088 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1089 TEXTOID, -1, 0);
1090 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1091
1092 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1093 tupstore = tuplestore_begin_heap(true, false, work_mem);
1094 rsinfo->setResult = tupstore;
1095 rsinfo->setDesc = tupdesc;
1096 MemoryContextSwitchTo(oldcontext);
1097
1098 values[0] = PQcmdStatus(res);
1099
1100 /* build the tuple and put it into the tuplestore. */
1101 tuple = BuildTupleFromCStrings(attinmeta, values);
1102 tuplestore_puttuple(tupstore, tuple);
1103
1104 PQclear(res);
1105 res = NULL;
1106 }
1107 else
1108 {
1109 Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1110 /* storeRow should have created a tuplestore */
1111 Assert(rsinfo->setResult != NULL);
1112
1113 PQclear(res);
1114 res = NULL;
1115 }
1116
1117 /* clean up data conversion short-lived memory context */
1118 if (sinfo.tmpcontext != NULL)
1119 MemoryContextDelete(sinfo.tmpcontext);
1120 sinfo.tmpcontext = NULL;
1121
1122 PQclear(sinfo.last_res);
1123 sinfo.last_res = NULL;
1124 PQclear(sinfo.cur_res);
1125 sinfo.cur_res = NULL;
1126 }
1127 PG_CATCH();
1128 {
1129 /* be sure to release any libpq result we collected */
1130 PQclear(res);
1131 PQclear(sinfo.last_res);
1132 PQclear(sinfo.cur_res);
1133 /* and clear out any pending data in libpq */
1134 while ((res = PQgetResult(conn)) != NULL)
1135 PQclear(res);
1136 PG_RE_THROW();
1137 }
1138 PG_END_TRY();
1139 }
1140
1141 /*
1142 * Execute query, and send any result rows to sinfo->tuplestore.
1143 */
1144 static PGresult *
storeQueryResult(volatile storeInfo * sinfo,PGconn * conn,const char * sql)1145 storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
1146 {
1147 bool first = true;
1148 int nestlevel = -1;
1149 PGresult *res;
1150
1151 if (!PQsendQuery(conn, sql))
1152 elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1153
1154 if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
1155 elog(ERROR, "failed to set single-row mode for dblink query");
1156
1157 for (;;)
1158 {
1159 CHECK_FOR_INTERRUPTS();
1160
1161 sinfo->cur_res = PQgetResult(conn);
1162 if (!sinfo->cur_res)
1163 break;
1164
1165 if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1166 {
1167 /* got one row from possibly-bigger resultset */
1168
1169 /*
1170 * Set GUCs to ensure we read GUC-sensitive data types correctly.
1171 * We shouldn't do this until we have a row in hand, to ensure
1172 * libpq has seen any earlier ParameterStatus protocol messages.
1173 */
1174 if (first && nestlevel < 0)
1175 nestlevel = applyRemoteGucs(conn);
1176
1177 storeRow(sinfo, sinfo->cur_res, first);
1178
1179 PQclear(sinfo->cur_res);
1180 sinfo->cur_res = NULL;
1181 first = false;
1182 }
1183 else
1184 {
1185 /* if empty resultset, fill tuplestore header */
1186 if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1187 storeRow(sinfo, sinfo->cur_res, first);
1188
1189 /* store completed result at last_res */
1190 PQclear(sinfo->last_res);
1191 sinfo->last_res = sinfo->cur_res;
1192 sinfo->cur_res = NULL;
1193 first = true;
1194 }
1195 }
1196
1197 /* clean up GUC settings, if we changed any */
1198 restoreLocalGucs(nestlevel);
1199
1200 /* return last_res */
1201 res = sinfo->last_res;
1202 sinfo->last_res = NULL;
1203 return res;
1204 }
1205
1206 /*
1207 * Send single row to sinfo->tuplestore.
1208 *
1209 * If "first" is true, create the tuplestore using PGresult's metadata
1210 * (in this case the PGresult might contain either zero or one row).
1211 */
1212 static void
storeRow(volatile storeInfo * sinfo,PGresult * res,bool first)1213 storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
1214 {
1215 int nfields = PQnfields(res);
1216 HeapTuple tuple;
1217 int i;
1218 MemoryContext oldcontext;
1219
1220 if (first)
1221 {
1222 /* Prepare for new result set */
1223 ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1224 TupleDesc tupdesc;
1225
1226 /*
1227 * It's possible to get more than one result set if the query string
1228 * contained multiple SQL commands. In that case, we follow PQexec's
1229 * traditional behavior of throwing away all but the last result.
1230 */
1231 if (sinfo->tuplestore)
1232 tuplestore_end(sinfo->tuplestore);
1233 sinfo->tuplestore = NULL;
1234
1235 /* get a tuple descriptor for our result type */
1236 switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1237 {
1238 case TYPEFUNC_COMPOSITE:
1239 /* success */
1240 break;
1241 case TYPEFUNC_RECORD:
1242 /* failed to determine actual type of RECORD */
1243 ereport(ERROR,
1244 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1245 errmsg("function returning record called in context "
1246 "that cannot accept type record")));
1247 break;
1248 default:
1249 /* result type isn't composite */
1250 elog(ERROR, "return type must be a row type");
1251 break;
1252 }
1253
1254 /* make sure we have a persistent copy of the tupdesc */
1255 tupdesc = CreateTupleDescCopy(tupdesc);
1256
1257 /* check result and tuple descriptor have the same number of columns */
1258 if (nfields != tupdesc->natts)
1259 ereport(ERROR,
1260 (errcode(ERRCODE_DATATYPE_MISMATCH),
1261 errmsg("remote query result rowtype does not match "
1262 "the specified FROM clause rowtype")));
1263
1264 /* Prepare attinmeta for later data conversions */
1265 sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1266
1267 /* Create a new, empty tuplestore */
1268 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1269 sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1270 rsinfo->setResult = sinfo->tuplestore;
1271 rsinfo->setDesc = tupdesc;
1272 MemoryContextSwitchTo(oldcontext);
1273
1274 /* Done if empty resultset */
1275 if (PQntuples(res) == 0)
1276 return;
1277
1278 /*
1279 * Set up sufficiently-wide string pointers array; this won't change
1280 * in size so it's easy to preallocate.
1281 */
1282 if (sinfo->cstrs)
1283 pfree(sinfo->cstrs);
1284 sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
1285 }
1286
1287 /* Should have a single-row result if we get here */
1288 Assert(PQntuples(res) == 1);
1289
1290 /*
1291 * Do the following work in a temp context that we reset after each tuple.
1292 * This cleans up not only the data we have direct access to, but any
1293 * cruft the I/O functions might leak.
1294 */
1295 oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1296
1297 /*
1298 * Fill cstrs with null-terminated strings of column values.
1299 */
1300 for (i = 0; i < nfields; i++)
1301 {
1302 if (PQgetisnull(res, 0, i))
1303 sinfo->cstrs[i] = NULL;
1304 else
1305 sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1306 }
1307
1308 /* Convert row to a tuple, and add it to the tuplestore */
1309 tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1310
1311 tuplestore_puttuple(sinfo->tuplestore, tuple);
1312
1313 /* Clean up */
1314 MemoryContextSwitchTo(oldcontext);
1315 MemoryContextReset(sinfo->tmpcontext);
1316 }
1317
1318 /*
1319 * List all open dblink connections by name.
1320 * Returns an array of all connection names.
1321 * Takes no params
1322 */
1323 PG_FUNCTION_INFO_V1(dblink_get_connections);
1324 Datum
dblink_get_connections(PG_FUNCTION_ARGS)1325 dblink_get_connections(PG_FUNCTION_ARGS)
1326 {
1327 HASH_SEQ_STATUS status;
1328 remoteConnHashEnt *hentry;
1329 ArrayBuildState *astate = NULL;
1330
1331 if (remoteConnHash)
1332 {
1333 hash_seq_init(&status, remoteConnHash);
1334 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1335 {
1336 /* stash away current value */
1337 astate = accumArrayResult(astate,
1338 CStringGetTextDatum(hentry->name),
1339 false, TEXTOID, CurrentMemoryContext);
1340 }
1341 }
1342
1343 if (astate)
1344 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
1345 CurrentMemoryContext));
1346 else
1347 PG_RETURN_NULL();
1348 }
1349
1350 /*
1351 * Checks if a given remote connection is busy
1352 *
1353 * Returns 1 if the connection is busy, 0 otherwise
1354 * Params:
1355 * text connection_name - name of the connection to check
1356 *
1357 */
1358 PG_FUNCTION_INFO_V1(dblink_is_busy);
1359 Datum
dblink_is_busy(PG_FUNCTION_ARGS)1360 dblink_is_busy(PG_FUNCTION_ARGS)
1361 {
1362 PGconn *conn;
1363
1364 dblink_init();
1365 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1366
1367 PQconsumeInput(conn);
1368 PG_RETURN_INT32(PQisBusy(conn));
1369 }
1370
1371 /*
1372 * Cancels a running request on a connection
1373 *
1374 * Returns text:
1375 * "OK" if the cancel request has been sent correctly,
1376 * an error message otherwise
1377 *
1378 * Params:
1379 * text connection_name - name of the connection to check
1380 *
1381 */
1382 PG_FUNCTION_INFO_V1(dblink_cancel_query);
1383 Datum
dblink_cancel_query(PG_FUNCTION_ARGS)1384 dblink_cancel_query(PG_FUNCTION_ARGS)
1385 {
1386 int res;
1387 PGconn *conn;
1388 PGcancel *cancel;
1389 char errbuf[256];
1390
1391 dblink_init();
1392 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1393 cancel = PQgetCancel(conn);
1394
1395 res = PQcancel(cancel, errbuf, 256);
1396 PQfreeCancel(cancel);
1397
1398 if (res == 1)
1399 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1400 else
1401 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
1402 }
1403
1404
1405 /*
1406 * Get error message from a connection
1407 *
1408 * Returns text:
1409 * "OK" if no error, an error message otherwise
1410 *
1411 * Params:
1412 * text connection_name - name of the connection to check
1413 *
1414 */
1415 PG_FUNCTION_INFO_V1(dblink_error_message);
1416 Datum
dblink_error_message(PG_FUNCTION_ARGS)1417 dblink_error_message(PG_FUNCTION_ARGS)
1418 {
1419 char *msg;
1420 PGconn *conn;
1421
1422 dblink_init();
1423 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1424
1425 msg = PQerrorMessage(conn);
1426 if (msg == NULL || msg[0] == '\0')
1427 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1428 else
1429 PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1430 }
1431
1432 /*
1433 * Execute an SQL non-SELECT command
1434 */
1435 PG_FUNCTION_INFO_V1(dblink_exec);
1436 Datum
dblink_exec(PG_FUNCTION_ARGS)1437 dblink_exec(PG_FUNCTION_ARGS)
1438 {
1439 text *volatile sql_cmd_status = NULL;
1440 PGconn *volatile conn = NULL;
1441 volatile bool freeconn = false;
1442
1443 dblink_init();
1444
1445 PG_TRY();
1446 {
1447 PGresult *res = NULL;
1448 char *sql = NULL;
1449 char *conname = NULL;
1450 bool fail = true; /* default to backward compatible behavior */
1451
1452 if (PG_NARGS() == 3)
1453 {
1454 /* must be text,text,bool */
1455 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1456 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1457 fail = PG_GETARG_BOOL(2);
1458 dblink_get_conn(conname, &conn, &conname, &freeconn);
1459 }
1460 else if (PG_NARGS() == 2)
1461 {
1462 /* might be text,text or text,bool */
1463 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1464 {
1465 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1466 fail = PG_GETARG_BOOL(1);
1467 conn = pconn->conn;
1468 }
1469 else
1470 {
1471 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1472 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1473 dblink_get_conn(conname, &conn, &conname, &freeconn);
1474 }
1475 }
1476 else if (PG_NARGS() == 1)
1477 {
1478 /* must be single text argument */
1479 conn = pconn->conn;
1480 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1481 }
1482 else
1483 /* shouldn't happen */
1484 elog(ERROR, "wrong number of arguments");
1485
1486 if (!conn)
1487 dblink_conn_not_avail(conname);
1488
1489 res = PQexec(conn, sql);
1490 if (!res ||
1491 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1492 PQresultStatus(res) != PGRES_TUPLES_OK))
1493 {
1494 dblink_res_error(conn, conname, res, fail,
1495 "while executing command");
1496
1497 /*
1498 * and save a copy of the command status string to return as our
1499 * result tuple
1500 */
1501 sql_cmd_status = cstring_to_text("ERROR");
1502 }
1503 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1504 {
1505 /*
1506 * and save a copy of the command status string to return as our
1507 * result tuple
1508 */
1509 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1510 PQclear(res);
1511 }
1512 else
1513 {
1514 PQclear(res);
1515 ereport(ERROR,
1516 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1517 errmsg("statement returning results not allowed")));
1518 }
1519 }
1520 PG_FINALLY();
1521 {
1522 /* if needed, close the connection to the database */
1523 if (freeconn)
1524 {
1525 PQfinish(conn);
1526 ReleaseExternalFD();
1527 }
1528 }
1529 PG_END_TRY();
1530
1531 PG_RETURN_TEXT_P(sql_cmd_status);
1532 }
1533
1534
1535 /*
1536 * dblink_get_pkey
1537 *
1538 * Return list of primary key fields for the supplied relation,
1539 * or NULL if none exists.
1540 */
1541 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1542 Datum
dblink_get_pkey(PG_FUNCTION_ARGS)1543 dblink_get_pkey(PG_FUNCTION_ARGS)
1544 {
1545 int16 indnkeyatts;
1546 char **results;
1547 FuncCallContext *funcctx;
1548 int32 call_cntr;
1549 int32 max_calls;
1550 AttInMetadata *attinmeta;
1551 MemoryContext oldcontext;
1552
1553 /* stuff done only on the first call of the function */
1554 if (SRF_IS_FIRSTCALL())
1555 {
1556 Relation rel;
1557 TupleDesc tupdesc;
1558
1559 /* create a function context for cross-call persistence */
1560 funcctx = SRF_FIRSTCALL_INIT();
1561
1562 /*
1563 * switch to memory context appropriate for multiple function calls
1564 */
1565 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1566
1567 /* open target relation */
1568 rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1569
1570 /* get the array of attnums */
1571 results = get_pkey_attnames(rel, &indnkeyatts);
1572
1573 relation_close(rel, AccessShareLock);
1574
1575 /*
1576 * need a tuple descriptor representing one INT and one TEXT column
1577 */
1578 tupdesc = CreateTemplateTupleDesc(2);
1579 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1580 INT4OID, -1, 0);
1581 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1582 TEXTOID, -1, 0);
1583
1584 /*
1585 * Generate attribute metadata needed later to produce tuples from raw
1586 * C strings
1587 */
1588 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1589 funcctx->attinmeta = attinmeta;
1590
1591 if ((results != NULL) && (indnkeyatts > 0))
1592 {
1593 funcctx->max_calls = indnkeyatts;
1594
1595 /* got results, keep track of them */
1596 funcctx->user_fctx = results;
1597 }
1598 else
1599 {
1600 /* fast track when no results */
1601 MemoryContextSwitchTo(oldcontext);
1602 SRF_RETURN_DONE(funcctx);
1603 }
1604
1605 MemoryContextSwitchTo(oldcontext);
1606 }
1607
1608 /* stuff done on every call of the function */
1609 funcctx = SRF_PERCALL_SETUP();
1610
1611 /*
1612 * initialize per-call variables
1613 */
1614 call_cntr = funcctx->call_cntr;
1615 max_calls = funcctx->max_calls;
1616
1617 results = (char **) funcctx->user_fctx;
1618 attinmeta = funcctx->attinmeta;
1619
1620 if (call_cntr < max_calls) /* do when there is more left to send */
1621 {
1622 char **values;
1623 HeapTuple tuple;
1624 Datum result;
1625
1626 values = (char **) palloc(2 * sizeof(char *));
1627 values[0] = psprintf("%d", call_cntr + 1);
1628 values[1] = results[call_cntr];
1629
1630 /* build the tuple */
1631 tuple = BuildTupleFromCStrings(attinmeta, values);
1632
1633 /* make the tuple into a datum */
1634 result = HeapTupleGetDatum(tuple);
1635
1636 SRF_RETURN_NEXT(funcctx, result);
1637 }
1638 else
1639 {
1640 /* do when there is no more left */
1641 SRF_RETURN_DONE(funcctx);
1642 }
1643 }
1644
1645
1646 /*
1647 * dblink_build_sql_insert
1648 *
1649 * Used to generate an SQL insert statement
1650 * based on an existing tuple in a local relation.
1651 * This is useful for selectively replicating data
1652 * to another server via dblink.
1653 *
1654 * API:
1655 * <relname> - name of local table of interest
1656 * <pkattnums> - an int2vector of attnums which will be used
1657 * to identify the local tuple of interest
1658 * <pknumatts> - number of attnums in pkattnums
1659 * <src_pkattvals_arry> - text array of key values which will be used
1660 * to identify the local tuple of interest
1661 * <tgt_pkattvals_arry> - text array of key values which will be used
1662 * to build the string for execution remotely. These are substituted
1663 * for their counterparts in src_pkattvals_arry
1664 */
1665 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1666 Datum
dblink_build_sql_insert(PG_FUNCTION_ARGS)1667 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1668 {
1669 text *relname_text = PG_GETARG_TEXT_PP(0);
1670 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1671 int32 pknumatts_arg = PG_GETARG_INT32(2);
1672 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1673 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1674 Relation rel;
1675 int *pkattnums;
1676 int pknumatts;
1677 char **src_pkattvals;
1678 char **tgt_pkattvals;
1679 int src_nitems;
1680 int tgt_nitems;
1681 char *sql;
1682
1683 /*
1684 * Open target relation.
1685 */
1686 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1687
1688 /*
1689 * Process pkattnums argument.
1690 */
1691 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1692 &pkattnums, &pknumatts);
1693
1694 /*
1695 * Source array is made up of key values that will be used to locate the
1696 * tuple of interest from the local system.
1697 */
1698 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1699
1700 /*
1701 * There should be one source array key value for each key attnum
1702 */
1703 if (src_nitems != pknumatts)
1704 ereport(ERROR,
1705 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1706 errmsg("source key array length must match number of key attributes")));
1707
1708 /*
1709 * Target array is made up of key values that will be used to build the
1710 * SQL string for use on the remote system.
1711 */
1712 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1713
1714 /*
1715 * There should be one target array key value for each key attnum
1716 */
1717 if (tgt_nitems != pknumatts)
1718 ereport(ERROR,
1719 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1720 errmsg("target key array length must match number of key attributes")));
1721
1722 /*
1723 * Prep work is finally done. Go get the SQL string.
1724 */
1725 sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1726
1727 /*
1728 * Now we can close the relation.
1729 */
1730 relation_close(rel, AccessShareLock);
1731
1732 /*
1733 * And send it
1734 */
1735 PG_RETURN_TEXT_P(cstring_to_text(sql));
1736 }
1737
1738
1739 /*
1740 * dblink_build_sql_delete
1741 *
1742 * Used to generate an SQL delete statement.
1743 * This is useful for selectively replicating a
1744 * delete to another server via dblink.
1745 *
1746 * API:
1747 * <relname> - name of remote table of interest
1748 * <pkattnums> - an int2vector of attnums which will be used
1749 * to identify the remote tuple of interest
1750 * <pknumatts> - number of attnums in pkattnums
1751 * <tgt_pkattvals_arry> - text array of key values which will be used
1752 * to build the string for execution remotely.
1753 */
1754 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1755 Datum
dblink_build_sql_delete(PG_FUNCTION_ARGS)1756 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1757 {
1758 text *relname_text = PG_GETARG_TEXT_PP(0);
1759 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1760 int32 pknumatts_arg = PG_GETARG_INT32(2);
1761 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1762 Relation rel;
1763 int *pkattnums;
1764 int pknumatts;
1765 char **tgt_pkattvals;
1766 int tgt_nitems;
1767 char *sql;
1768
1769 /*
1770 * Open target relation.
1771 */
1772 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1773
1774 /*
1775 * Process pkattnums argument.
1776 */
1777 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1778 &pkattnums, &pknumatts);
1779
1780 /*
1781 * Target array is made up of key values that will be used to build the
1782 * SQL string for use on the remote system.
1783 */
1784 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1785
1786 /*
1787 * There should be one target array key value for each key attnum
1788 */
1789 if (tgt_nitems != pknumatts)
1790 ereport(ERROR,
1791 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1792 errmsg("target key array length must match number of key attributes")));
1793
1794 /*
1795 * Prep work is finally done. Go get the SQL string.
1796 */
1797 sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1798
1799 /*
1800 * Now we can close the relation.
1801 */
1802 relation_close(rel, AccessShareLock);
1803
1804 /*
1805 * And send it
1806 */
1807 PG_RETURN_TEXT_P(cstring_to_text(sql));
1808 }
1809
1810
1811 /*
1812 * dblink_build_sql_update
1813 *
1814 * Used to generate an SQL update statement
1815 * based on an existing tuple in a local relation.
1816 * This is useful for selectively replicating data
1817 * to another server via dblink.
1818 *
1819 * API:
1820 * <relname> - name of local table of interest
1821 * <pkattnums> - an int2vector of attnums which will be used
1822 * to identify the local tuple of interest
1823 * <pknumatts> - number of attnums in pkattnums
1824 * <src_pkattvals_arry> - text array of key values which will be used
1825 * to identify the local tuple of interest
1826 * <tgt_pkattvals_arry> - text array of key values which will be used
1827 * to build the string for execution remotely. These are substituted
1828 * for their counterparts in src_pkattvals_arry
1829 */
1830 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1831 Datum
dblink_build_sql_update(PG_FUNCTION_ARGS)1832 dblink_build_sql_update(PG_FUNCTION_ARGS)
1833 {
1834 text *relname_text = PG_GETARG_TEXT_PP(0);
1835 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1836 int32 pknumatts_arg = PG_GETARG_INT32(2);
1837 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1838 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1839 Relation rel;
1840 int *pkattnums;
1841 int pknumatts;
1842 char **src_pkattvals;
1843 char **tgt_pkattvals;
1844 int src_nitems;
1845 int tgt_nitems;
1846 char *sql;
1847
1848 /*
1849 * Open target relation.
1850 */
1851 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1852
1853 /*
1854 * Process pkattnums argument.
1855 */
1856 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1857 &pkattnums, &pknumatts);
1858
1859 /*
1860 * Source array is made up of key values that will be used to locate the
1861 * tuple of interest from the local system.
1862 */
1863 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1864
1865 /*
1866 * There should be one source array key value for each key attnum
1867 */
1868 if (src_nitems != pknumatts)
1869 ereport(ERROR,
1870 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1871 errmsg("source key array length must match number of key attributes")));
1872
1873 /*
1874 * Target array is made up of key values that will be used to build the
1875 * SQL string for use on the remote system.
1876 */
1877 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1878
1879 /*
1880 * There should be one target array key value for each key attnum
1881 */
1882 if (tgt_nitems != pknumatts)
1883 ereport(ERROR,
1884 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1885 errmsg("target key array length must match number of key attributes")));
1886
1887 /*
1888 * Prep work is finally done. Go get the SQL string.
1889 */
1890 sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1891
1892 /*
1893 * Now we can close the relation.
1894 */
1895 relation_close(rel, AccessShareLock);
1896
1897 /*
1898 * And send it
1899 */
1900 PG_RETURN_TEXT_P(cstring_to_text(sql));
1901 }
1902
1903 /*
1904 * dblink_current_query
1905 * return the current query string
1906 * to allow its use in (among other things)
1907 * rewrite rules
1908 */
1909 PG_FUNCTION_INFO_V1(dblink_current_query);
1910 Datum
dblink_current_query(PG_FUNCTION_ARGS)1911 dblink_current_query(PG_FUNCTION_ARGS)
1912 {
1913 /* This is now just an alias for the built-in function current_query() */
1914 PG_RETURN_DATUM(current_query(fcinfo));
1915 }
1916
1917 /*
1918 * Retrieve async notifications for a connection.
1919 *
1920 * Returns a setof record of notifications, or an empty set if none received.
1921 * Can optionally take a named connection as parameter, but uses the unnamed
1922 * connection per default.
1923 *
1924 */
1925 #define DBLINK_NOTIFY_COLS 3
1926
1927 PG_FUNCTION_INFO_V1(dblink_get_notify);
1928 Datum
dblink_get_notify(PG_FUNCTION_ARGS)1929 dblink_get_notify(PG_FUNCTION_ARGS)
1930 {
1931 PGconn *conn;
1932 PGnotify *notify;
1933 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1934 TupleDesc tupdesc;
1935 Tuplestorestate *tupstore;
1936 MemoryContext per_query_ctx;
1937 MemoryContext oldcontext;
1938
1939 prepTuplestoreResult(fcinfo);
1940
1941 dblink_init();
1942 if (PG_NARGS() == 1)
1943 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1944 else
1945 conn = pconn->conn;
1946
1947 /* create the tuplestore in per-query memory */
1948 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1949 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1950
1951 tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS);
1952 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
1953 TEXTOID, -1, 0);
1954 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
1955 INT4OID, -1, 0);
1956 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
1957 TEXTOID, -1, 0);
1958
1959 tupstore = tuplestore_begin_heap(true, false, work_mem);
1960 rsinfo->setResult = tupstore;
1961 rsinfo->setDesc = tupdesc;
1962
1963 MemoryContextSwitchTo(oldcontext);
1964
1965 PQconsumeInput(conn);
1966 while ((notify = PQnotifies(conn)) != NULL)
1967 {
1968 Datum values[DBLINK_NOTIFY_COLS];
1969 bool nulls[DBLINK_NOTIFY_COLS];
1970
1971 memset(values, 0, sizeof(values));
1972 memset(nulls, 0, sizeof(nulls));
1973
1974 if (notify->relname != NULL)
1975 values[0] = CStringGetTextDatum(notify->relname);
1976 else
1977 nulls[0] = true;
1978
1979 values[1] = Int32GetDatum(notify->be_pid);
1980
1981 if (notify->extra != NULL)
1982 values[2] = CStringGetTextDatum(notify->extra);
1983 else
1984 nulls[2] = true;
1985
1986 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1987
1988 PQfreemem(notify);
1989 PQconsumeInput(conn);
1990 }
1991
1992 /* clean up and return the tuplestore */
1993 tuplestore_donestoring(tupstore);
1994
1995 return (Datum) 0;
1996 }
1997
1998 /*
1999 * Validate the options given to a dblink foreign server or user mapping.
2000 * Raise an error if any option is invalid.
2001 *
2002 * We just check the names of options here, so semantic errors in options,
2003 * such as invalid numeric format, will be detected at the attempt to connect.
2004 */
2005 PG_FUNCTION_INFO_V1(dblink_fdw_validator);
2006 Datum
dblink_fdw_validator(PG_FUNCTION_ARGS)2007 dblink_fdw_validator(PG_FUNCTION_ARGS)
2008 {
2009 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
2010 Oid context = PG_GETARG_OID(1);
2011 ListCell *cell;
2012
2013 static const PQconninfoOption *options = NULL;
2014
2015 /*
2016 * Get list of valid libpq options.
2017 *
2018 * To avoid unnecessary work, we get the list once and use it throughout
2019 * the lifetime of this backend process. We don't need to care about
2020 * memory context issues, because PQconndefaults allocates with malloc.
2021 */
2022 if (!options)
2023 {
2024 options = PQconndefaults();
2025 if (!options) /* assume reason for failure is OOM */
2026 ereport(ERROR,
2027 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2028 errmsg("out of memory"),
2029 errdetail("Could not get libpq's default connection options.")));
2030 }
2031
2032 /* Validate each supplied option. */
2033 foreach(cell, options_list)
2034 {
2035 DefElem *def = (DefElem *) lfirst(cell);
2036
2037 if (!is_valid_dblink_option(options, def->defname, context))
2038 {
2039 /*
2040 * Unknown option, or invalid option for the context specified, so
2041 * complain about it. Provide a hint with list of valid options
2042 * for the context.
2043 */
2044 StringInfoData buf;
2045 const PQconninfoOption *opt;
2046
2047 initStringInfo(&buf);
2048 for (opt = options; opt->keyword; opt++)
2049 {
2050 if (is_valid_dblink_option(options, opt->keyword, context))
2051 appendStringInfo(&buf, "%s%s",
2052 (buf.len > 0) ? ", " : "",
2053 opt->keyword);
2054 }
2055 ereport(ERROR,
2056 (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
2057 errmsg("invalid option \"%s\"", def->defname),
2058 errhint("Valid options in this context are: %s",
2059 buf.data)));
2060 }
2061 }
2062
2063 PG_RETURN_VOID();
2064 }
2065
2066
2067 /*************************************************************
2068 * internal functions
2069 */
2070
2071
2072 /*
2073 * get_pkey_attnames
2074 *
2075 * Get the primary key attnames for the given relation.
2076 * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2077 */
2078 static char **
get_pkey_attnames(Relation rel,int16 * indnkeyatts)2079 get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2080 {
2081 Relation indexRelation;
2082 ScanKeyData skey;
2083 SysScanDesc scan;
2084 HeapTuple indexTuple;
2085 int i;
2086 char **result = NULL;
2087 TupleDesc tupdesc;
2088
2089 /* initialize indnkeyatts to 0 in case no primary key exists */
2090 *indnkeyatts = 0;
2091
2092 tupdesc = rel->rd_att;
2093
2094 /* Prepare to scan pg_index for entries having indrelid = this rel. */
2095 indexRelation = table_open(IndexRelationId, AccessShareLock);
2096 ScanKeyInit(&skey,
2097 Anum_pg_index_indrelid,
2098 BTEqualStrategyNumber, F_OIDEQ,
2099 ObjectIdGetDatum(RelationGetRelid(rel)));
2100
2101 scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2102 NULL, 1, &skey);
2103
2104 while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2105 {
2106 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2107
2108 /* we're only interested if it is the primary key */
2109 if (index->indisprimary)
2110 {
2111 *indnkeyatts = index->indnkeyatts;
2112 if (*indnkeyatts > 0)
2113 {
2114 result = (char **) palloc(*indnkeyatts * sizeof(char *));
2115
2116 for (i = 0; i < *indnkeyatts; i++)
2117 result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2118 }
2119 break;
2120 }
2121 }
2122
2123 systable_endscan(scan);
2124 table_close(indexRelation, AccessShareLock);
2125
2126 return result;
2127 }
2128
2129 /*
2130 * Deconstruct a text[] into C-strings (note any NULL elements will be
2131 * returned as NULL pointers)
2132 */
2133 static char **
get_text_array_contents(ArrayType * array,int * numitems)2134 get_text_array_contents(ArrayType *array, int *numitems)
2135 {
2136 int ndim = ARR_NDIM(array);
2137 int *dims = ARR_DIMS(array);
2138 int nitems;
2139 int16 typlen;
2140 bool typbyval;
2141 char typalign;
2142 char **values;
2143 char *ptr;
2144 bits8 *bitmap;
2145 int bitmask;
2146 int i;
2147
2148 Assert(ARR_ELEMTYPE(array) == TEXTOID);
2149
2150 *numitems = nitems = ArrayGetNItems(ndim, dims);
2151
2152 get_typlenbyvalalign(ARR_ELEMTYPE(array),
2153 &typlen, &typbyval, &typalign);
2154
2155 values = (char **) palloc(nitems * sizeof(char *));
2156
2157 ptr = ARR_DATA_PTR(array);
2158 bitmap = ARR_NULLBITMAP(array);
2159 bitmask = 1;
2160
2161 for (i = 0; i < nitems; i++)
2162 {
2163 if (bitmap && (*bitmap & bitmask) == 0)
2164 {
2165 values[i] = NULL;
2166 }
2167 else
2168 {
2169 values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2170 ptr = att_addlength_pointer(ptr, typlen, ptr);
2171 ptr = (char *) att_align_nominal(ptr, typalign);
2172 }
2173
2174 /* advance bitmap pointer if any */
2175 if (bitmap)
2176 {
2177 bitmask <<= 1;
2178 if (bitmask == 0x100)
2179 {
2180 bitmap++;
2181 bitmask = 1;
2182 }
2183 }
2184 }
2185
2186 return values;
2187 }
2188
2189 static char *
get_sql_insert(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals,char ** tgt_pkattvals)2190 get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2191 {
2192 char *relname;
2193 HeapTuple tuple;
2194 TupleDesc tupdesc;
2195 int natts;
2196 StringInfoData buf;
2197 char *val;
2198 int key;
2199 int i;
2200 bool needComma;
2201
2202 initStringInfo(&buf);
2203
2204 /* get relation name including any needed schema prefix and quoting */
2205 relname = generate_relation_name(rel);
2206
2207 tupdesc = rel->rd_att;
2208 natts = tupdesc->natts;
2209
2210 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2211 if (!tuple)
2212 ereport(ERROR,
2213 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2214 errmsg("source row not found")));
2215
2216 appendStringInfo(&buf, "INSERT INTO %s(", relname);
2217
2218 needComma = false;
2219 for (i = 0; i < natts; i++)
2220 {
2221 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2222
2223 if (att->attisdropped)
2224 continue;
2225
2226 if (needComma)
2227 appendStringInfoChar(&buf, ',');
2228
2229 appendStringInfoString(&buf,
2230 quote_ident_cstr(NameStr(att->attname)));
2231 needComma = true;
2232 }
2233
2234 appendStringInfoString(&buf, ") VALUES(");
2235
2236 /*
2237 * Note: i is physical column number (counting from 0).
2238 */
2239 needComma = false;
2240 for (i = 0; i < natts; i++)
2241 {
2242 if (TupleDescAttr(tupdesc, i)->attisdropped)
2243 continue;
2244
2245 if (needComma)
2246 appendStringInfoChar(&buf, ',');
2247
2248 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2249
2250 if (key >= 0)
2251 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2252 else
2253 val = SPI_getvalue(tuple, tupdesc, i + 1);
2254
2255 if (val != NULL)
2256 {
2257 appendStringInfoString(&buf, quote_literal_cstr(val));
2258 pfree(val);
2259 }
2260 else
2261 appendStringInfoString(&buf, "NULL");
2262 needComma = true;
2263 }
2264 appendStringInfoChar(&buf, ')');
2265
2266 return buf.data;
2267 }
2268
2269 static char *
get_sql_delete(Relation rel,int * pkattnums,int pknumatts,char ** tgt_pkattvals)2270 get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2271 {
2272 char *relname;
2273 TupleDesc tupdesc;
2274 StringInfoData buf;
2275 int i;
2276
2277 initStringInfo(&buf);
2278
2279 /* get relation name including any needed schema prefix and quoting */
2280 relname = generate_relation_name(rel);
2281
2282 tupdesc = rel->rd_att;
2283
2284 appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2285 for (i = 0; i < pknumatts; i++)
2286 {
2287 int pkattnum = pkattnums[i];
2288 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2289
2290 if (i > 0)
2291 appendStringInfoString(&buf, " AND ");
2292
2293 appendStringInfoString(&buf,
2294 quote_ident_cstr(NameStr(attr->attname)));
2295
2296 if (tgt_pkattvals[i] != NULL)
2297 appendStringInfo(&buf, " = %s",
2298 quote_literal_cstr(tgt_pkattvals[i]));
2299 else
2300 appendStringInfoString(&buf, " IS NULL");
2301 }
2302
2303 return buf.data;
2304 }
2305
2306 static char *
get_sql_update(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals,char ** tgt_pkattvals)2307 get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2308 {
2309 char *relname;
2310 HeapTuple tuple;
2311 TupleDesc tupdesc;
2312 int natts;
2313 StringInfoData buf;
2314 char *val;
2315 int key;
2316 int i;
2317 bool needComma;
2318
2319 initStringInfo(&buf);
2320
2321 /* get relation name including any needed schema prefix and quoting */
2322 relname = generate_relation_name(rel);
2323
2324 tupdesc = rel->rd_att;
2325 natts = tupdesc->natts;
2326
2327 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2328 if (!tuple)
2329 ereport(ERROR,
2330 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2331 errmsg("source row not found")));
2332
2333 appendStringInfo(&buf, "UPDATE %s SET ", relname);
2334
2335 /*
2336 * Note: i is physical column number (counting from 0).
2337 */
2338 needComma = false;
2339 for (i = 0; i < natts; i++)
2340 {
2341 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2342
2343 if (attr->attisdropped)
2344 continue;
2345
2346 if (needComma)
2347 appendStringInfoString(&buf, ", ");
2348
2349 appendStringInfo(&buf, "%s = ",
2350 quote_ident_cstr(NameStr(attr->attname)));
2351
2352 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2353
2354 if (key >= 0)
2355 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2356 else
2357 val = SPI_getvalue(tuple, tupdesc, i + 1);
2358
2359 if (val != NULL)
2360 {
2361 appendStringInfoString(&buf, quote_literal_cstr(val));
2362 pfree(val);
2363 }
2364 else
2365 appendStringInfoString(&buf, "NULL");
2366 needComma = true;
2367 }
2368
2369 appendStringInfoString(&buf, " WHERE ");
2370
2371 for (i = 0; i < pknumatts; i++)
2372 {
2373 int pkattnum = pkattnums[i];
2374 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2375
2376 if (i > 0)
2377 appendStringInfoString(&buf, " AND ");
2378
2379 appendStringInfoString(&buf,
2380 quote_ident_cstr(NameStr(attr->attname)));
2381
2382 val = tgt_pkattvals[i];
2383
2384 if (val != NULL)
2385 appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2386 else
2387 appendStringInfoString(&buf, " IS NULL");
2388 }
2389
2390 return buf.data;
2391 }
2392
2393 /*
2394 * Return a properly quoted identifier.
2395 * Uses quote_ident in quote.c
2396 */
2397 static char *
quote_ident_cstr(char * rawstr)2398 quote_ident_cstr(char *rawstr)
2399 {
2400 text *rawstr_text;
2401 text *result_text;
2402 char *result;
2403
2404 rawstr_text = cstring_to_text(rawstr);
2405 result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2406 PointerGetDatum(rawstr_text)));
2407 result = text_to_cstring(result_text);
2408
2409 return result;
2410 }
2411
2412 static int
get_attnum_pk_pos(int * pkattnums,int pknumatts,int key)2413 get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2414 {
2415 int i;
2416
2417 /*
2418 * Not likely a long list anyway, so just scan for the value
2419 */
2420 for (i = 0; i < pknumatts; i++)
2421 if (key == pkattnums[i])
2422 return i;
2423
2424 return -1;
2425 }
2426
2427 static HeapTuple
get_tuple_of_interest(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals)2428 get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2429 {
2430 char *relname;
2431 TupleDesc tupdesc;
2432 int natts;
2433 StringInfoData buf;
2434 int ret;
2435 HeapTuple tuple;
2436 int i;
2437
2438 /*
2439 * Connect to SPI manager
2440 */
2441 if ((ret = SPI_connect()) < 0)
2442 /* internal error */
2443 elog(ERROR, "SPI connect failure - returned %d", ret);
2444
2445 initStringInfo(&buf);
2446
2447 /* get relation name including any needed schema prefix and quoting */
2448 relname = generate_relation_name(rel);
2449
2450 tupdesc = rel->rd_att;
2451 natts = tupdesc->natts;
2452
2453 /*
2454 * Build sql statement to look up tuple of interest, ie, the one matching
2455 * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2456 * generate a result tuple that matches the table's physical structure,
2457 * with NULLs for any dropped columns. Otherwise we have to deal with two
2458 * different tupdescs and everything's very confusing.
2459 */
2460 appendStringInfoString(&buf, "SELECT ");
2461
2462 for (i = 0; i < natts; i++)
2463 {
2464 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2465
2466 if (i > 0)
2467 appendStringInfoString(&buf, ", ");
2468
2469 if (attr->attisdropped)
2470 appendStringInfoString(&buf, "NULL");
2471 else
2472 appendStringInfoString(&buf,
2473 quote_ident_cstr(NameStr(attr->attname)));
2474 }
2475
2476 appendStringInfo(&buf, " FROM %s WHERE ", relname);
2477
2478 for (i = 0; i < pknumatts; i++)
2479 {
2480 int pkattnum = pkattnums[i];
2481 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2482
2483 if (i > 0)
2484 appendStringInfoString(&buf, " AND ");
2485
2486 appendStringInfoString(&buf,
2487 quote_ident_cstr(NameStr(attr->attname)));
2488
2489 if (src_pkattvals[i] != NULL)
2490 appendStringInfo(&buf, " = %s",
2491 quote_literal_cstr(src_pkattvals[i]));
2492 else
2493 appendStringInfoString(&buf, " IS NULL");
2494 }
2495
2496 /*
2497 * Retrieve the desired tuple
2498 */
2499 ret = SPI_exec(buf.data, 0);
2500 pfree(buf.data);
2501
2502 /*
2503 * Only allow one qualifying tuple
2504 */
2505 if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2506 ereport(ERROR,
2507 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2508 errmsg("source criteria matched more than one record")));
2509
2510 else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2511 {
2512 SPITupleTable *tuptable = SPI_tuptable;
2513
2514 tuple = SPI_copytuple(tuptable->vals[0]);
2515 SPI_finish();
2516
2517 return tuple;
2518 }
2519 else
2520 {
2521 /*
2522 * no qualifying tuples
2523 */
2524 SPI_finish();
2525
2526 return NULL;
2527 }
2528
2529 /*
2530 * never reached, but keep compiler quiet
2531 */
2532 return NULL;
2533 }
2534
2535 /*
2536 * Open the relation named by relname_text, acquire specified type of lock,
2537 * verify we have specified permissions.
2538 * Caller must close rel when done with it.
2539 */
2540 static Relation
get_rel_from_relname(text * relname_text,LOCKMODE lockmode,AclMode aclmode)2541 get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2542 {
2543 RangeVar *relvar;
2544 Relation rel;
2545 AclResult aclresult;
2546
2547 relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2548 rel = table_openrv(relvar, lockmode);
2549
2550 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2551 aclmode);
2552 if (aclresult != ACLCHECK_OK)
2553 aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
2554 RelationGetRelationName(rel));
2555
2556 return rel;
2557 }
2558
2559 /*
2560 * generate_relation_name - copied from ruleutils.c
2561 * Compute the name to display for a relation
2562 *
2563 * The result includes all necessary quoting and schema-prefixing.
2564 */
2565 static char *
generate_relation_name(Relation rel)2566 generate_relation_name(Relation rel)
2567 {
2568 char *nspname;
2569 char *result;
2570
2571 /* Qualify the name if not visible in search path */
2572 if (RelationIsVisible(RelationGetRelid(rel)))
2573 nspname = NULL;
2574 else
2575 nspname = get_namespace_name(rel->rd_rel->relnamespace);
2576
2577 result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2578
2579 return result;
2580 }
2581
2582
2583 static remoteConn *
getConnectionByName(const char * name)2584 getConnectionByName(const char *name)
2585 {
2586 remoteConnHashEnt *hentry;
2587 char *key;
2588
2589 if (!remoteConnHash)
2590 remoteConnHash = createConnHash();
2591
2592 key = pstrdup(name);
2593 truncate_identifier(key, strlen(key), false);
2594 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2595 key, HASH_FIND, NULL);
2596
2597 if (hentry)
2598 return hentry->rconn;
2599
2600 return NULL;
2601 }
2602
2603 static HTAB *
createConnHash(void)2604 createConnHash(void)
2605 {
2606 HASHCTL ctl;
2607
2608 ctl.keysize = NAMEDATALEN;
2609 ctl.entrysize = sizeof(remoteConnHashEnt);
2610
2611 return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
2612 }
2613
2614 static void
createNewConnection(const char * name,remoteConn * rconn)2615 createNewConnection(const char *name, remoteConn *rconn)
2616 {
2617 remoteConnHashEnt *hentry;
2618 bool found;
2619 char *key;
2620
2621 if (!remoteConnHash)
2622 remoteConnHash = createConnHash();
2623
2624 key = pstrdup(name);
2625 truncate_identifier(key, strlen(key), true);
2626 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2627 HASH_ENTER, &found);
2628
2629 if (found)
2630 {
2631 PQfinish(rconn->conn);
2632 ReleaseExternalFD();
2633 pfree(rconn);
2634
2635 ereport(ERROR,
2636 (errcode(ERRCODE_DUPLICATE_OBJECT),
2637 errmsg("duplicate connection name")));
2638 }
2639
2640 hentry->rconn = rconn;
2641 strlcpy(hentry->name, name, sizeof(hentry->name));
2642 }
2643
2644 static void
deleteConnection(const char * name)2645 deleteConnection(const char *name)
2646 {
2647 remoteConnHashEnt *hentry;
2648 bool found;
2649 char *key;
2650
2651 if (!remoteConnHash)
2652 remoteConnHash = createConnHash();
2653
2654 key = pstrdup(name);
2655 truncate_identifier(key, strlen(key), false);
2656 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2657 key, HASH_REMOVE, &found);
2658
2659 if (!hentry)
2660 ereport(ERROR,
2661 (errcode(ERRCODE_UNDEFINED_OBJECT),
2662 errmsg("undefined connection name")));
2663
2664 }
2665
2666 static void
dblink_security_check(PGconn * conn,remoteConn * rconn)2667 dblink_security_check(PGconn *conn, remoteConn *rconn)
2668 {
2669 if (!superuser())
2670 {
2671 if (!PQconnectionUsedPassword(conn))
2672 {
2673 PQfinish(conn);
2674 ReleaseExternalFD();
2675 if (rconn)
2676 pfree(rconn);
2677
2678 ereport(ERROR,
2679 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2680 errmsg("password is required"),
2681 errdetail("Non-superuser cannot connect if the server does not request a password."),
2682 errhint("Target server's authentication method must be changed.")));
2683 }
2684 }
2685 }
2686
2687 /*
2688 * For non-superusers, insist that the connstr specify a password. This
2689 * prevents a password from being picked up from .pgpass, a service file,
2690 * the environment, etc. We don't want the postgres user's passwords
2691 * to be accessible to non-superusers.
2692 */
2693 static void
dblink_connstr_check(const char * connstr)2694 dblink_connstr_check(const char *connstr)
2695 {
2696 if (!superuser())
2697 {
2698 PQconninfoOption *options;
2699 PQconninfoOption *option;
2700 bool connstr_gives_password = false;
2701
2702 options = PQconninfoParse(connstr, NULL);
2703 if (options)
2704 {
2705 for (option = options; option->keyword != NULL; option++)
2706 {
2707 if (strcmp(option->keyword, "password") == 0)
2708 {
2709 if (option->val != NULL && option->val[0] != '\0')
2710 {
2711 connstr_gives_password = true;
2712 break;
2713 }
2714 }
2715 }
2716 PQconninfoFree(options);
2717 }
2718
2719 if (!connstr_gives_password)
2720 ereport(ERROR,
2721 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2722 errmsg("password is required"),
2723 errdetail("Non-superusers must provide a password in the connection string.")));
2724 }
2725 }
2726
2727 /*
2728 * Report an error received from the remote server
2729 *
2730 * res: the received error result (will be freed)
2731 * fail: true for ERROR ereport, false for NOTICE
2732 * fmt and following args: sprintf-style format and values for errcontext;
2733 * the resulting string should be worded like "while <some action>"
2734 */
2735 static void
dblink_res_error(PGconn * conn,const char * conname,PGresult * res,bool fail,const char * fmt,...)2736 dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2737 bool fail, const char *fmt,...)
2738 {
2739 int level;
2740 char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2741 char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2742 char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2743 char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2744 char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2745 int sqlstate;
2746 char *message_primary;
2747 char *message_detail;
2748 char *message_hint;
2749 char *message_context;
2750 va_list ap;
2751 char dblink_context_msg[512];
2752
2753 if (fail)
2754 level = ERROR;
2755 else
2756 level = NOTICE;
2757
2758 if (pg_diag_sqlstate)
2759 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2760 pg_diag_sqlstate[1],
2761 pg_diag_sqlstate[2],
2762 pg_diag_sqlstate[3],
2763 pg_diag_sqlstate[4]);
2764 else
2765 sqlstate = ERRCODE_CONNECTION_FAILURE;
2766
2767 message_primary = xpstrdup(pg_diag_message_primary);
2768 message_detail = xpstrdup(pg_diag_message_detail);
2769 message_hint = xpstrdup(pg_diag_message_hint);
2770 message_context = xpstrdup(pg_diag_context);
2771
2772 /*
2773 * If we don't get a message from the PGresult, try the PGconn. This is
2774 * needed because for connection-level failures, PQexec may just return
2775 * NULL, not a PGresult at all.
2776 */
2777 if (message_primary == NULL)
2778 message_primary = pchomp(PQerrorMessage(conn));
2779
2780 /*
2781 * Now that we've copied all the data we need out of the PGresult, it's
2782 * safe to free it. We must do this to avoid PGresult leakage. We're
2783 * leaking all the strings too, but those are in palloc'd memory that will
2784 * get cleaned up eventually.
2785 */
2786 if (res)
2787 PQclear(res);
2788
2789 /*
2790 * Format the basic errcontext string. Below, we'll add on something
2791 * about the connection name. That's a violation of the translatability
2792 * guidelines about constructing error messages out of parts, but since
2793 * there's no translation support for dblink, there's no need to worry
2794 * about that (yet).
2795 */
2796 va_start(ap, fmt);
2797 vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2798 va_end(ap);
2799
2800 ereport(level,
2801 (errcode(sqlstate),
2802 message_primary ? errmsg_internal("%s", message_primary) :
2803 errmsg("could not obtain message string for remote error"),
2804 message_detail ? errdetail_internal("%s", message_detail) : 0,
2805 message_hint ? errhint("%s", message_hint) : 0,
2806 message_context ? (errcontext("%s", message_context)) : 0,
2807 conname ?
2808 (errcontext("%s on dblink connection named \"%s\"",
2809 dblink_context_msg, conname)) :
2810 (errcontext("%s on unnamed dblink connection",
2811 dblink_context_msg))));
2812 }
2813
2814 /*
2815 * Obtain connection string for a foreign server
2816 */
2817 static char *
get_connect_string(const char * servername)2818 get_connect_string(const char *servername)
2819 {
2820 ForeignServer *foreign_server = NULL;
2821 UserMapping *user_mapping;
2822 ListCell *cell;
2823 StringInfoData buf;
2824 ForeignDataWrapper *fdw;
2825 AclResult aclresult;
2826 char *srvname;
2827
2828 static const PQconninfoOption *options = NULL;
2829
2830 initStringInfo(&buf);
2831
2832 /*
2833 * Get list of valid libpq options.
2834 *
2835 * To avoid unnecessary work, we get the list once and use it throughout
2836 * the lifetime of this backend process. We don't need to care about
2837 * memory context issues, because PQconndefaults allocates with malloc.
2838 */
2839 if (!options)
2840 {
2841 options = PQconndefaults();
2842 if (!options) /* assume reason for failure is OOM */
2843 ereport(ERROR,
2844 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2845 errmsg("out of memory"),
2846 errdetail("Could not get libpq's default connection options.")));
2847 }
2848
2849 /* first gather the server connstr options */
2850 srvname = pstrdup(servername);
2851 truncate_identifier(srvname, strlen(srvname), false);
2852 foreign_server = GetForeignServerByName(srvname, true);
2853
2854 if (foreign_server)
2855 {
2856 Oid serverid = foreign_server->serverid;
2857 Oid fdwid = foreign_server->fdwid;
2858 Oid userid = GetUserId();
2859
2860 user_mapping = GetUserMapping(userid, serverid);
2861 fdw = GetForeignDataWrapper(fdwid);
2862
2863 /* Check permissions, user must have usage on the server. */
2864 aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
2865 if (aclresult != ACLCHECK_OK)
2866 aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2867
2868 foreach(cell, fdw->options)
2869 {
2870 DefElem *def = lfirst(cell);
2871
2872 if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2873 appendStringInfo(&buf, "%s='%s' ", def->defname,
2874 escape_param_str(strVal(def->arg)));
2875 }
2876
2877 foreach(cell, foreign_server->options)
2878 {
2879 DefElem *def = lfirst(cell);
2880
2881 if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2882 appendStringInfo(&buf, "%s='%s' ", def->defname,
2883 escape_param_str(strVal(def->arg)));
2884 }
2885
2886 foreach(cell, user_mapping->options)
2887 {
2888
2889 DefElem *def = lfirst(cell);
2890
2891 if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2892 appendStringInfo(&buf, "%s='%s' ", def->defname,
2893 escape_param_str(strVal(def->arg)));
2894 }
2895
2896 return buf.data;
2897 }
2898 else
2899 return NULL;
2900 }
2901
2902 /*
2903 * Escaping libpq connect parameter strings.
2904 *
2905 * Replaces "'" with "\'" and "\" with "\\".
2906 */
2907 static char *
escape_param_str(const char * str)2908 escape_param_str(const char *str)
2909 {
2910 const char *cp;
2911 StringInfoData buf;
2912
2913 initStringInfo(&buf);
2914
2915 for (cp = str; *cp; cp++)
2916 {
2917 if (*cp == '\\' || *cp == '\'')
2918 appendStringInfoChar(&buf, '\\');
2919 appendStringInfoChar(&buf, *cp);
2920 }
2921
2922 return buf.data;
2923 }
2924
2925 /*
2926 * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2927 * functions, and translate to the internal representation.
2928 *
2929 * The user supplies an int2vector of 1-based logical attnums, plus a count
2930 * argument (the need for the separate count argument is historical, but we
2931 * still check it). We check that each attnum corresponds to a valid,
2932 * non-dropped attribute of the rel. We do *not* prevent attnums from being
2933 * listed twice, though the actual use-case for such things is dubious.
2934 * Note that before Postgres 9.0, the user's attnums were interpreted as
2935 * physical not logical column numbers; this was changed for future-proofing.
2936 *
2937 * The internal representation is a palloc'd int array of 0-based physical
2938 * attnums.
2939 */
2940 static void
validate_pkattnums(Relation rel,int2vector * pkattnums_arg,int32 pknumatts_arg,int ** pkattnums,int * pknumatts)2941 validate_pkattnums(Relation rel,
2942 int2vector *pkattnums_arg, int32 pknumatts_arg,
2943 int **pkattnums, int *pknumatts)
2944 {
2945 TupleDesc tupdesc = rel->rd_att;
2946 int natts = tupdesc->natts;
2947 int i;
2948
2949 /* Don't take more array elements than there are */
2950 pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
2951
2952 /* Must have at least one pk attnum selected */
2953 if (pknumatts_arg <= 0)
2954 ereport(ERROR,
2955 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2956 errmsg("number of key attributes must be > 0")));
2957
2958 /* Allocate output array */
2959 *pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
2960 *pknumatts = pknumatts_arg;
2961
2962 /* Validate attnums and convert to internal form */
2963 for (i = 0; i < pknumatts_arg; i++)
2964 {
2965 int pkattnum = pkattnums_arg->values[i];
2966 int lnum;
2967 int j;
2968
2969 /* Can throw error immediately if out of range */
2970 if (pkattnum <= 0 || pkattnum > natts)
2971 ereport(ERROR,
2972 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2973 errmsg("invalid attribute number %d", pkattnum)));
2974
2975 /* Identify which physical column has this logical number */
2976 lnum = 0;
2977 for (j = 0; j < natts; j++)
2978 {
2979 /* dropped columns don't count */
2980 if (TupleDescAttr(tupdesc, j)->attisdropped)
2981 continue;
2982
2983 if (++lnum == pkattnum)
2984 break;
2985 }
2986
2987 if (j < natts)
2988 (*pkattnums)[i] = j;
2989 else
2990 ereport(ERROR,
2991 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2992 errmsg("invalid attribute number %d", pkattnum)));
2993 }
2994 }
2995
2996 /*
2997 * Check if the specified connection option is valid.
2998 *
2999 * We basically allow whatever libpq thinks is an option, with these
3000 * restrictions:
3001 * debug options: disallowed
3002 * "client_encoding": disallowed
3003 * "user": valid only in USER MAPPING options
3004 * secure options (eg password): valid only in USER MAPPING options
3005 * others: valid only in FOREIGN SERVER options
3006 *
3007 * We disallow client_encoding because it would be overridden anyway via
3008 * PQclientEncoding; allowing it to be specified would merely promote
3009 * confusion.
3010 */
3011 static bool
is_valid_dblink_option(const PQconninfoOption * options,const char * option,Oid context)3012 is_valid_dblink_option(const PQconninfoOption *options, const char *option,
3013 Oid context)
3014 {
3015 const PQconninfoOption *opt;
3016
3017 /* Look up the option in libpq result */
3018 for (opt = options; opt->keyword; opt++)
3019 {
3020 if (strcmp(opt->keyword, option) == 0)
3021 break;
3022 }
3023 if (opt->keyword == NULL)
3024 return false;
3025
3026 /* Disallow debug options (particularly "replication") */
3027 if (strchr(opt->dispchar, 'D'))
3028 return false;
3029
3030 /* Disallow "client_encoding" */
3031 if (strcmp(opt->keyword, "client_encoding") == 0)
3032 return false;
3033
3034 /*
3035 * If the option is "user" or marked secure, it should be specified only
3036 * in USER MAPPING. Others should be specified only in SERVER.
3037 */
3038 if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
3039 {
3040 if (context != UserMappingRelationId)
3041 return false;
3042 }
3043 else
3044 {
3045 if (context != ForeignServerRelationId)
3046 return false;
3047 }
3048
3049 return true;
3050 }
3051
3052 /*
3053 * Copy the remote session's values of GUCs that affect datatype I/O
3054 * and apply them locally in a new GUC nesting level. Returns the new
3055 * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3056 * or -1 if no new nestlevel was needed.
3057 *
3058 * We use the equivalent of a function SET option to allow the settings to
3059 * persist only until the caller calls restoreLocalGucs. If an error is
3060 * thrown in between, guc.c will take care of undoing the settings.
3061 */
3062 static int
applyRemoteGucs(PGconn * conn)3063 applyRemoteGucs(PGconn *conn)
3064 {
3065 static const char *const GUCsAffectingIO[] = {
3066 "DateStyle",
3067 "IntervalStyle"
3068 };
3069
3070 int nestlevel = -1;
3071 int i;
3072
3073 for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3074 {
3075 const char *gucName = GUCsAffectingIO[i];
3076 const char *remoteVal = PQparameterStatus(conn, gucName);
3077 const char *localVal;
3078
3079 /*
3080 * If the remote server is pre-8.4, it won't have IntervalStyle, but
3081 * that's okay because its output format won't be ambiguous. So just
3082 * skip the GUC if we don't get a value for it. (We might eventually
3083 * need more complicated logic with remote-version checks here.)
3084 */
3085 if (remoteVal == NULL)
3086 continue;
3087
3088 /*
3089 * Avoid GUC-setting overhead if the remote and local GUCs already
3090 * have the same value.
3091 */
3092 localVal = GetConfigOption(gucName, false, false);
3093 Assert(localVal != NULL);
3094
3095 if (strcmp(remoteVal, localVal) == 0)
3096 continue;
3097
3098 /* Create new GUC nest level if we didn't already */
3099 if (nestlevel < 0)
3100 nestlevel = NewGUCNestLevel();
3101
3102 /* Apply the option (this will throw error on failure) */
3103 (void) set_config_option(gucName, remoteVal,
3104 PGC_USERSET, PGC_S_SESSION,
3105 GUC_ACTION_SAVE, true, 0, false);
3106 }
3107
3108 return nestlevel;
3109 }
3110
3111 /*
3112 * Restore local GUCs after they have been overlaid with remote settings.
3113 */
3114 static void
restoreLocalGucs(int nestlevel)3115 restoreLocalGucs(int nestlevel)
3116 {
3117 /* Do nothing if no new nestlevel was created */
3118 if (nestlevel > 0)
3119 AtEOXact_GUC(true, nestlevel);
3120 }
3121