1 /*
2  * dblink.c
3  *
4  * Functions returning results from a remote database
5  *
6  * Joe Conway <mail@joeconway.com>
7  * And contributors:
8  * Darko Prenosil <Darko.Prenosil@finteh.hr>
9  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10  *
11  * contrib/dblink/dblink.c
12  * Copyright (c) 2001-2018, PostgreSQL Global Development Group
13  * ALL RIGHTS RESERVED;
14  *
15  * Permission to use, copy, modify, and distribute this software and its
16  * documentation for any purpose, without fee, and without a written agreement
17  * is hereby granted, provided that the above copyright notice and this
18  * paragraph and the following two paragraphs appear in all copies.
19  *
20  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24  * POSSIBILITY OF SUCH DAMAGE.
25  *
26  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
29  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31  *
32  */
33 #include "postgres.h"
34 
35 #include <limits.h>
36 
37 #include "libpq-fe.h"
38 
39 #include "access/htup_details.h"
40 #include "access/reloptions.h"
41 #include "catalog/indexing.h"
42 #include "catalog/namespace.h"
43 #include "catalog/pg_foreign_data_wrapper.h"
44 #include "catalog/pg_foreign_server.h"
45 #include "catalog/pg_type.h"
46 #include "catalog/pg_user_mapping.h"
47 #include "executor/spi.h"
48 #include "foreign/foreign.h"
49 #include "funcapi.h"
50 #include "lib/stringinfo.h"
51 #include "mb/pg_wchar.h"
52 #include "miscadmin.h"
53 #include "parser/scansup.h"
54 #include "utils/acl.h"
55 #include "utils/builtins.h"
56 #include "utils/fmgroids.h"
57 #include "utils/guc.h"
58 #include "utils/lsyscache.h"
59 #include "utils/memutils.h"
60 #include "utils/rel.h"
61 #include "utils/tqual.h"
62 #include "utils/varlena.h"
63 
64 PG_MODULE_MAGIC;
65 
66 typedef struct remoteConn
67 {
68 	PGconn	   *conn;			/* Hold the remote connection */
69 	int			openCursorCount;	/* The number of open cursors */
70 	bool		newXactForCursor;	/* Opened a transaction for a cursor */
71 } remoteConn;
72 
73 typedef struct storeInfo
74 {
75 	FunctionCallInfo fcinfo;
76 	Tuplestorestate *tuplestore;
77 	AttInMetadata *attinmeta;
78 	MemoryContext tmpcontext;
79 	char	  **cstrs;
80 	/* temp storage for results to avoid leaks on exception */
81 	PGresult   *last_res;
82 	PGresult   *cur_res;
83 } storeInfo;
84 
85 /*
86  * Internal declarations
87  */
88 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
89 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
90 static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
91 				  PGresult *res);
92 static void materializeQueryResult(FunctionCallInfo fcinfo,
93 					   PGconn *conn,
94 					   const char *conname,
95 					   const char *sql,
96 					   bool fail);
97 static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
98 static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
99 static remoteConn *getConnectionByName(const char *name);
100 static HTAB *createConnHash(void);
101 static void createNewConnection(const char *name, remoteConn *rconn);
102 static void deleteConnection(const char *name);
103 static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
104 static char **get_text_array_contents(ArrayType *array, int *numitems);
105 static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
106 static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
107 static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
108 static char *quote_ident_cstr(char *rawstr);
109 static int	get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
110 static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
111 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
112 static char *generate_relation_name(Relation rel);
113 static void dblink_connstr_check(const char *connstr);
114 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
115 static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
116 				 bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
117 static char *get_connect_string(const char *servername);
118 static char *escape_param_str(const char *from);
119 static void validate_pkattnums(Relation rel,
120 				   int2vector *pkattnums_arg, int32 pknumatts_arg,
121 				   int **pkattnums, int *pknumatts);
122 static bool is_valid_dblink_option(const PQconninfoOption *options,
123 					   const char *option, Oid context);
124 static int	applyRemoteGucs(PGconn *conn);
125 static void restoreLocalGucs(int nestlevel);
126 
127 /* Global */
128 static remoteConn *pconn = NULL;
129 static HTAB *remoteConnHash = NULL;
130 
131 /*
132  *	Following is list that holds multiple remote connections.
133  *	Calling convention of each dblink function changes to accept
134  *	connection name as the first parameter. The connection list is
135  *	much like ecpg e.g. a mapping between a name and a PGconn object.
136  */
137 
138 typedef struct remoteConnHashEnt
139 {
140 	char		name[NAMEDATALEN];
141 	remoteConn *rconn;
142 } remoteConnHashEnt;
143 
144 /* initial number of connection hashes */
145 #define NUMCONN 16
146 
147 static char *
xpstrdup(const char * in)148 xpstrdup(const char *in)
149 {
150 	if (in == NULL)
151 		return NULL;
152 	return pstrdup(in);
153 }
154 
155 static void
pg_attribute_noreturn()156 pg_attribute_noreturn()
157 dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
158 {
159 	char	   *msg = pchomp(PQerrorMessage(conn));
160 
161 	if (res)
162 		PQclear(res);
163 	elog(ERROR, "%s: %s", p2, msg);
164 }
165 
166 static void
pg_attribute_noreturn()167 pg_attribute_noreturn()
168 dblink_conn_not_avail(const char *conname)
169 {
170 	if (conname)
171 		ereport(ERROR,
172 				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
173 				 errmsg("connection \"%s\" not available", conname)));
174 	else
175 		ereport(ERROR,
176 				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
177 				 errmsg("connection not available")));
178 }
179 
180 static void
dblink_get_conn(char * conname_or_str,PGconn * volatile * conn_p,char ** conname_p,volatile bool * freeconn_p)181 dblink_get_conn(char *conname_or_str,
182 				PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
183 {
184 	remoteConn *rconn = getConnectionByName(conname_or_str);
185 	PGconn	   *conn;
186 	char	   *conname;
187 	bool		freeconn;
188 
189 	if (rconn)
190 	{
191 		conn = rconn->conn;
192 		conname = conname_or_str;
193 		freeconn = false;
194 	}
195 	else
196 	{
197 		const char *connstr;
198 
199 		connstr = get_connect_string(conname_or_str);
200 		if (connstr == NULL)
201 			connstr = conname_or_str;
202 		dblink_connstr_check(connstr);
203 		conn = PQconnectdb(connstr);
204 		if (PQstatus(conn) == CONNECTION_BAD)
205 		{
206 			char	   *msg = pchomp(PQerrorMessage(conn));
207 
208 			PQfinish(conn);
209 			ereport(ERROR,
210 					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
211 					 errmsg("could not establish connection"),
212 					 errdetail_internal("%s", msg)));
213 		}
214 		dblink_security_check(conn, rconn);
215 		if (PQclientEncoding(conn) != GetDatabaseEncoding())
216 			PQsetClientEncoding(conn, GetDatabaseEncodingName());
217 		freeconn = true;
218 		conname = NULL;
219 	}
220 
221 	*conn_p = conn;
222 	*conname_p = conname;
223 	*freeconn_p = freeconn;
224 }
225 
226 static PGconn *
dblink_get_named_conn(const char * conname)227 dblink_get_named_conn(const char *conname)
228 {
229 	remoteConn *rconn = getConnectionByName(conname);
230 
231 	if (rconn)
232 		return rconn->conn;
233 
234 	dblink_conn_not_avail(conname);
235 	return NULL;				/* keep compiler quiet */
236 }
237 
238 static void
dblink_init(void)239 dblink_init(void)
240 {
241 	if (!pconn)
242 	{
243 		pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
244 		pconn->conn = NULL;
245 		pconn->openCursorCount = 0;
246 		pconn->newXactForCursor = false;
247 	}
248 }
249 
250 /*
251  * Create a persistent connection to another database
252  */
253 PG_FUNCTION_INFO_V1(dblink_connect);
254 Datum
dblink_connect(PG_FUNCTION_ARGS)255 dblink_connect(PG_FUNCTION_ARGS)
256 {
257 	char	   *conname_or_str = NULL;
258 	char	   *connstr = NULL;
259 	char	   *connname = NULL;
260 	char	   *msg;
261 	PGconn	   *conn = NULL;
262 	remoteConn *rconn = NULL;
263 
264 	dblink_init();
265 
266 	if (PG_NARGS() == 2)
267 	{
268 		conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
269 		connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
270 	}
271 	else if (PG_NARGS() == 1)
272 		conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
273 
274 	if (connname)
275 	{
276 		rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
277 												  sizeof(remoteConn));
278 		rconn->conn = NULL;
279 		rconn->openCursorCount = 0;
280 		rconn->newXactForCursor = false;
281 	}
282 
283 	/* first check for valid foreign data server */
284 	connstr = get_connect_string(conname_or_str);
285 	if (connstr == NULL)
286 		connstr = conname_or_str;
287 
288 	/* check password in connection string if not superuser */
289 	dblink_connstr_check(connstr);
290 	conn = PQconnectdb(connstr);
291 
292 	if (PQstatus(conn) == CONNECTION_BAD)
293 	{
294 		msg = pchomp(PQerrorMessage(conn));
295 		PQfinish(conn);
296 		if (rconn)
297 			pfree(rconn);
298 
299 		ereport(ERROR,
300 				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
301 				 errmsg("could not establish connection"),
302 				 errdetail_internal("%s", msg)));
303 	}
304 
305 	/* check password actually used if not superuser */
306 	dblink_security_check(conn, rconn);
307 
308 	/* attempt to set client encoding to match server encoding, if needed */
309 	if (PQclientEncoding(conn) != GetDatabaseEncoding())
310 		PQsetClientEncoding(conn, GetDatabaseEncodingName());
311 
312 	if (connname)
313 	{
314 		rconn->conn = conn;
315 		createNewConnection(connname, rconn);
316 	}
317 	else
318 	{
319 		if (pconn->conn)
320 			PQfinish(pconn->conn);
321 		pconn->conn = conn;
322 	}
323 
324 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
325 }
326 
327 /*
328  * Clear a persistent connection to another database
329  */
330 PG_FUNCTION_INFO_V1(dblink_disconnect);
331 Datum
dblink_disconnect(PG_FUNCTION_ARGS)332 dblink_disconnect(PG_FUNCTION_ARGS)
333 {
334 	char	   *conname = NULL;
335 	remoteConn *rconn = NULL;
336 	PGconn	   *conn = NULL;
337 
338 	dblink_init();
339 
340 	if (PG_NARGS() == 1)
341 	{
342 		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
343 		rconn = getConnectionByName(conname);
344 		if (rconn)
345 			conn = rconn->conn;
346 	}
347 	else
348 		conn = pconn->conn;
349 
350 	if (!conn)
351 		dblink_conn_not_avail(conname);
352 
353 	PQfinish(conn);
354 	if (rconn)
355 	{
356 		deleteConnection(conname);
357 		pfree(rconn);
358 	}
359 	else
360 		pconn->conn = NULL;
361 
362 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
363 }
364 
365 /*
366  * opens a cursor using a persistent connection
367  */
368 PG_FUNCTION_INFO_V1(dblink_open);
369 Datum
dblink_open(PG_FUNCTION_ARGS)370 dblink_open(PG_FUNCTION_ARGS)
371 {
372 	PGresult   *res = NULL;
373 	PGconn	   *conn;
374 	char	   *curname = NULL;
375 	char	   *sql = NULL;
376 	char	   *conname = NULL;
377 	StringInfoData buf;
378 	remoteConn *rconn = NULL;
379 	bool		fail = true;	/* default to backward compatible behavior */
380 
381 	dblink_init();
382 	initStringInfo(&buf);
383 
384 	if (PG_NARGS() == 2)
385 	{
386 		/* text,text */
387 		curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
388 		sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
389 		rconn = pconn;
390 	}
391 	else if (PG_NARGS() == 3)
392 	{
393 		/* might be text,text,text or text,text,bool */
394 		if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
395 		{
396 			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
397 			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
398 			fail = PG_GETARG_BOOL(2);
399 			rconn = pconn;
400 		}
401 		else
402 		{
403 			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
404 			curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
405 			sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
406 			rconn = getConnectionByName(conname);
407 		}
408 	}
409 	else if (PG_NARGS() == 4)
410 	{
411 		/* text,text,text,bool */
412 		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
413 		curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
414 		sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
415 		fail = PG_GETARG_BOOL(3);
416 		rconn = getConnectionByName(conname);
417 	}
418 
419 	if (!rconn || !rconn->conn)
420 		dblink_conn_not_avail(conname);
421 
422 	conn = rconn->conn;
423 
424 	/* If we are not in a transaction, start one */
425 	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
426 	{
427 		res = PQexec(conn, "BEGIN");
428 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
429 			dblink_res_internalerror(conn, res, "begin error");
430 		PQclear(res);
431 		rconn->newXactForCursor = true;
432 
433 		/*
434 		 * Since transaction state was IDLE, we force cursor count to
435 		 * initially be 0. This is needed as a previous ABORT might have wiped
436 		 * out our transaction without maintaining the cursor count for us.
437 		 */
438 		rconn->openCursorCount = 0;
439 	}
440 
441 	/* if we started a transaction, increment cursor count */
442 	if (rconn->newXactForCursor)
443 		(rconn->openCursorCount)++;
444 
445 	appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
446 	res = PQexec(conn, buf.data);
447 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
448 	{
449 		dblink_res_error(conn, conname, res, fail,
450 						 "while opening cursor \"%s\"", curname);
451 		PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
452 	}
453 
454 	PQclear(res);
455 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
456 }
457 
458 /*
459  * closes a cursor
460  */
461 PG_FUNCTION_INFO_V1(dblink_close);
462 Datum
dblink_close(PG_FUNCTION_ARGS)463 dblink_close(PG_FUNCTION_ARGS)
464 {
465 	PGconn	   *conn;
466 	PGresult   *res = NULL;
467 	char	   *curname = NULL;
468 	char	   *conname = NULL;
469 	StringInfoData buf;
470 	remoteConn *rconn = NULL;
471 	bool		fail = true;	/* default to backward compatible behavior */
472 
473 	dblink_init();
474 	initStringInfo(&buf);
475 
476 	if (PG_NARGS() == 1)
477 	{
478 		/* text */
479 		curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
480 		rconn = pconn;
481 	}
482 	else if (PG_NARGS() == 2)
483 	{
484 		/* might be text,text or text,bool */
485 		if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
486 		{
487 			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
488 			fail = PG_GETARG_BOOL(1);
489 			rconn = pconn;
490 		}
491 		else
492 		{
493 			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
494 			curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
495 			rconn = getConnectionByName(conname);
496 		}
497 	}
498 	if (PG_NARGS() == 3)
499 	{
500 		/* text,text,bool */
501 		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
502 		curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
503 		fail = PG_GETARG_BOOL(2);
504 		rconn = getConnectionByName(conname);
505 	}
506 
507 	if (!rconn || !rconn->conn)
508 		dblink_conn_not_avail(conname);
509 
510 	conn = rconn->conn;
511 
512 	appendStringInfo(&buf, "CLOSE %s", curname);
513 
514 	/* close the cursor */
515 	res = PQexec(conn, buf.data);
516 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
517 	{
518 		dblink_res_error(conn, conname, res, fail,
519 						 "while closing cursor \"%s\"", curname);
520 		PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
521 	}
522 
523 	PQclear(res);
524 
525 	/* if we started a transaction, decrement cursor count */
526 	if (rconn->newXactForCursor)
527 	{
528 		(rconn->openCursorCount)--;
529 
530 		/* if count is zero, commit the transaction */
531 		if (rconn->openCursorCount == 0)
532 		{
533 			rconn->newXactForCursor = false;
534 
535 			res = PQexec(conn, "COMMIT");
536 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
537 				dblink_res_internalerror(conn, res, "commit error");
538 			PQclear(res);
539 		}
540 	}
541 
542 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
543 }
544 
545 /*
546  * Fetch results from an open cursor
547  */
548 PG_FUNCTION_INFO_V1(dblink_fetch);
549 Datum
dblink_fetch(PG_FUNCTION_ARGS)550 dblink_fetch(PG_FUNCTION_ARGS)
551 {
552 	PGresult   *res = NULL;
553 	char	   *conname = NULL;
554 	remoteConn *rconn = NULL;
555 	PGconn	   *conn = NULL;
556 	StringInfoData buf;
557 	char	   *curname = NULL;
558 	int			howmany = 0;
559 	bool		fail = true;	/* default to backward compatible */
560 
561 	prepTuplestoreResult(fcinfo);
562 
563 	dblink_init();
564 
565 	if (PG_NARGS() == 4)
566 	{
567 		/* text,text,int,bool */
568 		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
569 		curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
570 		howmany = PG_GETARG_INT32(2);
571 		fail = PG_GETARG_BOOL(3);
572 
573 		rconn = getConnectionByName(conname);
574 		if (rconn)
575 			conn = rconn->conn;
576 	}
577 	else if (PG_NARGS() == 3)
578 	{
579 		/* text,text,int or text,int,bool */
580 		if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
581 		{
582 			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
583 			howmany = PG_GETARG_INT32(1);
584 			fail = PG_GETARG_BOOL(2);
585 			conn = pconn->conn;
586 		}
587 		else
588 		{
589 			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
590 			curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
591 			howmany = PG_GETARG_INT32(2);
592 
593 			rconn = getConnectionByName(conname);
594 			if (rconn)
595 				conn = rconn->conn;
596 		}
597 	}
598 	else if (PG_NARGS() == 2)
599 	{
600 		/* text,int */
601 		curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
602 		howmany = PG_GETARG_INT32(1);
603 		conn = pconn->conn;
604 	}
605 
606 	if (!conn)
607 		dblink_conn_not_avail(conname);
608 
609 	initStringInfo(&buf);
610 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
611 
612 	/*
613 	 * Try to execute the query.  Note that since libpq uses malloc, the
614 	 * PGresult will be long-lived even though we are still in a short-lived
615 	 * memory context.
616 	 */
617 	res = PQexec(conn, buf.data);
618 	if (!res ||
619 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
620 		 PQresultStatus(res) != PGRES_TUPLES_OK))
621 	{
622 		dblink_res_error(conn, conname, res, fail,
623 						 "while fetching from cursor \"%s\"", curname);
624 		return (Datum) 0;
625 	}
626 	else if (PQresultStatus(res) == PGRES_COMMAND_OK)
627 	{
628 		/* cursor does not exist - closed already or bad name */
629 		PQclear(res);
630 		ereport(ERROR,
631 				(errcode(ERRCODE_INVALID_CURSOR_NAME),
632 				 errmsg("cursor \"%s\" does not exist", curname)));
633 	}
634 
635 	materializeResult(fcinfo, conn, res);
636 	return (Datum) 0;
637 }
638 
639 /*
640  * Note: this is the new preferred version of dblink
641  */
642 PG_FUNCTION_INFO_V1(dblink_record);
643 Datum
dblink_record(PG_FUNCTION_ARGS)644 dblink_record(PG_FUNCTION_ARGS)
645 {
646 	return dblink_record_internal(fcinfo, false);
647 }
648 
649 PG_FUNCTION_INFO_V1(dblink_send_query);
650 Datum
dblink_send_query(PG_FUNCTION_ARGS)651 dblink_send_query(PG_FUNCTION_ARGS)
652 {
653 	PGconn	   *conn;
654 	char	   *sql;
655 	int			retval;
656 
657 	if (PG_NARGS() == 2)
658 	{
659 		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
660 		sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
661 	}
662 	else
663 		/* shouldn't happen */
664 		elog(ERROR, "wrong number of arguments");
665 
666 	/* async query send */
667 	retval = PQsendQuery(conn, sql);
668 	if (retval != 1)
669 		elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
670 
671 	PG_RETURN_INT32(retval);
672 }
673 
674 PG_FUNCTION_INFO_V1(dblink_get_result);
675 Datum
dblink_get_result(PG_FUNCTION_ARGS)676 dblink_get_result(PG_FUNCTION_ARGS)
677 {
678 	return dblink_record_internal(fcinfo, true);
679 }
680 
681 static Datum
dblink_record_internal(FunctionCallInfo fcinfo,bool is_async)682 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
683 {
684 	PGconn	   *volatile conn = NULL;
685 	volatile bool freeconn = false;
686 
687 	prepTuplestoreResult(fcinfo);
688 
689 	dblink_init();
690 
691 	PG_TRY();
692 	{
693 		char	   *sql = NULL;
694 		char	   *conname = NULL;
695 		bool		fail = true;	/* default to backward compatible */
696 
697 		if (!is_async)
698 		{
699 			if (PG_NARGS() == 3)
700 			{
701 				/* text,text,bool */
702 				conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
703 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
704 				fail = PG_GETARG_BOOL(2);
705 				dblink_get_conn(conname, &conn, &conname, &freeconn);
706 			}
707 			else if (PG_NARGS() == 2)
708 			{
709 				/* text,text or text,bool */
710 				if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
711 				{
712 					sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
713 					fail = PG_GETARG_BOOL(1);
714 					conn = pconn->conn;
715 				}
716 				else
717 				{
718 					conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
719 					sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
720 					dblink_get_conn(conname, &conn, &conname, &freeconn);
721 				}
722 			}
723 			else if (PG_NARGS() == 1)
724 			{
725 				/* text */
726 				conn = pconn->conn;
727 				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
728 			}
729 			else
730 				/* shouldn't happen */
731 				elog(ERROR, "wrong number of arguments");
732 		}
733 		else					/* is_async */
734 		{
735 			/* get async result */
736 			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
737 
738 			if (PG_NARGS() == 2)
739 			{
740 				/* text,bool */
741 				fail = PG_GETARG_BOOL(1);
742 				conn = dblink_get_named_conn(conname);
743 			}
744 			else if (PG_NARGS() == 1)
745 			{
746 				/* text */
747 				conn = dblink_get_named_conn(conname);
748 			}
749 			else
750 				/* shouldn't happen */
751 				elog(ERROR, "wrong number of arguments");
752 		}
753 
754 		if (!conn)
755 			dblink_conn_not_avail(conname);
756 
757 		if (!is_async)
758 		{
759 			/* synchronous query, use efficient tuple collection method */
760 			materializeQueryResult(fcinfo, conn, conname, sql, fail);
761 		}
762 		else
763 		{
764 			/* async result retrieval, do it the old way */
765 			PGresult   *res = PQgetResult(conn);
766 
767 			/* NULL means we're all done with the async results */
768 			if (res)
769 			{
770 				if (PQresultStatus(res) != PGRES_COMMAND_OK &&
771 					PQresultStatus(res) != PGRES_TUPLES_OK)
772 				{
773 					dblink_res_error(conn, conname, res, fail,
774 									 "while executing query");
775 					/* if fail isn't set, we'll return an empty query result */
776 				}
777 				else
778 				{
779 					materializeResult(fcinfo, conn, res);
780 				}
781 			}
782 		}
783 	}
784 	PG_CATCH();
785 	{
786 		/* if needed, close the connection to the database */
787 		if (freeconn)
788 			PQfinish(conn);
789 		PG_RE_THROW();
790 	}
791 	PG_END_TRY();
792 
793 	/* if needed, close the connection to the database */
794 	if (freeconn)
795 		PQfinish(conn);
796 
797 	return (Datum) 0;
798 }
799 
800 /*
801  * Verify function caller can handle a tuplestore result, and set up for that.
802  *
803  * Note: if the caller returns without actually creating a tuplestore, the
804  * executor will treat the function result as an empty set.
805  */
806 static void
prepTuplestoreResult(FunctionCallInfo fcinfo)807 prepTuplestoreResult(FunctionCallInfo fcinfo)
808 {
809 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
810 
811 	/* check to see if query supports us returning a tuplestore */
812 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
813 		ereport(ERROR,
814 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
815 				 errmsg("set-valued function called in context that cannot accept a set")));
816 	if (!(rsinfo->allowedModes & SFRM_Materialize))
817 		ereport(ERROR,
818 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
819 				 errmsg("materialize mode required, but it is not allowed in this context")));
820 
821 	/* let the executor know we're sending back a tuplestore */
822 	rsinfo->returnMode = SFRM_Materialize;
823 
824 	/* caller must fill these to return a non-empty result */
825 	rsinfo->setResult = NULL;
826 	rsinfo->setDesc = NULL;
827 }
828 
829 /*
830  * Copy the contents of the PGresult into a tuplestore to be returned
831  * as the result of the current function.
832  * The PGresult will be released in this function.
833  */
834 static void
materializeResult(FunctionCallInfo fcinfo,PGconn * conn,PGresult * res)835 materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
836 {
837 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
838 
839 	/* prepTuplestoreResult must have been called previously */
840 	Assert(rsinfo->returnMode == SFRM_Materialize);
841 
842 	PG_TRY();
843 	{
844 		TupleDesc	tupdesc;
845 		bool		is_sql_cmd;
846 		int			ntuples;
847 		int			nfields;
848 
849 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
850 		{
851 			is_sql_cmd = true;
852 
853 			/*
854 			 * need a tuple descriptor representing one TEXT column to return
855 			 * the command status string as our result tuple
856 			 */
857 			tupdesc = CreateTemplateTupleDesc(1, false);
858 			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
859 							   TEXTOID, -1, 0);
860 			ntuples = 1;
861 			nfields = 1;
862 		}
863 		else
864 		{
865 			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
866 
867 			is_sql_cmd = false;
868 
869 			/* get a tuple descriptor for our result type */
870 			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
871 			{
872 				case TYPEFUNC_COMPOSITE:
873 					/* success */
874 					break;
875 				case TYPEFUNC_RECORD:
876 					/* failed to determine actual type of RECORD */
877 					ereport(ERROR,
878 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
879 							 errmsg("function returning record called in context "
880 									"that cannot accept type record")));
881 					break;
882 				default:
883 					/* result type isn't composite */
884 					elog(ERROR, "return type must be a row type");
885 					break;
886 			}
887 
888 			/* make sure we have a persistent copy of the tupdesc */
889 			tupdesc = CreateTupleDescCopy(tupdesc);
890 			ntuples = PQntuples(res);
891 			nfields = PQnfields(res);
892 		}
893 
894 		/*
895 		 * check result and tuple descriptor have the same number of columns
896 		 */
897 		if (nfields != tupdesc->natts)
898 			ereport(ERROR,
899 					(errcode(ERRCODE_DATATYPE_MISMATCH),
900 					 errmsg("remote query result rowtype does not match "
901 							"the specified FROM clause rowtype")));
902 
903 		if (ntuples > 0)
904 		{
905 			AttInMetadata *attinmeta;
906 			int			nestlevel = -1;
907 			Tuplestorestate *tupstore;
908 			MemoryContext oldcontext;
909 			int			row;
910 			char	  **values;
911 
912 			attinmeta = TupleDescGetAttInMetadata(tupdesc);
913 
914 			/* Set GUCs to ensure we read GUC-sensitive data types correctly */
915 			if (!is_sql_cmd)
916 				nestlevel = applyRemoteGucs(conn);
917 
918 			oldcontext = MemoryContextSwitchTo(
919 											   rsinfo->econtext->ecxt_per_query_memory);
920 			tupstore = tuplestore_begin_heap(true, false, work_mem);
921 			rsinfo->setResult = tupstore;
922 			rsinfo->setDesc = tupdesc;
923 			MemoryContextSwitchTo(oldcontext);
924 
925 			values = (char **) palloc(nfields * sizeof(char *));
926 
927 			/* put all tuples into the tuplestore */
928 			for (row = 0; row < ntuples; row++)
929 			{
930 				HeapTuple	tuple;
931 
932 				if (!is_sql_cmd)
933 				{
934 					int			i;
935 
936 					for (i = 0; i < nfields; i++)
937 					{
938 						if (PQgetisnull(res, row, i))
939 							values[i] = NULL;
940 						else
941 							values[i] = PQgetvalue(res, row, i);
942 					}
943 				}
944 				else
945 				{
946 					values[0] = PQcmdStatus(res);
947 				}
948 
949 				/* build the tuple and put it into the tuplestore. */
950 				tuple = BuildTupleFromCStrings(attinmeta, values);
951 				tuplestore_puttuple(tupstore, tuple);
952 			}
953 
954 			/* clean up GUC settings, if we changed any */
955 			restoreLocalGucs(nestlevel);
956 
957 			/* clean up and return the tuplestore */
958 			tuplestore_donestoring(tupstore);
959 		}
960 
961 		PQclear(res);
962 	}
963 	PG_CATCH();
964 	{
965 		/* be sure to release the libpq result */
966 		PQclear(res);
967 		PG_RE_THROW();
968 	}
969 	PG_END_TRY();
970 }
971 
972 /*
973  * Execute the given SQL command and store its results into a tuplestore
974  * to be returned as the result of the current function.
975  *
976  * This is equivalent to PQexec followed by materializeResult, but we make
977  * use of libpq's single-row mode to avoid accumulating the whole result
978  * inside libpq before it gets transferred to the tuplestore.
979  */
980 static void
materializeQueryResult(FunctionCallInfo fcinfo,PGconn * conn,const char * conname,const char * sql,bool fail)981 materializeQueryResult(FunctionCallInfo fcinfo,
982 					   PGconn *conn,
983 					   const char *conname,
984 					   const char *sql,
985 					   bool fail)
986 {
987 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
988 	PGresult   *volatile res = NULL;
989 	volatile storeInfo sinfo;
990 
991 	/* prepTuplestoreResult must have been called previously */
992 	Assert(rsinfo->returnMode == SFRM_Materialize);
993 
994 	/* initialize storeInfo to empty */
995 	memset((void *) &sinfo, 0, sizeof(sinfo));
996 	sinfo.fcinfo = fcinfo;
997 
998 	PG_TRY();
999 	{
1000 		/* Create short-lived memory context for data conversions */
1001 		sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
1002 												 "dblink temporary context",
1003 												 ALLOCSET_DEFAULT_SIZES);
1004 
1005 		/* execute query, collecting any tuples into the tuplestore */
1006 		res = storeQueryResult(&sinfo, conn, sql);
1007 
1008 		if (!res ||
1009 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
1010 			 PQresultStatus(res) != PGRES_TUPLES_OK))
1011 		{
1012 			/*
1013 			 * dblink_res_error will clear the passed PGresult, so we need
1014 			 * this ugly dance to avoid doing so twice during error exit
1015 			 */
1016 			PGresult   *res1 = res;
1017 
1018 			res = NULL;
1019 			dblink_res_error(conn, conname, res1, fail,
1020 							 "while executing query");
1021 			/* if fail isn't set, we'll return an empty query result */
1022 		}
1023 		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1024 		{
1025 			/*
1026 			 * storeRow didn't get called, so we need to convert the command
1027 			 * status string to a tuple manually
1028 			 */
1029 			TupleDesc	tupdesc;
1030 			AttInMetadata *attinmeta;
1031 			Tuplestorestate *tupstore;
1032 			HeapTuple	tuple;
1033 			char	   *values[1];
1034 			MemoryContext oldcontext;
1035 
1036 			/*
1037 			 * need a tuple descriptor representing one TEXT column to return
1038 			 * the command status string as our result tuple
1039 			 */
1040 			tupdesc = CreateTemplateTupleDesc(1, false);
1041 			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1042 							   TEXTOID, -1, 0);
1043 			attinmeta = TupleDescGetAttInMetadata(tupdesc);
1044 
1045 			oldcontext = MemoryContextSwitchTo(
1046 											   rsinfo->econtext->ecxt_per_query_memory);
1047 			tupstore = tuplestore_begin_heap(true, false, work_mem);
1048 			rsinfo->setResult = tupstore;
1049 			rsinfo->setDesc = tupdesc;
1050 			MemoryContextSwitchTo(oldcontext);
1051 
1052 			values[0] = PQcmdStatus(res);
1053 
1054 			/* build the tuple and put it into the tuplestore. */
1055 			tuple = BuildTupleFromCStrings(attinmeta, values);
1056 			tuplestore_puttuple(tupstore, tuple);
1057 
1058 			PQclear(res);
1059 			res = NULL;
1060 		}
1061 		else
1062 		{
1063 			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1064 			/* storeRow should have created a tuplestore */
1065 			Assert(rsinfo->setResult != NULL);
1066 
1067 			PQclear(res);
1068 			res = NULL;
1069 		}
1070 
1071 		/* clean up data conversion short-lived memory context */
1072 		if (sinfo.tmpcontext != NULL)
1073 			MemoryContextDelete(sinfo.tmpcontext);
1074 		sinfo.tmpcontext = NULL;
1075 
1076 		PQclear(sinfo.last_res);
1077 		sinfo.last_res = NULL;
1078 		PQclear(sinfo.cur_res);
1079 		sinfo.cur_res = NULL;
1080 	}
1081 	PG_CATCH();
1082 	{
1083 		/* be sure to release any libpq result we collected */
1084 		PQclear(res);
1085 		PQclear(sinfo.last_res);
1086 		PQclear(sinfo.cur_res);
1087 		/* and clear out any pending data in libpq */
1088 		while ((res = PQgetResult(conn)) != NULL)
1089 			PQclear(res);
1090 		PG_RE_THROW();
1091 	}
1092 	PG_END_TRY();
1093 }
1094 
1095 /*
1096  * Execute query, and send any result rows to sinfo->tuplestore.
1097  */
1098 static PGresult *
storeQueryResult(volatile storeInfo * sinfo,PGconn * conn,const char * sql)1099 storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
1100 {
1101 	bool		first = true;
1102 	int			nestlevel = -1;
1103 	PGresult   *res;
1104 
1105 	if (!PQsendQuery(conn, sql))
1106 		elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1107 
1108 	if (!PQsetSingleRowMode(conn))	/* shouldn't fail */
1109 		elog(ERROR, "failed to set single-row mode for dblink query");
1110 
1111 	for (;;)
1112 	{
1113 		CHECK_FOR_INTERRUPTS();
1114 
1115 		sinfo->cur_res = PQgetResult(conn);
1116 		if (!sinfo->cur_res)
1117 			break;
1118 
1119 		if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1120 		{
1121 			/* got one row from possibly-bigger resultset */
1122 
1123 			/*
1124 			 * Set GUCs to ensure we read GUC-sensitive data types correctly.
1125 			 * We shouldn't do this until we have a row in hand, to ensure
1126 			 * libpq has seen any earlier ParameterStatus protocol messages.
1127 			 */
1128 			if (first && nestlevel < 0)
1129 				nestlevel = applyRemoteGucs(conn);
1130 
1131 			storeRow(sinfo, sinfo->cur_res, first);
1132 
1133 			PQclear(sinfo->cur_res);
1134 			sinfo->cur_res = NULL;
1135 			first = false;
1136 		}
1137 		else
1138 		{
1139 			/* if empty resultset, fill tuplestore header */
1140 			if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1141 				storeRow(sinfo, sinfo->cur_res, first);
1142 
1143 			/* store completed result at last_res */
1144 			PQclear(sinfo->last_res);
1145 			sinfo->last_res = sinfo->cur_res;
1146 			sinfo->cur_res = NULL;
1147 			first = true;
1148 		}
1149 	}
1150 
1151 	/* clean up GUC settings, if we changed any */
1152 	restoreLocalGucs(nestlevel);
1153 
1154 	/* return last_res */
1155 	res = sinfo->last_res;
1156 	sinfo->last_res = NULL;
1157 	return res;
1158 }
1159 
1160 /*
1161  * Send single row to sinfo->tuplestore.
1162  *
1163  * If "first" is true, create the tuplestore using PGresult's metadata
1164  * (in this case the PGresult might contain either zero or one row).
1165  */
1166 static void
storeRow(volatile storeInfo * sinfo,PGresult * res,bool first)1167 storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
1168 {
1169 	int			nfields = PQnfields(res);
1170 	HeapTuple	tuple;
1171 	int			i;
1172 	MemoryContext oldcontext;
1173 
1174 	if (first)
1175 	{
1176 		/* Prepare for new result set */
1177 		ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1178 		TupleDesc	tupdesc;
1179 
1180 		/*
1181 		 * It's possible to get more than one result set if the query string
1182 		 * contained multiple SQL commands.  In that case, we follow PQexec's
1183 		 * traditional behavior of throwing away all but the last result.
1184 		 */
1185 		if (sinfo->tuplestore)
1186 			tuplestore_end(sinfo->tuplestore);
1187 		sinfo->tuplestore = NULL;
1188 
1189 		/* get a tuple descriptor for our result type */
1190 		switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1191 		{
1192 			case TYPEFUNC_COMPOSITE:
1193 				/* success */
1194 				break;
1195 			case TYPEFUNC_RECORD:
1196 				/* failed to determine actual type of RECORD */
1197 				ereport(ERROR,
1198 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1199 						 errmsg("function returning record called in context "
1200 								"that cannot accept type record")));
1201 				break;
1202 			default:
1203 				/* result type isn't composite */
1204 				elog(ERROR, "return type must be a row type");
1205 				break;
1206 		}
1207 
1208 		/* make sure we have a persistent copy of the tupdesc */
1209 		tupdesc = CreateTupleDescCopy(tupdesc);
1210 
1211 		/* check result and tuple descriptor have the same number of columns */
1212 		if (nfields != tupdesc->natts)
1213 			ereport(ERROR,
1214 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1215 					 errmsg("remote query result rowtype does not match "
1216 							"the specified FROM clause rowtype")));
1217 
1218 		/* Prepare attinmeta for later data conversions */
1219 		sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1220 
1221 		/* Create a new, empty tuplestore */
1222 		oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1223 		sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1224 		rsinfo->setResult = sinfo->tuplestore;
1225 		rsinfo->setDesc = tupdesc;
1226 		MemoryContextSwitchTo(oldcontext);
1227 
1228 		/* Done if empty resultset */
1229 		if (PQntuples(res) == 0)
1230 			return;
1231 
1232 		/*
1233 		 * Set up sufficiently-wide string pointers array; this won't change
1234 		 * in size so it's easy to preallocate.
1235 		 */
1236 		if (sinfo->cstrs)
1237 			pfree(sinfo->cstrs);
1238 		sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
1239 	}
1240 
1241 	/* Should have a single-row result if we get here */
1242 	Assert(PQntuples(res) == 1);
1243 
1244 	/*
1245 	 * Do the following work in a temp context that we reset after each tuple.
1246 	 * This cleans up not only the data we have direct access to, but any
1247 	 * cruft the I/O functions might leak.
1248 	 */
1249 	oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1250 
1251 	/*
1252 	 * Fill cstrs with null-terminated strings of column values.
1253 	 */
1254 	for (i = 0; i < nfields; i++)
1255 	{
1256 		if (PQgetisnull(res, 0, i))
1257 			sinfo->cstrs[i] = NULL;
1258 		else
1259 			sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1260 	}
1261 
1262 	/* Convert row to a tuple, and add it to the tuplestore */
1263 	tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1264 
1265 	tuplestore_puttuple(sinfo->tuplestore, tuple);
1266 
1267 	/* Clean up */
1268 	MemoryContextSwitchTo(oldcontext);
1269 	MemoryContextReset(sinfo->tmpcontext);
1270 }
1271 
1272 /*
1273  * List all open dblink connections by name.
1274  * Returns an array of all connection names.
1275  * Takes no params
1276  */
1277 PG_FUNCTION_INFO_V1(dblink_get_connections);
1278 Datum
dblink_get_connections(PG_FUNCTION_ARGS)1279 dblink_get_connections(PG_FUNCTION_ARGS)
1280 {
1281 	HASH_SEQ_STATUS status;
1282 	remoteConnHashEnt *hentry;
1283 	ArrayBuildState *astate = NULL;
1284 
1285 	if (remoteConnHash)
1286 	{
1287 		hash_seq_init(&status, remoteConnHash);
1288 		while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1289 		{
1290 			/* stash away current value */
1291 			astate = accumArrayResult(astate,
1292 									  CStringGetTextDatum(hentry->name),
1293 									  false, TEXTOID, CurrentMemoryContext);
1294 		}
1295 	}
1296 
1297 	if (astate)
1298 		PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
1299 											  CurrentMemoryContext));
1300 	else
1301 		PG_RETURN_NULL();
1302 }
1303 
1304 /*
1305  * Checks if a given remote connection is busy
1306  *
1307  * Returns 1 if the connection is busy, 0 otherwise
1308  * Params:
1309  *	text connection_name - name of the connection to check
1310  *
1311  */
1312 PG_FUNCTION_INFO_V1(dblink_is_busy);
1313 Datum
dblink_is_busy(PG_FUNCTION_ARGS)1314 dblink_is_busy(PG_FUNCTION_ARGS)
1315 {
1316 	PGconn	   *conn;
1317 
1318 	dblink_init();
1319 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1320 
1321 	PQconsumeInput(conn);
1322 	PG_RETURN_INT32(PQisBusy(conn));
1323 }
1324 
1325 /*
1326  * Cancels a running request on a connection
1327  *
1328  * Returns text:
1329  *	"OK" if the cancel request has been sent correctly,
1330  *		an error message otherwise
1331  *
1332  * Params:
1333  *	text connection_name - name of the connection to check
1334  *
1335  */
1336 PG_FUNCTION_INFO_V1(dblink_cancel_query);
1337 Datum
dblink_cancel_query(PG_FUNCTION_ARGS)1338 dblink_cancel_query(PG_FUNCTION_ARGS)
1339 {
1340 	int			res;
1341 	PGconn	   *conn;
1342 	PGcancel   *cancel;
1343 	char		errbuf[256];
1344 
1345 	dblink_init();
1346 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1347 	cancel = PQgetCancel(conn);
1348 
1349 	res = PQcancel(cancel, errbuf, 256);
1350 	PQfreeCancel(cancel);
1351 
1352 	if (res == 1)
1353 		PG_RETURN_TEXT_P(cstring_to_text("OK"));
1354 	else
1355 		PG_RETURN_TEXT_P(cstring_to_text(errbuf));
1356 }
1357 
1358 
1359 /*
1360  * Get error message from a connection
1361  *
1362  * Returns text:
1363  *	"OK" if no error, an error message otherwise
1364  *
1365  * Params:
1366  *	text connection_name - name of the connection to check
1367  *
1368  */
1369 PG_FUNCTION_INFO_V1(dblink_error_message);
1370 Datum
dblink_error_message(PG_FUNCTION_ARGS)1371 dblink_error_message(PG_FUNCTION_ARGS)
1372 {
1373 	char	   *msg;
1374 	PGconn	   *conn;
1375 
1376 	dblink_init();
1377 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1378 
1379 	msg = PQerrorMessage(conn);
1380 	if (msg == NULL || msg[0] == '\0')
1381 		PG_RETURN_TEXT_P(cstring_to_text("OK"));
1382 	else
1383 		PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1384 }
1385 
1386 /*
1387  * Execute an SQL non-SELECT command
1388  */
1389 PG_FUNCTION_INFO_V1(dblink_exec);
1390 Datum
dblink_exec(PG_FUNCTION_ARGS)1391 dblink_exec(PG_FUNCTION_ARGS)
1392 {
1393 	text	   *volatile sql_cmd_status = NULL;
1394 	PGconn	   *volatile conn = NULL;
1395 	volatile bool freeconn = false;
1396 
1397 	dblink_init();
1398 
1399 	PG_TRY();
1400 	{
1401 		PGresult   *res = NULL;
1402 		char	   *sql = NULL;
1403 		char	   *conname = NULL;
1404 		bool		fail = true;	/* default to backward compatible behavior */
1405 
1406 		if (PG_NARGS() == 3)
1407 		{
1408 			/* must be text,text,bool */
1409 			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1410 			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1411 			fail = PG_GETARG_BOOL(2);
1412 			dblink_get_conn(conname, &conn, &conname, &freeconn);
1413 		}
1414 		else if (PG_NARGS() == 2)
1415 		{
1416 			/* might be text,text or text,bool */
1417 			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1418 			{
1419 				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1420 				fail = PG_GETARG_BOOL(1);
1421 				conn = pconn->conn;
1422 			}
1423 			else
1424 			{
1425 				conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1426 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1427 				dblink_get_conn(conname, &conn, &conname, &freeconn);
1428 			}
1429 		}
1430 		else if (PG_NARGS() == 1)
1431 		{
1432 			/* must be single text argument */
1433 			conn = pconn->conn;
1434 			sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1435 		}
1436 		else
1437 			/* shouldn't happen */
1438 			elog(ERROR, "wrong number of arguments");
1439 
1440 		if (!conn)
1441 			dblink_conn_not_avail(conname);
1442 
1443 		res = PQexec(conn, sql);
1444 		if (!res ||
1445 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
1446 			 PQresultStatus(res) != PGRES_TUPLES_OK))
1447 		{
1448 			dblink_res_error(conn, conname, res, fail,
1449 							 "while executing command");
1450 
1451 			/*
1452 			 * and save a copy of the command status string to return as our
1453 			 * result tuple
1454 			 */
1455 			sql_cmd_status = cstring_to_text("ERROR");
1456 		}
1457 		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1458 		{
1459 			/*
1460 			 * and save a copy of the command status string to return as our
1461 			 * result tuple
1462 			 */
1463 			sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1464 			PQclear(res);
1465 		}
1466 		else
1467 		{
1468 			PQclear(res);
1469 			ereport(ERROR,
1470 					(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1471 					 errmsg("statement returning results not allowed")));
1472 		}
1473 	}
1474 	PG_CATCH();
1475 	{
1476 		/* if needed, close the connection to the database */
1477 		if (freeconn)
1478 			PQfinish(conn);
1479 		PG_RE_THROW();
1480 	}
1481 	PG_END_TRY();
1482 
1483 	/* if needed, close the connection to the database */
1484 	if (freeconn)
1485 		PQfinish(conn);
1486 
1487 	PG_RETURN_TEXT_P(sql_cmd_status);
1488 }
1489 
1490 
1491 /*
1492  * dblink_get_pkey
1493  *
1494  * Return list of primary key fields for the supplied relation,
1495  * or NULL if none exists.
1496  */
1497 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1498 Datum
dblink_get_pkey(PG_FUNCTION_ARGS)1499 dblink_get_pkey(PG_FUNCTION_ARGS)
1500 {
1501 	int16		indnkeyatts;
1502 	char	  **results;
1503 	FuncCallContext *funcctx;
1504 	int32		call_cntr;
1505 	int32		max_calls;
1506 	AttInMetadata *attinmeta;
1507 	MemoryContext oldcontext;
1508 
1509 	/* stuff done only on the first call of the function */
1510 	if (SRF_IS_FIRSTCALL())
1511 	{
1512 		Relation	rel;
1513 		TupleDesc	tupdesc;
1514 
1515 		/* create a function context for cross-call persistence */
1516 		funcctx = SRF_FIRSTCALL_INIT();
1517 
1518 		/*
1519 		 * switch to memory context appropriate for multiple function calls
1520 		 */
1521 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1522 
1523 		/* open target relation */
1524 		rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1525 
1526 		/* get the array of attnums */
1527 		results = get_pkey_attnames(rel, &indnkeyatts);
1528 
1529 		relation_close(rel, AccessShareLock);
1530 
1531 		/*
1532 		 * need a tuple descriptor representing one INT and one TEXT column
1533 		 */
1534 		tupdesc = CreateTemplateTupleDesc(2, false);
1535 		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1536 						   INT4OID, -1, 0);
1537 		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1538 						   TEXTOID, -1, 0);
1539 
1540 		/*
1541 		 * Generate attribute metadata needed later to produce tuples from raw
1542 		 * C strings
1543 		 */
1544 		attinmeta = TupleDescGetAttInMetadata(tupdesc);
1545 		funcctx->attinmeta = attinmeta;
1546 
1547 		if ((results != NULL) && (indnkeyatts > 0))
1548 		{
1549 			funcctx->max_calls = indnkeyatts;
1550 
1551 			/* got results, keep track of them */
1552 			funcctx->user_fctx = results;
1553 		}
1554 		else
1555 		{
1556 			/* fast track when no results */
1557 			MemoryContextSwitchTo(oldcontext);
1558 			SRF_RETURN_DONE(funcctx);
1559 		}
1560 
1561 		MemoryContextSwitchTo(oldcontext);
1562 	}
1563 
1564 	/* stuff done on every call of the function */
1565 	funcctx = SRF_PERCALL_SETUP();
1566 
1567 	/*
1568 	 * initialize per-call variables
1569 	 */
1570 	call_cntr = funcctx->call_cntr;
1571 	max_calls = funcctx->max_calls;
1572 
1573 	results = (char **) funcctx->user_fctx;
1574 	attinmeta = funcctx->attinmeta;
1575 
1576 	if (call_cntr < max_calls)	/* do when there is more left to send */
1577 	{
1578 		char	  **values;
1579 		HeapTuple	tuple;
1580 		Datum		result;
1581 
1582 		values = (char **) palloc(2 * sizeof(char *));
1583 		values[0] = psprintf("%d", call_cntr + 1);
1584 		values[1] = results[call_cntr];
1585 
1586 		/* build the tuple */
1587 		tuple = BuildTupleFromCStrings(attinmeta, values);
1588 
1589 		/* make the tuple into a datum */
1590 		result = HeapTupleGetDatum(tuple);
1591 
1592 		SRF_RETURN_NEXT(funcctx, result);
1593 	}
1594 	else
1595 	{
1596 		/* do when there is no more left */
1597 		SRF_RETURN_DONE(funcctx);
1598 	}
1599 }
1600 
1601 
1602 /*
1603  * dblink_build_sql_insert
1604  *
1605  * Used to generate an SQL insert statement
1606  * based on an existing tuple in a local relation.
1607  * This is useful for selectively replicating data
1608  * to another server via dblink.
1609  *
1610  * API:
1611  * <relname> - name of local table of interest
1612  * <pkattnums> - an int2vector of attnums which will be used
1613  * to identify the local tuple of interest
1614  * <pknumatts> - number of attnums in pkattnums
1615  * <src_pkattvals_arry> - text array of key values which will be used
1616  * to identify the local tuple of interest
1617  * <tgt_pkattvals_arry> - text array of key values which will be used
1618  * to build the string for execution remotely. These are substituted
1619  * for their counterparts in src_pkattvals_arry
1620  */
1621 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1622 Datum
dblink_build_sql_insert(PG_FUNCTION_ARGS)1623 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1624 {
1625 	text	   *relname_text = PG_GETARG_TEXT_PP(0);
1626 	int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1627 	int32		pknumatts_arg = PG_GETARG_INT32(2);
1628 	ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1629 	ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1630 	Relation	rel;
1631 	int		   *pkattnums;
1632 	int			pknumatts;
1633 	char	  **src_pkattvals;
1634 	char	  **tgt_pkattvals;
1635 	int			src_nitems;
1636 	int			tgt_nitems;
1637 	char	   *sql;
1638 
1639 	/*
1640 	 * Open target relation.
1641 	 */
1642 	rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1643 
1644 	/*
1645 	 * Process pkattnums argument.
1646 	 */
1647 	validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1648 					   &pkattnums, &pknumatts);
1649 
1650 	/*
1651 	 * Source array is made up of key values that will be used to locate the
1652 	 * tuple of interest from the local system.
1653 	 */
1654 	src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1655 
1656 	/*
1657 	 * There should be one source array key value for each key attnum
1658 	 */
1659 	if (src_nitems != pknumatts)
1660 		ereport(ERROR,
1661 				(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1662 				 errmsg("source key array length must match number of key " \
1663 						"attributes")));
1664 
1665 	/*
1666 	 * Target array is made up of key values that will be used to build the
1667 	 * SQL string for use on the remote system.
1668 	 */
1669 	tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1670 
1671 	/*
1672 	 * There should be one target array key value for each key attnum
1673 	 */
1674 	if (tgt_nitems != pknumatts)
1675 		ereport(ERROR,
1676 				(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1677 				 errmsg("target key array length must match number of key " \
1678 						"attributes")));
1679 
1680 	/*
1681 	 * Prep work is finally done. Go get the SQL string.
1682 	 */
1683 	sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1684 
1685 	/*
1686 	 * Now we can close the relation.
1687 	 */
1688 	relation_close(rel, AccessShareLock);
1689 
1690 	/*
1691 	 * And send it
1692 	 */
1693 	PG_RETURN_TEXT_P(cstring_to_text(sql));
1694 }
1695 
1696 
1697 /*
1698  * dblink_build_sql_delete
1699  *
1700  * Used to generate an SQL delete statement.
1701  * This is useful for selectively replicating a
1702  * delete to another server via dblink.
1703  *
1704  * API:
1705  * <relname> - name of remote table of interest
1706  * <pkattnums> - an int2vector of attnums which will be used
1707  * to identify the remote tuple of interest
1708  * <pknumatts> - number of attnums in pkattnums
1709  * <tgt_pkattvals_arry> - text array of key values which will be used
1710  * to build the string for execution remotely.
1711  */
1712 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1713 Datum
dblink_build_sql_delete(PG_FUNCTION_ARGS)1714 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1715 {
1716 	text	   *relname_text = PG_GETARG_TEXT_PP(0);
1717 	int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1718 	int32		pknumatts_arg = PG_GETARG_INT32(2);
1719 	ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1720 	Relation	rel;
1721 	int		   *pkattnums;
1722 	int			pknumatts;
1723 	char	  **tgt_pkattvals;
1724 	int			tgt_nitems;
1725 	char	   *sql;
1726 
1727 	/*
1728 	 * Open target relation.
1729 	 */
1730 	rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1731 
1732 	/*
1733 	 * Process pkattnums argument.
1734 	 */
1735 	validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1736 					   &pkattnums, &pknumatts);
1737 
1738 	/*
1739 	 * Target array is made up of key values that will be used to build the
1740 	 * SQL string for use on the remote system.
1741 	 */
1742 	tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1743 
1744 	/*
1745 	 * There should be one target array key value for each key attnum
1746 	 */
1747 	if (tgt_nitems != pknumatts)
1748 		ereport(ERROR,
1749 				(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1750 				 errmsg("target key array length must match number of key " \
1751 						"attributes")));
1752 
1753 	/*
1754 	 * Prep work is finally done. Go get the SQL string.
1755 	 */
1756 	sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1757 
1758 	/*
1759 	 * Now we can close the relation.
1760 	 */
1761 	relation_close(rel, AccessShareLock);
1762 
1763 	/*
1764 	 * And send it
1765 	 */
1766 	PG_RETURN_TEXT_P(cstring_to_text(sql));
1767 }
1768 
1769 
1770 /*
1771  * dblink_build_sql_update
1772  *
1773  * Used to generate an SQL update statement
1774  * based on an existing tuple in a local relation.
1775  * This is useful for selectively replicating data
1776  * to another server via dblink.
1777  *
1778  * API:
1779  * <relname> - name of local table of interest
1780  * <pkattnums> - an int2vector of attnums which will be used
1781  * to identify the local tuple of interest
1782  * <pknumatts> - number of attnums in pkattnums
1783  * <src_pkattvals_arry> - text array of key values which will be used
1784  * to identify the local tuple of interest
1785  * <tgt_pkattvals_arry> - text array of key values which will be used
1786  * to build the string for execution remotely. These are substituted
1787  * for their counterparts in src_pkattvals_arry
1788  */
1789 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1790 Datum
dblink_build_sql_update(PG_FUNCTION_ARGS)1791 dblink_build_sql_update(PG_FUNCTION_ARGS)
1792 {
1793 	text	   *relname_text = PG_GETARG_TEXT_PP(0);
1794 	int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1795 	int32		pknumatts_arg = PG_GETARG_INT32(2);
1796 	ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1797 	ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1798 	Relation	rel;
1799 	int		   *pkattnums;
1800 	int			pknumatts;
1801 	char	  **src_pkattvals;
1802 	char	  **tgt_pkattvals;
1803 	int			src_nitems;
1804 	int			tgt_nitems;
1805 	char	   *sql;
1806 
1807 	/*
1808 	 * Open target relation.
1809 	 */
1810 	rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1811 
1812 	/*
1813 	 * Process pkattnums argument.
1814 	 */
1815 	validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1816 					   &pkattnums, &pknumatts);
1817 
1818 	/*
1819 	 * Source array is made up of key values that will be used to locate the
1820 	 * tuple of interest from the local system.
1821 	 */
1822 	src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1823 
1824 	/*
1825 	 * There should be one source array key value for each key attnum
1826 	 */
1827 	if (src_nitems != pknumatts)
1828 		ereport(ERROR,
1829 				(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1830 				 errmsg("source key array length must match number of key " \
1831 						"attributes")));
1832 
1833 	/*
1834 	 * Target array is made up of key values that will be used to build the
1835 	 * SQL string for use on the remote system.
1836 	 */
1837 	tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1838 
1839 	/*
1840 	 * There should be one target array key value for each key attnum
1841 	 */
1842 	if (tgt_nitems != pknumatts)
1843 		ereport(ERROR,
1844 				(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1845 				 errmsg("target key array length must match number of key " \
1846 						"attributes")));
1847 
1848 	/*
1849 	 * Prep work is finally done. Go get the SQL string.
1850 	 */
1851 	sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1852 
1853 	/*
1854 	 * Now we can close the relation.
1855 	 */
1856 	relation_close(rel, AccessShareLock);
1857 
1858 	/*
1859 	 * And send it
1860 	 */
1861 	PG_RETURN_TEXT_P(cstring_to_text(sql));
1862 }
1863 
1864 /*
1865  * dblink_current_query
1866  * return the current query string
1867  * to allow its use in (among other things)
1868  * rewrite rules
1869  */
1870 PG_FUNCTION_INFO_V1(dblink_current_query);
1871 Datum
dblink_current_query(PG_FUNCTION_ARGS)1872 dblink_current_query(PG_FUNCTION_ARGS)
1873 {
1874 	/* This is now just an alias for the built-in function current_query() */
1875 	PG_RETURN_DATUM(current_query(fcinfo));
1876 }
1877 
1878 /*
1879  * Retrieve async notifications for a connection.
1880  *
1881  * Returns a setof record of notifications, or an empty set if none received.
1882  * Can optionally take a named connection as parameter, but uses the unnamed
1883  * connection per default.
1884  *
1885  */
1886 #define DBLINK_NOTIFY_COLS		3
1887 
1888 PG_FUNCTION_INFO_V1(dblink_get_notify);
1889 Datum
dblink_get_notify(PG_FUNCTION_ARGS)1890 dblink_get_notify(PG_FUNCTION_ARGS)
1891 {
1892 	PGconn	   *conn;
1893 	PGnotify   *notify;
1894 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1895 	TupleDesc	tupdesc;
1896 	Tuplestorestate *tupstore;
1897 	MemoryContext per_query_ctx;
1898 	MemoryContext oldcontext;
1899 
1900 	prepTuplestoreResult(fcinfo);
1901 
1902 	dblink_init();
1903 	if (PG_NARGS() == 1)
1904 		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1905 	else
1906 		conn = pconn->conn;
1907 
1908 	/* create the tuplestore in per-query memory */
1909 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1910 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
1911 
1912 	tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false);
1913 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
1914 					   TEXTOID, -1, 0);
1915 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
1916 					   INT4OID, -1, 0);
1917 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
1918 					   TEXTOID, -1, 0);
1919 
1920 	tupstore = tuplestore_begin_heap(true, false, work_mem);
1921 	rsinfo->setResult = tupstore;
1922 	rsinfo->setDesc = tupdesc;
1923 
1924 	MemoryContextSwitchTo(oldcontext);
1925 
1926 	PQconsumeInput(conn);
1927 	while ((notify = PQnotifies(conn)) != NULL)
1928 	{
1929 		Datum		values[DBLINK_NOTIFY_COLS];
1930 		bool		nulls[DBLINK_NOTIFY_COLS];
1931 
1932 		memset(values, 0, sizeof(values));
1933 		memset(nulls, 0, sizeof(nulls));
1934 
1935 		if (notify->relname != NULL)
1936 			values[0] = CStringGetTextDatum(notify->relname);
1937 		else
1938 			nulls[0] = true;
1939 
1940 		values[1] = Int32GetDatum(notify->be_pid);
1941 
1942 		if (notify->extra != NULL)
1943 			values[2] = CStringGetTextDatum(notify->extra);
1944 		else
1945 			nulls[2] = true;
1946 
1947 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1948 
1949 		PQfreemem(notify);
1950 		PQconsumeInput(conn);
1951 	}
1952 
1953 	/* clean up and return the tuplestore */
1954 	tuplestore_donestoring(tupstore);
1955 
1956 	return (Datum) 0;
1957 }
1958 
1959 /*
1960  * Validate the options given to a dblink foreign server or user mapping.
1961  * Raise an error if any option is invalid.
1962  *
1963  * We just check the names of options here, so semantic errors in options,
1964  * such as invalid numeric format, will be detected at the attempt to connect.
1965  */
1966 PG_FUNCTION_INFO_V1(dblink_fdw_validator);
1967 Datum
dblink_fdw_validator(PG_FUNCTION_ARGS)1968 dblink_fdw_validator(PG_FUNCTION_ARGS)
1969 {
1970 	List	   *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
1971 	Oid			context = PG_GETARG_OID(1);
1972 	ListCell   *cell;
1973 
1974 	static const PQconninfoOption *options = NULL;
1975 
1976 	/*
1977 	 * Get list of valid libpq options.
1978 	 *
1979 	 * To avoid unnecessary work, we get the list once and use it throughout
1980 	 * the lifetime of this backend process.  We don't need to care about
1981 	 * memory context issues, because PQconndefaults allocates with malloc.
1982 	 */
1983 	if (!options)
1984 	{
1985 		options = PQconndefaults();
1986 		if (!options)			/* assume reason for failure is OOM */
1987 			ereport(ERROR,
1988 					(errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1989 					 errmsg("out of memory"),
1990 					 errdetail("Could not get libpq's default connection options.")));
1991 	}
1992 
1993 	/* Validate each supplied option. */
1994 	foreach(cell, options_list)
1995 	{
1996 		DefElem    *def = (DefElem *) lfirst(cell);
1997 
1998 		if (!is_valid_dblink_option(options, def->defname, context))
1999 		{
2000 			/*
2001 			 * Unknown option, or invalid option for the context specified, so
2002 			 * complain about it.  Provide a hint with list of valid options
2003 			 * for the context.
2004 			 */
2005 			StringInfoData buf;
2006 			const PQconninfoOption *opt;
2007 
2008 			initStringInfo(&buf);
2009 			for (opt = options; opt->keyword; opt++)
2010 			{
2011 				if (is_valid_dblink_option(options, opt->keyword, context))
2012 					appendStringInfo(&buf, "%s%s",
2013 									 (buf.len > 0) ? ", " : "",
2014 									 opt->keyword);
2015 			}
2016 			ereport(ERROR,
2017 					(errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
2018 					 errmsg("invalid option \"%s\"", def->defname),
2019 					 errhint("Valid options in this context are: %s",
2020 							 buf.data)));
2021 		}
2022 	}
2023 
2024 	PG_RETURN_VOID();
2025 }
2026 
2027 
2028 /*************************************************************
2029  * internal functions
2030  */
2031 
2032 
2033 /*
2034  * get_pkey_attnames
2035  *
2036  * Get the primary key attnames for the given relation.
2037  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2038  */
2039 static char **
get_pkey_attnames(Relation rel,int16 * indnkeyatts)2040 get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2041 {
2042 	Relation	indexRelation;
2043 	ScanKeyData skey;
2044 	SysScanDesc scan;
2045 	HeapTuple	indexTuple;
2046 	int			i;
2047 	char	  **result = NULL;
2048 	TupleDesc	tupdesc;
2049 
2050 	/* initialize indnkeyatts to 0 in case no primary key exists */
2051 	*indnkeyatts = 0;
2052 
2053 	tupdesc = rel->rd_att;
2054 
2055 	/* Prepare to scan pg_index for entries having indrelid = this rel. */
2056 	indexRelation = heap_open(IndexRelationId, AccessShareLock);
2057 	ScanKeyInit(&skey,
2058 				Anum_pg_index_indrelid,
2059 				BTEqualStrategyNumber, F_OIDEQ,
2060 				ObjectIdGetDatum(RelationGetRelid(rel)));
2061 
2062 	scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2063 							  NULL, 1, &skey);
2064 
2065 	while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2066 	{
2067 		Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2068 
2069 		/* we're only interested if it is the primary key */
2070 		if (index->indisprimary)
2071 		{
2072 			*indnkeyatts = index->indnkeyatts;
2073 			if (*indnkeyatts > 0)
2074 			{
2075 				result = (char **) palloc(*indnkeyatts * sizeof(char *));
2076 
2077 				for (i = 0; i < *indnkeyatts; i++)
2078 					result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2079 			}
2080 			break;
2081 		}
2082 	}
2083 
2084 	systable_endscan(scan);
2085 	heap_close(indexRelation, AccessShareLock);
2086 
2087 	return result;
2088 }
2089 
2090 /*
2091  * Deconstruct a text[] into C-strings (note any NULL elements will be
2092  * returned as NULL pointers)
2093  */
2094 static char **
get_text_array_contents(ArrayType * array,int * numitems)2095 get_text_array_contents(ArrayType *array, int *numitems)
2096 {
2097 	int			ndim = ARR_NDIM(array);
2098 	int		   *dims = ARR_DIMS(array);
2099 	int			nitems;
2100 	int16		typlen;
2101 	bool		typbyval;
2102 	char		typalign;
2103 	char	  **values;
2104 	char	   *ptr;
2105 	bits8	   *bitmap;
2106 	int			bitmask;
2107 	int			i;
2108 
2109 	Assert(ARR_ELEMTYPE(array) == TEXTOID);
2110 
2111 	*numitems = nitems = ArrayGetNItems(ndim, dims);
2112 
2113 	get_typlenbyvalalign(ARR_ELEMTYPE(array),
2114 						 &typlen, &typbyval, &typalign);
2115 
2116 	values = (char **) palloc(nitems * sizeof(char *));
2117 
2118 	ptr = ARR_DATA_PTR(array);
2119 	bitmap = ARR_NULLBITMAP(array);
2120 	bitmask = 1;
2121 
2122 	for (i = 0; i < nitems; i++)
2123 	{
2124 		if (bitmap && (*bitmap & bitmask) == 0)
2125 		{
2126 			values[i] = NULL;
2127 		}
2128 		else
2129 		{
2130 			values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2131 			ptr = att_addlength_pointer(ptr, typlen, ptr);
2132 			ptr = (char *) att_align_nominal(ptr, typalign);
2133 		}
2134 
2135 		/* advance bitmap pointer if any */
2136 		if (bitmap)
2137 		{
2138 			bitmask <<= 1;
2139 			if (bitmask == 0x100)
2140 			{
2141 				bitmap++;
2142 				bitmask = 1;
2143 			}
2144 		}
2145 	}
2146 
2147 	return values;
2148 }
2149 
2150 static char *
get_sql_insert(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals,char ** tgt_pkattvals)2151 get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2152 {
2153 	char	   *relname;
2154 	HeapTuple	tuple;
2155 	TupleDesc	tupdesc;
2156 	int			natts;
2157 	StringInfoData buf;
2158 	char	   *val;
2159 	int			key;
2160 	int			i;
2161 	bool		needComma;
2162 
2163 	initStringInfo(&buf);
2164 
2165 	/* get relation name including any needed schema prefix and quoting */
2166 	relname = generate_relation_name(rel);
2167 
2168 	tupdesc = rel->rd_att;
2169 	natts = tupdesc->natts;
2170 
2171 	tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2172 	if (!tuple)
2173 		ereport(ERROR,
2174 				(errcode(ERRCODE_CARDINALITY_VIOLATION),
2175 				 errmsg("source row not found")));
2176 
2177 	appendStringInfo(&buf, "INSERT INTO %s(", relname);
2178 
2179 	needComma = false;
2180 	for (i = 0; i < natts; i++)
2181 	{
2182 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2183 
2184 		if (att->attisdropped)
2185 			continue;
2186 
2187 		if (needComma)
2188 			appendStringInfoChar(&buf, ',');
2189 
2190 		appendStringInfoString(&buf,
2191 							   quote_ident_cstr(NameStr(att->attname)));
2192 		needComma = true;
2193 	}
2194 
2195 	appendStringInfoString(&buf, ") VALUES(");
2196 
2197 	/*
2198 	 * Note: i is physical column number (counting from 0).
2199 	 */
2200 	needComma = false;
2201 	for (i = 0; i < natts; i++)
2202 	{
2203 		if (TupleDescAttr(tupdesc, i)->attisdropped)
2204 			continue;
2205 
2206 		if (needComma)
2207 			appendStringInfoChar(&buf, ',');
2208 
2209 		key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2210 
2211 		if (key >= 0)
2212 			val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2213 		else
2214 			val = SPI_getvalue(tuple, tupdesc, i + 1);
2215 
2216 		if (val != NULL)
2217 		{
2218 			appendStringInfoString(&buf, quote_literal_cstr(val));
2219 			pfree(val);
2220 		}
2221 		else
2222 			appendStringInfoString(&buf, "NULL");
2223 		needComma = true;
2224 	}
2225 	appendStringInfoChar(&buf, ')');
2226 
2227 	return buf.data;
2228 }
2229 
2230 static char *
get_sql_delete(Relation rel,int * pkattnums,int pknumatts,char ** tgt_pkattvals)2231 get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2232 {
2233 	char	   *relname;
2234 	TupleDesc	tupdesc;
2235 	StringInfoData buf;
2236 	int			i;
2237 
2238 	initStringInfo(&buf);
2239 
2240 	/* get relation name including any needed schema prefix and quoting */
2241 	relname = generate_relation_name(rel);
2242 
2243 	tupdesc = rel->rd_att;
2244 
2245 	appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2246 	for (i = 0; i < pknumatts; i++)
2247 	{
2248 		int			pkattnum = pkattnums[i];
2249 		Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2250 
2251 		if (i > 0)
2252 			appendStringInfoString(&buf, " AND ");
2253 
2254 		appendStringInfoString(&buf,
2255 							   quote_ident_cstr(NameStr(attr->attname)));
2256 
2257 		if (tgt_pkattvals[i] != NULL)
2258 			appendStringInfo(&buf, " = %s",
2259 							 quote_literal_cstr(tgt_pkattvals[i]));
2260 		else
2261 			appendStringInfoString(&buf, " IS NULL");
2262 	}
2263 
2264 	return buf.data;
2265 }
2266 
2267 static char *
get_sql_update(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals,char ** tgt_pkattvals)2268 get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2269 {
2270 	char	   *relname;
2271 	HeapTuple	tuple;
2272 	TupleDesc	tupdesc;
2273 	int			natts;
2274 	StringInfoData buf;
2275 	char	   *val;
2276 	int			key;
2277 	int			i;
2278 	bool		needComma;
2279 
2280 	initStringInfo(&buf);
2281 
2282 	/* get relation name including any needed schema prefix and quoting */
2283 	relname = generate_relation_name(rel);
2284 
2285 	tupdesc = rel->rd_att;
2286 	natts = tupdesc->natts;
2287 
2288 	tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2289 	if (!tuple)
2290 		ereport(ERROR,
2291 				(errcode(ERRCODE_CARDINALITY_VIOLATION),
2292 				 errmsg("source row not found")));
2293 
2294 	appendStringInfo(&buf, "UPDATE %s SET ", relname);
2295 
2296 	/*
2297 	 * Note: i is physical column number (counting from 0).
2298 	 */
2299 	needComma = false;
2300 	for (i = 0; i < natts; i++)
2301 	{
2302 		Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2303 
2304 		if (attr->attisdropped)
2305 			continue;
2306 
2307 		if (needComma)
2308 			appendStringInfoString(&buf, ", ");
2309 
2310 		appendStringInfo(&buf, "%s = ",
2311 						 quote_ident_cstr(NameStr(attr->attname)));
2312 
2313 		key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2314 
2315 		if (key >= 0)
2316 			val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2317 		else
2318 			val = SPI_getvalue(tuple, tupdesc, i + 1);
2319 
2320 		if (val != NULL)
2321 		{
2322 			appendStringInfoString(&buf, quote_literal_cstr(val));
2323 			pfree(val);
2324 		}
2325 		else
2326 			appendStringInfoString(&buf, "NULL");
2327 		needComma = true;
2328 	}
2329 
2330 	appendStringInfoString(&buf, " WHERE ");
2331 
2332 	for (i = 0; i < pknumatts; i++)
2333 	{
2334 		int			pkattnum = pkattnums[i];
2335 		Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2336 
2337 		if (i > 0)
2338 			appendStringInfoString(&buf, " AND ");
2339 
2340 		appendStringInfoString(&buf,
2341 							   quote_ident_cstr(NameStr(attr->attname)));
2342 
2343 		val = tgt_pkattvals[i];
2344 
2345 		if (val != NULL)
2346 			appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2347 		else
2348 			appendStringInfoString(&buf, " IS NULL");
2349 	}
2350 
2351 	return buf.data;
2352 }
2353 
2354 /*
2355  * Return a properly quoted identifier.
2356  * Uses quote_ident in quote.c
2357  */
2358 static char *
quote_ident_cstr(char * rawstr)2359 quote_ident_cstr(char *rawstr)
2360 {
2361 	text	   *rawstr_text;
2362 	text	   *result_text;
2363 	char	   *result;
2364 
2365 	rawstr_text = cstring_to_text(rawstr);
2366 	result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2367 													 PointerGetDatum(rawstr_text)));
2368 	result = text_to_cstring(result_text);
2369 
2370 	return result;
2371 }
2372 
2373 static int
get_attnum_pk_pos(int * pkattnums,int pknumatts,int key)2374 get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2375 {
2376 	int			i;
2377 
2378 	/*
2379 	 * Not likely a long list anyway, so just scan for the value
2380 	 */
2381 	for (i = 0; i < pknumatts; i++)
2382 		if (key == pkattnums[i])
2383 			return i;
2384 
2385 	return -1;
2386 }
2387 
2388 static HeapTuple
get_tuple_of_interest(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals)2389 get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2390 {
2391 	char	   *relname;
2392 	TupleDesc	tupdesc;
2393 	int			natts;
2394 	StringInfoData buf;
2395 	int			ret;
2396 	HeapTuple	tuple;
2397 	int			i;
2398 
2399 	/*
2400 	 * Connect to SPI manager
2401 	 */
2402 	if ((ret = SPI_connect()) < 0)
2403 		/* internal error */
2404 		elog(ERROR, "SPI connect failure - returned %d", ret);
2405 
2406 	initStringInfo(&buf);
2407 
2408 	/* get relation name including any needed schema prefix and quoting */
2409 	relname = generate_relation_name(rel);
2410 
2411 	tupdesc = rel->rd_att;
2412 	natts = tupdesc->natts;
2413 
2414 	/*
2415 	 * Build sql statement to look up tuple of interest, ie, the one matching
2416 	 * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
2417 	 * generate a result tuple that matches the table's physical structure,
2418 	 * with NULLs for any dropped columns.  Otherwise we have to deal with two
2419 	 * different tupdescs and everything's very confusing.
2420 	 */
2421 	appendStringInfoString(&buf, "SELECT ");
2422 
2423 	for (i = 0; i < natts; i++)
2424 	{
2425 		Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2426 
2427 		if (i > 0)
2428 			appendStringInfoString(&buf, ", ");
2429 
2430 		if (attr->attisdropped)
2431 			appendStringInfoString(&buf, "NULL");
2432 		else
2433 			appendStringInfoString(&buf,
2434 								   quote_ident_cstr(NameStr(attr->attname)));
2435 	}
2436 
2437 	appendStringInfo(&buf, " FROM %s WHERE ", relname);
2438 
2439 	for (i = 0; i < pknumatts; i++)
2440 	{
2441 		int			pkattnum = pkattnums[i];
2442 		Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2443 
2444 		if (i > 0)
2445 			appendStringInfoString(&buf, " AND ");
2446 
2447 		appendStringInfoString(&buf,
2448 							   quote_ident_cstr(NameStr(attr->attname)));
2449 
2450 		if (src_pkattvals[i] != NULL)
2451 			appendStringInfo(&buf, " = %s",
2452 							 quote_literal_cstr(src_pkattvals[i]));
2453 		else
2454 			appendStringInfoString(&buf, " IS NULL");
2455 	}
2456 
2457 	/*
2458 	 * Retrieve the desired tuple
2459 	 */
2460 	ret = SPI_exec(buf.data, 0);
2461 	pfree(buf.data);
2462 
2463 	/*
2464 	 * Only allow one qualifying tuple
2465 	 */
2466 	if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2467 		ereport(ERROR,
2468 				(errcode(ERRCODE_CARDINALITY_VIOLATION),
2469 				 errmsg("source criteria matched more than one record")));
2470 
2471 	else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2472 	{
2473 		SPITupleTable *tuptable = SPI_tuptable;
2474 
2475 		tuple = SPI_copytuple(tuptable->vals[0]);
2476 		SPI_finish();
2477 
2478 		return tuple;
2479 	}
2480 	else
2481 	{
2482 		/*
2483 		 * no qualifying tuples
2484 		 */
2485 		SPI_finish();
2486 
2487 		return NULL;
2488 	}
2489 
2490 	/*
2491 	 * never reached, but keep compiler quiet
2492 	 */
2493 	return NULL;
2494 }
2495 
2496 /*
2497  * Open the relation named by relname_text, acquire specified type of lock,
2498  * verify we have specified permissions.
2499  * Caller must close rel when done with it.
2500  */
2501 static Relation
get_rel_from_relname(text * relname_text,LOCKMODE lockmode,AclMode aclmode)2502 get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2503 {
2504 	RangeVar   *relvar;
2505 	Relation	rel;
2506 	AclResult	aclresult;
2507 
2508 	relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2509 	rel = heap_openrv(relvar, lockmode);
2510 
2511 	aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2512 								  aclmode);
2513 	if (aclresult != ACLCHECK_OK)
2514 		aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
2515 					   RelationGetRelationName(rel));
2516 
2517 	return rel;
2518 }
2519 
2520 /*
2521  * generate_relation_name - copied from ruleutils.c
2522  *		Compute the name to display for a relation
2523  *
2524  * The result includes all necessary quoting and schema-prefixing.
2525  */
2526 static char *
generate_relation_name(Relation rel)2527 generate_relation_name(Relation rel)
2528 {
2529 	char	   *nspname;
2530 	char	   *result;
2531 
2532 	/* Qualify the name if not visible in search path */
2533 	if (RelationIsVisible(RelationGetRelid(rel)))
2534 		nspname = NULL;
2535 	else
2536 		nspname = get_namespace_name(rel->rd_rel->relnamespace);
2537 
2538 	result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2539 
2540 	return result;
2541 }
2542 
2543 
2544 static remoteConn *
getConnectionByName(const char * name)2545 getConnectionByName(const char *name)
2546 {
2547 	remoteConnHashEnt *hentry;
2548 	char	   *key;
2549 
2550 	if (!remoteConnHash)
2551 		remoteConnHash = createConnHash();
2552 
2553 	key = pstrdup(name);
2554 	truncate_identifier(key, strlen(key), false);
2555 	hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2556 											   key, HASH_FIND, NULL);
2557 
2558 	if (hentry)
2559 		return hentry->rconn;
2560 
2561 	return NULL;
2562 }
2563 
2564 static HTAB *
createConnHash(void)2565 createConnHash(void)
2566 {
2567 	HASHCTL		ctl;
2568 
2569 	ctl.keysize = NAMEDATALEN;
2570 	ctl.entrysize = sizeof(remoteConnHashEnt);
2571 
2572 	return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
2573 }
2574 
2575 static void
createNewConnection(const char * name,remoteConn * rconn)2576 createNewConnection(const char *name, remoteConn *rconn)
2577 {
2578 	remoteConnHashEnt *hentry;
2579 	bool		found;
2580 	char	   *key;
2581 
2582 	if (!remoteConnHash)
2583 		remoteConnHash = createConnHash();
2584 
2585 	key = pstrdup(name);
2586 	truncate_identifier(key, strlen(key), true);
2587 	hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2588 											   HASH_ENTER, &found);
2589 
2590 	if (found)
2591 	{
2592 		PQfinish(rconn->conn);
2593 		pfree(rconn);
2594 
2595 		ereport(ERROR,
2596 				(errcode(ERRCODE_DUPLICATE_OBJECT),
2597 				 errmsg("duplicate connection name")));
2598 	}
2599 
2600 	hentry->rconn = rconn;
2601 	strlcpy(hentry->name, name, sizeof(hentry->name));
2602 }
2603 
2604 static void
deleteConnection(const char * name)2605 deleteConnection(const char *name)
2606 {
2607 	remoteConnHashEnt *hentry;
2608 	bool		found;
2609 	char	   *key;
2610 
2611 	if (!remoteConnHash)
2612 		remoteConnHash = createConnHash();
2613 
2614 	key = pstrdup(name);
2615 	truncate_identifier(key, strlen(key), false);
2616 	hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2617 											   key, HASH_REMOVE, &found);
2618 
2619 	if (!hentry)
2620 		ereport(ERROR,
2621 				(errcode(ERRCODE_UNDEFINED_OBJECT),
2622 				 errmsg("undefined connection name")));
2623 
2624 }
2625 
2626 static void
dblink_security_check(PGconn * conn,remoteConn * rconn)2627 dblink_security_check(PGconn *conn, remoteConn *rconn)
2628 {
2629 	if (!superuser())
2630 	{
2631 		if (!PQconnectionUsedPassword(conn))
2632 		{
2633 			PQfinish(conn);
2634 			if (rconn)
2635 				pfree(rconn);
2636 
2637 			ereport(ERROR,
2638 					(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2639 					 errmsg("password is required"),
2640 					 errdetail("Non-superuser cannot connect if the server does not request a password."),
2641 					 errhint("Target server's authentication method must be changed.")));
2642 		}
2643 	}
2644 }
2645 
2646 /*
2647  * For non-superusers, insist that the connstr specify a password.  This
2648  * prevents a password from being picked up from .pgpass, a service file,
2649  * the environment, etc.  We don't want the postgres user's passwords
2650  * to be accessible to non-superusers.
2651  */
2652 static void
dblink_connstr_check(const char * connstr)2653 dblink_connstr_check(const char *connstr)
2654 {
2655 	if (!superuser())
2656 	{
2657 		PQconninfoOption *options;
2658 		PQconninfoOption *option;
2659 		bool		connstr_gives_password = false;
2660 
2661 		options = PQconninfoParse(connstr, NULL);
2662 		if (options)
2663 		{
2664 			for (option = options; option->keyword != NULL; option++)
2665 			{
2666 				if (strcmp(option->keyword, "password") == 0)
2667 				{
2668 					if (option->val != NULL && option->val[0] != '\0')
2669 					{
2670 						connstr_gives_password = true;
2671 						break;
2672 					}
2673 				}
2674 			}
2675 			PQconninfoFree(options);
2676 		}
2677 
2678 		if (!connstr_gives_password)
2679 			ereport(ERROR,
2680 					(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2681 					 errmsg("password is required"),
2682 					 errdetail("Non-superusers must provide a password in the connection string.")));
2683 	}
2684 }
2685 
2686 /*
2687  * Report an error received from the remote server
2688  *
2689  * res: the received error result (will be freed)
2690  * fail: true for ERROR ereport, false for NOTICE
2691  * fmt and following args: sprintf-style format and values for errcontext;
2692  * the resulting string should be worded like "while <some action>"
2693  */
2694 static void
dblink_res_error(PGconn * conn,const char * conname,PGresult * res,bool fail,const char * fmt,...)2695 dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2696 				 bool fail, const char *fmt,...)
2697 {
2698 	int			level;
2699 	char	   *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2700 	char	   *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2701 	char	   *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2702 	char	   *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2703 	char	   *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2704 	int			sqlstate;
2705 	char	   *message_primary;
2706 	char	   *message_detail;
2707 	char	   *message_hint;
2708 	char	   *message_context;
2709 	va_list		ap;
2710 	char		dblink_context_msg[512];
2711 
2712 	if (fail)
2713 		level = ERROR;
2714 	else
2715 		level = NOTICE;
2716 
2717 	if (pg_diag_sqlstate)
2718 		sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2719 								 pg_diag_sqlstate[1],
2720 								 pg_diag_sqlstate[2],
2721 								 pg_diag_sqlstate[3],
2722 								 pg_diag_sqlstate[4]);
2723 	else
2724 		sqlstate = ERRCODE_CONNECTION_FAILURE;
2725 
2726 	message_primary = xpstrdup(pg_diag_message_primary);
2727 	message_detail = xpstrdup(pg_diag_message_detail);
2728 	message_hint = xpstrdup(pg_diag_message_hint);
2729 	message_context = xpstrdup(pg_diag_context);
2730 
2731 	/*
2732 	 * If we don't get a message from the PGresult, try the PGconn.  This is
2733 	 * needed because for connection-level failures, PQexec may just return
2734 	 * NULL, not a PGresult at all.
2735 	 */
2736 	if (message_primary == NULL)
2737 		message_primary = pchomp(PQerrorMessage(conn));
2738 
2739 	/*
2740 	 * Now that we've copied all the data we need out of the PGresult, it's
2741 	 * safe to free it.  We must do this to avoid PGresult leakage.  We're
2742 	 * leaking all the strings too, but those are in palloc'd memory that will
2743 	 * get cleaned up eventually.
2744 	 */
2745 	if (res)
2746 		PQclear(res);
2747 
2748 	/*
2749 	 * Format the basic errcontext string.  Below, we'll add on something
2750 	 * about the connection name.  That's a violation of the translatability
2751 	 * guidelines about constructing error messages out of parts, but since
2752 	 * there's no translation support for dblink, there's no need to worry
2753 	 * about that (yet).
2754 	 */
2755 	va_start(ap, fmt);
2756 	vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2757 	va_end(ap);
2758 
2759 	ereport(level,
2760 			(errcode(sqlstate),
2761 			 message_primary ? errmsg_internal("%s", message_primary) :
2762 			 errmsg("could not obtain message string for remote error"),
2763 			 message_detail ? errdetail_internal("%s", message_detail) : 0,
2764 			 message_hint ? errhint("%s", message_hint) : 0,
2765 			 message_context ? (errcontext("%s", message_context)) : 0,
2766 			 conname ?
2767 			 (errcontext("%s on dblink connection named \"%s\"",
2768 						 dblink_context_msg, conname)) :
2769 			 (errcontext("%s on unnamed dblink connection",
2770 						 dblink_context_msg))));
2771 }
2772 
2773 /*
2774  * Obtain connection string for a foreign server
2775  */
2776 static char *
get_connect_string(const char * servername)2777 get_connect_string(const char *servername)
2778 {
2779 	ForeignServer *foreign_server = NULL;
2780 	UserMapping *user_mapping;
2781 	ListCell   *cell;
2782 	StringInfoData buf;
2783 	ForeignDataWrapper *fdw;
2784 	AclResult	aclresult;
2785 	char	   *srvname;
2786 
2787 	static const PQconninfoOption *options = NULL;
2788 
2789 	initStringInfo(&buf);
2790 
2791 	/*
2792 	 * Get list of valid libpq options.
2793 	 *
2794 	 * To avoid unnecessary work, we get the list once and use it throughout
2795 	 * the lifetime of this backend process.  We don't need to care about
2796 	 * memory context issues, because PQconndefaults allocates with malloc.
2797 	 */
2798 	if (!options)
2799 	{
2800 		options = PQconndefaults();
2801 		if (!options)			/* assume reason for failure is OOM */
2802 			ereport(ERROR,
2803 					(errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2804 					 errmsg("out of memory"),
2805 					 errdetail("Could not get libpq's default connection options.")));
2806 	}
2807 
2808 	/* first gather the server connstr options */
2809 	srvname = pstrdup(servername);
2810 	truncate_identifier(srvname, strlen(srvname), false);
2811 	foreign_server = GetForeignServerByName(srvname, true);
2812 
2813 	if (foreign_server)
2814 	{
2815 		Oid			serverid = foreign_server->serverid;
2816 		Oid			fdwid = foreign_server->fdwid;
2817 		Oid			userid = GetUserId();
2818 
2819 		user_mapping = GetUserMapping(userid, serverid);
2820 		fdw = GetForeignDataWrapper(fdwid);
2821 
2822 		/* Check permissions, user must have usage on the server. */
2823 		aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
2824 		if (aclresult != ACLCHECK_OK)
2825 			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2826 
2827 		foreach(cell, fdw->options)
2828 		{
2829 			DefElem    *def = lfirst(cell);
2830 
2831 			if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2832 				appendStringInfo(&buf, "%s='%s' ", def->defname,
2833 								 escape_param_str(strVal(def->arg)));
2834 		}
2835 
2836 		foreach(cell, foreign_server->options)
2837 		{
2838 			DefElem    *def = lfirst(cell);
2839 
2840 			if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2841 				appendStringInfo(&buf, "%s='%s' ", def->defname,
2842 								 escape_param_str(strVal(def->arg)));
2843 		}
2844 
2845 		foreach(cell, user_mapping->options)
2846 		{
2847 
2848 			DefElem    *def = lfirst(cell);
2849 
2850 			if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2851 				appendStringInfo(&buf, "%s='%s' ", def->defname,
2852 								 escape_param_str(strVal(def->arg)));
2853 		}
2854 
2855 		return buf.data;
2856 	}
2857 	else
2858 		return NULL;
2859 }
2860 
2861 /*
2862  * Escaping libpq connect parameter strings.
2863  *
2864  * Replaces "'" with "\'" and "\" with "\\".
2865  */
2866 static char *
escape_param_str(const char * str)2867 escape_param_str(const char *str)
2868 {
2869 	const char *cp;
2870 	StringInfoData buf;
2871 
2872 	initStringInfo(&buf);
2873 
2874 	for (cp = str; *cp; cp++)
2875 	{
2876 		if (*cp == '\\' || *cp == '\'')
2877 			appendStringInfoChar(&buf, '\\');
2878 		appendStringInfoChar(&buf, *cp);
2879 	}
2880 
2881 	return buf.data;
2882 }
2883 
2884 /*
2885  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2886  * functions, and translate to the internal representation.
2887  *
2888  * The user supplies an int2vector of 1-based logical attnums, plus a count
2889  * argument (the need for the separate count argument is historical, but we
2890  * still check it).  We check that each attnum corresponds to a valid,
2891  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
2892  * listed twice, though the actual use-case for such things is dubious.
2893  * Note that before Postgres 9.0, the user's attnums were interpreted as
2894  * physical not logical column numbers; this was changed for future-proofing.
2895  *
2896  * The internal representation is a palloc'd int array of 0-based physical
2897  * attnums.
2898  */
2899 static void
validate_pkattnums(Relation rel,int2vector * pkattnums_arg,int32 pknumatts_arg,int ** pkattnums,int * pknumatts)2900 validate_pkattnums(Relation rel,
2901 				   int2vector *pkattnums_arg, int32 pknumatts_arg,
2902 				   int **pkattnums, int *pknumatts)
2903 {
2904 	TupleDesc	tupdesc = rel->rd_att;
2905 	int			natts = tupdesc->natts;
2906 	int			i;
2907 
2908 	/* Don't take more array elements than there are */
2909 	pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
2910 
2911 	/* Must have at least one pk attnum selected */
2912 	if (pknumatts_arg <= 0)
2913 		ereport(ERROR,
2914 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2915 				 errmsg("number of key attributes must be > 0")));
2916 
2917 	/* Allocate output array */
2918 	*pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
2919 	*pknumatts = pknumatts_arg;
2920 
2921 	/* Validate attnums and convert to internal form */
2922 	for (i = 0; i < pknumatts_arg; i++)
2923 	{
2924 		int			pkattnum = pkattnums_arg->values[i];
2925 		int			lnum;
2926 		int			j;
2927 
2928 		/* Can throw error immediately if out of range */
2929 		if (pkattnum <= 0 || pkattnum > natts)
2930 			ereport(ERROR,
2931 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2932 					 errmsg("invalid attribute number %d", pkattnum)));
2933 
2934 		/* Identify which physical column has this logical number */
2935 		lnum = 0;
2936 		for (j = 0; j < natts; j++)
2937 		{
2938 			/* dropped columns don't count */
2939 			if (TupleDescAttr(tupdesc, j)->attisdropped)
2940 				continue;
2941 
2942 			if (++lnum == pkattnum)
2943 				break;
2944 		}
2945 
2946 		if (j < natts)
2947 			(*pkattnums)[i] = j;
2948 		else
2949 			ereport(ERROR,
2950 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2951 					 errmsg("invalid attribute number %d", pkattnum)));
2952 	}
2953 }
2954 
2955 /*
2956  * Check if the specified connection option is valid.
2957  *
2958  * We basically allow whatever libpq thinks is an option, with these
2959  * restrictions:
2960  *		debug options: disallowed
2961  *		"client_encoding": disallowed
2962  *		"user": valid only in USER MAPPING options
2963  *		secure options (eg password): valid only in USER MAPPING options
2964  *		others: valid only in FOREIGN SERVER options
2965  *
2966  * We disallow client_encoding because it would be overridden anyway via
2967  * PQclientEncoding; allowing it to be specified would merely promote
2968  * confusion.
2969  */
2970 static bool
is_valid_dblink_option(const PQconninfoOption * options,const char * option,Oid context)2971 is_valid_dblink_option(const PQconninfoOption *options, const char *option,
2972 					   Oid context)
2973 {
2974 	const PQconninfoOption *opt;
2975 
2976 	/* Look up the option in libpq result */
2977 	for (opt = options; opt->keyword; opt++)
2978 	{
2979 		if (strcmp(opt->keyword, option) == 0)
2980 			break;
2981 	}
2982 	if (opt->keyword == NULL)
2983 		return false;
2984 
2985 	/* Disallow debug options (particularly "replication") */
2986 	if (strchr(opt->dispchar, 'D'))
2987 		return false;
2988 
2989 	/* Disallow "client_encoding" */
2990 	if (strcmp(opt->keyword, "client_encoding") == 0)
2991 		return false;
2992 
2993 	/*
2994 	 * If the option is "user" or marked secure, it should be specified only
2995 	 * in USER MAPPING.  Others should be specified only in SERVER.
2996 	 */
2997 	if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
2998 	{
2999 		if (context != UserMappingRelationId)
3000 			return false;
3001 	}
3002 	else
3003 	{
3004 		if (context != ForeignServerRelationId)
3005 			return false;
3006 	}
3007 
3008 	return true;
3009 }
3010 
3011 /*
3012  * Copy the remote session's values of GUCs that affect datatype I/O
3013  * and apply them locally in a new GUC nesting level.  Returns the new
3014  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3015  * or -1 if no new nestlevel was needed.
3016  *
3017  * We use the equivalent of a function SET option to allow the settings to
3018  * persist only until the caller calls restoreLocalGucs.  If an error is
3019  * thrown in between, guc.c will take care of undoing the settings.
3020  */
3021 static int
applyRemoteGucs(PGconn * conn)3022 applyRemoteGucs(PGconn *conn)
3023 {
3024 	static const char *const GUCsAffectingIO[] = {
3025 		"DateStyle",
3026 		"IntervalStyle"
3027 	};
3028 
3029 	int			nestlevel = -1;
3030 	int			i;
3031 
3032 	for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3033 	{
3034 		const char *gucName = GUCsAffectingIO[i];
3035 		const char *remoteVal = PQparameterStatus(conn, gucName);
3036 		const char *localVal;
3037 
3038 		/*
3039 		 * If the remote server is pre-8.4, it won't have IntervalStyle, but
3040 		 * that's okay because its output format won't be ambiguous.  So just
3041 		 * skip the GUC if we don't get a value for it.  (We might eventually
3042 		 * need more complicated logic with remote-version checks here.)
3043 		 */
3044 		if (remoteVal == NULL)
3045 			continue;
3046 
3047 		/*
3048 		 * Avoid GUC-setting overhead if the remote and local GUCs already
3049 		 * have the same value.
3050 		 */
3051 		localVal = GetConfigOption(gucName, false, false);
3052 		Assert(localVal != NULL);
3053 
3054 		if (strcmp(remoteVal, localVal) == 0)
3055 			continue;
3056 
3057 		/* Create new GUC nest level if we didn't already */
3058 		if (nestlevel < 0)
3059 			nestlevel = NewGUCNestLevel();
3060 
3061 		/* Apply the option (this will throw error on failure) */
3062 		(void) set_config_option(gucName, remoteVal,
3063 								 PGC_USERSET, PGC_S_SESSION,
3064 								 GUC_ACTION_SAVE, true, 0, false);
3065 	}
3066 
3067 	return nestlevel;
3068 }
3069 
3070 /*
3071  * Restore local GUCs after they have been overlaid with remote settings.
3072  */
3073 static void
restoreLocalGucs(int nestlevel)3074 restoreLocalGucs(int nestlevel)
3075 {
3076 	/* Do nothing if no new nestlevel was created */
3077 	if (nestlevel > 0)
3078 		AtEOXact_GUC(true, nestlevel);
3079 }
3080