1 /*
2  * dblink.c
3  *
4  * Functions returning results from a remote database
5  *
6  * Joe Conway <mail@joeconway.com>
7  * And contributors:
8  * Darko Prenosil <Darko.Prenosil@finteh.hr>
9  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10  *
11  * contrib/dblink/dblink.c
12  * Copyright (c) 2001-2021, PostgreSQL Global Development Group
13  * ALL RIGHTS RESERVED;
14  *
15  * Permission to use, copy, modify, and distribute this software and its
16  * documentation for any purpose, without fee, and without a written agreement
17  * is hereby granted, provided that the above copyright notice and this
18  * paragraph and the following two paragraphs appear in all copies.
19  *
20  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24  * POSSIBILITY OF SUCH DAMAGE.
25  *
26  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
29  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31  *
32  */
33 #include "postgres.h"
34 
35 #include <limits.h>
36 
37 #include "access/htup_details.h"
38 #include "access/relation.h"
39 #include "access/reloptions.h"
40 #include "access/table.h"
41 #include "catalog/namespace.h"
42 #include "catalog/pg_foreign_data_wrapper.h"
43 #include "catalog/pg_foreign_server.h"
44 #include "catalog/pg_type.h"
45 #include "catalog/pg_user_mapping.h"
46 #include "executor/spi.h"
47 #include "foreign/foreign.h"
48 #include "funcapi.h"
49 #include "lib/stringinfo.h"
50 #include "libpq-fe.h"
51 #include "mb/pg_wchar.h"
52 #include "miscadmin.h"
53 #include "parser/scansup.h"
54 #include "utils/acl.h"
55 #include "utils/builtins.h"
56 #include "utils/fmgroids.h"
57 #include "utils/guc.h"
58 #include "utils/lsyscache.h"
59 #include "utils/memutils.h"
60 #include "utils/rel.h"
61 #include "utils/varlena.h"
62 
63 PG_MODULE_MAGIC;
64 
65 typedef struct remoteConn
66 {
67 	PGconn	   *conn;			/* Hold the remote connection */
68 	int			openCursorCount;	/* The number of open cursors */
69 	bool		newXactForCursor;	/* Opened a transaction for a cursor */
70 } remoteConn;
71 
72 typedef struct storeInfo
73 {
74 	FunctionCallInfo fcinfo;
75 	Tuplestorestate *tuplestore;
76 	AttInMetadata *attinmeta;
77 	MemoryContext tmpcontext;
78 	char	  **cstrs;
79 	/* temp storage for results to avoid leaks on exception */
80 	PGresult   *last_res;
81 	PGresult   *cur_res;
82 } storeInfo;
83 
84 /*
85  * Internal declarations
86  */
87 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
88 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
89 static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
90 							  PGresult *res);
91 static void materializeQueryResult(FunctionCallInfo fcinfo,
92 								   PGconn *conn,
93 								   const char *conname,
94 								   const char *sql,
95 								   bool fail);
96 static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
97 static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
98 static remoteConn *getConnectionByName(const char *name);
99 static HTAB *createConnHash(void);
100 static void createNewConnection(const char *name, remoteConn *rconn);
101 static void deleteConnection(const char *name);
102 static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
103 static char **get_text_array_contents(ArrayType *array, int *numitems);
104 static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
105 static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
106 static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
107 static char *quote_ident_cstr(char *rawstr);
108 static int	get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
109 static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
110 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
111 static char *generate_relation_name(Relation rel);
112 static void dblink_connstr_check(const char *connstr);
113 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
114 static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
115 							 bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
116 static char *get_connect_string(const char *servername);
117 static char *escape_param_str(const char *from);
118 static void validate_pkattnums(Relation rel,
119 							   int2vector *pkattnums_arg, int32 pknumatts_arg,
120 							   int **pkattnums, int *pknumatts);
121 static bool is_valid_dblink_option(const PQconninfoOption *options,
122 								   const char *option, Oid context);
123 static int	applyRemoteGucs(PGconn *conn);
124 static void restoreLocalGucs(int nestlevel);
125 
126 /* Global */
127 static remoteConn *pconn = NULL;
128 static HTAB *remoteConnHash = NULL;
129 
130 /*
131  *	Following is list that holds multiple remote connections.
132  *	Calling convention of each dblink function changes to accept
133  *	connection name as the first parameter. The connection list is
134  *	much like ecpg e.g. a mapping between a name and a PGconn object.
135  */
136 
137 typedef struct remoteConnHashEnt
138 {
139 	char		name[NAMEDATALEN];
140 	remoteConn *rconn;
141 } remoteConnHashEnt;
142 
143 /* initial number of connection hashes */
144 #define NUMCONN 16
145 
146 static char *
xpstrdup(const char * in)147 xpstrdup(const char *in)
148 {
149 	if (in == NULL)
150 		return NULL;
151 	return pstrdup(in);
152 }
153 
154 static void
pg_attribute_noreturn()155 pg_attribute_noreturn()
156 dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
157 {
158 	char	   *msg = pchomp(PQerrorMessage(conn));
159 
160 	if (res)
161 		PQclear(res);
162 	elog(ERROR, "%s: %s", p2, msg);
163 }
164 
165 static void
pg_attribute_noreturn()166 pg_attribute_noreturn()
167 dblink_conn_not_avail(const char *conname)
168 {
169 	if (conname)
170 		ereport(ERROR,
171 				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
172 				 errmsg("connection \"%s\" not available", conname)));
173 	else
174 		ereport(ERROR,
175 				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
176 				 errmsg("connection not available")));
177 }
178 
179 static void
dblink_get_conn(char * conname_or_str,PGconn * volatile * conn_p,char ** conname_p,volatile bool * freeconn_p)180 dblink_get_conn(char *conname_or_str,
181 				PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
182 {
183 	remoteConn *rconn = getConnectionByName(conname_or_str);
184 	PGconn	   *conn;
185 	char	   *conname;
186 	bool		freeconn;
187 
188 	if (rconn)
189 	{
190 		conn = rconn->conn;
191 		conname = conname_or_str;
192 		freeconn = false;
193 	}
194 	else
195 	{
196 		const char *connstr;
197 
198 		connstr = get_connect_string(conname_or_str);
199 		if (connstr == NULL)
200 			connstr = conname_or_str;
201 		dblink_connstr_check(connstr);
202 
203 		/*
204 		 * We must obey fd.c's limit on non-virtual file descriptors.  Assume
205 		 * that a PGconn represents one long-lived FD.  (Doing this here also
206 		 * ensures that VFDs are closed if needed to make room.)
207 		 */
208 		if (!AcquireExternalFD())
209 		{
210 #ifndef WIN32					/* can't write #if within ereport() macro */
211 			ereport(ERROR,
212 					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
213 					 errmsg("could not establish connection"),
214 					 errdetail("There are too many open files on the local server."),
215 					 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
216 #else
217 			ereport(ERROR,
218 					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
219 					 errmsg("could not establish connection"),
220 					 errdetail("There are too many open files on the local server."),
221 					 errhint("Raise the server's max_files_per_process setting.")));
222 #endif
223 		}
224 
225 		/* OK to make connection */
226 		conn = PQconnectdb(connstr);
227 
228 		if (PQstatus(conn) == CONNECTION_BAD)
229 		{
230 			char	   *msg = pchomp(PQerrorMessage(conn));
231 
232 			PQfinish(conn);
233 			ReleaseExternalFD();
234 			ereport(ERROR,
235 					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
236 					 errmsg("could not establish connection"),
237 					 errdetail_internal("%s", msg)));
238 		}
239 		dblink_security_check(conn, rconn);
240 		if (PQclientEncoding(conn) != GetDatabaseEncoding())
241 			PQsetClientEncoding(conn, GetDatabaseEncodingName());
242 		freeconn = true;
243 		conname = NULL;
244 	}
245 
246 	*conn_p = conn;
247 	*conname_p = conname;
248 	*freeconn_p = freeconn;
249 }
250 
251 static PGconn *
dblink_get_named_conn(const char * conname)252 dblink_get_named_conn(const char *conname)
253 {
254 	remoteConn *rconn = getConnectionByName(conname);
255 
256 	if (rconn)
257 		return rconn->conn;
258 
259 	dblink_conn_not_avail(conname);
260 	return NULL;				/* keep compiler quiet */
261 }
262 
263 static void
dblink_init(void)264 dblink_init(void)
265 {
266 	if (!pconn)
267 	{
268 		pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
269 		pconn->conn = NULL;
270 		pconn->openCursorCount = 0;
271 		pconn->newXactForCursor = false;
272 	}
273 }
274 
275 /*
276  * Create a persistent connection to another database
277  */
278 PG_FUNCTION_INFO_V1(dblink_connect);
279 Datum
dblink_connect(PG_FUNCTION_ARGS)280 dblink_connect(PG_FUNCTION_ARGS)
281 {
282 	char	   *conname_or_str = NULL;
283 	char	   *connstr = NULL;
284 	char	   *connname = NULL;
285 	char	   *msg;
286 	PGconn	   *conn = NULL;
287 	remoteConn *rconn = NULL;
288 
289 	dblink_init();
290 
291 	if (PG_NARGS() == 2)
292 	{
293 		conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
294 		connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
295 	}
296 	else if (PG_NARGS() == 1)
297 		conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
298 
299 	if (connname)
300 	{
301 		rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
302 												  sizeof(remoteConn));
303 		rconn->conn = NULL;
304 		rconn->openCursorCount = 0;
305 		rconn->newXactForCursor = false;
306 	}
307 
308 	/* first check for valid foreign data server */
309 	connstr = get_connect_string(conname_or_str);
310 	if (connstr == NULL)
311 		connstr = conname_or_str;
312 
313 	/* check password in connection string if not superuser */
314 	dblink_connstr_check(connstr);
315 
316 	/*
317 	 * We must obey fd.c's limit on non-virtual file descriptors.  Assume that
318 	 * a PGconn represents one long-lived FD.  (Doing this here also ensures
319 	 * that VFDs are closed if needed to make room.)
320 	 */
321 	if (!AcquireExternalFD())
322 	{
323 #ifndef WIN32					/* can't write #if within ereport() macro */
324 		ereport(ERROR,
325 				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
326 				 errmsg("could not establish connection"),
327 				 errdetail("There are too many open files on the local server."),
328 				 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
329 #else
330 		ereport(ERROR,
331 				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
332 				 errmsg("could not establish connection"),
333 				 errdetail("There are too many open files on the local server."),
334 				 errhint("Raise the server's max_files_per_process setting.")));
335 #endif
336 	}
337 
338 	/* OK to make connection */
339 	conn = PQconnectdb(connstr);
340 
341 	if (PQstatus(conn) == CONNECTION_BAD)
342 	{
343 		msg = pchomp(PQerrorMessage(conn));
344 		PQfinish(conn);
345 		ReleaseExternalFD();
346 		if (rconn)
347 			pfree(rconn);
348 
349 		ereport(ERROR,
350 				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
351 				 errmsg("could not establish connection"),
352 				 errdetail_internal("%s", msg)));
353 	}
354 
355 	/* check password actually used if not superuser */
356 	dblink_security_check(conn, rconn);
357 
358 	/* attempt to set client encoding to match server encoding, if needed */
359 	if (PQclientEncoding(conn) != GetDatabaseEncoding())
360 		PQsetClientEncoding(conn, GetDatabaseEncodingName());
361 
362 	if (connname)
363 	{
364 		rconn->conn = conn;
365 		createNewConnection(connname, rconn);
366 	}
367 	else
368 	{
369 		if (pconn->conn)
370 		{
371 			PQfinish(pconn->conn);
372 			ReleaseExternalFD();
373 		}
374 		pconn->conn = conn;
375 	}
376 
377 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
378 }
379 
380 /*
381  * Clear a persistent connection to another database
382  */
383 PG_FUNCTION_INFO_V1(dblink_disconnect);
384 Datum
dblink_disconnect(PG_FUNCTION_ARGS)385 dblink_disconnect(PG_FUNCTION_ARGS)
386 {
387 	char	   *conname = NULL;
388 	remoteConn *rconn = NULL;
389 	PGconn	   *conn = NULL;
390 
391 	dblink_init();
392 
393 	if (PG_NARGS() == 1)
394 	{
395 		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
396 		rconn = getConnectionByName(conname);
397 		if (rconn)
398 			conn = rconn->conn;
399 	}
400 	else
401 		conn = pconn->conn;
402 
403 	if (!conn)
404 		dblink_conn_not_avail(conname);
405 
406 	PQfinish(conn);
407 	ReleaseExternalFD();
408 	if (rconn)
409 	{
410 		deleteConnection(conname);
411 		pfree(rconn);
412 	}
413 	else
414 		pconn->conn = NULL;
415 
416 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
417 }
418 
419 /*
420  * opens a cursor using a persistent connection
421  */
422 PG_FUNCTION_INFO_V1(dblink_open);
423 Datum
dblink_open(PG_FUNCTION_ARGS)424 dblink_open(PG_FUNCTION_ARGS)
425 {
426 	PGresult   *res = NULL;
427 	PGconn	   *conn;
428 	char	   *curname = NULL;
429 	char	   *sql = NULL;
430 	char	   *conname = NULL;
431 	StringInfoData buf;
432 	remoteConn *rconn = NULL;
433 	bool		fail = true;	/* default to backward compatible behavior */
434 
435 	dblink_init();
436 	initStringInfo(&buf);
437 
438 	if (PG_NARGS() == 2)
439 	{
440 		/* text,text */
441 		curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
442 		sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
443 		rconn = pconn;
444 	}
445 	else if (PG_NARGS() == 3)
446 	{
447 		/* might be text,text,text or text,text,bool */
448 		if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
449 		{
450 			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
451 			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
452 			fail = PG_GETARG_BOOL(2);
453 			rconn = pconn;
454 		}
455 		else
456 		{
457 			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
458 			curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
459 			sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
460 			rconn = getConnectionByName(conname);
461 		}
462 	}
463 	else if (PG_NARGS() == 4)
464 	{
465 		/* text,text,text,bool */
466 		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
467 		curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
468 		sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
469 		fail = PG_GETARG_BOOL(3);
470 		rconn = getConnectionByName(conname);
471 	}
472 
473 	if (!rconn || !rconn->conn)
474 		dblink_conn_not_avail(conname);
475 
476 	conn = rconn->conn;
477 
478 	/* If we are not in a transaction, start one */
479 	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
480 	{
481 		res = PQexec(conn, "BEGIN");
482 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
483 			dblink_res_internalerror(conn, res, "begin error");
484 		PQclear(res);
485 		rconn->newXactForCursor = true;
486 
487 		/*
488 		 * Since transaction state was IDLE, we force cursor count to
489 		 * initially be 0. This is needed as a previous ABORT might have wiped
490 		 * out our transaction without maintaining the cursor count for us.
491 		 */
492 		rconn->openCursorCount = 0;
493 	}
494 
495 	/* if we started a transaction, increment cursor count */
496 	if (rconn->newXactForCursor)
497 		(rconn->openCursorCount)++;
498 
499 	appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
500 	res = PQexec(conn, buf.data);
501 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
502 	{
503 		dblink_res_error(conn, conname, res, fail,
504 						 "while opening cursor \"%s\"", curname);
505 		PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
506 	}
507 
508 	PQclear(res);
509 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
510 }
511 
512 /*
513  * closes a cursor
514  */
515 PG_FUNCTION_INFO_V1(dblink_close);
516 Datum
dblink_close(PG_FUNCTION_ARGS)517 dblink_close(PG_FUNCTION_ARGS)
518 {
519 	PGconn	   *conn;
520 	PGresult   *res = NULL;
521 	char	   *curname = NULL;
522 	char	   *conname = NULL;
523 	StringInfoData buf;
524 	remoteConn *rconn = NULL;
525 	bool		fail = true;	/* default to backward compatible behavior */
526 
527 	dblink_init();
528 	initStringInfo(&buf);
529 
530 	if (PG_NARGS() == 1)
531 	{
532 		/* text */
533 		curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
534 		rconn = pconn;
535 	}
536 	else if (PG_NARGS() == 2)
537 	{
538 		/* might be text,text or text,bool */
539 		if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
540 		{
541 			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
542 			fail = PG_GETARG_BOOL(1);
543 			rconn = pconn;
544 		}
545 		else
546 		{
547 			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
548 			curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
549 			rconn = getConnectionByName(conname);
550 		}
551 	}
552 	if (PG_NARGS() == 3)
553 	{
554 		/* text,text,bool */
555 		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
556 		curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
557 		fail = PG_GETARG_BOOL(2);
558 		rconn = getConnectionByName(conname);
559 	}
560 
561 	if (!rconn || !rconn->conn)
562 		dblink_conn_not_avail(conname);
563 
564 	conn = rconn->conn;
565 
566 	appendStringInfo(&buf, "CLOSE %s", curname);
567 
568 	/* close the cursor */
569 	res = PQexec(conn, buf.data);
570 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
571 	{
572 		dblink_res_error(conn, conname, res, fail,
573 						 "while closing cursor \"%s\"", curname);
574 		PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
575 	}
576 
577 	PQclear(res);
578 
579 	/* if we started a transaction, decrement cursor count */
580 	if (rconn->newXactForCursor)
581 	{
582 		(rconn->openCursorCount)--;
583 
584 		/* if count is zero, commit the transaction */
585 		if (rconn->openCursorCount == 0)
586 		{
587 			rconn->newXactForCursor = false;
588 
589 			res = PQexec(conn, "COMMIT");
590 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
591 				dblink_res_internalerror(conn, res, "commit error");
592 			PQclear(res);
593 		}
594 	}
595 
596 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
597 }
598 
599 /*
600  * Fetch results from an open cursor
601  */
602 PG_FUNCTION_INFO_V1(dblink_fetch);
603 Datum
dblink_fetch(PG_FUNCTION_ARGS)604 dblink_fetch(PG_FUNCTION_ARGS)
605 {
606 	PGresult   *res = NULL;
607 	char	   *conname = NULL;
608 	remoteConn *rconn = NULL;
609 	PGconn	   *conn = NULL;
610 	StringInfoData buf;
611 	char	   *curname = NULL;
612 	int			howmany = 0;
613 	bool		fail = true;	/* default to backward compatible */
614 
615 	prepTuplestoreResult(fcinfo);
616 
617 	dblink_init();
618 
619 	if (PG_NARGS() == 4)
620 	{
621 		/* text,text,int,bool */
622 		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
623 		curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
624 		howmany = PG_GETARG_INT32(2);
625 		fail = PG_GETARG_BOOL(3);
626 
627 		rconn = getConnectionByName(conname);
628 		if (rconn)
629 			conn = rconn->conn;
630 	}
631 	else if (PG_NARGS() == 3)
632 	{
633 		/* text,text,int or text,int,bool */
634 		if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
635 		{
636 			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
637 			howmany = PG_GETARG_INT32(1);
638 			fail = PG_GETARG_BOOL(2);
639 			conn = pconn->conn;
640 		}
641 		else
642 		{
643 			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
644 			curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
645 			howmany = PG_GETARG_INT32(2);
646 
647 			rconn = getConnectionByName(conname);
648 			if (rconn)
649 				conn = rconn->conn;
650 		}
651 	}
652 	else if (PG_NARGS() == 2)
653 	{
654 		/* text,int */
655 		curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
656 		howmany = PG_GETARG_INT32(1);
657 		conn = pconn->conn;
658 	}
659 
660 	if (!conn)
661 		dblink_conn_not_avail(conname);
662 
663 	initStringInfo(&buf);
664 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
665 
666 	/*
667 	 * Try to execute the query.  Note that since libpq uses malloc, the
668 	 * PGresult will be long-lived even though we are still in a short-lived
669 	 * memory context.
670 	 */
671 	res = PQexec(conn, buf.data);
672 	if (!res ||
673 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
674 		 PQresultStatus(res) != PGRES_TUPLES_OK))
675 	{
676 		dblink_res_error(conn, conname, res, fail,
677 						 "while fetching from cursor \"%s\"", curname);
678 		return (Datum) 0;
679 	}
680 	else if (PQresultStatus(res) == PGRES_COMMAND_OK)
681 	{
682 		/* cursor does not exist - closed already or bad name */
683 		PQclear(res);
684 		ereport(ERROR,
685 				(errcode(ERRCODE_INVALID_CURSOR_NAME),
686 				 errmsg("cursor \"%s\" does not exist", curname)));
687 	}
688 
689 	materializeResult(fcinfo, conn, res);
690 	return (Datum) 0;
691 }
692 
693 /*
694  * Note: this is the new preferred version of dblink
695  */
696 PG_FUNCTION_INFO_V1(dblink_record);
697 Datum
dblink_record(PG_FUNCTION_ARGS)698 dblink_record(PG_FUNCTION_ARGS)
699 {
700 	return dblink_record_internal(fcinfo, false);
701 }
702 
703 PG_FUNCTION_INFO_V1(dblink_send_query);
704 Datum
dblink_send_query(PG_FUNCTION_ARGS)705 dblink_send_query(PG_FUNCTION_ARGS)
706 {
707 	PGconn	   *conn;
708 	char	   *sql;
709 	int			retval;
710 
711 	if (PG_NARGS() == 2)
712 	{
713 		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
714 		sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
715 	}
716 	else
717 		/* shouldn't happen */
718 		elog(ERROR, "wrong number of arguments");
719 
720 	/* async query send */
721 	retval = PQsendQuery(conn, sql);
722 	if (retval != 1)
723 		elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
724 
725 	PG_RETURN_INT32(retval);
726 }
727 
728 PG_FUNCTION_INFO_V1(dblink_get_result);
729 Datum
dblink_get_result(PG_FUNCTION_ARGS)730 dblink_get_result(PG_FUNCTION_ARGS)
731 {
732 	return dblink_record_internal(fcinfo, true);
733 }
734 
735 static Datum
dblink_record_internal(FunctionCallInfo fcinfo,bool is_async)736 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
737 {
738 	PGconn	   *volatile conn = NULL;
739 	volatile bool freeconn = false;
740 
741 	prepTuplestoreResult(fcinfo);
742 
743 	dblink_init();
744 
745 	PG_TRY();
746 	{
747 		char	   *sql = NULL;
748 		char	   *conname = NULL;
749 		bool		fail = true;	/* default to backward compatible */
750 
751 		if (!is_async)
752 		{
753 			if (PG_NARGS() == 3)
754 			{
755 				/* text,text,bool */
756 				conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
757 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
758 				fail = PG_GETARG_BOOL(2);
759 				dblink_get_conn(conname, &conn, &conname, &freeconn);
760 			}
761 			else if (PG_NARGS() == 2)
762 			{
763 				/* text,text or text,bool */
764 				if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
765 				{
766 					sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
767 					fail = PG_GETARG_BOOL(1);
768 					conn = pconn->conn;
769 				}
770 				else
771 				{
772 					conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
773 					sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
774 					dblink_get_conn(conname, &conn, &conname, &freeconn);
775 				}
776 			}
777 			else if (PG_NARGS() == 1)
778 			{
779 				/* text */
780 				conn = pconn->conn;
781 				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
782 			}
783 			else
784 				/* shouldn't happen */
785 				elog(ERROR, "wrong number of arguments");
786 		}
787 		else					/* is_async */
788 		{
789 			/* get async result */
790 			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
791 
792 			if (PG_NARGS() == 2)
793 			{
794 				/* text,bool */
795 				fail = PG_GETARG_BOOL(1);
796 				conn = dblink_get_named_conn(conname);
797 			}
798 			else if (PG_NARGS() == 1)
799 			{
800 				/* text */
801 				conn = dblink_get_named_conn(conname);
802 			}
803 			else
804 				/* shouldn't happen */
805 				elog(ERROR, "wrong number of arguments");
806 		}
807 
808 		if (!conn)
809 			dblink_conn_not_avail(conname);
810 
811 		if (!is_async)
812 		{
813 			/* synchronous query, use efficient tuple collection method */
814 			materializeQueryResult(fcinfo, conn, conname, sql, fail);
815 		}
816 		else
817 		{
818 			/* async result retrieval, do it the old way */
819 			PGresult   *res = PQgetResult(conn);
820 
821 			/* NULL means we're all done with the async results */
822 			if (res)
823 			{
824 				if (PQresultStatus(res) != PGRES_COMMAND_OK &&
825 					PQresultStatus(res) != PGRES_TUPLES_OK)
826 				{
827 					dblink_res_error(conn, conname, res, fail,
828 									 "while executing query");
829 					/* if fail isn't set, we'll return an empty query result */
830 				}
831 				else
832 				{
833 					materializeResult(fcinfo, conn, res);
834 				}
835 			}
836 		}
837 	}
838 	PG_FINALLY();
839 	{
840 		/* if needed, close the connection to the database */
841 		if (freeconn)
842 		{
843 			PQfinish(conn);
844 			ReleaseExternalFD();
845 		}
846 	}
847 	PG_END_TRY();
848 
849 	return (Datum) 0;
850 }
851 
852 /*
853  * Verify function caller can handle a tuplestore result, and set up for that.
854  *
855  * Note: if the caller returns without actually creating a tuplestore, the
856  * executor will treat the function result as an empty set.
857  */
858 static void
prepTuplestoreResult(FunctionCallInfo fcinfo)859 prepTuplestoreResult(FunctionCallInfo fcinfo)
860 {
861 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
862 
863 	/* check to see if query supports us returning a tuplestore */
864 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
865 		ereport(ERROR,
866 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
867 				 errmsg("set-valued function called in context that cannot accept a set")));
868 	if (!(rsinfo->allowedModes & SFRM_Materialize))
869 		ereport(ERROR,
870 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
871 				 errmsg("materialize mode required, but it is not allowed in this context")));
872 
873 	/* let the executor know we're sending back a tuplestore */
874 	rsinfo->returnMode = SFRM_Materialize;
875 
876 	/* caller must fill these to return a non-empty result */
877 	rsinfo->setResult = NULL;
878 	rsinfo->setDesc = NULL;
879 }
880 
881 /*
882  * Copy the contents of the PGresult into a tuplestore to be returned
883  * as the result of the current function.
884  * The PGresult will be released in this function.
885  */
886 static void
materializeResult(FunctionCallInfo fcinfo,PGconn * conn,PGresult * res)887 materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
888 {
889 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
890 
891 	/* prepTuplestoreResult must have been called previously */
892 	Assert(rsinfo->returnMode == SFRM_Materialize);
893 
894 	PG_TRY();
895 	{
896 		TupleDesc	tupdesc;
897 		bool		is_sql_cmd;
898 		int			ntuples;
899 		int			nfields;
900 
901 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
902 		{
903 			is_sql_cmd = true;
904 
905 			/*
906 			 * need a tuple descriptor representing one TEXT column to return
907 			 * the command status string as our result tuple
908 			 */
909 			tupdesc = CreateTemplateTupleDesc(1);
910 			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
911 							   TEXTOID, -1, 0);
912 			ntuples = 1;
913 			nfields = 1;
914 		}
915 		else
916 		{
917 			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
918 
919 			is_sql_cmd = false;
920 
921 			/* get a tuple descriptor for our result type */
922 			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
923 			{
924 				case TYPEFUNC_COMPOSITE:
925 					/* success */
926 					break;
927 				case TYPEFUNC_RECORD:
928 					/* failed to determine actual type of RECORD */
929 					ereport(ERROR,
930 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
931 							 errmsg("function returning record called in context "
932 									"that cannot accept type record")));
933 					break;
934 				default:
935 					/* result type isn't composite */
936 					elog(ERROR, "return type must be a row type");
937 					break;
938 			}
939 
940 			/* make sure we have a persistent copy of the tupdesc */
941 			tupdesc = CreateTupleDescCopy(tupdesc);
942 			ntuples = PQntuples(res);
943 			nfields = PQnfields(res);
944 		}
945 
946 		/*
947 		 * check result and tuple descriptor have the same number of columns
948 		 */
949 		if (nfields != tupdesc->natts)
950 			ereport(ERROR,
951 					(errcode(ERRCODE_DATATYPE_MISMATCH),
952 					 errmsg("remote query result rowtype does not match "
953 							"the specified FROM clause rowtype")));
954 
955 		if (ntuples > 0)
956 		{
957 			AttInMetadata *attinmeta;
958 			int			nestlevel = -1;
959 			Tuplestorestate *tupstore;
960 			MemoryContext oldcontext;
961 			int			row;
962 			char	  **values;
963 
964 			attinmeta = TupleDescGetAttInMetadata(tupdesc);
965 
966 			/* Set GUCs to ensure we read GUC-sensitive data types correctly */
967 			if (!is_sql_cmd)
968 				nestlevel = applyRemoteGucs(conn);
969 
970 			oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
971 			tupstore = tuplestore_begin_heap(true, false, work_mem);
972 			rsinfo->setResult = tupstore;
973 			rsinfo->setDesc = tupdesc;
974 			MemoryContextSwitchTo(oldcontext);
975 
976 			values = (char **) palloc(nfields * sizeof(char *));
977 
978 			/* put all tuples into the tuplestore */
979 			for (row = 0; row < ntuples; row++)
980 			{
981 				HeapTuple	tuple;
982 
983 				if (!is_sql_cmd)
984 				{
985 					int			i;
986 
987 					for (i = 0; i < nfields; i++)
988 					{
989 						if (PQgetisnull(res, row, i))
990 							values[i] = NULL;
991 						else
992 							values[i] = PQgetvalue(res, row, i);
993 					}
994 				}
995 				else
996 				{
997 					values[0] = PQcmdStatus(res);
998 				}
999 
1000 				/* build the tuple and put it into the tuplestore. */
1001 				tuple = BuildTupleFromCStrings(attinmeta, values);
1002 				tuplestore_puttuple(tupstore, tuple);
1003 			}
1004 
1005 			/* clean up GUC settings, if we changed any */
1006 			restoreLocalGucs(nestlevel);
1007 
1008 			/* clean up and return the tuplestore */
1009 			tuplestore_donestoring(tupstore);
1010 		}
1011 	}
1012 	PG_FINALLY();
1013 	{
1014 		/* be sure to release the libpq result */
1015 		PQclear(res);
1016 	}
1017 	PG_END_TRY();
1018 }
1019 
1020 /*
1021  * Execute the given SQL command and store its results into a tuplestore
1022  * to be returned as the result of the current function.
1023  *
1024  * This is equivalent to PQexec followed by materializeResult, but we make
1025  * use of libpq's single-row mode to avoid accumulating the whole result
1026  * inside libpq before it gets transferred to the tuplestore.
1027  */
1028 static void
materializeQueryResult(FunctionCallInfo fcinfo,PGconn * conn,const char * conname,const char * sql,bool fail)1029 materializeQueryResult(FunctionCallInfo fcinfo,
1030 					   PGconn *conn,
1031 					   const char *conname,
1032 					   const char *sql,
1033 					   bool fail)
1034 {
1035 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1036 	PGresult   *volatile res = NULL;
1037 	volatile storeInfo sinfo = {0};
1038 
1039 	/* prepTuplestoreResult must have been called previously */
1040 	Assert(rsinfo->returnMode == SFRM_Materialize);
1041 
1042 	sinfo.fcinfo = fcinfo;
1043 
1044 	PG_TRY();
1045 	{
1046 		/* Create short-lived memory context for data conversions */
1047 		sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
1048 												 "dblink temporary context",
1049 												 ALLOCSET_DEFAULT_SIZES);
1050 
1051 		/* execute query, collecting any tuples into the tuplestore */
1052 		res = storeQueryResult(&sinfo, conn, sql);
1053 
1054 		if (!res ||
1055 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
1056 			 PQresultStatus(res) != PGRES_TUPLES_OK))
1057 		{
1058 			/*
1059 			 * dblink_res_error will clear the passed PGresult, so we need
1060 			 * this ugly dance to avoid doing so twice during error exit
1061 			 */
1062 			PGresult   *res1 = res;
1063 
1064 			res = NULL;
1065 			dblink_res_error(conn, conname, res1, fail,
1066 							 "while executing query");
1067 			/* if fail isn't set, we'll return an empty query result */
1068 		}
1069 		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1070 		{
1071 			/*
1072 			 * storeRow didn't get called, so we need to convert the command
1073 			 * status string to a tuple manually
1074 			 */
1075 			TupleDesc	tupdesc;
1076 			AttInMetadata *attinmeta;
1077 			Tuplestorestate *tupstore;
1078 			HeapTuple	tuple;
1079 			char	   *values[1];
1080 			MemoryContext oldcontext;
1081 
1082 			/*
1083 			 * need a tuple descriptor representing one TEXT column to return
1084 			 * the command status string as our result tuple
1085 			 */
1086 			tupdesc = CreateTemplateTupleDesc(1);
1087 			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1088 							   TEXTOID, -1, 0);
1089 			attinmeta = TupleDescGetAttInMetadata(tupdesc);
1090 
1091 			oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1092 			tupstore = tuplestore_begin_heap(true, false, work_mem);
1093 			rsinfo->setResult = tupstore;
1094 			rsinfo->setDesc = tupdesc;
1095 			MemoryContextSwitchTo(oldcontext);
1096 
1097 			values[0] = PQcmdStatus(res);
1098 
1099 			/* build the tuple and put it into the tuplestore. */
1100 			tuple = BuildTupleFromCStrings(attinmeta, values);
1101 			tuplestore_puttuple(tupstore, tuple);
1102 
1103 			PQclear(res);
1104 			res = NULL;
1105 		}
1106 		else
1107 		{
1108 			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1109 			/* storeRow should have created a tuplestore */
1110 			Assert(rsinfo->setResult != NULL);
1111 
1112 			PQclear(res);
1113 			res = NULL;
1114 		}
1115 
1116 		/* clean up data conversion short-lived memory context */
1117 		if (sinfo.tmpcontext != NULL)
1118 			MemoryContextDelete(sinfo.tmpcontext);
1119 		sinfo.tmpcontext = NULL;
1120 
1121 		PQclear(sinfo.last_res);
1122 		sinfo.last_res = NULL;
1123 		PQclear(sinfo.cur_res);
1124 		sinfo.cur_res = NULL;
1125 	}
1126 	PG_CATCH();
1127 	{
1128 		/* be sure to release any libpq result we collected */
1129 		PQclear(res);
1130 		PQclear(sinfo.last_res);
1131 		PQclear(sinfo.cur_res);
1132 		/* and clear out any pending data in libpq */
1133 		while ((res = PQgetResult(conn)) != NULL)
1134 			PQclear(res);
1135 		PG_RE_THROW();
1136 	}
1137 	PG_END_TRY();
1138 }
1139 
1140 /*
1141  * Execute query, and send any result rows to sinfo->tuplestore.
1142  */
1143 static PGresult *
storeQueryResult(volatile storeInfo * sinfo,PGconn * conn,const char * sql)1144 storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
1145 {
1146 	bool		first = true;
1147 	int			nestlevel = -1;
1148 	PGresult   *res;
1149 
1150 	if (!PQsendQuery(conn, sql))
1151 		elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1152 
1153 	if (!PQsetSingleRowMode(conn))	/* shouldn't fail */
1154 		elog(ERROR, "failed to set single-row mode for dblink query");
1155 
1156 	for (;;)
1157 	{
1158 		CHECK_FOR_INTERRUPTS();
1159 
1160 		sinfo->cur_res = PQgetResult(conn);
1161 		if (!sinfo->cur_res)
1162 			break;
1163 
1164 		if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1165 		{
1166 			/* got one row from possibly-bigger resultset */
1167 
1168 			/*
1169 			 * Set GUCs to ensure we read GUC-sensitive data types correctly.
1170 			 * We shouldn't do this until we have a row in hand, to ensure
1171 			 * libpq has seen any earlier ParameterStatus protocol messages.
1172 			 */
1173 			if (first && nestlevel < 0)
1174 				nestlevel = applyRemoteGucs(conn);
1175 
1176 			storeRow(sinfo, sinfo->cur_res, first);
1177 
1178 			PQclear(sinfo->cur_res);
1179 			sinfo->cur_res = NULL;
1180 			first = false;
1181 		}
1182 		else
1183 		{
1184 			/* if empty resultset, fill tuplestore header */
1185 			if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1186 				storeRow(sinfo, sinfo->cur_res, first);
1187 
1188 			/* store completed result at last_res */
1189 			PQclear(sinfo->last_res);
1190 			sinfo->last_res = sinfo->cur_res;
1191 			sinfo->cur_res = NULL;
1192 			first = true;
1193 		}
1194 	}
1195 
1196 	/* clean up GUC settings, if we changed any */
1197 	restoreLocalGucs(nestlevel);
1198 
1199 	/* return last_res */
1200 	res = sinfo->last_res;
1201 	sinfo->last_res = NULL;
1202 	return res;
1203 }
1204 
1205 /*
1206  * Send single row to sinfo->tuplestore.
1207  *
1208  * If "first" is true, create the tuplestore using PGresult's metadata
1209  * (in this case the PGresult might contain either zero or one row).
1210  */
1211 static void
storeRow(volatile storeInfo * sinfo,PGresult * res,bool first)1212 storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
1213 {
1214 	int			nfields = PQnfields(res);
1215 	HeapTuple	tuple;
1216 	int			i;
1217 	MemoryContext oldcontext;
1218 
1219 	if (first)
1220 	{
1221 		/* Prepare for new result set */
1222 		ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1223 		TupleDesc	tupdesc;
1224 
1225 		/*
1226 		 * It's possible to get more than one result set if the query string
1227 		 * contained multiple SQL commands.  In that case, we follow PQexec's
1228 		 * traditional behavior of throwing away all but the last result.
1229 		 */
1230 		if (sinfo->tuplestore)
1231 			tuplestore_end(sinfo->tuplestore);
1232 		sinfo->tuplestore = NULL;
1233 
1234 		/* get a tuple descriptor for our result type */
1235 		switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1236 		{
1237 			case TYPEFUNC_COMPOSITE:
1238 				/* success */
1239 				break;
1240 			case TYPEFUNC_RECORD:
1241 				/* failed to determine actual type of RECORD */
1242 				ereport(ERROR,
1243 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1244 						 errmsg("function returning record called in context "
1245 								"that cannot accept type record")));
1246 				break;
1247 			default:
1248 				/* result type isn't composite */
1249 				elog(ERROR, "return type must be a row type");
1250 				break;
1251 		}
1252 
1253 		/* make sure we have a persistent copy of the tupdesc */
1254 		tupdesc = CreateTupleDescCopy(tupdesc);
1255 
1256 		/* check result and tuple descriptor have the same number of columns */
1257 		if (nfields != tupdesc->natts)
1258 			ereport(ERROR,
1259 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1260 					 errmsg("remote query result rowtype does not match "
1261 							"the specified FROM clause rowtype")));
1262 
1263 		/* Prepare attinmeta for later data conversions */
1264 		sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1265 
1266 		/* Create a new, empty tuplestore */
1267 		oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1268 		sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1269 		rsinfo->setResult = sinfo->tuplestore;
1270 		rsinfo->setDesc = tupdesc;
1271 		MemoryContextSwitchTo(oldcontext);
1272 
1273 		/* Done if empty resultset */
1274 		if (PQntuples(res) == 0)
1275 			return;
1276 
1277 		/*
1278 		 * Set up sufficiently-wide string pointers array; this won't change
1279 		 * in size so it's easy to preallocate.
1280 		 */
1281 		if (sinfo->cstrs)
1282 			pfree(sinfo->cstrs);
1283 		sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
1284 	}
1285 
1286 	/* Should have a single-row result if we get here */
1287 	Assert(PQntuples(res) == 1);
1288 
1289 	/*
1290 	 * Do the following work in a temp context that we reset after each tuple.
1291 	 * This cleans up not only the data we have direct access to, but any
1292 	 * cruft the I/O functions might leak.
1293 	 */
1294 	oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1295 
1296 	/*
1297 	 * Fill cstrs with null-terminated strings of column values.
1298 	 */
1299 	for (i = 0; i < nfields; i++)
1300 	{
1301 		if (PQgetisnull(res, 0, i))
1302 			sinfo->cstrs[i] = NULL;
1303 		else
1304 			sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1305 	}
1306 
1307 	/* Convert row to a tuple, and add it to the tuplestore */
1308 	tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1309 
1310 	tuplestore_puttuple(sinfo->tuplestore, tuple);
1311 
1312 	/* Clean up */
1313 	MemoryContextSwitchTo(oldcontext);
1314 	MemoryContextReset(sinfo->tmpcontext);
1315 }
1316 
1317 /*
1318  * List all open dblink connections by name.
1319  * Returns an array of all connection names.
1320  * Takes no params
1321  */
1322 PG_FUNCTION_INFO_V1(dblink_get_connections);
1323 Datum
dblink_get_connections(PG_FUNCTION_ARGS)1324 dblink_get_connections(PG_FUNCTION_ARGS)
1325 {
1326 	HASH_SEQ_STATUS status;
1327 	remoteConnHashEnt *hentry;
1328 	ArrayBuildState *astate = NULL;
1329 
1330 	if (remoteConnHash)
1331 	{
1332 		hash_seq_init(&status, remoteConnHash);
1333 		while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1334 		{
1335 			/* stash away current value */
1336 			astate = accumArrayResult(astate,
1337 									  CStringGetTextDatum(hentry->name),
1338 									  false, TEXTOID, CurrentMemoryContext);
1339 		}
1340 	}
1341 
1342 	if (astate)
1343 		PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
1344 											  CurrentMemoryContext));
1345 	else
1346 		PG_RETURN_NULL();
1347 }
1348 
1349 /*
1350  * Checks if a given remote connection is busy
1351  *
1352  * Returns 1 if the connection is busy, 0 otherwise
1353  * Params:
1354  *	text connection_name - name of the connection to check
1355  *
1356  */
1357 PG_FUNCTION_INFO_V1(dblink_is_busy);
1358 Datum
dblink_is_busy(PG_FUNCTION_ARGS)1359 dblink_is_busy(PG_FUNCTION_ARGS)
1360 {
1361 	PGconn	   *conn;
1362 
1363 	dblink_init();
1364 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1365 
1366 	PQconsumeInput(conn);
1367 	PG_RETURN_INT32(PQisBusy(conn));
1368 }
1369 
1370 /*
1371  * Cancels a running request on a connection
1372  *
1373  * Returns text:
1374  *	"OK" if the cancel request has been sent correctly,
1375  *		an error message otherwise
1376  *
1377  * Params:
1378  *	text connection_name - name of the connection to check
1379  *
1380  */
1381 PG_FUNCTION_INFO_V1(dblink_cancel_query);
1382 Datum
dblink_cancel_query(PG_FUNCTION_ARGS)1383 dblink_cancel_query(PG_FUNCTION_ARGS)
1384 {
1385 	int			res;
1386 	PGconn	   *conn;
1387 	PGcancel   *cancel;
1388 	char		errbuf[256];
1389 
1390 	dblink_init();
1391 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1392 	cancel = PQgetCancel(conn);
1393 
1394 	res = PQcancel(cancel, errbuf, 256);
1395 	PQfreeCancel(cancel);
1396 
1397 	if (res == 1)
1398 		PG_RETURN_TEXT_P(cstring_to_text("OK"));
1399 	else
1400 		PG_RETURN_TEXT_P(cstring_to_text(errbuf));
1401 }
1402 
1403 
1404 /*
1405  * Get error message from a connection
1406  *
1407  * Returns text:
1408  *	"OK" if no error, an error message otherwise
1409  *
1410  * Params:
1411  *	text connection_name - name of the connection to check
1412  *
1413  */
1414 PG_FUNCTION_INFO_V1(dblink_error_message);
1415 Datum
dblink_error_message(PG_FUNCTION_ARGS)1416 dblink_error_message(PG_FUNCTION_ARGS)
1417 {
1418 	char	   *msg;
1419 	PGconn	   *conn;
1420 
1421 	dblink_init();
1422 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1423 
1424 	msg = PQerrorMessage(conn);
1425 	if (msg == NULL || msg[0] == '\0')
1426 		PG_RETURN_TEXT_P(cstring_to_text("OK"));
1427 	else
1428 		PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1429 }
1430 
1431 /*
1432  * Execute an SQL non-SELECT command
1433  */
1434 PG_FUNCTION_INFO_V1(dblink_exec);
1435 Datum
dblink_exec(PG_FUNCTION_ARGS)1436 dblink_exec(PG_FUNCTION_ARGS)
1437 {
1438 	text	   *volatile sql_cmd_status = NULL;
1439 	PGconn	   *volatile conn = NULL;
1440 	volatile bool freeconn = false;
1441 
1442 	dblink_init();
1443 
1444 	PG_TRY();
1445 	{
1446 		PGresult   *res = NULL;
1447 		char	   *sql = NULL;
1448 		char	   *conname = NULL;
1449 		bool		fail = true;	/* default to backward compatible behavior */
1450 
1451 		if (PG_NARGS() == 3)
1452 		{
1453 			/* must be text,text,bool */
1454 			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1455 			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1456 			fail = PG_GETARG_BOOL(2);
1457 			dblink_get_conn(conname, &conn, &conname, &freeconn);
1458 		}
1459 		else if (PG_NARGS() == 2)
1460 		{
1461 			/* might be text,text or text,bool */
1462 			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1463 			{
1464 				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1465 				fail = PG_GETARG_BOOL(1);
1466 				conn = pconn->conn;
1467 			}
1468 			else
1469 			{
1470 				conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1471 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1472 				dblink_get_conn(conname, &conn, &conname, &freeconn);
1473 			}
1474 		}
1475 		else if (PG_NARGS() == 1)
1476 		{
1477 			/* must be single text argument */
1478 			conn = pconn->conn;
1479 			sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1480 		}
1481 		else
1482 			/* shouldn't happen */
1483 			elog(ERROR, "wrong number of arguments");
1484 
1485 		if (!conn)
1486 			dblink_conn_not_avail(conname);
1487 
1488 		res = PQexec(conn, sql);
1489 		if (!res ||
1490 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
1491 			 PQresultStatus(res) != PGRES_TUPLES_OK))
1492 		{
1493 			dblink_res_error(conn, conname, res, fail,
1494 							 "while executing command");
1495 
1496 			/*
1497 			 * and save a copy of the command status string to return as our
1498 			 * result tuple
1499 			 */
1500 			sql_cmd_status = cstring_to_text("ERROR");
1501 		}
1502 		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1503 		{
1504 			/*
1505 			 * and save a copy of the command status string to return as our
1506 			 * result tuple
1507 			 */
1508 			sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1509 			PQclear(res);
1510 		}
1511 		else
1512 		{
1513 			PQclear(res);
1514 			ereport(ERROR,
1515 					(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1516 					 errmsg("statement returning results not allowed")));
1517 		}
1518 	}
1519 	PG_FINALLY();
1520 	{
1521 		/* if needed, close the connection to the database */
1522 		if (freeconn)
1523 		{
1524 			PQfinish(conn);
1525 			ReleaseExternalFD();
1526 		}
1527 	}
1528 	PG_END_TRY();
1529 
1530 	PG_RETURN_TEXT_P(sql_cmd_status);
1531 }
1532 
1533 
1534 /*
1535  * dblink_get_pkey
1536  *
1537  * Return list of primary key fields for the supplied relation,
1538  * or NULL if none exists.
1539  */
1540 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1541 Datum
dblink_get_pkey(PG_FUNCTION_ARGS)1542 dblink_get_pkey(PG_FUNCTION_ARGS)
1543 {
1544 	int16		indnkeyatts;
1545 	char	  **results;
1546 	FuncCallContext *funcctx;
1547 	int32		call_cntr;
1548 	int32		max_calls;
1549 	AttInMetadata *attinmeta;
1550 	MemoryContext oldcontext;
1551 
1552 	/* stuff done only on the first call of the function */
1553 	if (SRF_IS_FIRSTCALL())
1554 	{
1555 		Relation	rel;
1556 		TupleDesc	tupdesc;
1557 
1558 		/* create a function context for cross-call persistence */
1559 		funcctx = SRF_FIRSTCALL_INIT();
1560 
1561 		/*
1562 		 * switch to memory context appropriate for multiple function calls
1563 		 */
1564 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1565 
1566 		/* open target relation */
1567 		rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1568 
1569 		/* get the array of attnums */
1570 		results = get_pkey_attnames(rel, &indnkeyatts);
1571 
1572 		relation_close(rel, AccessShareLock);
1573 
1574 		/*
1575 		 * need a tuple descriptor representing one INT and one TEXT column
1576 		 */
1577 		tupdesc = CreateTemplateTupleDesc(2);
1578 		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1579 						   INT4OID, -1, 0);
1580 		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1581 						   TEXTOID, -1, 0);
1582 
1583 		/*
1584 		 * Generate attribute metadata needed later to produce tuples from raw
1585 		 * C strings
1586 		 */
1587 		attinmeta = TupleDescGetAttInMetadata(tupdesc);
1588 		funcctx->attinmeta = attinmeta;
1589 
1590 		if ((results != NULL) && (indnkeyatts > 0))
1591 		{
1592 			funcctx->max_calls = indnkeyatts;
1593 
1594 			/* got results, keep track of them */
1595 			funcctx->user_fctx = results;
1596 		}
1597 		else
1598 		{
1599 			/* fast track when no results */
1600 			MemoryContextSwitchTo(oldcontext);
1601 			SRF_RETURN_DONE(funcctx);
1602 		}
1603 
1604 		MemoryContextSwitchTo(oldcontext);
1605 	}
1606 
1607 	/* stuff done on every call of the function */
1608 	funcctx = SRF_PERCALL_SETUP();
1609 
1610 	/*
1611 	 * initialize per-call variables
1612 	 */
1613 	call_cntr = funcctx->call_cntr;
1614 	max_calls = funcctx->max_calls;
1615 
1616 	results = (char **) funcctx->user_fctx;
1617 	attinmeta = funcctx->attinmeta;
1618 
1619 	if (call_cntr < max_calls)	/* do when there is more left to send */
1620 	{
1621 		char	  **values;
1622 		HeapTuple	tuple;
1623 		Datum		result;
1624 
1625 		values = (char **) palloc(2 * sizeof(char *));
1626 		values[0] = psprintf("%d", call_cntr + 1);
1627 		values[1] = results[call_cntr];
1628 
1629 		/* build the tuple */
1630 		tuple = BuildTupleFromCStrings(attinmeta, values);
1631 
1632 		/* make the tuple into a datum */
1633 		result = HeapTupleGetDatum(tuple);
1634 
1635 		SRF_RETURN_NEXT(funcctx, result);
1636 	}
1637 	else
1638 	{
1639 		/* do when there is no more left */
1640 		SRF_RETURN_DONE(funcctx);
1641 	}
1642 }
1643 
1644 
1645 /*
1646  * dblink_build_sql_insert
1647  *
1648  * Used to generate an SQL insert statement
1649  * based on an existing tuple in a local relation.
1650  * This is useful for selectively replicating data
1651  * to another server via dblink.
1652  *
1653  * API:
1654  * <relname> - name of local table of interest
1655  * <pkattnums> - an int2vector of attnums which will be used
1656  * to identify the local tuple of interest
1657  * <pknumatts> - number of attnums in pkattnums
1658  * <src_pkattvals_arry> - text array of key values which will be used
1659  * to identify the local tuple of interest
1660  * <tgt_pkattvals_arry> - text array of key values which will be used
1661  * to build the string for execution remotely. These are substituted
1662  * for their counterparts in src_pkattvals_arry
1663  */
1664 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1665 Datum
dblink_build_sql_insert(PG_FUNCTION_ARGS)1666 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1667 {
1668 	text	   *relname_text = PG_GETARG_TEXT_PP(0);
1669 	int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1670 	int32		pknumatts_arg = PG_GETARG_INT32(2);
1671 	ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1672 	ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1673 	Relation	rel;
1674 	int		   *pkattnums;
1675 	int			pknumatts;
1676 	char	  **src_pkattvals;
1677 	char	  **tgt_pkattvals;
1678 	int			src_nitems;
1679 	int			tgt_nitems;
1680 	char	   *sql;
1681 
1682 	/*
1683 	 * Open target relation.
1684 	 */
1685 	rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1686 
1687 	/*
1688 	 * Process pkattnums argument.
1689 	 */
1690 	validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1691 					   &pkattnums, &pknumatts);
1692 
1693 	/*
1694 	 * Source array is made up of key values that will be used to locate the
1695 	 * tuple of interest from the local system.
1696 	 */
1697 	src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1698 
1699 	/*
1700 	 * There should be one source array key value for each key attnum
1701 	 */
1702 	if (src_nitems != pknumatts)
1703 		ereport(ERROR,
1704 				(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1705 				 errmsg("source key array length must match number of key attributes")));
1706 
1707 	/*
1708 	 * Target array is made up of key values that will be used to build the
1709 	 * SQL string for use on the remote system.
1710 	 */
1711 	tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1712 
1713 	/*
1714 	 * There should be one target array key value for each key attnum
1715 	 */
1716 	if (tgt_nitems != pknumatts)
1717 		ereport(ERROR,
1718 				(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1719 				 errmsg("target key array length must match number of key attributes")));
1720 
1721 	/*
1722 	 * Prep work is finally done. Go get the SQL string.
1723 	 */
1724 	sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1725 
1726 	/*
1727 	 * Now we can close the relation.
1728 	 */
1729 	relation_close(rel, AccessShareLock);
1730 
1731 	/*
1732 	 * And send it
1733 	 */
1734 	PG_RETURN_TEXT_P(cstring_to_text(sql));
1735 }
1736 
1737 
1738 /*
1739  * dblink_build_sql_delete
1740  *
1741  * Used to generate an SQL delete statement.
1742  * This is useful for selectively replicating a
1743  * delete to another server via dblink.
1744  *
1745  * API:
1746  * <relname> - name of remote table of interest
1747  * <pkattnums> - an int2vector of attnums which will be used
1748  * to identify the remote tuple of interest
1749  * <pknumatts> - number of attnums in pkattnums
1750  * <tgt_pkattvals_arry> - text array of key values which will be used
1751  * to build the string for execution remotely.
1752  */
1753 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1754 Datum
dblink_build_sql_delete(PG_FUNCTION_ARGS)1755 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1756 {
1757 	text	   *relname_text = PG_GETARG_TEXT_PP(0);
1758 	int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1759 	int32		pknumatts_arg = PG_GETARG_INT32(2);
1760 	ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1761 	Relation	rel;
1762 	int		   *pkattnums;
1763 	int			pknumatts;
1764 	char	  **tgt_pkattvals;
1765 	int			tgt_nitems;
1766 	char	   *sql;
1767 
1768 	/*
1769 	 * Open target relation.
1770 	 */
1771 	rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1772 
1773 	/*
1774 	 * Process pkattnums argument.
1775 	 */
1776 	validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1777 					   &pkattnums, &pknumatts);
1778 
1779 	/*
1780 	 * Target array is made up of key values that will be used to build the
1781 	 * SQL string for use on the remote system.
1782 	 */
1783 	tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1784 
1785 	/*
1786 	 * There should be one target array key value for each key attnum
1787 	 */
1788 	if (tgt_nitems != pknumatts)
1789 		ereport(ERROR,
1790 				(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1791 				 errmsg("target key array length must match number of key attributes")));
1792 
1793 	/*
1794 	 * Prep work is finally done. Go get the SQL string.
1795 	 */
1796 	sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1797 
1798 	/*
1799 	 * Now we can close the relation.
1800 	 */
1801 	relation_close(rel, AccessShareLock);
1802 
1803 	/*
1804 	 * And send it
1805 	 */
1806 	PG_RETURN_TEXT_P(cstring_to_text(sql));
1807 }
1808 
1809 
1810 /*
1811  * dblink_build_sql_update
1812  *
1813  * Used to generate an SQL update statement
1814  * based on an existing tuple in a local relation.
1815  * This is useful for selectively replicating data
1816  * to another server via dblink.
1817  *
1818  * API:
1819  * <relname> - name of local table of interest
1820  * <pkattnums> - an int2vector of attnums which will be used
1821  * to identify the local tuple of interest
1822  * <pknumatts> - number of attnums in pkattnums
1823  * <src_pkattvals_arry> - text array of key values which will be used
1824  * to identify the local tuple of interest
1825  * <tgt_pkattvals_arry> - text array of key values which will be used
1826  * to build the string for execution remotely. These are substituted
1827  * for their counterparts in src_pkattvals_arry
1828  */
1829 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1830 Datum
dblink_build_sql_update(PG_FUNCTION_ARGS)1831 dblink_build_sql_update(PG_FUNCTION_ARGS)
1832 {
1833 	text	   *relname_text = PG_GETARG_TEXT_PP(0);
1834 	int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1835 	int32		pknumatts_arg = PG_GETARG_INT32(2);
1836 	ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1837 	ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1838 	Relation	rel;
1839 	int		   *pkattnums;
1840 	int			pknumatts;
1841 	char	  **src_pkattvals;
1842 	char	  **tgt_pkattvals;
1843 	int			src_nitems;
1844 	int			tgt_nitems;
1845 	char	   *sql;
1846 
1847 	/*
1848 	 * Open target relation.
1849 	 */
1850 	rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1851 
1852 	/*
1853 	 * Process pkattnums argument.
1854 	 */
1855 	validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1856 					   &pkattnums, &pknumatts);
1857 
1858 	/*
1859 	 * Source array is made up of key values that will be used to locate the
1860 	 * tuple of interest from the local system.
1861 	 */
1862 	src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1863 
1864 	/*
1865 	 * There should be one source array key value for each key attnum
1866 	 */
1867 	if (src_nitems != pknumatts)
1868 		ereport(ERROR,
1869 				(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1870 				 errmsg("source key array length must match number of key attributes")));
1871 
1872 	/*
1873 	 * Target array is made up of key values that will be used to build the
1874 	 * SQL string for use on the remote system.
1875 	 */
1876 	tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1877 
1878 	/*
1879 	 * There should be one target array key value for each key attnum
1880 	 */
1881 	if (tgt_nitems != pknumatts)
1882 		ereport(ERROR,
1883 				(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1884 				 errmsg("target key array length must match number of key attributes")));
1885 
1886 	/*
1887 	 * Prep work is finally done. Go get the SQL string.
1888 	 */
1889 	sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1890 
1891 	/*
1892 	 * Now we can close the relation.
1893 	 */
1894 	relation_close(rel, AccessShareLock);
1895 
1896 	/*
1897 	 * And send it
1898 	 */
1899 	PG_RETURN_TEXT_P(cstring_to_text(sql));
1900 }
1901 
1902 /*
1903  * dblink_current_query
1904  * return the current query string
1905  * to allow its use in (among other things)
1906  * rewrite rules
1907  */
1908 PG_FUNCTION_INFO_V1(dblink_current_query);
1909 Datum
dblink_current_query(PG_FUNCTION_ARGS)1910 dblink_current_query(PG_FUNCTION_ARGS)
1911 {
1912 	/* This is now just an alias for the built-in function current_query() */
1913 	PG_RETURN_DATUM(current_query(fcinfo));
1914 }
1915 
1916 /*
1917  * Retrieve async notifications for a connection.
1918  *
1919  * Returns a setof record of notifications, or an empty set if none received.
1920  * Can optionally take a named connection as parameter, but uses the unnamed
1921  * connection per default.
1922  *
1923  */
1924 #define DBLINK_NOTIFY_COLS		3
1925 
1926 PG_FUNCTION_INFO_V1(dblink_get_notify);
1927 Datum
dblink_get_notify(PG_FUNCTION_ARGS)1928 dblink_get_notify(PG_FUNCTION_ARGS)
1929 {
1930 	PGconn	   *conn;
1931 	PGnotify   *notify;
1932 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1933 	TupleDesc	tupdesc;
1934 	Tuplestorestate *tupstore;
1935 	MemoryContext per_query_ctx;
1936 	MemoryContext oldcontext;
1937 
1938 	prepTuplestoreResult(fcinfo);
1939 
1940 	dblink_init();
1941 	if (PG_NARGS() == 1)
1942 		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1943 	else
1944 		conn = pconn->conn;
1945 
1946 	/* create the tuplestore in per-query memory */
1947 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1948 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
1949 
1950 	tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS);
1951 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
1952 					   TEXTOID, -1, 0);
1953 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
1954 					   INT4OID, -1, 0);
1955 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
1956 					   TEXTOID, -1, 0);
1957 
1958 	tupstore = tuplestore_begin_heap(true, false, work_mem);
1959 	rsinfo->setResult = tupstore;
1960 	rsinfo->setDesc = tupdesc;
1961 
1962 	MemoryContextSwitchTo(oldcontext);
1963 
1964 	PQconsumeInput(conn);
1965 	while ((notify = PQnotifies(conn)) != NULL)
1966 	{
1967 		Datum		values[DBLINK_NOTIFY_COLS];
1968 		bool		nulls[DBLINK_NOTIFY_COLS];
1969 
1970 		memset(values, 0, sizeof(values));
1971 		memset(nulls, 0, sizeof(nulls));
1972 
1973 		if (notify->relname != NULL)
1974 			values[0] = CStringGetTextDatum(notify->relname);
1975 		else
1976 			nulls[0] = true;
1977 
1978 		values[1] = Int32GetDatum(notify->be_pid);
1979 
1980 		if (notify->extra != NULL)
1981 			values[2] = CStringGetTextDatum(notify->extra);
1982 		else
1983 			nulls[2] = true;
1984 
1985 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1986 
1987 		PQfreemem(notify);
1988 		PQconsumeInput(conn);
1989 	}
1990 
1991 	/* clean up and return the tuplestore */
1992 	tuplestore_donestoring(tupstore);
1993 
1994 	return (Datum) 0;
1995 }
1996 
1997 /*
1998  * Validate the options given to a dblink foreign server or user mapping.
1999  * Raise an error if any option is invalid.
2000  *
2001  * We just check the names of options here, so semantic errors in options,
2002  * such as invalid numeric format, will be detected at the attempt to connect.
2003  */
2004 PG_FUNCTION_INFO_V1(dblink_fdw_validator);
2005 Datum
dblink_fdw_validator(PG_FUNCTION_ARGS)2006 dblink_fdw_validator(PG_FUNCTION_ARGS)
2007 {
2008 	List	   *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
2009 	Oid			context = PG_GETARG_OID(1);
2010 	ListCell   *cell;
2011 
2012 	static const PQconninfoOption *options = NULL;
2013 
2014 	/*
2015 	 * Get list of valid libpq options.
2016 	 *
2017 	 * To avoid unnecessary work, we get the list once and use it throughout
2018 	 * the lifetime of this backend process.  We don't need to care about
2019 	 * memory context issues, because PQconndefaults allocates with malloc.
2020 	 */
2021 	if (!options)
2022 	{
2023 		options = PQconndefaults();
2024 		if (!options)			/* assume reason for failure is OOM */
2025 			ereport(ERROR,
2026 					(errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2027 					 errmsg("out of memory"),
2028 					 errdetail("Could not get libpq's default connection options.")));
2029 	}
2030 
2031 	/* Validate each supplied option. */
2032 	foreach(cell, options_list)
2033 	{
2034 		DefElem    *def = (DefElem *) lfirst(cell);
2035 
2036 		if (!is_valid_dblink_option(options, def->defname, context))
2037 		{
2038 			/*
2039 			 * Unknown option, or invalid option for the context specified, so
2040 			 * complain about it.  Provide a hint with list of valid options
2041 			 * for the context.
2042 			 */
2043 			StringInfoData buf;
2044 			const PQconninfoOption *opt;
2045 
2046 			initStringInfo(&buf);
2047 			for (opt = options; opt->keyword; opt++)
2048 			{
2049 				if (is_valid_dblink_option(options, opt->keyword, context))
2050 					appendStringInfo(&buf, "%s%s",
2051 									 (buf.len > 0) ? ", " : "",
2052 									 opt->keyword);
2053 			}
2054 			ereport(ERROR,
2055 					(errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
2056 					 errmsg("invalid option \"%s\"", def->defname),
2057 					 errhint("Valid options in this context are: %s",
2058 							 buf.data)));
2059 		}
2060 	}
2061 
2062 	PG_RETURN_VOID();
2063 }
2064 
2065 
2066 /*************************************************************
2067  * internal functions
2068  */
2069 
2070 
2071 /*
2072  * get_pkey_attnames
2073  *
2074  * Get the primary key attnames for the given relation.
2075  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2076  */
2077 static char **
get_pkey_attnames(Relation rel,int16 * indnkeyatts)2078 get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2079 {
2080 	Relation	indexRelation;
2081 	ScanKeyData skey;
2082 	SysScanDesc scan;
2083 	HeapTuple	indexTuple;
2084 	int			i;
2085 	char	  **result = NULL;
2086 	TupleDesc	tupdesc;
2087 
2088 	/* initialize indnkeyatts to 0 in case no primary key exists */
2089 	*indnkeyatts = 0;
2090 
2091 	tupdesc = rel->rd_att;
2092 
2093 	/* Prepare to scan pg_index for entries having indrelid = this rel. */
2094 	indexRelation = table_open(IndexRelationId, AccessShareLock);
2095 	ScanKeyInit(&skey,
2096 				Anum_pg_index_indrelid,
2097 				BTEqualStrategyNumber, F_OIDEQ,
2098 				ObjectIdGetDatum(RelationGetRelid(rel)));
2099 
2100 	scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2101 							  NULL, 1, &skey);
2102 
2103 	while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2104 	{
2105 		Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2106 
2107 		/* we're only interested if it is the primary key */
2108 		if (index->indisprimary)
2109 		{
2110 			*indnkeyatts = index->indnkeyatts;
2111 			if (*indnkeyatts > 0)
2112 			{
2113 				result = (char **) palloc(*indnkeyatts * sizeof(char *));
2114 
2115 				for (i = 0; i < *indnkeyatts; i++)
2116 					result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2117 			}
2118 			break;
2119 		}
2120 	}
2121 
2122 	systable_endscan(scan);
2123 	table_close(indexRelation, AccessShareLock);
2124 
2125 	return result;
2126 }
2127 
2128 /*
2129  * Deconstruct a text[] into C-strings (note any NULL elements will be
2130  * returned as NULL pointers)
2131  */
2132 static char **
get_text_array_contents(ArrayType * array,int * numitems)2133 get_text_array_contents(ArrayType *array, int *numitems)
2134 {
2135 	int			ndim = ARR_NDIM(array);
2136 	int		   *dims = ARR_DIMS(array);
2137 	int			nitems;
2138 	int16		typlen;
2139 	bool		typbyval;
2140 	char		typalign;
2141 	char	  **values;
2142 	char	   *ptr;
2143 	bits8	   *bitmap;
2144 	int			bitmask;
2145 	int			i;
2146 
2147 	Assert(ARR_ELEMTYPE(array) == TEXTOID);
2148 
2149 	*numitems = nitems = ArrayGetNItems(ndim, dims);
2150 
2151 	get_typlenbyvalalign(ARR_ELEMTYPE(array),
2152 						 &typlen, &typbyval, &typalign);
2153 
2154 	values = (char **) palloc(nitems * sizeof(char *));
2155 
2156 	ptr = ARR_DATA_PTR(array);
2157 	bitmap = ARR_NULLBITMAP(array);
2158 	bitmask = 1;
2159 
2160 	for (i = 0; i < nitems; i++)
2161 	{
2162 		if (bitmap && (*bitmap & bitmask) == 0)
2163 		{
2164 			values[i] = NULL;
2165 		}
2166 		else
2167 		{
2168 			values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2169 			ptr = att_addlength_pointer(ptr, typlen, ptr);
2170 			ptr = (char *) att_align_nominal(ptr, typalign);
2171 		}
2172 
2173 		/* advance bitmap pointer if any */
2174 		if (bitmap)
2175 		{
2176 			bitmask <<= 1;
2177 			if (bitmask == 0x100)
2178 			{
2179 				bitmap++;
2180 				bitmask = 1;
2181 			}
2182 		}
2183 	}
2184 
2185 	return values;
2186 }
2187 
2188 static char *
get_sql_insert(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals,char ** tgt_pkattvals)2189 get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2190 {
2191 	char	   *relname;
2192 	HeapTuple	tuple;
2193 	TupleDesc	tupdesc;
2194 	int			natts;
2195 	StringInfoData buf;
2196 	char	   *val;
2197 	int			key;
2198 	int			i;
2199 	bool		needComma;
2200 
2201 	initStringInfo(&buf);
2202 
2203 	/* get relation name including any needed schema prefix and quoting */
2204 	relname = generate_relation_name(rel);
2205 
2206 	tupdesc = rel->rd_att;
2207 	natts = tupdesc->natts;
2208 
2209 	tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2210 	if (!tuple)
2211 		ereport(ERROR,
2212 				(errcode(ERRCODE_CARDINALITY_VIOLATION),
2213 				 errmsg("source row not found")));
2214 
2215 	appendStringInfo(&buf, "INSERT INTO %s(", relname);
2216 
2217 	needComma = false;
2218 	for (i = 0; i < natts; i++)
2219 	{
2220 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2221 
2222 		if (att->attisdropped)
2223 			continue;
2224 
2225 		if (needComma)
2226 			appendStringInfoChar(&buf, ',');
2227 
2228 		appendStringInfoString(&buf,
2229 							   quote_ident_cstr(NameStr(att->attname)));
2230 		needComma = true;
2231 	}
2232 
2233 	appendStringInfoString(&buf, ") VALUES(");
2234 
2235 	/*
2236 	 * Note: i is physical column number (counting from 0).
2237 	 */
2238 	needComma = false;
2239 	for (i = 0; i < natts; i++)
2240 	{
2241 		if (TupleDescAttr(tupdesc, i)->attisdropped)
2242 			continue;
2243 
2244 		if (needComma)
2245 			appendStringInfoChar(&buf, ',');
2246 
2247 		key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2248 
2249 		if (key >= 0)
2250 			val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2251 		else
2252 			val = SPI_getvalue(tuple, tupdesc, i + 1);
2253 
2254 		if (val != NULL)
2255 		{
2256 			appendStringInfoString(&buf, quote_literal_cstr(val));
2257 			pfree(val);
2258 		}
2259 		else
2260 			appendStringInfoString(&buf, "NULL");
2261 		needComma = true;
2262 	}
2263 	appendStringInfoChar(&buf, ')');
2264 
2265 	return buf.data;
2266 }
2267 
2268 static char *
get_sql_delete(Relation rel,int * pkattnums,int pknumatts,char ** tgt_pkattvals)2269 get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2270 {
2271 	char	   *relname;
2272 	TupleDesc	tupdesc;
2273 	StringInfoData buf;
2274 	int			i;
2275 
2276 	initStringInfo(&buf);
2277 
2278 	/* get relation name including any needed schema prefix and quoting */
2279 	relname = generate_relation_name(rel);
2280 
2281 	tupdesc = rel->rd_att;
2282 
2283 	appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2284 	for (i = 0; i < pknumatts; i++)
2285 	{
2286 		int			pkattnum = pkattnums[i];
2287 		Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2288 
2289 		if (i > 0)
2290 			appendStringInfoString(&buf, " AND ");
2291 
2292 		appendStringInfoString(&buf,
2293 							   quote_ident_cstr(NameStr(attr->attname)));
2294 
2295 		if (tgt_pkattvals[i] != NULL)
2296 			appendStringInfo(&buf, " = %s",
2297 							 quote_literal_cstr(tgt_pkattvals[i]));
2298 		else
2299 			appendStringInfoString(&buf, " IS NULL");
2300 	}
2301 
2302 	return buf.data;
2303 }
2304 
2305 static char *
get_sql_update(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals,char ** tgt_pkattvals)2306 get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2307 {
2308 	char	   *relname;
2309 	HeapTuple	tuple;
2310 	TupleDesc	tupdesc;
2311 	int			natts;
2312 	StringInfoData buf;
2313 	char	   *val;
2314 	int			key;
2315 	int			i;
2316 	bool		needComma;
2317 
2318 	initStringInfo(&buf);
2319 
2320 	/* get relation name including any needed schema prefix and quoting */
2321 	relname = generate_relation_name(rel);
2322 
2323 	tupdesc = rel->rd_att;
2324 	natts = tupdesc->natts;
2325 
2326 	tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2327 	if (!tuple)
2328 		ereport(ERROR,
2329 				(errcode(ERRCODE_CARDINALITY_VIOLATION),
2330 				 errmsg("source row not found")));
2331 
2332 	appendStringInfo(&buf, "UPDATE %s SET ", relname);
2333 
2334 	/*
2335 	 * Note: i is physical column number (counting from 0).
2336 	 */
2337 	needComma = false;
2338 	for (i = 0; i < natts; i++)
2339 	{
2340 		Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2341 
2342 		if (attr->attisdropped)
2343 			continue;
2344 
2345 		if (needComma)
2346 			appendStringInfoString(&buf, ", ");
2347 
2348 		appendStringInfo(&buf, "%s = ",
2349 						 quote_ident_cstr(NameStr(attr->attname)));
2350 
2351 		key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2352 
2353 		if (key >= 0)
2354 			val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2355 		else
2356 			val = SPI_getvalue(tuple, tupdesc, i + 1);
2357 
2358 		if (val != NULL)
2359 		{
2360 			appendStringInfoString(&buf, quote_literal_cstr(val));
2361 			pfree(val);
2362 		}
2363 		else
2364 			appendStringInfoString(&buf, "NULL");
2365 		needComma = true;
2366 	}
2367 
2368 	appendStringInfoString(&buf, " WHERE ");
2369 
2370 	for (i = 0; i < pknumatts; i++)
2371 	{
2372 		int			pkattnum = pkattnums[i];
2373 		Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2374 
2375 		if (i > 0)
2376 			appendStringInfoString(&buf, " AND ");
2377 
2378 		appendStringInfoString(&buf,
2379 							   quote_ident_cstr(NameStr(attr->attname)));
2380 
2381 		val = tgt_pkattvals[i];
2382 
2383 		if (val != NULL)
2384 			appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2385 		else
2386 			appendStringInfoString(&buf, " IS NULL");
2387 	}
2388 
2389 	return buf.data;
2390 }
2391 
2392 /*
2393  * Return a properly quoted identifier.
2394  * Uses quote_ident in quote.c
2395  */
2396 static char *
quote_ident_cstr(char * rawstr)2397 quote_ident_cstr(char *rawstr)
2398 {
2399 	text	   *rawstr_text;
2400 	text	   *result_text;
2401 	char	   *result;
2402 
2403 	rawstr_text = cstring_to_text(rawstr);
2404 	result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2405 													 PointerGetDatum(rawstr_text)));
2406 	result = text_to_cstring(result_text);
2407 
2408 	return result;
2409 }
2410 
2411 static int
get_attnum_pk_pos(int * pkattnums,int pknumatts,int key)2412 get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2413 {
2414 	int			i;
2415 
2416 	/*
2417 	 * Not likely a long list anyway, so just scan for the value
2418 	 */
2419 	for (i = 0; i < pknumatts; i++)
2420 		if (key == pkattnums[i])
2421 			return i;
2422 
2423 	return -1;
2424 }
2425 
2426 static HeapTuple
get_tuple_of_interest(Relation rel,int * pkattnums,int pknumatts,char ** src_pkattvals)2427 get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2428 {
2429 	char	   *relname;
2430 	TupleDesc	tupdesc;
2431 	int			natts;
2432 	StringInfoData buf;
2433 	int			ret;
2434 	HeapTuple	tuple;
2435 	int			i;
2436 
2437 	/*
2438 	 * Connect to SPI manager
2439 	 */
2440 	if ((ret = SPI_connect()) < 0)
2441 		/* internal error */
2442 		elog(ERROR, "SPI connect failure - returned %d", ret);
2443 
2444 	initStringInfo(&buf);
2445 
2446 	/* get relation name including any needed schema prefix and quoting */
2447 	relname = generate_relation_name(rel);
2448 
2449 	tupdesc = rel->rd_att;
2450 	natts = tupdesc->natts;
2451 
2452 	/*
2453 	 * Build sql statement to look up tuple of interest, ie, the one matching
2454 	 * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
2455 	 * generate a result tuple that matches the table's physical structure,
2456 	 * with NULLs for any dropped columns.  Otherwise we have to deal with two
2457 	 * different tupdescs and everything's very confusing.
2458 	 */
2459 	appendStringInfoString(&buf, "SELECT ");
2460 
2461 	for (i = 0; i < natts; i++)
2462 	{
2463 		Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2464 
2465 		if (i > 0)
2466 			appendStringInfoString(&buf, ", ");
2467 
2468 		if (attr->attisdropped)
2469 			appendStringInfoString(&buf, "NULL");
2470 		else
2471 			appendStringInfoString(&buf,
2472 								   quote_ident_cstr(NameStr(attr->attname)));
2473 	}
2474 
2475 	appendStringInfo(&buf, " FROM %s WHERE ", relname);
2476 
2477 	for (i = 0; i < pknumatts; i++)
2478 	{
2479 		int			pkattnum = pkattnums[i];
2480 		Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2481 
2482 		if (i > 0)
2483 			appendStringInfoString(&buf, " AND ");
2484 
2485 		appendStringInfoString(&buf,
2486 							   quote_ident_cstr(NameStr(attr->attname)));
2487 
2488 		if (src_pkattvals[i] != NULL)
2489 			appendStringInfo(&buf, " = %s",
2490 							 quote_literal_cstr(src_pkattvals[i]));
2491 		else
2492 			appendStringInfoString(&buf, " IS NULL");
2493 	}
2494 
2495 	/*
2496 	 * Retrieve the desired tuple
2497 	 */
2498 	ret = SPI_exec(buf.data, 0);
2499 	pfree(buf.data);
2500 
2501 	/*
2502 	 * Only allow one qualifying tuple
2503 	 */
2504 	if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2505 		ereport(ERROR,
2506 				(errcode(ERRCODE_CARDINALITY_VIOLATION),
2507 				 errmsg("source criteria matched more than one record")));
2508 
2509 	else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2510 	{
2511 		SPITupleTable *tuptable = SPI_tuptable;
2512 
2513 		tuple = SPI_copytuple(tuptable->vals[0]);
2514 		SPI_finish();
2515 
2516 		return tuple;
2517 	}
2518 	else
2519 	{
2520 		/*
2521 		 * no qualifying tuples
2522 		 */
2523 		SPI_finish();
2524 
2525 		return NULL;
2526 	}
2527 
2528 	/*
2529 	 * never reached, but keep compiler quiet
2530 	 */
2531 	return NULL;
2532 }
2533 
2534 /*
2535  * Open the relation named by relname_text, acquire specified type of lock,
2536  * verify we have specified permissions.
2537  * Caller must close rel when done with it.
2538  */
2539 static Relation
get_rel_from_relname(text * relname_text,LOCKMODE lockmode,AclMode aclmode)2540 get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2541 {
2542 	RangeVar   *relvar;
2543 	Relation	rel;
2544 	AclResult	aclresult;
2545 
2546 	relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2547 	rel = table_openrv(relvar, lockmode);
2548 
2549 	aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2550 								  aclmode);
2551 	if (aclresult != ACLCHECK_OK)
2552 		aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
2553 					   RelationGetRelationName(rel));
2554 
2555 	return rel;
2556 }
2557 
2558 /*
2559  * generate_relation_name - copied from ruleutils.c
2560  *		Compute the name to display for a relation
2561  *
2562  * The result includes all necessary quoting and schema-prefixing.
2563  */
2564 static char *
generate_relation_name(Relation rel)2565 generate_relation_name(Relation rel)
2566 {
2567 	char	   *nspname;
2568 	char	   *result;
2569 
2570 	/* Qualify the name if not visible in search path */
2571 	if (RelationIsVisible(RelationGetRelid(rel)))
2572 		nspname = NULL;
2573 	else
2574 		nspname = get_namespace_name(rel->rd_rel->relnamespace);
2575 
2576 	result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2577 
2578 	return result;
2579 }
2580 
2581 
2582 static remoteConn *
getConnectionByName(const char * name)2583 getConnectionByName(const char *name)
2584 {
2585 	remoteConnHashEnt *hentry;
2586 	char	   *key;
2587 
2588 	if (!remoteConnHash)
2589 		remoteConnHash = createConnHash();
2590 
2591 	key = pstrdup(name);
2592 	truncate_identifier(key, strlen(key), false);
2593 	hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2594 											   key, HASH_FIND, NULL);
2595 
2596 	if (hentry)
2597 		return hentry->rconn;
2598 
2599 	return NULL;
2600 }
2601 
2602 static HTAB *
createConnHash(void)2603 createConnHash(void)
2604 {
2605 	HASHCTL		ctl;
2606 
2607 	ctl.keysize = NAMEDATALEN;
2608 	ctl.entrysize = sizeof(remoteConnHashEnt);
2609 
2610 	return hash_create("Remote Con hash", NUMCONN, &ctl,
2611 					   HASH_ELEM | HASH_STRINGS);
2612 }
2613 
2614 static void
createNewConnection(const char * name,remoteConn * rconn)2615 createNewConnection(const char *name, remoteConn *rconn)
2616 {
2617 	remoteConnHashEnt *hentry;
2618 	bool		found;
2619 	char	   *key;
2620 
2621 	if (!remoteConnHash)
2622 		remoteConnHash = createConnHash();
2623 
2624 	key = pstrdup(name);
2625 	truncate_identifier(key, strlen(key), true);
2626 	hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2627 											   HASH_ENTER, &found);
2628 
2629 	if (found)
2630 	{
2631 		PQfinish(rconn->conn);
2632 		ReleaseExternalFD();
2633 		pfree(rconn);
2634 
2635 		ereport(ERROR,
2636 				(errcode(ERRCODE_DUPLICATE_OBJECT),
2637 				 errmsg("duplicate connection name")));
2638 	}
2639 
2640 	hentry->rconn = rconn;
2641 	strlcpy(hentry->name, name, sizeof(hentry->name));
2642 }
2643 
2644 static void
deleteConnection(const char * name)2645 deleteConnection(const char *name)
2646 {
2647 	remoteConnHashEnt *hentry;
2648 	bool		found;
2649 	char	   *key;
2650 
2651 	if (!remoteConnHash)
2652 		remoteConnHash = createConnHash();
2653 
2654 	key = pstrdup(name);
2655 	truncate_identifier(key, strlen(key), false);
2656 	hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2657 											   key, HASH_REMOVE, &found);
2658 
2659 	if (!hentry)
2660 		ereport(ERROR,
2661 				(errcode(ERRCODE_UNDEFINED_OBJECT),
2662 				 errmsg("undefined connection name")));
2663 
2664 }
2665 
2666 static void
dblink_security_check(PGconn * conn,remoteConn * rconn)2667 dblink_security_check(PGconn *conn, remoteConn *rconn)
2668 {
2669 	if (!superuser())
2670 	{
2671 		if (!PQconnectionUsedPassword(conn))
2672 		{
2673 			PQfinish(conn);
2674 			ReleaseExternalFD();
2675 			if (rconn)
2676 				pfree(rconn);
2677 
2678 			ereport(ERROR,
2679 					(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2680 					 errmsg("password is required"),
2681 					 errdetail("Non-superuser cannot connect if the server does not request a password."),
2682 					 errhint("Target server's authentication method must be changed.")));
2683 		}
2684 	}
2685 }
2686 
2687 /*
2688  * For non-superusers, insist that the connstr specify a password.  This
2689  * prevents a password from being picked up from .pgpass, a service file,
2690  * the environment, etc.  We don't want the postgres user's passwords
2691  * to be accessible to non-superusers.
2692  */
2693 static void
dblink_connstr_check(const char * connstr)2694 dblink_connstr_check(const char *connstr)
2695 {
2696 	if (!superuser())
2697 	{
2698 		PQconninfoOption *options;
2699 		PQconninfoOption *option;
2700 		bool		connstr_gives_password = false;
2701 
2702 		options = PQconninfoParse(connstr, NULL);
2703 		if (options)
2704 		{
2705 			for (option = options; option->keyword != NULL; option++)
2706 			{
2707 				if (strcmp(option->keyword, "password") == 0)
2708 				{
2709 					if (option->val != NULL && option->val[0] != '\0')
2710 					{
2711 						connstr_gives_password = true;
2712 						break;
2713 					}
2714 				}
2715 			}
2716 			PQconninfoFree(options);
2717 		}
2718 
2719 		if (!connstr_gives_password)
2720 			ereport(ERROR,
2721 					(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2722 					 errmsg("password is required"),
2723 					 errdetail("Non-superusers must provide a password in the connection string.")));
2724 	}
2725 }
2726 
2727 /*
2728  * Report an error received from the remote server
2729  *
2730  * res: the received error result (will be freed)
2731  * fail: true for ERROR ereport, false for NOTICE
2732  * fmt and following args: sprintf-style format and values for errcontext;
2733  * the resulting string should be worded like "while <some action>"
2734  */
2735 static void
dblink_res_error(PGconn * conn,const char * conname,PGresult * res,bool fail,const char * fmt,...)2736 dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2737 				 bool fail, const char *fmt,...)
2738 {
2739 	int			level;
2740 	char	   *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2741 	char	   *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2742 	char	   *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2743 	char	   *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2744 	char	   *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2745 	int			sqlstate;
2746 	char	   *message_primary;
2747 	char	   *message_detail;
2748 	char	   *message_hint;
2749 	char	   *message_context;
2750 	va_list		ap;
2751 	char		dblink_context_msg[512];
2752 
2753 	if (fail)
2754 		level = ERROR;
2755 	else
2756 		level = NOTICE;
2757 
2758 	if (pg_diag_sqlstate)
2759 		sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2760 								 pg_diag_sqlstate[1],
2761 								 pg_diag_sqlstate[2],
2762 								 pg_diag_sqlstate[3],
2763 								 pg_diag_sqlstate[4]);
2764 	else
2765 		sqlstate = ERRCODE_CONNECTION_FAILURE;
2766 
2767 	message_primary = xpstrdup(pg_diag_message_primary);
2768 	message_detail = xpstrdup(pg_diag_message_detail);
2769 	message_hint = xpstrdup(pg_diag_message_hint);
2770 	message_context = xpstrdup(pg_diag_context);
2771 
2772 	/*
2773 	 * If we don't get a message from the PGresult, try the PGconn.  This is
2774 	 * needed because for connection-level failures, PQexec may just return
2775 	 * NULL, not a PGresult at all.
2776 	 */
2777 	if (message_primary == NULL)
2778 		message_primary = pchomp(PQerrorMessage(conn));
2779 
2780 	/*
2781 	 * Now that we've copied all the data we need out of the PGresult, it's
2782 	 * safe to free it.  We must do this to avoid PGresult leakage.  We're
2783 	 * leaking all the strings too, but those are in palloc'd memory that will
2784 	 * get cleaned up eventually.
2785 	 */
2786 	if (res)
2787 		PQclear(res);
2788 
2789 	/*
2790 	 * Format the basic errcontext string.  Below, we'll add on something
2791 	 * about the connection name.  That's a violation of the translatability
2792 	 * guidelines about constructing error messages out of parts, but since
2793 	 * there's no translation support for dblink, there's no need to worry
2794 	 * about that (yet).
2795 	 */
2796 	va_start(ap, fmt);
2797 	vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2798 	va_end(ap);
2799 
2800 	ereport(level,
2801 			(errcode(sqlstate),
2802 			 message_primary ? errmsg_internal("%s", message_primary) :
2803 			 errmsg("could not obtain message string for remote error"),
2804 			 message_detail ? errdetail_internal("%s", message_detail) : 0,
2805 			 message_hint ? errhint("%s", message_hint) : 0,
2806 			 message_context ? (errcontext("%s", message_context)) : 0,
2807 			 conname ?
2808 			 (errcontext("%s on dblink connection named \"%s\"",
2809 						 dblink_context_msg, conname)) :
2810 			 (errcontext("%s on unnamed dblink connection",
2811 						 dblink_context_msg))));
2812 }
2813 
2814 /*
2815  * Obtain connection string for a foreign server
2816  */
2817 static char *
get_connect_string(const char * servername)2818 get_connect_string(const char *servername)
2819 {
2820 	ForeignServer *foreign_server = NULL;
2821 	UserMapping *user_mapping;
2822 	ListCell   *cell;
2823 	StringInfoData buf;
2824 	ForeignDataWrapper *fdw;
2825 	AclResult	aclresult;
2826 	char	   *srvname;
2827 
2828 	static const PQconninfoOption *options = NULL;
2829 
2830 	initStringInfo(&buf);
2831 
2832 	/*
2833 	 * Get list of valid libpq options.
2834 	 *
2835 	 * To avoid unnecessary work, we get the list once and use it throughout
2836 	 * the lifetime of this backend process.  We don't need to care about
2837 	 * memory context issues, because PQconndefaults allocates with malloc.
2838 	 */
2839 	if (!options)
2840 	{
2841 		options = PQconndefaults();
2842 		if (!options)			/* assume reason for failure is OOM */
2843 			ereport(ERROR,
2844 					(errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2845 					 errmsg("out of memory"),
2846 					 errdetail("Could not get libpq's default connection options.")));
2847 	}
2848 
2849 	/* first gather the server connstr options */
2850 	srvname = pstrdup(servername);
2851 	truncate_identifier(srvname, strlen(srvname), false);
2852 	foreign_server = GetForeignServerByName(srvname, true);
2853 
2854 	if (foreign_server)
2855 	{
2856 		Oid			serverid = foreign_server->serverid;
2857 		Oid			fdwid = foreign_server->fdwid;
2858 		Oid			userid = GetUserId();
2859 
2860 		user_mapping = GetUserMapping(userid, serverid);
2861 		fdw = GetForeignDataWrapper(fdwid);
2862 
2863 		/* Check permissions, user must have usage on the server. */
2864 		aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
2865 		if (aclresult != ACLCHECK_OK)
2866 			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2867 
2868 		foreach(cell, fdw->options)
2869 		{
2870 			DefElem    *def = lfirst(cell);
2871 
2872 			if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2873 				appendStringInfo(&buf, "%s='%s' ", def->defname,
2874 								 escape_param_str(strVal(def->arg)));
2875 		}
2876 
2877 		foreach(cell, foreign_server->options)
2878 		{
2879 			DefElem    *def = lfirst(cell);
2880 
2881 			if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2882 				appendStringInfo(&buf, "%s='%s' ", def->defname,
2883 								 escape_param_str(strVal(def->arg)));
2884 		}
2885 
2886 		foreach(cell, user_mapping->options)
2887 		{
2888 
2889 			DefElem    *def = lfirst(cell);
2890 
2891 			if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2892 				appendStringInfo(&buf, "%s='%s' ", def->defname,
2893 								 escape_param_str(strVal(def->arg)));
2894 		}
2895 
2896 		return buf.data;
2897 	}
2898 	else
2899 		return NULL;
2900 }
2901 
2902 /*
2903  * Escaping libpq connect parameter strings.
2904  *
2905  * Replaces "'" with "\'" and "\" with "\\".
2906  */
2907 static char *
escape_param_str(const char * str)2908 escape_param_str(const char *str)
2909 {
2910 	const char *cp;
2911 	StringInfoData buf;
2912 
2913 	initStringInfo(&buf);
2914 
2915 	for (cp = str; *cp; cp++)
2916 	{
2917 		if (*cp == '\\' || *cp == '\'')
2918 			appendStringInfoChar(&buf, '\\');
2919 		appendStringInfoChar(&buf, *cp);
2920 	}
2921 
2922 	return buf.data;
2923 }
2924 
2925 /*
2926  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2927  * functions, and translate to the internal representation.
2928  *
2929  * The user supplies an int2vector of 1-based logical attnums, plus a count
2930  * argument (the need for the separate count argument is historical, but we
2931  * still check it).  We check that each attnum corresponds to a valid,
2932  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
2933  * listed twice, though the actual use-case for such things is dubious.
2934  * Note that before Postgres 9.0, the user's attnums were interpreted as
2935  * physical not logical column numbers; this was changed for future-proofing.
2936  *
2937  * The internal representation is a palloc'd int array of 0-based physical
2938  * attnums.
2939  */
2940 static void
validate_pkattnums(Relation rel,int2vector * pkattnums_arg,int32 pknumatts_arg,int ** pkattnums,int * pknumatts)2941 validate_pkattnums(Relation rel,
2942 				   int2vector *pkattnums_arg, int32 pknumatts_arg,
2943 				   int **pkattnums, int *pknumatts)
2944 {
2945 	TupleDesc	tupdesc = rel->rd_att;
2946 	int			natts = tupdesc->natts;
2947 	int			i;
2948 
2949 	/* Don't take more array elements than there are */
2950 	pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
2951 
2952 	/* Must have at least one pk attnum selected */
2953 	if (pknumatts_arg <= 0)
2954 		ereport(ERROR,
2955 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2956 				 errmsg("number of key attributes must be > 0")));
2957 
2958 	/* Allocate output array */
2959 	*pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
2960 	*pknumatts = pknumatts_arg;
2961 
2962 	/* Validate attnums and convert to internal form */
2963 	for (i = 0; i < pknumatts_arg; i++)
2964 	{
2965 		int			pkattnum = pkattnums_arg->values[i];
2966 		int			lnum;
2967 		int			j;
2968 
2969 		/* Can throw error immediately if out of range */
2970 		if (pkattnum <= 0 || pkattnum > natts)
2971 			ereport(ERROR,
2972 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2973 					 errmsg("invalid attribute number %d", pkattnum)));
2974 
2975 		/* Identify which physical column has this logical number */
2976 		lnum = 0;
2977 		for (j = 0; j < natts; j++)
2978 		{
2979 			/* dropped columns don't count */
2980 			if (TupleDescAttr(tupdesc, j)->attisdropped)
2981 				continue;
2982 
2983 			if (++lnum == pkattnum)
2984 				break;
2985 		}
2986 
2987 		if (j < natts)
2988 			(*pkattnums)[i] = j;
2989 		else
2990 			ereport(ERROR,
2991 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2992 					 errmsg("invalid attribute number %d", pkattnum)));
2993 	}
2994 }
2995 
2996 /*
2997  * Check if the specified connection option is valid.
2998  *
2999  * We basically allow whatever libpq thinks is an option, with these
3000  * restrictions:
3001  *		debug options: disallowed
3002  *		"client_encoding": disallowed
3003  *		"user": valid only in USER MAPPING options
3004  *		secure options (eg password): valid only in USER MAPPING options
3005  *		others: valid only in FOREIGN SERVER options
3006  *
3007  * We disallow client_encoding because it would be overridden anyway via
3008  * PQclientEncoding; allowing it to be specified would merely promote
3009  * confusion.
3010  */
3011 static bool
is_valid_dblink_option(const PQconninfoOption * options,const char * option,Oid context)3012 is_valid_dblink_option(const PQconninfoOption *options, const char *option,
3013 					   Oid context)
3014 {
3015 	const PQconninfoOption *opt;
3016 
3017 	/* Look up the option in libpq result */
3018 	for (opt = options; opt->keyword; opt++)
3019 	{
3020 		if (strcmp(opt->keyword, option) == 0)
3021 			break;
3022 	}
3023 	if (opt->keyword == NULL)
3024 		return false;
3025 
3026 	/* Disallow debug options (particularly "replication") */
3027 	if (strchr(opt->dispchar, 'D'))
3028 		return false;
3029 
3030 	/* Disallow "client_encoding" */
3031 	if (strcmp(opt->keyword, "client_encoding") == 0)
3032 		return false;
3033 
3034 	/*
3035 	 * If the option is "user" or marked secure, it should be specified only
3036 	 * in USER MAPPING.  Others should be specified only in SERVER.
3037 	 */
3038 	if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
3039 	{
3040 		if (context != UserMappingRelationId)
3041 			return false;
3042 	}
3043 	else
3044 	{
3045 		if (context != ForeignServerRelationId)
3046 			return false;
3047 	}
3048 
3049 	return true;
3050 }
3051 
3052 /*
3053  * Copy the remote session's values of GUCs that affect datatype I/O
3054  * and apply them locally in a new GUC nesting level.  Returns the new
3055  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3056  * or -1 if no new nestlevel was needed.
3057  *
3058  * We use the equivalent of a function SET option to allow the settings to
3059  * persist only until the caller calls restoreLocalGucs.  If an error is
3060  * thrown in between, guc.c will take care of undoing the settings.
3061  */
3062 static int
applyRemoteGucs(PGconn * conn)3063 applyRemoteGucs(PGconn *conn)
3064 {
3065 	static const char *const GUCsAffectingIO[] = {
3066 		"DateStyle",
3067 		"IntervalStyle"
3068 	};
3069 
3070 	int			nestlevel = -1;
3071 	int			i;
3072 
3073 	for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3074 	{
3075 		const char *gucName = GUCsAffectingIO[i];
3076 		const char *remoteVal = PQparameterStatus(conn, gucName);
3077 		const char *localVal;
3078 
3079 		/*
3080 		 * If the remote server is pre-8.4, it won't have IntervalStyle, but
3081 		 * that's okay because its output format won't be ambiguous.  So just
3082 		 * skip the GUC if we don't get a value for it.  (We might eventually
3083 		 * need more complicated logic with remote-version checks here.)
3084 		 */
3085 		if (remoteVal == NULL)
3086 			continue;
3087 
3088 		/*
3089 		 * Avoid GUC-setting overhead if the remote and local GUCs already
3090 		 * have the same value.
3091 		 */
3092 		localVal = GetConfigOption(gucName, false, false);
3093 		Assert(localVal != NULL);
3094 
3095 		if (strcmp(remoteVal, localVal) == 0)
3096 			continue;
3097 
3098 		/* Create new GUC nest level if we didn't already */
3099 		if (nestlevel < 0)
3100 			nestlevel = NewGUCNestLevel();
3101 
3102 		/* Apply the option (this will throw error on failure) */
3103 		(void) set_config_option(gucName, remoteVal,
3104 								 PGC_USERSET, PGC_S_SESSION,
3105 								 GUC_ACTION_SAVE, true, 0, false);
3106 	}
3107 
3108 	return nestlevel;
3109 }
3110 
3111 /*
3112  * Restore local GUCs after they have been overlaid with remote settings.
3113  */
3114 static void
restoreLocalGucs(int nestlevel)3115 restoreLocalGucs(int nestlevel)
3116 {
3117 	/* Do nothing if no new nestlevel was created */
3118 	if (nestlevel > 0)
3119 		AtEOXact_GUC(true, nestlevel);
3120 }
3121