1 /*
2 * dblink.c
3 *
4 * Functions returning results from a remote database
5 *
6 * Joe Conway <mail@joeconway.com>
7 * And contributors:
8 * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10 *
11 * contrib/dblink/dblink.c
12 * Copyright (c) 2001-2021, PostgreSQL Global Development Group
13 * ALL RIGHTS RESERVED;
14 *
15 * Permission to use, copy, modify, and distribute this software and its
16 * documentation for any purpose, without fee, and without a written agreement
17 * is hereby granted, provided that the above copyright notice and this
18 * paragraph and the following two paragraphs appear in all copies.
19 *
20 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
25 *
26 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 *
32 */
33 #include "postgres.h"
34
35 #include <limits.h>
36
37 #include "access/htup_details.h"
38 #include "access/relation.h"
39 #include "access/reloptions.h"
40 #include "access/table.h"
41 #include "catalog/namespace.h"
42 #include "catalog/pg_foreign_data_wrapper.h"
43 #include "catalog/pg_foreign_server.h"
44 #include "catalog/pg_type.h"
45 #include "catalog/pg_user_mapping.h"
46 #include "executor/spi.h"
47 #include "foreign/foreign.h"
48 #include "funcapi.h"
49 #include "lib/stringinfo.h"
50 #include "libpq-fe.h"
51 #include "mb/pg_wchar.h"
52 #include "miscadmin.h"
53 #include "parser/scansup.h"
54 #include "utils/acl.h"
55 #include "utils/builtins.h"
56 #include "utils/fmgroids.h"
57 #include "utils/guc.h"
58 #include "utils/lsyscache.h"
59 #include "utils/memutils.h"
60 #include "utils/rel.h"
61 #include "utils/varlena.h"
62
63 PG_MODULE_MAGIC;
64
65 typedef struct remoteConn
66 {
67 PGconn *conn; /* Hold the remote connection */
68 int openCursorCount; /* The number of open cursors */
69 bool newXactForCursor; /* Opened a transaction for a cursor */
70 } remoteConn;
71
72 typedef struct storeInfo
73 {
74 FunctionCallInfo fcinfo;
75 Tuplestorestate *tuplestore;
76 AttInMetadata *attinmeta;
77 MemoryContext tmpcontext;
78 char **cstrs;
79 /* temp storage for results to avoid leaks on exception */
80 PGresult *last_res;
81 PGresult *cur_res;
82 } storeInfo;
83
84 /*
85 * Internal declarations
86 */
87 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
88 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
89 static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
90 PGresult *res);
91 static void materializeQueryResult(FunctionCallInfo fcinfo,
92 PGconn *conn,
93 const char *conname,
94 const char *sql,
95 bool fail);
96 static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
97 static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
98 static remoteConn *getConnectionByName(const char *name);
99 static HTAB *createConnHash(void);
100 static void createNewConnection(const char *name, remoteConn *rconn);
101 static void deleteConnection(const char *name);
102 static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
103 static char **get_text_array_contents(ArrayType *array, int *numitems);
104 static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
105 static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
106 static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
107 static char *quote_ident_cstr(char *rawstr);
108 static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
109 static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
110 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
111 static char *generate_relation_name(Relation rel);
112 static void dblink_connstr_check(const char *connstr);
113 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
114 static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
115 bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
116 static char *get_connect_string(const char *servername);
117 static char *escape_param_str(const char *from);
118 static void validate_pkattnums(Relation rel,
119 int2vector *pkattnums_arg, int32 pknumatts_arg,
120 int **pkattnums, int *pknumatts);
121 static bool is_valid_dblink_option(const PQconninfoOption *options,
122 const char *option, Oid context);
123 static int applyRemoteGucs(PGconn *conn);
124 static void restoreLocalGucs(int nestlevel);
125
126 /* Global */
127 static remoteConn *pconn = NULL;
128 static HTAB *remoteConnHash = NULL;
129
130 /*
131 * Following is list that holds multiple remote connections.
132 * Calling convention of each dblink function changes to accept
133 * connection name as the first parameter. The connection list is
134 * much like ecpg e.g. a mapping between a name and a PGconn object.
135 */
136
137 typedef struct remoteConnHashEnt
138 {
139 char name[NAMEDATALEN];
140 remoteConn *rconn;
141 } remoteConnHashEnt;
142
143 /* initial number of connection hashes */
144 #define NUMCONN 16
145
146 static char *
xpstrdup(const char * in)147 xpstrdup(const char *in)
148 {
149 if (in == NULL)
150 return NULL;
151 return pstrdup(in);
152 }
153
154 static void
pg_attribute_noreturn()155 pg_attribute_noreturn()
156 dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
157 {
158 char *msg = pchomp(PQerrorMessage(conn));
159
160 if (res)
161 PQclear(res);
162 elog(ERROR, "%s: %s", p2, msg);
163 }
164
165 static void
pg_attribute_noreturn()166 pg_attribute_noreturn()
167 dblink_conn_not_avail(const char *conname)
168 {
169 if (conname)
170 ereport(ERROR,
171 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
172 errmsg("connection \"%s\" not available", conname)));
173 else
174 ereport(ERROR,
175 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
176 errmsg("connection not available")));
177 }
178
179 static void
dblink_get_conn(char * conname_or_str,PGconn * volatile * conn_p,char ** conname_p,volatile bool * freeconn_p)180 dblink_get_conn(char *conname_or_str,
181 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
182 {
183 remoteConn *rconn = getConnectionByName(conname_or_str);
184 PGconn *conn;
185 char *conname;
186 bool freeconn;
187
188 if (rconn)
189 {
190 conn = rconn->conn;
191 conname = conname_or_str;
192 freeconn = false;
193 }
194 else
195 {
196 const char *connstr;
197
198 connstr = get_connect_string(conname_or_str);
199 if (connstr == NULL)
200 connstr = conname_or_str;
201 dblink_connstr_check(connstr);
202
203 /*
204 * We must obey fd.c's limit on non-virtual file descriptors. Assume
205 * that a PGconn represents one long-lived FD. (Doing this here also
206 * ensures that VFDs are closed if needed to make room.)
207 */
208 if (!AcquireExternalFD())
209 {
210 #ifndef WIN32 /* can't write #if within ereport() macro */
211 ereport(ERROR,
212 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
213 errmsg("could not establish connection"),
214 errdetail("There are too many open files on the local server."),
215 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
216 #else
217 ereport(ERROR,
218 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
219 errmsg("could not establish connection"),
220 errdetail("There are too many open files on the local server."),
221 errhint("Raise the server's max_files_per_process setting.")));
222 #endif
223 }
224
225 /* OK to make connection */
226 conn = PQconnectdb(connstr);
227
228 if (PQstatus(conn) == CONNECTION_BAD)
229 {
230 char *msg = pchomp(PQerrorMessage(conn));
231
232 PQfinish(conn);
233 ReleaseExternalFD();
234 ereport(ERROR,
235 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
236 errmsg("could not establish connection"),
237 errdetail_internal("%s", msg)));
238 }
239 dblink_security_check(conn, rconn);
240 if (PQclientEncoding(conn) != GetDatabaseEncoding())
241 PQsetClientEncoding(conn, GetDatabaseEncodingName());
242 freeconn = true;
243 conname = NULL;
244 }
245
246 *conn_p = conn;
247 *conname_p = conname;
248 *freeconn_p = freeconn;
249 }
250
251 static PGconn *
dblink_get_named_conn(const char * conname)252 dblink_get_named_conn(const char *conname)
253 {
254 remoteConn *rconn = getConnectionByName(conname);
255
256 if (rconn)
257 return rconn->conn;
258
259 dblink_conn_not_avail(conname);
260 return NULL; /* keep compiler quiet */
261 }
262
263 static void
dblink_init(void)264 dblink_init(void)
265 {
266 if (!pconn)
267 {
268 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
269 pconn->conn = NULL;
270 pconn->openCursorCount = 0;
271 pconn->newXactForCursor = false;
272 }
273 }
274
275 /*
276 * Create a persistent connection to another database
277 */
278 PG_FUNCTION_INFO_V1(dblink_connect);
279 Datum
dblink_connect(PG_FUNCTION_ARGS)280 dblink_connect(PG_FUNCTION_ARGS)
281 {
282 char *conname_or_str = NULL;
283 char *connstr = NULL;
284 char *connname = NULL;
285 char *msg;
286 PGconn *conn = NULL;
287 remoteConn *rconn = NULL;
288
289 dblink_init();
290
291 if (PG_NARGS() == 2)
292 {
293 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
294 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
295 }
296 else if (PG_NARGS() == 1)
297 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
298
299 if (connname)
300 {
301 rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
302 sizeof(remoteConn));
303 rconn->conn = NULL;
304 rconn->openCursorCount = 0;
305 rconn->newXactForCursor = false;
306 }
307
308 /* first check for valid foreign data server */
309 connstr = get_connect_string(conname_or_str);
310 if (connstr == NULL)
311 connstr = conname_or_str;
312
313 /* check password in connection string if not superuser */
314 dblink_connstr_check(connstr);
315
316 /*
317 * We must obey fd.c's limit on non-virtual file descriptors. Assume that
318 * a PGconn represents one long-lived FD. (Doing this here also ensures
319 * that VFDs are closed if needed to make room.)
320 */
321 if (!AcquireExternalFD())
322 {
323 #ifndef WIN32 /* can't write #if within ereport() macro */
324 ereport(ERROR,
325 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
326 errmsg("could not establish connection"),
327 errdetail("There are too many open files on the local server."),
328 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
329 #else
330 ereport(ERROR,
331 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
332 errmsg("could not establish connection"),
333 errdetail("There are too many open files on the local server."),
334 errhint("Raise the server's max_files_per_process setting.")));
335 #endif
336 }
337
338 /* OK to make connection */
339 conn = PQconnectdb(connstr);
340
341 if (PQstatus(conn) == CONNECTION_BAD)
342 {
343 msg = pchomp(PQerrorMessage(conn));
344 PQfinish(conn);
345 ReleaseExternalFD();
346 if (rconn)
347 pfree(rconn);
348
349 ereport(ERROR,
350 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
351 errmsg("could not establish connection"),
352 errdetail_internal("%s", msg)));
353 }
354
355 /* check password actually used if not superuser */
356 dblink_security_check(conn, rconn);
357
358 /* attempt to set client encoding to match server encoding, if needed */
359 if (PQclientEncoding(conn) != GetDatabaseEncoding())
360 PQsetClientEncoding(conn, GetDatabaseEncodingName());
361
362 if (connname)
363 {
364 rconn->conn = conn;
365 createNewConnection(connname, rconn);
366 }
367 else
368 {
369 if (pconn->conn)
370 {
371 PQfinish(pconn->conn);
372 ReleaseExternalFD();
373 }
374 pconn->conn = conn;
375 }
376
377 PG_RETURN_TEXT_P(cstring_to_text("OK"));
378 }
379
380 /*
381 * Clear a persistent connection to another database
382 */
383 PG_FUNCTION_INFO_V1(dblink_disconnect);
384 Datum
dblink_disconnect(PG_FUNCTION_ARGS)385 dblink_disconnect(PG_FUNCTION_ARGS)
386 {
387 char *conname = NULL;
388 remoteConn *rconn = NULL;
389 PGconn *conn = NULL;
390
391 dblink_init();
392
393 if (PG_NARGS() == 1)
394 {
395 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
396 rconn = getConnectionByName(conname);
397 if (rconn)
398 conn = rconn->conn;
399 }
400 else
401 conn = pconn->conn;
402
403 if (!conn)
404 dblink_conn_not_avail(conname);
405
406 PQfinish(conn);
407 ReleaseExternalFD();
408 if (rconn)
409 {
410 deleteConnection(conname);
411 pfree(rconn);
412 }
413 else
414 pconn->conn = NULL;
415
416 PG_RETURN_TEXT_P(cstring_to_text("OK"));
417 }
418
419 /*
420 * opens a cursor using a persistent connection
421 */
422 PG_FUNCTION_INFO_V1(dblink_open);
423 Datum
dblink_open(PG_FUNCTION_ARGS)424 dblink_open(PG_FUNCTION_ARGS)
425 {
426 PGresult *res = NULL;
427 PGconn *conn;
428 char *curname = NULL;
429 char *sql = NULL;
430 char *conname = NULL;
431 StringInfoData buf;
432 remoteConn *rconn = NULL;
433 bool fail = true; /* default to backward compatible behavior */
434
435 dblink_init();
436 initStringInfo(&buf);
437
438 if (PG_NARGS() == 2)
439 {
440 /* text,text */
441 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
442 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
443 rconn = pconn;
444 }
445 else if (PG_NARGS() == 3)
446 {
447 /* might be text,text,text or text,text,bool */
448 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
449 {
450 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
451 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
452 fail = PG_GETARG_BOOL(2);
453 rconn = pconn;
454 }
455 else
456 {
457 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
458 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
459 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
460 rconn = getConnectionByName(conname);
461 }
462 }
463 else if (PG_NARGS() == 4)
464 {
465 /* text,text,text,bool */
466 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
467 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
468 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
469 fail = PG_GETARG_BOOL(3);
470 rconn = getConnectionByName(conname);
471 }
472
473 if (!rconn || !rconn->conn)
474 dblink_conn_not_avail(conname);
475
476 conn = rconn->conn;
477
478 /* If we are not in a transaction, start one */
479 if (PQtransactionStatus(conn) == PQTRANS_IDLE)
480 {
481 res = PQexec(conn, "BEGIN");
482 if (PQresultStatus(res) != PGRES_COMMAND_OK)
483 dblink_res_internalerror(conn, res, "begin error");
484 PQclear(res);
485 rconn->newXactForCursor = true;
486
487 /*
488 * Since transaction state was IDLE, we force cursor count to
489 * initially be 0. This is needed as a previous ABORT might have wiped
490 * out our transaction without maintaining the cursor count for us.
491 */
492 rconn->openCursorCount = 0;
493 }
494
495 /* if we started a transaction, increment cursor count */
496 if (rconn->newXactForCursor)
497 (rconn->openCursorCount)++;
498
499 appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
500 res = PQexec(conn, buf.data);
501 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
502 {
503 dblink_res_error(conn, conname, res, fail,
504 "while opening cursor \"%s\"", curname);
505 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
506 }
507
508 PQclear(res);
509 PG_RETURN_TEXT_P(cstring_to_text("OK"));
510 }
511
512 /*
513 * closes a cursor
514 */
515 PG_FUNCTION_INFO_V1(dblink_close);
516 Datum
dblink_close(PG_FUNCTION_ARGS)517 dblink_close(PG_FUNCTION_ARGS)
518 {
519 PGconn *conn;
520 PGresult *res = NULL;
521 char *curname = NULL;
522 char *conname = NULL;
523 StringInfoData buf;
524 remoteConn *rconn = NULL;
525 bool fail = true; /* default to backward compatible behavior */
526
527 dblink_init();
528 initStringInfo(&buf);
529
530 if (PG_NARGS() == 1)
531 {
532 /* text */
533 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
534 rconn = pconn;
535 }
536 else if (PG_NARGS() == 2)
537 {
538 /* might be text,text or text,bool */
539 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
540 {
541 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
542 fail = PG_GETARG_BOOL(1);
543 rconn = pconn;
544 }
545 else
546 {
547 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
548 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
549 rconn = getConnectionByName(conname);
550 }
551 }
552 if (PG_NARGS() == 3)
553 {
554 /* text,text,bool */
555 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
556 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
557 fail = PG_GETARG_BOOL(2);
558 rconn = getConnectionByName(conname);
559 }
560
561 if (!rconn || !rconn->conn)
562 dblink_conn_not_avail(conname);
563
564 conn = rconn->conn;
565
566 appendStringInfo(&buf, "CLOSE %s", curname);
567
568 /* close the cursor */
569 res = PQexec(conn, buf.data);
570 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
571 {
572 dblink_res_error(conn, conname, res, fail,
573 "while closing cursor \"%s\"", curname);
574 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
575 }
576
577 PQclear(res);
578
579 /* if we started a transaction, decrement cursor count */
580 if (rconn->newXactForCursor)
581 {
582 (rconn->openCursorCount)--;
583
584 /* if count is zero, commit the transaction */
585 if (rconn->openCursorCount == 0)
586 {
587 rconn->newXactForCursor = false;
588
589 res = PQexec(conn, "COMMIT");
590 if (PQresultStatus(res) != PGRES_COMMAND_OK)
591 dblink_res_internalerror(conn, res, "commit error");
592 PQclear(res);
593 }
594 }
595
596 PG_RETURN_TEXT_P(cstring_to_text("OK"));
597 }
598
599 /*
600 * Fetch results from an open cursor
601 */
602 PG_FUNCTION_INFO_V1(dblink_fetch);
603 Datum
dblink_fetch(PG_FUNCTION_ARGS)604 dblink_fetch(PG_FUNCTION_ARGS)
605 {
606 PGresult *res = NULL;
607 char *conname = NULL;
608 remoteConn *rconn = NULL;
609 PGconn *conn = NULL;
610 StringInfoData buf;
611 char *curname = NULL;
612 int howmany = 0;
613 bool fail = true; /* default to backward compatible */
614
615 prepTuplestoreResult(fcinfo);
616
617 dblink_init();
618
619 if (PG_NARGS() == 4)
620 {
621 /* text,text,int,bool */
622 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
623 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
624 howmany = PG_GETARG_INT32(2);
625 fail = PG_GETARG_BOOL(3);
626
627 rconn = getConnectionByName(conname);
628 if (rconn)
629 conn = rconn->conn;
630 }
631 else if (PG_NARGS() == 3)
632 {
633 /* text,text,int or text,int,bool */
634 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
635 {
636 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
637 howmany = PG_GETARG_INT32(1);
638 fail = PG_GETARG_BOOL(2);
639 conn = pconn->conn;
640 }
641 else
642 {
643 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
644 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
645 howmany = PG_GETARG_INT32(2);
646
647 rconn = getConnectionByName(conname);
648 if (rconn)
649 conn = rconn->conn;
650 }
651 }
652 else if (PG_NARGS() == 2)
653 {
654 /* text,int */
655 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
656 howmany = PG_GETARG_INT32(1);
657 conn = pconn->conn;
658 }
659
660 if (!conn)
661 dblink_conn_not_avail(conname);
662
663 initStringInfo(&buf);
664 appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
665
666 /*
667 * Try to execute the query. Note that since libpq uses malloc, the
668 * PGresult will be long-lived even though we are still in a short-lived
669 * memory context.
670 */
671 res = PQexec(conn, buf.data);
672 if (!res ||
673 (PQresultStatus(res) != PGRES_COMMAND_OK &&
674 PQresultStatus(res) != PGRES_TUPLES_OK))
675 {
676 dblink_res_error(conn, conname, res, fail,
677 "while fetching from cursor \"%s\"", curname);
678 return (Datum) 0;
679 }
680 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
681 {
682 /* cursor does not exist - closed already or bad name */
683 PQclear(res);
684 ereport(ERROR,
685 (errcode(ERRCODE_INVALID_CURSOR_NAME),
686 errmsg("cursor \"%s\" does not exist", curname)));
687 }
688
689 materializeResult(fcinfo, conn, res);
690 return (Datum) 0;
691 }
692
693 /*
694 * Note: this is the new preferred version of dblink
695 */
696 PG_FUNCTION_INFO_V1(dblink_record);
697 Datum
dblink_record(PG_FUNCTION_ARGS)698 dblink_record(PG_FUNCTION_ARGS)
699 {
700 return dblink_record_internal(fcinfo, false);
701 }
702
703 PG_FUNCTION_INFO_V1(dblink_send_query);
704 Datum
dblink_send_query(PG_FUNCTION_ARGS)705 dblink_send_query(PG_FUNCTION_ARGS)
706 {
707 PGconn *conn;
708 char *sql;
709 int retval;
710
711 if (PG_NARGS() == 2)
712 {
713 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
714 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
715 }
716 else
717 /* shouldn't happen */
718 elog(ERROR, "wrong number of arguments");
719
720 /* async query send */
721 retval = PQsendQuery(conn, sql);
722 if (retval != 1)
723 elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
724
725 PG_RETURN_INT32(retval);
726 }
727
728 PG_FUNCTION_INFO_V1(dblink_get_result);
729 Datum
dblink_get_result(PG_FUNCTION_ARGS)730 dblink_get_result(PG_FUNCTION_ARGS)
731 {
732 return dblink_record_internal(fcinfo, true);
733 }
734
735 static Datum
dblink_record_internal(FunctionCallInfo fcinfo,bool is_async)736 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
737 {
738 PGconn *volatile conn = NULL;
739 volatile bool freeconn = false;
740
741 prepTuplestoreResult(fcinfo);
742
743 dblink_init();
744
745 PG_TRY();
746 {
747 char *sql = NULL;
748 char *conname = NULL;
749 bool fail = true; /* default to backward compatible */
750
751 if (!is_async)
752 {
753 if (PG_NARGS() == 3)
754 {
755 /* text,text,bool */
756 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
757 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
758 fail = PG_GETARG_BOOL(2);
759 dblink_get_conn(conname, &conn, &conname, &freeconn);
760 }
761 else if (PG_NARGS() == 2)
762 {
763 /* text,text or text,bool */
764 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
765 {
766 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
767 fail = PG_GETARG_BOOL(1);
768 conn = pconn->conn;
769 }
770 else
771 {
772 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
773 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
774 dblink_get_conn(conname, &conn, &conname, &freeconn);
775 }
776 }
777 else if (PG_NARGS() == 1)
778 {
779 /* text */
780 conn = pconn->conn;
781 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
782 }
783 else
784 /* shouldn't happen */
785 elog(ERROR, "wrong number of arguments");
786 }
787 else /* is_async */
788 {
789 /* get async result */
790 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
791
792 if (PG_NARGS() == 2)
793 {
794 /* text,bool */
795 fail = PG_GETARG_BOOL(1);
796 conn = dblink_get_named_conn(conname);
797 }
798 else if (PG_NARGS() == 1)
799 {
800 /* text */
801 conn = dblink_get_named_conn(conname);
802 }
803 else
804 /* shouldn't happen */
805 elog(ERROR, "wrong number of arguments");
806 }
807
808 if (!conn)
809 dblink_conn_not_avail(conname);
810
811 if (!is_async)
812 {
813 /* synchronous query, use efficient tuple collection method */
814 materializeQueryResult(fcinfo, conn, conname, sql, fail);
815 }
816 else
817 {
818 /* async result retrieval, do it the old way */
819 PGresult *res = PQgetResult(conn);
820
821 /* NULL means we're all done with the async results */
822 if (res)
823 {
824 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
825 PQresultStatus(res) != PGRES_TUPLES_OK)
826 {
827 dblink_res_error(conn, conname, res, fail,
828 "while executing query");
829 /* if fail isn't set, we'll return an empty query result */
830 }
831 else
832 {
833 materializeResult(fcinfo, conn, res);
834 }
835 }
836 }
837 }
838 PG_FINALLY();
839 {
840 /* if needed, close the connection to the database */
841 if (freeconn)
842 {
843 PQfinish(conn);
844 ReleaseExternalFD();
845 }
846 }
847 PG_END_TRY();
848
849 return (Datum) 0;
850 }
851
852 /*
853 * Verify function caller can handle a tuplestore result, and set up for that.
854 *
855 * Note: if the caller returns without actually creating a tuplestore, the
856 * executor will treat the function result as an empty set.
857 */
858 static void
prepTuplestoreResult(FunctionCallInfo fcinfo)859 prepTuplestoreResult(FunctionCallInfo fcinfo)
860 {
861 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
862
863 /* check to see if query supports us returning a tuplestore */
864 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
865 ereport(ERROR,
866 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
867 errmsg("set-valued function called in context that cannot accept a set")));
868 if (!(rsinfo->allowedModes & SFRM_Materialize))
869 ereport(ERROR,
870 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
871 errmsg("materialize mode required, but it is not allowed in this context")));
872
873 /* let the executor know we're sending back a tuplestore */
874 rsinfo->returnMode = SFRM_Materialize;
875
876 /* caller must fill these to return a non-empty result */
877 rsinfo->setResult = NULL;
878 rsinfo->setDesc = NULL;
879 }
880
881 /*
882 * Copy the contents of the PGresult into a tuplestore to be returned
883 * as the result of the current function.
884 * The PGresult will be released in this function.
885 */
886 static void
materializeResult(FunctionCallInfo fcinfo,PGconn * conn,PGresult * res)887 materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
888 {
889 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
890
891 /* prepTuplestoreResult must have been called previously */
892 Assert(rsinfo->returnMode == SFRM_Materialize);
893
894 PG_TRY();
895 {
896 TupleDesc tupdesc;
897 bool is_sql_cmd;
898 int ntuples;
899 int nfields;
900
901 if (PQresultStatus(res) == PGRES_COMMAND_OK)
902 {
903 is_sql_cmd = true;
904
905 /*
906 * need a tuple descriptor representing one TEXT column to return
907 * the command status string as our result tuple
908 */
909 tupdesc = CreateTemplateTupleDesc(1);
910 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
911 TEXTOID, -1, 0);
912 ntuples = 1;
913 nfields = 1;
914 }
915 else
916 {
917 Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
918
919 is_sql_cmd = false;
920
921 /* get a tuple descriptor for our result type */
922 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
923 {
924 case TYPEFUNC_COMPOSITE:
925 /* success */
926 break;
927 case TYPEFUNC_RECORD:
928 /* failed to determine actual type of RECORD */
929 ereport(ERROR,
930 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
931 errmsg("function returning record called in context "
932 "that cannot accept type record")));
933 break;
934 default:
935 /* result type isn't composite */
936 elog(ERROR, "return type must be a row type");
937 break;
938 }
939
940 /* make sure we have a persistent copy of the tupdesc */
941 tupdesc = CreateTupleDescCopy(tupdesc);
942 ntuples = PQntuples(res);
943 nfields = PQnfields(res);
944 }
945
946 /*
947 * check result and tuple descriptor have the same number of columns
948 */
949 if (nfields != tupdesc->natts)
950 ereport(ERROR,
951 (errcode(ERRCODE_DATATYPE_MISMATCH),
952 errmsg("remote query result rowtype does not match "
953 "the specified FROM clause rowtype")));
954
955 if (ntuples > 0)
956 {
957 AttInMetadata *attinmeta;
958 int nestlevel = -1;
959 Tuplestorestate *tupstore;
960 MemoryContext oldcontext;
961 int row;
962 char **values;
963
964 attinmeta = TupleDescGetAttInMetadata(tupdesc);
965
966 /* Set GUCs to ensure we read GUC-sensitive data types correctly */
967 if (!is_sql_cmd)
968 nestlevel = applyRemoteGucs(conn);
969
970 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
971 tupstore = tuplestore_begin_heap(true, false, work_mem);
972 rsinfo->setResult = tupstore;
973 rsinfo->setDesc = tupdesc;
974 MemoryContextSwitchTo(oldcontext);
975
976 values = (char **) palloc(nfields * sizeof(char *));
977
978 /* put all tuples into the tuplestore */
979 for (row = 0; row < ntuples; row++)
980 {
981 HeapTuple tuple;
982
983 if (!is_sql_cmd)
984 {
985 int i;
986
987 for (i = 0; i < nfields; i++)
988 {
989 if (PQgetisnull(res, row, i))
990 values[i] = NULL;
991 else
992 values[i] = PQgetvalue(res, row, i);
993 }
994 }
995 else
996 {
997 values[0] = PQcmdStatus(res);
998 }
999
1000 /* build the tuple and put it into the tuplestore. */
1001 tuple = BuildTupleFromCStrings(attinmeta, values);
1002 tuplestore_puttuple(tupstore, tuple);
1003 }
1004
1005 /* clean up GUC settings, if we changed any */
1006 restoreLocalGucs(nestlevel);
1007
1008 /* clean up and return the tuplestore */
1009 tuplestore_donestoring(tupstore);
1010 }
1011 }
1012 PG_FINALLY();
1013 {
1014 /* be sure to release the libpq result */
1015 PQclear(res);
1016 }
1017 PG_END_TRY();
1018 }
1019
1020 /*
1021 * Execute the given SQL command and store its results into a tuplestore
1022 * to be returned as the result of the current function.
1023 *
1024 * This is equivalent to PQexec followed by materializeResult, but we make
1025 * use of libpq's single-row mode to avoid accumulating the whole result
1026 * inside libpq before it gets transferred to the tuplestore.
1027 */
1028 static void
materializeQueryResult(FunctionCallInfo fcinfo,PGconn * conn,const char * conname,const char * sql,bool fail)1029 materializeQueryResult(FunctionCallInfo fcinfo,
1030 PGconn *conn,
1031 const char *conname,
1032 const char *sql,
1033 bool fail)
1034 {
1035 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1036 PGresult *volatile res = NULL;
1037 volatile storeInfo sinfo = {0};
1038
1039 /* prepTuplestoreResult must have been called previously */
1040 Assert(rsinfo->returnMode == SFRM_Materialize);
1041
1042 sinfo.fcinfo = fcinfo;
1043
1044 PG_TRY();
1045 {
1046 /* Create short-lived memory context for data conversions */
1047 sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
1048 "dblink temporary context",
1049 ALLOCSET_DEFAULT_SIZES);
1050
1051 /* execute query, collecting any tuples into the tuplestore */
1052 res = storeQueryResult(&sinfo, conn, sql);
1053
1054 if (!res ||
1055 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1056 PQresultStatus(res) != PGRES_TUPLES_OK))
1057 {
1058 /*
1059 * dblink_res_error will clear the passed PGresult, so we need
1060 * this ugly dance to avoid doing so twice during error exit
1061 */
1062 PGresult *res1 = res;
1063
1064 res = NULL;
1065 dblink_res_error(conn, conname, res1, fail,
1066 "while executing query");
1067 /* if fail isn't set, we'll return an empty query result */
1068 }
1069 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1070 {
1071 /*
1072 * storeRow didn't get called, so we need to convert the command
1073 * status string to a tuple manually
1074 */
1075 TupleDesc tupdesc;
1076 AttInMetadata *attinmeta;
1077 Tuplestorestate *tupstore;
1078 HeapTuple tuple;
1079 char *values[1];
1080 MemoryContext oldcontext;
1081
1082 /*
1083 * need a tuple descriptor representing one TEXT column to return
1084 * the command status string as our result tuple
1085 */
1086 tupdesc = CreateTemplateTupleDesc(1);
1087 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1088 TEXTOID, -1, 0);
1089 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1090
1091 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1092 tupstore = tuplestore_begin_heap(true, false, work_mem);
1093 rsinfo->setResult = tupstore;
1094 rsinfo->setDesc = tupdesc;
1095 MemoryContextSwitchTo(oldcontext);
1096
1097 values[0] = PQcmdStatus(res);
1098
1099 /* build the tuple and put it into the tuplestore. */
1100 tuple = BuildTupleFromCStrings(attinmeta, values);
1101 tuplestore_puttuple(tupstore, tuple);
1102
1103 PQclear(res);
1104 res = NULL;
1105 }
1106 else
1107 {
1108 Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1109 /* storeRow should have created a tuplestore */
1110 Assert(rsinfo->setResult != NULL);
1111
1112 PQclear(res);
1113 res = NULL;
1114 }
1115
1116 /* clean up data conversion short-lived memory context */
1117 if (sinfo.tmpcontext != NULL)
1118 MemoryContextDelete(sinfo.tmpcontext);
1119 sinfo.tmpcontext = NULL;
1120
1121 PQclear(sinfo.last_res);
1122 sinfo.last_res = NULL;
1123 PQclear(sinfo.cur_res);
1124 sinfo.cur_res = NULL;
1125 }
1126 PG_CATCH();
1127 {
1128 /* be sure to release any libpq result we collected */
1129 PQclear(res);
1130 PQclear(sinfo.last_res);
1131 PQclear(sinfo.cur_res);
1132 /* and clear out any pending data in libpq */
1133 while ((res = PQgetResult(conn)) != NULL)
1134 PQclear(res);
1135 PG_RE_THROW();
1136 }
1137 PG_END_TRY();
1138 }
1139
1140 /*
1141 * Execute query, and send any result rows to sinfo->tuplestore.
1142 */
1143 static PGresult *
storeQueryResult(volatile storeInfo * sinfo,PGconn * conn,const char * sql)1144 storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
1145 {
1146 bool first = true;
1147 int nestlevel = -1;
1148 PGresult *res;
1149
1150 if (!PQsendQuery(conn, sql))
1151 elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1152
1153 if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
1154 elog(ERROR, "failed to set single-row mode for dblink query");
1155
1156 for (;;)
1157 {
1158 CHECK_FOR_INTERRUPTS();
1159
1160 sinfo->cur_res = PQgetResult(conn);
1161 if (!sinfo->cur_res)
1162 break;
1163
1164 if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1165 {
1166 /* got one row from possibly-bigger resultset */
1167
1168 /*
1169 * Set GUCs to ensure we read GUC-sensitive data types correctly.
1170 * We shouldn't do this until we have a row in hand, to ensure
1171 * libpq has seen any earlier ParameterStatus protocol messages.
1172 */
1173 if (first && nestlevel < 0)
1174 nestlevel = applyRemoteGucs(conn);
1175
1176 storeRow(sinfo, sinfo->cur_res, first);
1177
1178 PQclear(sinfo->cur_res);
1179 sinfo->cur_res = NULL;
1180 first = false;
1181 }
1182 else
1183 {
1184 /* if empty resultset, fill tuplestore header */
1185 if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1186 storeRow(sinfo, sinfo->cur_res, first);
1187
1188 /* store completed result at last_res */
1189 PQclear(sinfo->last_res);
1190 sinfo->last_res = sinfo->cur_res;
1191 sinfo->cur_res = NULL;
1192 first = true;
1193 }
1194 }
1195
1196 /* clean up GUC settings, if we changed any */
1197 restoreLocalGucs(nestlevel);
1198
1199 /* return last_res */
1200 res = sinfo->last_res;
1201 sinfo->last_res = NULL;
1202 return res;
1203 }
1204
1205 /*
1206 * Send single row to sinfo->tuplestore.
1207 *
1208 * If "first" is true, create the tuplestore using PGresult's metadata
1209 * (in this case the PGresult might contain either zero or one row).
1210 */
1211 static void
storeRow(volatile storeInfo * sinfo,PGresult * res,bool first)1212 storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
1213 {
1214 int nfields = PQnfields(res);
1215 HeapTuple tuple;
1216 int i;
1217 MemoryContext oldcontext;
1218
1219 if (first)
1220 {
1221 /* Prepare for new result set */
1222 ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1223 TupleDesc tupdesc;
1224
1225 /*
1226 * It's possible to get more than one result set if the query string
1227 * contained multiple SQL commands. In that case, we follow PQexec's
1228 * traditional behavior of throwing away all but the last result.
1229 */
1230 if (sinfo->tuplestore)
1231 tuplestore_end(sinfo->tuplestore);
1232 sinfo->tuplestore = NULL;
1233
1234 /* get a tuple descriptor for our result type */
1235 switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1236 {
1237 case TYPEFUNC_COMPOSITE:
1238 /* success */
1239 break;
1240 case TYPEFUNC_RECORD:
1241 /* failed to determine actual type of RECORD */
1242 ereport(ERROR,
1243 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1244 errmsg("function returning record called in context "
1245 "that cannot accept type record")));
1246 break;
1247 default:
1248 /* result type isn't composite */
1249 elog(ERROR, "return type must be a row type");
1250 break;
1251 }
1252
1253 /* make sure we have a persistent copy of the tupdesc */
1254 tupdesc = CreateTupleDescCopy(tupdesc);
1255
1256 /* check result and tuple descriptor have the same number of columns */
1257 if (nfields != tupdesc->natts)
1258 ereport(ERROR,
1259 (errcode(ERRCODE_DATATYPE_MISMATCH),
1260 errmsg("remote query result rowtype does not match "
1261 "the specified FROM clause rowtype")));
1262
1263 /* Prepare attinmeta for later data conversions */
1264 sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1265
1266 /* Create a new, empty tuplestore */
1267 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1268 sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1269 rsinfo->setResult = sinfo->tuplestore;
1270 rsinfo->setDesc = tupdesc;
1271 MemoryContextSwitchTo(oldcontext);
1272
1273 /* Done if empty resultset */
1274 if (PQntuples(res) == 0)
1275 return;
1276
1277 /*
1278 * Set up sufficiently-wide string pointers array; this won't change
1279 * in size so it's easy to preallocate.
1280 */
1281 if (sinfo->cstrs)
1282 pfree(sinfo->cstrs);
1283 sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
1284 }
1285
1286 /* Should have a single-row result if we get here */
1287 Assert(PQntuples(res) == 1);
1288
1289 /*
1290 * Do the following work in a temp context that we reset after each tuple.
1291 * This cleans up not only the data we have direct access to, but any
1292 * cruft the I/O functions might leak.
1293 */
1294 oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1295
1296 /*
1297 * Fill cstrs with null-terminated strings of column values.
1298 */
1299 for (i = 0; i < nfields; i++)
1300 {
1301 if (PQgetisnull(res, 0, i))
1302 sinfo->cstrs[i] = NULL;
1303 else
1304 sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1305 }
1306
1307 /* Convert row to a tuple, and add it to the tuplestore */
1308 tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1309
1310 tuplestore_puttuple(sinfo->tuplestore, tuple);
1311
1312 /* Clean up */
1313 MemoryContextSwitchTo(oldcontext);
1314 MemoryContextReset(sinfo->tmpcontext);
1315 }
1316
1317 /*
1318 * List all open dblink connections by name.
1319 * Returns an array of all connection names.
1320 * Takes no params
1321 */
1322 PG_FUNCTION_INFO_V1(dblink_get_connections);
1323 Datum
dblink_get_connections(PG_FUNCTION_ARGS)1324 dblink_get_connections(PG_FUNCTION_ARGS)
1325 {
1326 HASH_SEQ_STATUS status;
1327 remoteConnHashEnt *hentry;
1328 ArrayBuildState *astate = NULL;
1329
1330 if (remoteConnHash)
1331 {
1332 hash_seq_init(&status, remoteConnHash);
1333 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1334 {
1335 /* stash away current value */
1336 astate = accumArrayResult(astate,
1337 CStringGetTextDatum(hentry->name),
1338 false, TEXTOID, CurrentMemoryContext);
1339 }
1340 }
1341
1342 if (astate)
1343 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
1344 CurrentMemoryContext));
1345 else
1346 PG_RETURN_NULL();
1347 }
1348
1349 /*
1350 * Checks if a given remote connection is busy
1351 *
1352 * Returns 1 if the connection is busy, 0 otherwise
1353 * Params:
1354 * text connection_name - name of the connection to check
1355 *
1356 */
1357 PG_FUNCTION_INFO_V1(dblink_is_busy);
1358 Datum
dblink_is_busy(PG_FUNCTION_ARGS)1359 dblink_is_busy(PG_FUNCTION_ARGS)
1360 {
1361 PGconn *conn;
1362
1363 dblink_init();
1364 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1365
1366 PQconsumeInput(conn);
1367 PG_RETURN_INT32(PQisBusy(conn));
1368 }
1369
1370 /*
1371 * Cancels a running request on a connection
1372 *
1373 * Returns text:
1374 * "OK" if the cancel request has been sent correctly,
1375 * an error message otherwise
1376 *
1377 * Params:
1378 * text connection_name - name of the connection to check
1379 *
1380 */
1381 PG_FUNCTION_INFO_V1(dblink_cancel_query);
1382 Datum
dblink_cancel_query(PG_FUNCTION_ARGS)1383 dblink_cancel_query(PG_FUNCTION_ARGS)
1384 {
1385 int res;
1386 PGconn *conn;
1387 PGcancel *cancel;
1388 char errbuf[256];
1389
1390 dblink_init();
1391 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1392 cancel = PQgetCancel(conn);
1393
1394 res = PQcancel(cancel, errbuf, 256);
1395 PQfreeCancel(cancel);
1396
1397 if (res == 1)
1398 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1399 else
1400 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
1401 }
1402
1403
1404 /*
1405 * Get error message from a connection
1406 *
1407 * Returns text:
1408 * "OK" if no error, an error message otherwise
1409 *
1410 * Params:
1411 * text connection_name - name of the connection to check
1412 *
1413 */
1414 PG_FUNCTION_INFO_V1(dblink_error_message);
1415 Datum
dblink_error_message(PG_FUNCTION_ARGS)1416 dblink_error_message(PG_FUNCTION_ARGS)
1417 {
1418 char *msg;
1419 PGconn *conn;
1420
1421 dblink_init();
1422 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1423
1424 msg = PQerrorMessage(conn);
1425 if (msg == NULL || msg[0] == '\0')
1426 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1427 else
1428 PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1429 }
1430
1431 /*
1432 * Execute an SQL non-SELECT command
1433 */
1434 PG_FUNCTION_INFO_V1(dblink_exec);
1435 Datum
dblink_exec(PG_FUNCTION_ARGS)1436 dblink_exec(PG_FUNCTION_ARGS)
1437 {
1438 text *volatile sql_cmd_status = NULL;
1439 PGconn *volatile conn = NULL;
1440 volatile bool freeconn = false;
1441
1442 dblink_init();
1443
1444 PG_TRY();
1445 {
1446 PGresult *res = NULL;
1447 char *sql = NULL;
1448 char *conname = NULL;
1449 bool fail = true; /* default to backward compatible behavior */
1450
1451 if (PG_NARGS() == 3)
1452 {
1453 /* must be text,text,bool */
1454 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1455 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1456 fail = PG_GETARG_BOOL(2);
1457 dblink_get_conn(conname, &conn, &conname, &freeconn);
1458 }
1459 else if (PG_NARGS() == 2)
1460 {
1461 /* might be text,text or text,bool */
1462 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1463 {
1464 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1465 fail = PG_GETARG_BOOL(1);
1466 conn = pconn->conn;
1467 }
1468 else
1469 {
1470 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1471 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1472 dblink_get_conn(conname, &conn, &conname, &freeconn);
1473 }
1474 }
1475 else if (PG_NARGS() == 1)
1476 {
1477 /* must be single text argument */
1478 conn = pconn->conn;
1479 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1480 }
1481 else
1482 /* shouldn't happen */
1483 elog(ERROR, "wrong number of arguments");
1484
1485 if (!conn)
1486 dblink_conn_not_avail(conname);
1487
1488 res = PQexec(conn, sql);
1489 if (!res ||
1490 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1491 PQresultStatus(res) != PGRES_TUPLES_OK))
1492 {
1493 dblink_res_error(conn, conname, res, fail,
1494 "while executing command");
1495
1496 /*
1497 * and save a copy of the command status string to return as our
1498 * result tuple
1499 */
1500 sql_cmd_status = cstring_to_text("ERROR");
1501 }
1502 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1503 {
1504 /*
1505 * and save a copy of the command status string to return as our
1506 * result tuple
1507 */
1508 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1509 PQclear(res);
1510 }
1511 else
1512 {
1513 PQclear(res);
1514 ereport(ERROR,
1515 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1516 errmsg("statement returning results not allowed")));
1517 }
1518 }
1519 PG_FINALLY();
1520 {
1521 /* if needed, close the connection to the database */
1522 if (freeconn)
1523 {
1524 PQfinish(conn);
1525 ReleaseExternalFD();
1526 }
1527 }
1528 PG_END_TRY();
1529
1530 PG_RETURN_TEXT_P(sql_cmd_status);
1531 }
1532
1533
1534 /*
1535 * dblink_get_pkey
1536 *
1537 * Return list of primary key fields for the supplied relation,
1538 * or NULL if none exists.
1539 */
1540 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1541 Datum
dblink_get_pkey(PG_FUNCTION_ARGS)1542 dblink_get_pkey(PG_FUNCTION_ARGS)
1543 {
1544 int16 indnkeyatts;
1545 char **results;
1546 FuncCallContext *funcctx;
1547 int32 call_cntr;
1548 int32 max_calls;
1549 AttInMetadata *attinmeta;
1550 MemoryContext oldcontext;
1551
1552 /* stuff done only on the first call of the function */
1553 if (SRF_IS_FIRSTCALL())
1554 {
1555 Relation rel;
1556 TupleDesc tupdesc;
1557
1558 /* create a function context for cross-call persistence */
1559 funcctx = SRF_FIRSTCALL_INIT();
1560
1561 /*
1562 * switch to memory context appropriate for multiple function calls
1563 */
1564 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1565
1566 /* open target relation */
1567 rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1568
1569 /* get the array of attnums */
1570 results = get_pkey_attnames(rel, &indnkeyatts);
1571
1572 relation_close(rel, AccessShareLock);
1573
1574 /*
1575 * need a tuple descriptor representing one INT and one TEXT column
1576 */
1577 tupdesc = CreateTemplateTupleDesc(2);
1578 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1579 INT4OID, -1, 0);
1580 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1581 TEXTOID, -1, 0);
1582
1583 /*
1584 * Generate attribute metadata needed later to produce tuples from raw
1585 * C strings
1586 */
1587 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1588 funcctx->attinmeta = attinmeta;
1589
1590 if ((results != NULL) && (indnkeyatts > 0))
1591 {
1592 funcctx->max_calls = indnkeyatts;
1593
1594 /* got results, keep track of them */
1595 funcctx->user_fctx = results;
1596 }
1597 else
1598 {
1599 /* fast track when no results */
1600 MemoryContextSwitchTo(oldcontext);
1601 SRF_RETURN_DONE(funcctx);
1602 }
1603
1604 MemoryContextSwitchTo(oldcontext);
1605 }
1606
1607 /* stuff done on every call of the function */
1608 funcctx = SRF_PERCALL_SETUP();
1609
1610 /*
1611 * initialize per-call variables
1612 */
1613 call_cntr = funcctx->call_cntr;
1614 max_calls = funcctx->max_calls;
1615
1616 results = (char **) funcctx->user_fctx;
1617 attinmeta = funcctx->attinmeta;
1618
1619 if (call_cntr < max_calls) /* do when there is more left to send */
1620 {
1621 char **values;
1622 HeapTuple tuple;
1623 Datum result;
1624
1625 values = (char **) palloc(2 * sizeof(char *));
1626 values[0] = psprintf("%d", call_cntr + 1);
1627 values[1] = results[call_cntr];
1628
1629 /* build the tuple */
1630 tuple = BuildTupleFromCStrings(attinmeta, values);
1631
1632 /* make the tuple into a datum */
1633 result = HeapTupleGetDatum(tuple);
1634
1635 SRF_RETURN_NEXT(funcctx, result);
1636 }
1637 else
1638 {
1639 /* do when there is no more left */
1640 SRF_RETURN_DONE(funcctx);
1641 }
1642 }
1643
1644
1645 /*
1646 * dblink_build_sql_insert
1647 *
1648 * Used to generate an SQL insert statement
1649 * based on an existing tuple in a local relation.
1650 * This is useful for selectively replicating data
1651 * to another server via dblink.
1652 *
1653 * API:
1654 * <relname> - name of local table of interest
1655 * <pkattnums> - an int2vector of attnums which will be used
1656 * to identify the local tuple of interest
1657 * <pknumatts> - number of attnums in pkattnums
1658 * <src_pkattvals_arry> - text array of key values which will be used
1659 * to identify the local tuple of interest
1660 * <tgt_pkattvals_arry> - text array of key values which will be used
1661 * to build the string for execution remotely. These are substituted
1662 * for their counterparts in src_pkattvals_arry
1663 */
1664 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1665 Datum
dblink_build_sql_insert(PG_FUNCTION_ARGS)1666 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1667 {
1668 text *relname_text = PG_GETARG_TEXT_PP(0);
1669 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1670 int32 pknumatts_arg = PG_GETARG_INT32(2);
1671 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1672 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1673 Relation rel;
1674 int *pkattnums;
1675 int pknumatts;
1676 char **src_pkattvals;
1677 char **tgt_pkattvals;
1678 int src_nitems;
1679 int tgt_nitems;
1680 char *sql;
1681
1682 /*
1683 * Open target relation.
1684 */
1685 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1686
1687 /*
1688 * Process pkattnums argument.
1689 */
1690 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1691 &pkattnums, &pknumatts);
1692
1693 /*
1694 * Source array is made up of key values that will be used to locate the
1695 * tuple of interest from the local system.
1696 */
1697 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1698
1699 /*
1700 * There should be one source array key value for each key attnum
1701 */
1702 if (src_nitems != pknumatts)
1703 ereport(ERROR,
1704 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1705 errmsg("source key array length must match number of key attributes")));
1706
1707 /*
1708 * Target array is made up of key values that will be used to build the
1709 * SQL string for use on the remote system.
1710 */
1711 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1712
1713 /*
1714 * There should be one target array key value for each key attnum
1715 */
1716 if (tgt_nitems != pknumatts)
1717 ereport(ERROR,
1718 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1719 errmsg("target key array length must match number of key attributes")));
1720
1721 /*
1722 * Prep work is finally done. Go get the SQL string.
1723 */
1724 sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1725
1726 /*
1727 * Now we can close the relation.
1728 */
1729 relation_close(rel, AccessShareLock);
1730
1731 /*
1732 * And send it
1733 */
1734 PG_RETURN_TEXT_P(cstring_to_text(sql));
1735 }
1736
1737
1738 /*
1739 * dblink_build_sql_delete
1740 *
1741 * Used to generate an SQL delete statement.
1742 * This is useful for selectively replicating a
1743 * delete to another server via dblink.
1744 *
1745 * API:
1746 * <relname> - name of remote table of interest
1747 * <pkattnums> - an int2vector of attnums which will be used
1748 * to identify the remote tuple of interest
1749 * <pknumatts> - number of attnums in pkattnums
1750 * <tgt_pkattvals_arry> - text array of key values which will be used
1751 * to build the string for execution remotely.
1752 */
1753 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1754 Datum
dblink_build_sql_delete(PG_FUNCTION_ARGS)1755 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1756 {
1757 text *relname_text = PG_GETARG_TEXT_PP(0);
1758 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1759 int32 pknumatts_arg = PG_GETARG_INT32(2);
1760 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1761 Relation rel;
1762 int *pkattnums;
1763 int pknumatts;
1764 char **tgt_pkattvals;
1765 int tgt_nitems;
1766 char *sql;
1767
1768 /*
1769 * Open target relation.
1770 */
1771 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1772
1773 /*
1774 * Process pkattnums argument.
1775 */
1776 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1777 &pkattnums, &pknumatts);
1778
1779 /*
1780 * Target array is made up of key values that will be used to build the
1781 * SQL string for use on the remote system.
1782 */
1783 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1784
1785 /*
1786 * There should be one target array key value for each key attnum
1787 */
1788 if (tgt_nitems != pknumatts)
1789 ereport(ERROR,
1790 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1791 errmsg("target key array length must match number of key attributes")));
1792
1793 /*
1794 * Prep work is finally done. Go get the SQL string.
1795 */
1796 sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1797
1798 /*
1799 * Now we can close the relation.
1800 */
1801 relation_close(rel, AccessShareLock);
1802
1803 /*
1804 * And send it
1805 */
1806 PG_RETURN_TEXT_P(cstring_to_text(sql));
1807 }
1808
1809
1810 /*
1811 * dblink_build_sql_update
1812 *
1813 * Used to generate an SQL update statement
1814 * based on an existing tuple in a local relation.
1815 * This is useful for selectively replicating data
1816 * to another server via dblink.
1817 *
1818 * API:
1819 * <relname> - name of local table of interest
1820 * <pkattnums> - an int2vector of attnums which will be used
1821 * to identify the local tuple of interest
1822 * <pknumatts> - number of attnums in pkattnums
1823 * <src_pkattvals_arry> - text array of key values which will be used
1824 * to identify the local tuple of interest
1825 * <tgt_pkattvals_arry> - text array of key values which will be used
1826 * to build the string for execution remotely. These are substituted
1827 * for their counterparts in src_pkattvals_arry
1828 */
1829 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1830 Datum
dblink_build_sql_update(PG_FUNCTION_ARGS)1831 dblink_build_sql_update(PG_FUNCTION_ARGS)
1832 {
1833 text *relname_text = PG_GETARG_TEXT_PP(0);
1834 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1835 int32 pknumatts_arg = PG_GETARG_INT32(2);
1836 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1837 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1838 Relation rel;
1839 int *pkattnums;
1840 int pknumatts;
1841 char **src_pkattvals;
1842 char **tgt_pkattvals;
1843 int src_nitems;
1844 int tgt_nitems;
1845 char *sql;
1846
1847 /*
1848 * Open target relation.
1849 */
1850 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1851
1852 /*
1853 * Process pkattnums argument.
1854 */
1855 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1856 &pkattnums, &pknumatts);
1857
1858 /*
1859 * Source array is made up of key values that will be used to locate the
1860 * tuple of interest from the local system.
1861 */
1862 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1863
1864 /*
1865 * There should be one source array key value for each key attnum
1866 */
1867 if (src_nitems != pknumatts)
1868 ereport(ERROR,
1869 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1870 errmsg("source key array length must match number of key attributes")));
1871
1872 /*
1873 * Target array is made up of key values that will be used to build the
1874 * SQL string for use on the remote system.
1875 */
1876 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1877
1878 /*
1879 * There should be one target array key value for each key attnum
1880 */
1881 if (tgt_nitems != pknumatts)
1882 ereport(ERROR,
1883 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1884 errmsg("target key array length must match number of key attributes")));
1885
1886 /*
1887 * Prep work is finally done. Go get the SQL string.
1888 */
1889 sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1890
1891 /*
1892 * Now we can close the relation.
1893 */
1894 relation_close(rel, AccessShareLock);
1895
1896 /*
1897 * And send it
1898 */
1899 PG_RETURN_TEXT_P(cstring_to_text(sql));
1900 }
1901
1902 /*
1903 * dblink_current_query
1904 * return the current query string
1905 * to allow its use in (among other things)
1906 * rewrite rules
1907 */
1908 PG_FUNCTION_INFO_V1(dblink_current_query);
1909 Datum
dblink_current_query(PG_FUNCTION_ARGS)1910 dblink_current_query(PG_FUNCTION_ARGS)
1911 {
1912 /* This is now just an alias for the built-in function current_query() */
1913 PG_RETURN_DATUM(current_query(fcinfo));
1914 }
1915
1916 /*
1917 * Retrieve async notifications for a connection.
1918 *
1919 * Returns a setof record of notifications, or an empty set if none received.
1920 * Can optionally take a named connection as parameter, but uses the unnamed
1921 * connection per default.
1922 *
1923 */
1924 #define DBLINK_NOTIFY_COLS 3
1925
1926 PG_FUNCTION_INFO_V1(dblink_get_notify);
1927 Datum
dblink_get_notify(PG_FUNCTION_ARGS)1928 dblink_get_notify(PG_FUNCTION_ARGS)
1929 {
1930 PGconn *conn;
1931 PGnotify *notify;
1932 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1933 TupleDesc tupdesc;
1934 Tuplestorestate *tupstore;
1935 MemoryContext per_query_ctx;
1936 MemoryContext oldcontext;
1937
1938 prepTuplestoreResult(fcinfo);
1939
1940 dblink_init();
1941 if (PG_NARGS() == 1)
1942 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1943 else
1944 conn = pconn->conn;
1945
1946 /* create the tuplestore in per-query memory */
1947 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1948 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1949
1950 tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS);
1951 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
1952 TEXTOID, -1, 0);
1953 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
1954 INT4OID, -1, 0);
1955 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
1956 TEXTOID, -1, 0);
1957
1958 tupstore = tuplestore_begin_heap(true, false, work_mem);
1959 rsinfo->setResult = tupstore;
1960 rsinfo->setDesc = tupdesc;
1961
1962 MemoryContextSwitchTo(oldcontext);
1963
1964 PQconsumeInput(conn);
1965 while ((notify = PQnotifies(conn)) != NULL)
1966 {
1967 Datum values[DBLINK_NOTIFY_COLS];
1968 bool nulls[DBLINK_NOTIFY_COLS];
1969
1970 memset(values, 0, sizeof(values));
1971 memset(nulls, 0, sizeof(nulls));
1972
1973 if (notify->relname != NULL)
1974 values[0] = CStringGetTextDatum(notify->relname);
1975 else
1976 nulls[0] = true;
1977
1978 values[1] = Int32GetDatum(notify->be_pid);
1979
1980 if (notify->extra != NULL)
1981 values[2] = CStringGetTextDatum(notify->extra);
1982 else
1983 nulls[2] = true;
1984
1985 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1986
1987 PQfreemem(notify);
1988 PQconsumeInput(conn);
1989 }
1990
1991 /* clean up and return the tuplestore */
1992 tuplestore_donestoring(tupstore);
1993
1994 return (Datum) 0;
1995 }
1996
1997 /*
1998 * Validate the options given to a dblink foreign server or user mapping.
1999 * Raise an error if any option is invalid.
2000 *
2001 * We just check the names of options here, so semantic errors in options,
2002 * such as invalid numeric format, will be detected at the attempt to connect.
2003 */
2004 PG_FUNCTION_INFO_V1(dblink_fdw_validator);
2005 Datum
dblink_fdw_validator(PG_FUNCTION_ARGS)2006 dblink_fdw_validator(PG_FUNCTION_ARGS)
2007 {
2008 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
2009 Oid context = PG_GETARG_OID(1);
2010 ListCell *cell;
2011
2012 static const PQconninfoOption *options = NULL;
2013
2014 /*
2015 * Get list of valid libpq options.
2016 *
2017 * To avoid unnecessary work, we get the list once and use it throughout
2018 * the lifetime of this backend process. We don't need to care about
2019 * memory context issues, because PQconndefaults allocates with malloc.
2020 */
2021 if (!options)
2022 {
2023 options = PQconndefaults();
2024 if (!options) /* assume reason for failure is OOM */
2025 ereport(ERROR,
2026 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2027 errmsg("out of memory"),
2028 errdetail("Could not get libpq's default connection options.")));
2029 }
2030
2031 /* Validate each supplied option. */
2032 foreach(cell, options_list)
2033 {
2034 DefElem *def = (DefElem *) lfirst(cell);
2035
2036 if (!is_valid_dblink_option(options, def->defname, context))
2037 {
2038 /*
2039 * Unknown option, or invalid option for the context specified, so
2040 * complain about it. Provide a hint with list of valid options
2041 * for the context.
2042 */
2043 StringInfoData buf;
2044 const PQconninfoOption *opt;
2045
2046 initStringInfo(&buf);
2047 for (opt = options; opt->keyword; opt++)
2048 {
2049 if (is_valid_dblink_option(options, opt->keyword, context))
2050 appendStringInfo(&buf, "%s%s",
2051 (buf.len > 0) ? ", " : "",
2052 opt->keyword);
2053 }
2054 ereport(ERROR,
2055 (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
2056 errmsg("invalid option \"%s\"", def->defname),
2057 errhint("Valid options in this context are: %s",
2058 buf.data)));
2059 }
2060 }
2061
2062 PG_RETURN_VOID();
2063 }
2064
2065
2066 /*************************************************************
2067 * internal functions
2068 */
2069
2070
2071 /*
2072 * get_pkey_attnames
2073 *
2074 * Get the primary key attnames for the given relation.
2075 * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2076 */
2077 static char **
get_pkey_attnames(Relation rel,int16 * indnkeyatts)2078 get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2079 {
2080 Relation indexRelation;
2081 ScanKeyData skey;
2082 SysScanDesc scan;
2083 HeapTuple indexTuple;
2084 int i;
2085 char **result = NULL;
2086 TupleDesc tupdesc;
2087
2088 /* initialize indnkeyatts to 0 in case no primary key exists */
2089 *indnkeyatts = 0;
2090
2091 tupdesc = rel->rd_att;
2092
2093 /* Prepare to scan pg_index for entries having indrelid = this rel. */
2094 indexRelation = table_open(IndexRelationId, AccessShareLock);
2095 ScanKeyInit(&skey,
2096 Anum_pg_index_indrelid,
2097 BTEqualStrategyNumber, F_OIDEQ,
2098 ObjectIdGetDatum(RelationGetRelid(rel)));
2099
2100 scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2101 NULL, 1, &skey);
2102
2103 while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2104 {
2105 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2106
2107 /* we're only interested if it is the primary key */
2108 if (index->indisprimary)
2109 {
2110 *indnkeyatts = index->indnkeyatts;
2111 if (*indnkeyatts > 0)
2112 {
2113 result = (char **) palloc(*indnkeyatts * sizeof(char *));
2114
2115 for (i = 0; i < *indnkeyatts; i++)
2116 result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2117 }
2118 break;
2119 }
2120 }
2121
2122 systable_endscan(scan);
2123 table_close(indexRelation, AccessShareLock);
2124
2125 return result;
2126 }
2127
2128 /*
2129 * Deconstruct a text[] into C-strings (note any NULL elements will be
2130 * returned as NULL pointers)
2131 */
2132 static char **
get_text_array_contents(ArrayType * array,int * numitems)2133 get_text_array_contents(ArrayType *array, int *numitems)
2134 {
2135 int ndim = ARR_NDIM(array);
2136 int *dims = ARR_DIMS(array);
2137 int nitems;
2138 int16 typlen;
2139 bool typbyval;
2140 char typalign;
2141 char **values;
2142 char *ptr;
2143 bits8 *bitmap;
2144 int bitmask;
2145 int i;
2146
2147 Assert(ARR_ELEMTYPE(array) == TEXTOID);
2148
2149 *numitems = nitems = ArrayGetNItems(ndim, dims);
2150
2151 get_typlenbyvalalign(ARR_ELEMTYPE(array),
2152 &typlen, &typbyval, &typalign);
2153
2154 values = (char **) palloc(nitems * sizeof(char *));
2155
2156 ptr = ARR_DATA_PTR(array);
2157 bitmap = ARR_NULLBITMAP(array);
2158 bitmask = 1;
2159
2160 for (i = 0; i < nitems; i++)
2161 {
2162 if (bitmap && (*bitmap & bitmask) == 0)
2163 {
2164 values[i] = NULL;
2165 }
2166 else
2167 {
2168 values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2169 ptr = att_addlength_pointer(ptr, typlen, ptr);
2170 ptr = (char *) att_align_nominal(ptr, typalign);
2171 }
2172
2173 /* advance bitmap pointer if any */
2174 if (bitmap)
2175 {
2176 bitmask <<= 1;
2177 if (bitmask == 0x100)
2178 {
2179 bitmap++;
2180 bitmask = 1;
2181 }
2182 }
2183 }
2184
2185 return values;
2186 }
2187
2188 static char *
get_sql_insert(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals,char ** tgt_pkattvals)2189 get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2190 {
2191 char *relname;
2192 HeapTuple tuple;
2193 TupleDesc tupdesc;
2194 int natts;
2195 StringInfoData buf;
2196 char *val;
2197 int key;
2198 int i;
2199 bool needComma;
2200
2201 initStringInfo(&buf);
2202
2203 /* get relation name including any needed schema prefix and quoting */
2204 relname = generate_relation_name(rel);
2205
2206 tupdesc = rel->rd_att;
2207 natts = tupdesc->natts;
2208
2209 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2210 if (!tuple)
2211 ereport(ERROR,
2212 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2213 errmsg("source row not found")));
2214
2215 appendStringInfo(&buf, "INSERT INTO %s(", relname);
2216
2217 needComma = false;
2218 for (i = 0; i < natts; i++)
2219 {
2220 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2221
2222 if (att->attisdropped)
2223 continue;
2224
2225 if (needComma)
2226 appendStringInfoChar(&buf, ',');
2227
2228 appendStringInfoString(&buf,
2229 quote_ident_cstr(NameStr(att->attname)));
2230 needComma = true;
2231 }
2232
2233 appendStringInfoString(&buf, ") VALUES(");
2234
2235 /*
2236 * Note: i is physical column number (counting from 0).
2237 */
2238 needComma = false;
2239 for (i = 0; i < natts; i++)
2240 {
2241 if (TupleDescAttr(tupdesc, i)->attisdropped)
2242 continue;
2243
2244 if (needComma)
2245 appendStringInfoChar(&buf, ',');
2246
2247 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2248
2249 if (key >= 0)
2250 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2251 else
2252 val = SPI_getvalue(tuple, tupdesc, i + 1);
2253
2254 if (val != NULL)
2255 {
2256 appendStringInfoString(&buf, quote_literal_cstr(val));
2257 pfree(val);
2258 }
2259 else
2260 appendStringInfoString(&buf, "NULL");
2261 needComma = true;
2262 }
2263 appendStringInfoChar(&buf, ')');
2264
2265 return buf.data;
2266 }
2267
2268 static char *
get_sql_delete(Relation rel,int * pkattnums,int pknumatts,char ** tgt_pkattvals)2269 get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2270 {
2271 char *relname;
2272 TupleDesc tupdesc;
2273 StringInfoData buf;
2274 int i;
2275
2276 initStringInfo(&buf);
2277
2278 /* get relation name including any needed schema prefix and quoting */
2279 relname = generate_relation_name(rel);
2280
2281 tupdesc = rel->rd_att;
2282
2283 appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2284 for (i = 0; i < pknumatts; i++)
2285 {
2286 int pkattnum = pkattnums[i];
2287 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2288
2289 if (i > 0)
2290 appendStringInfoString(&buf, " AND ");
2291
2292 appendStringInfoString(&buf,
2293 quote_ident_cstr(NameStr(attr->attname)));
2294
2295 if (tgt_pkattvals[i] != NULL)
2296 appendStringInfo(&buf, " = %s",
2297 quote_literal_cstr(tgt_pkattvals[i]));
2298 else
2299 appendStringInfoString(&buf, " IS NULL");
2300 }
2301
2302 return buf.data;
2303 }
2304
2305 static char *
get_sql_update(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals,char ** tgt_pkattvals)2306 get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2307 {
2308 char *relname;
2309 HeapTuple tuple;
2310 TupleDesc tupdesc;
2311 int natts;
2312 StringInfoData buf;
2313 char *val;
2314 int key;
2315 int i;
2316 bool needComma;
2317
2318 initStringInfo(&buf);
2319
2320 /* get relation name including any needed schema prefix and quoting */
2321 relname = generate_relation_name(rel);
2322
2323 tupdesc = rel->rd_att;
2324 natts = tupdesc->natts;
2325
2326 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2327 if (!tuple)
2328 ereport(ERROR,
2329 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2330 errmsg("source row not found")));
2331
2332 appendStringInfo(&buf, "UPDATE %s SET ", relname);
2333
2334 /*
2335 * Note: i is physical column number (counting from 0).
2336 */
2337 needComma = false;
2338 for (i = 0; i < natts; i++)
2339 {
2340 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2341
2342 if (attr->attisdropped)
2343 continue;
2344
2345 if (needComma)
2346 appendStringInfoString(&buf, ", ");
2347
2348 appendStringInfo(&buf, "%s = ",
2349 quote_ident_cstr(NameStr(attr->attname)));
2350
2351 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2352
2353 if (key >= 0)
2354 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2355 else
2356 val = SPI_getvalue(tuple, tupdesc, i + 1);
2357
2358 if (val != NULL)
2359 {
2360 appendStringInfoString(&buf, quote_literal_cstr(val));
2361 pfree(val);
2362 }
2363 else
2364 appendStringInfoString(&buf, "NULL");
2365 needComma = true;
2366 }
2367
2368 appendStringInfoString(&buf, " WHERE ");
2369
2370 for (i = 0; i < pknumatts; i++)
2371 {
2372 int pkattnum = pkattnums[i];
2373 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2374
2375 if (i > 0)
2376 appendStringInfoString(&buf, " AND ");
2377
2378 appendStringInfoString(&buf,
2379 quote_ident_cstr(NameStr(attr->attname)));
2380
2381 val = tgt_pkattvals[i];
2382
2383 if (val != NULL)
2384 appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2385 else
2386 appendStringInfoString(&buf, " IS NULL");
2387 }
2388
2389 return buf.data;
2390 }
2391
2392 /*
2393 * Return a properly quoted identifier.
2394 * Uses quote_ident in quote.c
2395 */
2396 static char *
quote_ident_cstr(char * rawstr)2397 quote_ident_cstr(char *rawstr)
2398 {
2399 text *rawstr_text;
2400 text *result_text;
2401 char *result;
2402
2403 rawstr_text = cstring_to_text(rawstr);
2404 result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2405 PointerGetDatum(rawstr_text)));
2406 result = text_to_cstring(result_text);
2407
2408 return result;
2409 }
2410
2411 static int
get_attnum_pk_pos(int * pkattnums,int pknumatts,int key)2412 get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2413 {
2414 int i;
2415
2416 /*
2417 * Not likely a long list anyway, so just scan for the value
2418 */
2419 for (i = 0; i < pknumatts; i++)
2420 if (key == pkattnums[i])
2421 return i;
2422
2423 return -1;
2424 }
2425
2426 static HeapTuple
get_tuple_of_interest(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals)2427 get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2428 {
2429 char *relname;
2430 TupleDesc tupdesc;
2431 int natts;
2432 StringInfoData buf;
2433 int ret;
2434 HeapTuple tuple;
2435 int i;
2436
2437 /*
2438 * Connect to SPI manager
2439 */
2440 if ((ret = SPI_connect()) < 0)
2441 /* internal error */
2442 elog(ERROR, "SPI connect failure - returned %d", ret);
2443
2444 initStringInfo(&buf);
2445
2446 /* get relation name including any needed schema prefix and quoting */
2447 relname = generate_relation_name(rel);
2448
2449 tupdesc = rel->rd_att;
2450 natts = tupdesc->natts;
2451
2452 /*
2453 * Build sql statement to look up tuple of interest, ie, the one matching
2454 * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2455 * generate a result tuple that matches the table's physical structure,
2456 * with NULLs for any dropped columns. Otherwise we have to deal with two
2457 * different tupdescs and everything's very confusing.
2458 */
2459 appendStringInfoString(&buf, "SELECT ");
2460
2461 for (i = 0; i < natts; i++)
2462 {
2463 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2464
2465 if (i > 0)
2466 appendStringInfoString(&buf, ", ");
2467
2468 if (attr->attisdropped)
2469 appendStringInfoString(&buf, "NULL");
2470 else
2471 appendStringInfoString(&buf,
2472 quote_ident_cstr(NameStr(attr->attname)));
2473 }
2474
2475 appendStringInfo(&buf, " FROM %s WHERE ", relname);
2476
2477 for (i = 0; i < pknumatts; i++)
2478 {
2479 int pkattnum = pkattnums[i];
2480 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2481
2482 if (i > 0)
2483 appendStringInfoString(&buf, " AND ");
2484
2485 appendStringInfoString(&buf,
2486 quote_ident_cstr(NameStr(attr->attname)));
2487
2488 if (src_pkattvals[i] != NULL)
2489 appendStringInfo(&buf, " = %s",
2490 quote_literal_cstr(src_pkattvals[i]));
2491 else
2492 appendStringInfoString(&buf, " IS NULL");
2493 }
2494
2495 /*
2496 * Retrieve the desired tuple
2497 */
2498 ret = SPI_exec(buf.data, 0);
2499 pfree(buf.data);
2500
2501 /*
2502 * Only allow one qualifying tuple
2503 */
2504 if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2505 ereport(ERROR,
2506 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2507 errmsg("source criteria matched more than one record")));
2508
2509 else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2510 {
2511 SPITupleTable *tuptable = SPI_tuptable;
2512
2513 tuple = SPI_copytuple(tuptable->vals[0]);
2514 SPI_finish();
2515
2516 return tuple;
2517 }
2518 else
2519 {
2520 /*
2521 * no qualifying tuples
2522 */
2523 SPI_finish();
2524
2525 return NULL;
2526 }
2527
2528 /*
2529 * never reached, but keep compiler quiet
2530 */
2531 return NULL;
2532 }
2533
2534 /*
2535 * Open the relation named by relname_text, acquire specified type of lock,
2536 * verify we have specified permissions.
2537 * Caller must close rel when done with it.
2538 */
2539 static Relation
get_rel_from_relname(text * relname_text,LOCKMODE lockmode,AclMode aclmode)2540 get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2541 {
2542 RangeVar *relvar;
2543 Relation rel;
2544 AclResult aclresult;
2545
2546 relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2547 rel = table_openrv(relvar, lockmode);
2548
2549 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2550 aclmode);
2551 if (aclresult != ACLCHECK_OK)
2552 aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
2553 RelationGetRelationName(rel));
2554
2555 return rel;
2556 }
2557
2558 /*
2559 * generate_relation_name - copied from ruleutils.c
2560 * Compute the name to display for a relation
2561 *
2562 * The result includes all necessary quoting and schema-prefixing.
2563 */
2564 static char *
generate_relation_name(Relation rel)2565 generate_relation_name(Relation rel)
2566 {
2567 char *nspname;
2568 char *result;
2569
2570 /* Qualify the name if not visible in search path */
2571 if (RelationIsVisible(RelationGetRelid(rel)))
2572 nspname = NULL;
2573 else
2574 nspname = get_namespace_name(rel->rd_rel->relnamespace);
2575
2576 result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2577
2578 return result;
2579 }
2580
2581
2582 static remoteConn *
getConnectionByName(const char * name)2583 getConnectionByName(const char *name)
2584 {
2585 remoteConnHashEnt *hentry;
2586 char *key;
2587
2588 if (!remoteConnHash)
2589 remoteConnHash = createConnHash();
2590
2591 key = pstrdup(name);
2592 truncate_identifier(key, strlen(key), false);
2593 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2594 key, HASH_FIND, NULL);
2595
2596 if (hentry)
2597 return hentry->rconn;
2598
2599 return NULL;
2600 }
2601
2602 static HTAB *
createConnHash(void)2603 createConnHash(void)
2604 {
2605 HASHCTL ctl;
2606
2607 ctl.keysize = NAMEDATALEN;
2608 ctl.entrysize = sizeof(remoteConnHashEnt);
2609
2610 return hash_create("Remote Con hash", NUMCONN, &ctl,
2611 HASH_ELEM | HASH_STRINGS);
2612 }
2613
2614 static void
createNewConnection(const char * name,remoteConn * rconn)2615 createNewConnection(const char *name, remoteConn *rconn)
2616 {
2617 remoteConnHashEnt *hentry;
2618 bool found;
2619 char *key;
2620
2621 if (!remoteConnHash)
2622 remoteConnHash = createConnHash();
2623
2624 key = pstrdup(name);
2625 truncate_identifier(key, strlen(key), true);
2626 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2627 HASH_ENTER, &found);
2628
2629 if (found)
2630 {
2631 PQfinish(rconn->conn);
2632 ReleaseExternalFD();
2633 pfree(rconn);
2634
2635 ereport(ERROR,
2636 (errcode(ERRCODE_DUPLICATE_OBJECT),
2637 errmsg("duplicate connection name")));
2638 }
2639
2640 hentry->rconn = rconn;
2641 strlcpy(hentry->name, name, sizeof(hentry->name));
2642 }
2643
2644 static void
deleteConnection(const char * name)2645 deleteConnection(const char *name)
2646 {
2647 remoteConnHashEnt *hentry;
2648 bool found;
2649 char *key;
2650
2651 if (!remoteConnHash)
2652 remoteConnHash = createConnHash();
2653
2654 key = pstrdup(name);
2655 truncate_identifier(key, strlen(key), false);
2656 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2657 key, HASH_REMOVE, &found);
2658
2659 if (!hentry)
2660 ereport(ERROR,
2661 (errcode(ERRCODE_UNDEFINED_OBJECT),
2662 errmsg("undefined connection name")));
2663
2664 }
2665
2666 static void
dblink_security_check(PGconn * conn,remoteConn * rconn)2667 dblink_security_check(PGconn *conn, remoteConn *rconn)
2668 {
2669 if (!superuser())
2670 {
2671 if (!PQconnectionUsedPassword(conn))
2672 {
2673 PQfinish(conn);
2674 ReleaseExternalFD();
2675 if (rconn)
2676 pfree(rconn);
2677
2678 ereport(ERROR,
2679 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2680 errmsg("password is required"),
2681 errdetail("Non-superuser cannot connect if the server does not request a password."),
2682 errhint("Target server's authentication method must be changed.")));
2683 }
2684 }
2685 }
2686
2687 /*
2688 * For non-superusers, insist that the connstr specify a password. This
2689 * prevents a password from being picked up from .pgpass, a service file,
2690 * the environment, etc. We don't want the postgres user's passwords
2691 * to be accessible to non-superusers.
2692 */
2693 static void
dblink_connstr_check(const char * connstr)2694 dblink_connstr_check(const char *connstr)
2695 {
2696 if (!superuser())
2697 {
2698 PQconninfoOption *options;
2699 PQconninfoOption *option;
2700 bool connstr_gives_password = false;
2701
2702 options = PQconninfoParse(connstr, NULL);
2703 if (options)
2704 {
2705 for (option = options; option->keyword != NULL; option++)
2706 {
2707 if (strcmp(option->keyword, "password") == 0)
2708 {
2709 if (option->val != NULL && option->val[0] != '\0')
2710 {
2711 connstr_gives_password = true;
2712 break;
2713 }
2714 }
2715 }
2716 PQconninfoFree(options);
2717 }
2718
2719 if (!connstr_gives_password)
2720 ereport(ERROR,
2721 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2722 errmsg("password is required"),
2723 errdetail("Non-superusers must provide a password in the connection string.")));
2724 }
2725 }
2726
2727 /*
2728 * Report an error received from the remote server
2729 *
2730 * res: the received error result (will be freed)
2731 * fail: true for ERROR ereport, false for NOTICE
2732 * fmt and following args: sprintf-style format and values for errcontext;
2733 * the resulting string should be worded like "while <some action>"
2734 */
2735 static void
dblink_res_error(PGconn * conn,const char * conname,PGresult * res,bool fail,const char * fmt,...)2736 dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2737 bool fail, const char *fmt,...)
2738 {
2739 int level;
2740 char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2741 char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2742 char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2743 char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2744 char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2745 int sqlstate;
2746 char *message_primary;
2747 char *message_detail;
2748 char *message_hint;
2749 char *message_context;
2750 va_list ap;
2751 char dblink_context_msg[512];
2752
2753 if (fail)
2754 level = ERROR;
2755 else
2756 level = NOTICE;
2757
2758 if (pg_diag_sqlstate)
2759 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2760 pg_diag_sqlstate[1],
2761 pg_diag_sqlstate[2],
2762 pg_diag_sqlstate[3],
2763 pg_diag_sqlstate[4]);
2764 else
2765 sqlstate = ERRCODE_CONNECTION_FAILURE;
2766
2767 message_primary = xpstrdup(pg_diag_message_primary);
2768 message_detail = xpstrdup(pg_diag_message_detail);
2769 message_hint = xpstrdup(pg_diag_message_hint);
2770 message_context = xpstrdup(pg_diag_context);
2771
2772 /*
2773 * If we don't get a message from the PGresult, try the PGconn. This is
2774 * needed because for connection-level failures, PQexec may just return
2775 * NULL, not a PGresult at all.
2776 */
2777 if (message_primary == NULL)
2778 message_primary = pchomp(PQerrorMessage(conn));
2779
2780 /*
2781 * Now that we've copied all the data we need out of the PGresult, it's
2782 * safe to free it. We must do this to avoid PGresult leakage. We're
2783 * leaking all the strings too, but those are in palloc'd memory that will
2784 * get cleaned up eventually.
2785 */
2786 if (res)
2787 PQclear(res);
2788
2789 /*
2790 * Format the basic errcontext string. Below, we'll add on something
2791 * about the connection name. That's a violation of the translatability
2792 * guidelines about constructing error messages out of parts, but since
2793 * there's no translation support for dblink, there's no need to worry
2794 * about that (yet).
2795 */
2796 va_start(ap, fmt);
2797 vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2798 va_end(ap);
2799
2800 ereport(level,
2801 (errcode(sqlstate),
2802 message_primary ? errmsg_internal("%s", message_primary) :
2803 errmsg("could not obtain message string for remote error"),
2804 message_detail ? errdetail_internal("%s", message_detail) : 0,
2805 message_hint ? errhint("%s", message_hint) : 0,
2806 message_context ? (errcontext("%s", message_context)) : 0,
2807 conname ?
2808 (errcontext("%s on dblink connection named \"%s\"",
2809 dblink_context_msg, conname)) :
2810 (errcontext("%s on unnamed dblink connection",
2811 dblink_context_msg))));
2812 }
2813
2814 /*
2815 * Obtain connection string for a foreign server
2816 */
2817 static char *
get_connect_string(const char * servername)2818 get_connect_string(const char *servername)
2819 {
2820 ForeignServer *foreign_server = NULL;
2821 UserMapping *user_mapping;
2822 ListCell *cell;
2823 StringInfoData buf;
2824 ForeignDataWrapper *fdw;
2825 AclResult aclresult;
2826 char *srvname;
2827
2828 static const PQconninfoOption *options = NULL;
2829
2830 initStringInfo(&buf);
2831
2832 /*
2833 * Get list of valid libpq options.
2834 *
2835 * To avoid unnecessary work, we get the list once and use it throughout
2836 * the lifetime of this backend process. We don't need to care about
2837 * memory context issues, because PQconndefaults allocates with malloc.
2838 */
2839 if (!options)
2840 {
2841 options = PQconndefaults();
2842 if (!options) /* assume reason for failure is OOM */
2843 ereport(ERROR,
2844 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2845 errmsg("out of memory"),
2846 errdetail("Could not get libpq's default connection options.")));
2847 }
2848
2849 /* first gather the server connstr options */
2850 srvname = pstrdup(servername);
2851 truncate_identifier(srvname, strlen(srvname), false);
2852 foreign_server = GetForeignServerByName(srvname, true);
2853
2854 if (foreign_server)
2855 {
2856 Oid serverid = foreign_server->serverid;
2857 Oid fdwid = foreign_server->fdwid;
2858 Oid userid = GetUserId();
2859
2860 user_mapping = GetUserMapping(userid, serverid);
2861 fdw = GetForeignDataWrapper(fdwid);
2862
2863 /* Check permissions, user must have usage on the server. */
2864 aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
2865 if (aclresult != ACLCHECK_OK)
2866 aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2867
2868 foreach(cell, fdw->options)
2869 {
2870 DefElem *def = lfirst(cell);
2871
2872 if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2873 appendStringInfo(&buf, "%s='%s' ", def->defname,
2874 escape_param_str(strVal(def->arg)));
2875 }
2876
2877 foreach(cell, foreign_server->options)
2878 {
2879 DefElem *def = lfirst(cell);
2880
2881 if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2882 appendStringInfo(&buf, "%s='%s' ", def->defname,
2883 escape_param_str(strVal(def->arg)));
2884 }
2885
2886 foreach(cell, user_mapping->options)
2887 {
2888
2889 DefElem *def = lfirst(cell);
2890
2891 if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2892 appendStringInfo(&buf, "%s='%s' ", def->defname,
2893 escape_param_str(strVal(def->arg)));
2894 }
2895
2896 return buf.data;
2897 }
2898 else
2899 return NULL;
2900 }
2901
2902 /*
2903 * Escaping libpq connect parameter strings.
2904 *
2905 * Replaces "'" with "\'" and "\" with "\\".
2906 */
2907 static char *
escape_param_str(const char * str)2908 escape_param_str(const char *str)
2909 {
2910 const char *cp;
2911 StringInfoData buf;
2912
2913 initStringInfo(&buf);
2914
2915 for (cp = str; *cp; cp++)
2916 {
2917 if (*cp == '\\' || *cp == '\'')
2918 appendStringInfoChar(&buf, '\\');
2919 appendStringInfoChar(&buf, *cp);
2920 }
2921
2922 return buf.data;
2923 }
2924
2925 /*
2926 * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2927 * functions, and translate to the internal representation.
2928 *
2929 * The user supplies an int2vector of 1-based logical attnums, plus a count
2930 * argument (the need for the separate count argument is historical, but we
2931 * still check it). We check that each attnum corresponds to a valid,
2932 * non-dropped attribute of the rel. We do *not* prevent attnums from being
2933 * listed twice, though the actual use-case for such things is dubious.
2934 * Note that before Postgres 9.0, the user's attnums were interpreted as
2935 * physical not logical column numbers; this was changed for future-proofing.
2936 *
2937 * The internal representation is a palloc'd int array of 0-based physical
2938 * attnums.
2939 */
2940 static void
validate_pkattnums(Relation rel,int2vector * pkattnums_arg,int32 pknumatts_arg,int ** pkattnums,int * pknumatts)2941 validate_pkattnums(Relation rel,
2942 int2vector *pkattnums_arg, int32 pknumatts_arg,
2943 int **pkattnums, int *pknumatts)
2944 {
2945 TupleDesc tupdesc = rel->rd_att;
2946 int natts = tupdesc->natts;
2947 int i;
2948
2949 /* Don't take more array elements than there are */
2950 pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
2951
2952 /* Must have at least one pk attnum selected */
2953 if (pknumatts_arg <= 0)
2954 ereport(ERROR,
2955 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2956 errmsg("number of key attributes must be > 0")));
2957
2958 /* Allocate output array */
2959 *pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
2960 *pknumatts = pknumatts_arg;
2961
2962 /* Validate attnums and convert to internal form */
2963 for (i = 0; i < pknumatts_arg; i++)
2964 {
2965 int pkattnum = pkattnums_arg->values[i];
2966 int lnum;
2967 int j;
2968
2969 /* Can throw error immediately if out of range */
2970 if (pkattnum <= 0 || pkattnum > natts)
2971 ereport(ERROR,
2972 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2973 errmsg("invalid attribute number %d", pkattnum)));
2974
2975 /* Identify which physical column has this logical number */
2976 lnum = 0;
2977 for (j = 0; j < natts; j++)
2978 {
2979 /* dropped columns don't count */
2980 if (TupleDescAttr(tupdesc, j)->attisdropped)
2981 continue;
2982
2983 if (++lnum == pkattnum)
2984 break;
2985 }
2986
2987 if (j < natts)
2988 (*pkattnums)[i] = j;
2989 else
2990 ereport(ERROR,
2991 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2992 errmsg("invalid attribute number %d", pkattnum)));
2993 }
2994 }
2995
2996 /*
2997 * Check if the specified connection option is valid.
2998 *
2999 * We basically allow whatever libpq thinks is an option, with these
3000 * restrictions:
3001 * debug options: disallowed
3002 * "client_encoding": disallowed
3003 * "user": valid only in USER MAPPING options
3004 * secure options (eg password): valid only in USER MAPPING options
3005 * others: valid only in FOREIGN SERVER options
3006 *
3007 * We disallow client_encoding because it would be overridden anyway via
3008 * PQclientEncoding; allowing it to be specified would merely promote
3009 * confusion.
3010 */
3011 static bool
is_valid_dblink_option(const PQconninfoOption * options,const char * option,Oid context)3012 is_valid_dblink_option(const PQconninfoOption *options, const char *option,
3013 Oid context)
3014 {
3015 const PQconninfoOption *opt;
3016
3017 /* Look up the option in libpq result */
3018 for (opt = options; opt->keyword; opt++)
3019 {
3020 if (strcmp(opt->keyword, option) == 0)
3021 break;
3022 }
3023 if (opt->keyword == NULL)
3024 return false;
3025
3026 /* Disallow debug options (particularly "replication") */
3027 if (strchr(opt->dispchar, 'D'))
3028 return false;
3029
3030 /* Disallow "client_encoding" */
3031 if (strcmp(opt->keyword, "client_encoding") == 0)
3032 return false;
3033
3034 /*
3035 * If the option is "user" or marked secure, it should be specified only
3036 * in USER MAPPING. Others should be specified only in SERVER.
3037 */
3038 if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
3039 {
3040 if (context != UserMappingRelationId)
3041 return false;
3042 }
3043 else
3044 {
3045 if (context != ForeignServerRelationId)
3046 return false;
3047 }
3048
3049 return true;
3050 }
3051
3052 /*
3053 * Copy the remote session's values of GUCs that affect datatype I/O
3054 * and apply them locally in a new GUC nesting level. Returns the new
3055 * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3056 * or -1 if no new nestlevel was needed.
3057 *
3058 * We use the equivalent of a function SET option to allow the settings to
3059 * persist only until the caller calls restoreLocalGucs. If an error is
3060 * thrown in between, guc.c will take care of undoing the settings.
3061 */
3062 static int
applyRemoteGucs(PGconn * conn)3063 applyRemoteGucs(PGconn *conn)
3064 {
3065 static const char *const GUCsAffectingIO[] = {
3066 "DateStyle",
3067 "IntervalStyle"
3068 };
3069
3070 int nestlevel = -1;
3071 int i;
3072
3073 for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3074 {
3075 const char *gucName = GUCsAffectingIO[i];
3076 const char *remoteVal = PQparameterStatus(conn, gucName);
3077 const char *localVal;
3078
3079 /*
3080 * If the remote server is pre-8.4, it won't have IntervalStyle, but
3081 * that's okay because its output format won't be ambiguous. So just
3082 * skip the GUC if we don't get a value for it. (We might eventually
3083 * need more complicated logic with remote-version checks here.)
3084 */
3085 if (remoteVal == NULL)
3086 continue;
3087
3088 /*
3089 * Avoid GUC-setting overhead if the remote and local GUCs already
3090 * have the same value.
3091 */
3092 localVal = GetConfigOption(gucName, false, false);
3093 Assert(localVal != NULL);
3094
3095 if (strcmp(remoteVal, localVal) == 0)
3096 continue;
3097
3098 /* Create new GUC nest level if we didn't already */
3099 if (nestlevel < 0)
3100 nestlevel = NewGUCNestLevel();
3101
3102 /* Apply the option (this will throw error on failure) */
3103 (void) set_config_option(gucName, remoteVal,
3104 PGC_USERSET, PGC_S_SESSION,
3105 GUC_ACTION_SAVE, true, 0, false);
3106 }
3107
3108 return nestlevel;
3109 }
3110
3111 /*
3112 * Restore local GUCs after they have been overlaid with remote settings.
3113 */
3114 static void
restoreLocalGucs(int nestlevel)3115 restoreLocalGucs(int nestlevel)
3116 {
3117 /* Do nothing if no new nestlevel was created */
3118 if (nestlevel > 0)
3119 AtEOXact_GUC(true, nestlevel);
3120 }
3121