1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  *		  Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2016, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "commands/defrem.h"
20 #include "commands/explain.h"
21 #include "commands/vacuum.h"
22 #include "foreign/fdwapi.h"
23 #include "funcapi.h"
24 #include "miscadmin.h"
25 #include "nodes/makefuncs.h"
26 #include "nodes/nodeFuncs.h"
27 #include "optimizer/cost.h"
28 #include "optimizer/pathnode.h"
29 #include "optimizer/paths.h"
30 #include "optimizer/planmain.h"
31 #include "optimizer/restrictinfo.h"
32 #include "optimizer/var.h"
33 #include "optimizer/tlist.h"
34 #include "parser/parsetree.h"
35 #include "utils/builtins.h"
36 #include "utils/guc.h"
37 #include "utils/lsyscache.h"
38 #include "utils/memutils.h"
39 #include "utils/rel.h"
40 #include "utils/sampling.h"
41 
42 PG_MODULE_MAGIC;
43 
44 /* Default CPU cost to start up a foreign query. */
45 #define DEFAULT_FDW_STARTUP_COST	100.0
46 
47 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
48 #define DEFAULT_FDW_TUPLE_COST		0.01
49 
50 /* If no remote estimates, assume a sort costs 20% extra */
51 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
52 
53 /*
54  * Indexes of FDW-private information stored in fdw_private lists.
55  *
56  * These items are indexed with the enum FdwScanPrivateIndex, so an item
57  * can be fetched with list_nth().  For example, to get the SELECT statement:
58  *		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
59  */
60 enum FdwScanPrivateIndex
61 {
62 	/* SQL statement to execute remotely (as a String node) */
63 	FdwScanPrivateSelectSql,
64 	/* List of restriction clauses that can be executed remotely */
65 	FdwScanPrivateRemoteConds,
66 	/* Integer list of attribute numbers retrieved by the SELECT */
67 	FdwScanPrivateRetrievedAttrs,
68 	/* Integer representing the desired fetch_size */
69 	FdwScanPrivateFetchSize,
70 
71 	/*
72 	 * String describing join i.e. names of relations being joined and types
73 	 * of join, added when the scan is join
74 	 */
75 	FdwScanPrivateRelations
76 };
77 
78 /*
79  * Similarly, this enum describes what's kept in the fdw_private list for
80  * a ModifyTable node referencing a postgres_fdw foreign table.  We store:
81  *
82  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
83  * 2) Integer list of target attribute numbers for INSERT/UPDATE
84  *	  (NIL for a DELETE)
85  * 3) Boolean flag showing if the remote query has a RETURNING clause
86  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
87  */
88 enum FdwModifyPrivateIndex
89 {
90 	/* SQL statement to execute remotely (as a String node) */
91 	FdwModifyPrivateUpdateSql,
92 	/* Integer list of target attribute numbers for INSERT/UPDATE */
93 	FdwModifyPrivateTargetAttnums,
94 	/* has-returning flag (as an integer Value node) */
95 	FdwModifyPrivateHasReturning,
96 	/* Integer list of attribute numbers retrieved by RETURNING */
97 	FdwModifyPrivateRetrievedAttrs
98 };
99 
100 /*
101  * Similarly, this enum describes what's kept in the fdw_private list for
102  * a ForeignScan node that modifies a foreign table directly.  We store:
103  *
104  * 1) UPDATE/DELETE statement text to be sent to the remote server
105  * 2) Boolean flag showing if the remote query has a RETURNING clause
106  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
107  * 4) Boolean flag showing if we set the command es_processed
108  */
109 enum FdwDirectModifyPrivateIndex
110 {
111 	/* SQL statement to execute remotely (as a String node) */
112 	FdwDirectModifyPrivateUpdateSql,
113 	/* has-returning flag (as an integer Value node) */
114 	FdwDirectModifyPrivateHasReturning,
115 	/* Integer list of attribute numbers retrieved by RETURNING */
116 	FdwDirectModifyPrivateRetrievedAttrs,
117 	/* set-processed flag (as an integer Value node) */
118 	FdwDirectModifyPrivateSetProcessed
119 };
120 
121 /*
122  * Execution state of a foreign scan using postgres_fdw.
123  */
124 typedef struct PgFdwScanState
125 {
126 	Relation	rel;			/* relcache entry for the foreign table. NULL
127 								 * for a foreign join scan. */
128 	TupleDesc	tupdesc;		/* tuple descriptor of scan */
129 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
130 
131 	/* extracted fdw_private data */
132 	char	   *query;			/* text of SELECT command */
133 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
134 
135 	/* for remote query execution */
136 	PGconn	   *conn;			/* connection for the scan */
137 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
138 	bool		cursor_exists;	/* have we created the cursor? */
139 	int			numParams;		/* number of parameters passed to query */
140 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
141 	List	   *param_exprs;	/* executable expressions for param values */
142 	const char **param_values;	/* textual values of query parameters */
143 
144 	/* for storing result tuples */
145 	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
146 	int			num_tuples;		/* # of tuples in array */
147 	int			next_tuple;		/* index of next one to return */
148 
149 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
150 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
151 	bool		eof_reached;	/* true if last fetch reached EOF */
152 
153 	/* working memory contexts */
154 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
155 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
156 
157 	int			fetch_size;		/* number of tuples per fetch */
158 } PgFdwScanState;
159 
160 /*
161  * Execution state of a foreign insert/update/delete operation.
162  */
163 typedef struct PgFdwModifyState
164 {
165 	Relation	rel;			/* relcache entry for the foreign table */
166 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
167 
168 	/* for remote query execution */
169 	PGconn	   *conn;			/* connection for the scan */
170 	char	   *p_name;			/* name of prepared statement, if created */
171 
172 	/* extracted fdw_private data */
173 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
174 	List	   *target_attrs;	/* list of target attribute numbers */
175 	bool		has_returning;	/* is there a RETURNING clause? */
176 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
177 
178 	/* info about parameters for prepared statement */
179 	AttrNumber	ctidAttno;		/* attnum of input resjunk ctid column */
180 	int			p_nums;			/* number of parameters to transmit */
181 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
182 
183 	/* working memory context */
184 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
185 } PgFdwModifyState;
186 
187 /*
188  * Execution state of a foreign scan that modifies a foreign table directly.
189  */
190 typedef struct PgFdwDirectModifyState
191 {
192 	Relation	rel;			/* relcache entry for the foreign table */
193 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
194 
195 	/* extracted fdw_private data */
196 	char	   *query;			/* text of UPDATE/DELETE command */
197 	bool		has_returning;	/* is there a RETURNING clause? */
198 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
199 	bool		set_processed;	/* do we set the command es_processed? */
200 
201 	/* for remote query execution */
202 	PGconn	   *conn;			/* connection for the update */
203 	int			numParams;		/* number of parameters passed to query */
204 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
205 	List	   *param_exprs;	/* executable expressions for param values */
206 	const char **param_values;	/* textual values of query parameters */
207 
208 	/* for storing result tuples */
209 	PGresult   *result;			/* result for query */
210 	int			num_tuples;		/* # of result tuples */
211 	int			next_tuple;		/* index of next one to return */
212 
213 	/* working memory context */
214 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
215 } PgFdwDirectModifyState;
216 
217 /*
218  * Workspace for analyzing a foreign table.
219  */
220 typedef struct PgFdwAnalyzeState
221 {
222 	Relation	rel;			/* relcache entry for the foreign table */
223 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
224 	List	   *retrieved_attrs;	/* attr numbers retrieved by query */
225 
226 	/* collected sample rows */
227 	HeapTuple  *rows;			/* array of size targrows */
228 	int			targrows;		/* target # of sample rows */
229 	int			numrows;		/* # of sample rows collected */
230 
231 	/* for random sampling */
232 	double		samplerows;		/* # of rows fetched */
233 	double		rowstoskip;		/* # of rows to skip before next sample */
234 	ReservoirStateData rstate;	/* state for reservoir sampling */
235 
236 	/* working memory contexts */
237 	MemoryContext anl_cxt;		/* context for per-analyze lifespan data */
238 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
239 } PgFdwAnalyzeState;
240 
241 /*
242  * Identify the attribute where data conversion fails.
243  */
244 typedef struct ConversionLocation
245 {
246 	AttrNumber	cur_attno;		/* attribute number being processed, or 0 */
247 	Relation	rel;			/* foreign table being processed, or NULL */
248 	ForeignScanState *fsstate;	/* plan node being processed, or NULL */
249 } ConversionLocation;
250 
251 /* Callback argument for ec_member_matches_foreign */
252 typedef struct
253 {
254 	Expr	   *current;		/* current expr, or NULL if not yet found */
255 	List	   *already_used;	/* expressions already dealt with */
256 } ec_member_foreign_arg;
257 
258 /*
259  * SQL functions
260  */
261 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
262 
263 /*
264  * FDW callback routines
265  */
266 static void postgresGetForeignRelSize(PlannerInfo *root,
267 						  RelOptInfo *baserel,
268 						  Oid foreigntableid);
269 static void postgresGetForeignPaths(PlannerInfo *root,
270 						RelOptInfo *baserel,
271 						Oid foreigntableid);
272 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
273 					   RelOptInfo *baserel,
274 					   Oid foreigntableid,
275 					   ForeignPath *best_path,
276 					   List *tlist,
277 					   List *scan_clauses,
278 					   Plan *outer_plan);
279 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
280 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
281 static void postgresReScanForeignScan(ForeignScanState *node);
282 static void postgresEndForeignScan(ForeignScanState *node);
283 static void postgresAddForeignUpdateTargets(Query *parsetree,
284 								RangeTblEntry *target_rte,
285 								Relation target_relation);
286 static List *postgresPlanForeignModify(PlannerInfo *root,
287 						  ModifyTable *plan,
288 						  Index resultRelation,
289 						  int subplan_index);
290 static void postgresBeginForeignModify(ModifyTableState *mtstate,
291 						   ResultRelInfo *resultRelInfo,
292 						   List *fdw_private,
293 						   int subplan_index,
294 						   int eflags);
295 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
296 						  ResultRelInfo *resultRelInfo,
297 						  TupleTableSlot *slot,
298 						  TupleTableSlot *planSlot);
299 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
300 						  ResultRelInfo *resultRelInfo,
301 						  TupleTableSlot *slot,
302 						  TupleTableSlot *planSlot);
303 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
304 						  ResultRelInfo *resultRelInfo,
305 						  TupleTableSlot *slot,
306 						  TupleTableSlot *planSlot);
307 static void postgresEndForeignModify(EState *estate,
308 						 ResultRelInfo *resultRelInfo);
309 static int	postgresIsForeignRelUpdatable(Relation rel);
310 static bool postgresPlanDirectModify(PlannerInfo *root,
311 						 ModifyTable *plan,
312 						 Index resultRelation,
313 						 int subplan_index);
314 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
315 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
316 static void postgresEndDirectModify(ForeignScanState *node);
317 static void postgresExplainForeignScan(ForeignScanState *node,
318 						   ExplainState *es);
319 static void postgresExplainForeignModify(ModifyTableState *mtstate,
320 							 ResultRelInfo *rinfo,
321 							 List *fdw_private,
322 							 int subplan_index,
323 							 ExplainState *es);
324 static void postgresExplainDirectModify(ForeignScanState *node,
325 							ExplainState *es);
326 static bool postgresAnalyzeForeignTable(Relation relation,
327 							AcquireSampleRowsFunc *func,
328 							BlockNumber *totalpages);
329 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
330 							Oid serverOid);
331 static void postgresGetForeignJoinPaths(PlannerInfo *root,
332 							RelOptInfo *joinrel,
333 							RelOptInfo *outerrel,
334 							RelOptInfo *innerrel,
335 							JoinType jointype,
336 							JoinPathExtraData *extra);
337 static bool postgresRecheckForeignScan(ForeignScanState *node,
338 						   TupleTableSlot *slot);
339 
340 /*
341  * Helper functions
342  */
343 static void estimate_path_cost_size(PlannerInfo *root,
344 						RelOptInfo *baserel,
345 						List *join_conds,
346 						List *pathkeys,
347 						double *p_rows, int *p_width,
348 						Cost *p_startup_cost, Cost *p_total_cost);
349 static void get_remote_estimate(const char *sql,
350 					PGconn *conn,
351 					double *rows,
352 					int *width,
353 					Cost *startup_cost,
354 					Cost *total_cost);
355 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
356 						  EquivalenceClass *ec, EquivalenceMember *em,
357 						  void *arg);
358 static void create_cursor(ForeignScanState *node);
359 static void fetch_more_data(ForeignScanState *node);
360 static void close_cursor(PGconn *conn, unsigned int cursor_number);
361 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
362 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
363 						 ItemPointer tupleid,
364 						 TupleTableSlot *slot);
365 static void store_returning_result(PgFdwModifyState *fmstate,
366 					   TupleTableSlot *slot, PGresult *res);
367 static void execute_dml_stmt(ForeignScanState *node);
368 static TupleTableSlot *get_returning_data(ForeignScanState *node);
369 static void prepare_query_params(PlanState *node,
370 					 List *fdw_exprs,
371 					 int numParams,
372 					 FmgrInfo **param_flinfo,
373 					 List **param_exprs,
374 					 const char ***param_values);
375 static void process_query_params(ExprContext *econtext,
376 					 FmgrInfo *param_flinfo,
377 					 List *param_exprs,
378 					 const char **param_values);
379 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
380 							  HeapTuple *rows, int targrows,
381 							  double *totalrows,
382 							  double *totaldeadrows);
383 static void analyze_row_processor(PGresult *res, int row,
384 					  PgFdwAnalyzeState *astate);
385 static HeapTuple make_tuple_from_result_row(PGresult *res,
386 						   int row,
387 						   Relation rel,
388 						   AttInMetadata *attinmeta,
389 						   List *retrieved_attrs,
390 						   ForeignScanState *fsstate,
391 						   MemoryContext temp_context);
392 static void conversion_error_callback(void *arg);
393 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
394 				JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
395 				JoinPathExtraData *extra);
396 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
397 								 RelOptInfo *rel);
398 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
399 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
400 								Path *epq_path);
401 
402 
403 /*
404  * Foreign-data wrapper handler function: return a struct with pointers
405  * to my callback routines.
406  */
407 Datum
postgres_fdw_handler(PG_FUNCTION_ARGS)408 postgres_fdw_handler(PG_FUNCTION_ARGS)
409 {
410 	FdwRoutine *routine = makeNode(FdwRoutine);
411 
412 	/* Functions for scanning foreign tables */
413 	routine->GetForeignRelSize = postgresGetForeignRelSize;
414 	routine->GetForeignPaths = postgresGetForeignPaths;
415 	routine->GetForeignPlan = postgresGetForeignPlan;
416 	routine->BeginForeignScan = postgresBeginForeignScan;
417 	routine->IterateForeignScan = postgresIterateForeignScan;
418 	routine->ReScanForeignScan = postgresReScanForeignScan;
419 	routine->EndForeignScan = postgresEndForeignScan;
420 
421 	/* Functions for updating foreign tables */
422 	routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
423 	routine->PlanForeignModify = postgresPlanForeignModify;
424 	routine->BeginForeignModify = postgresBeginForeignModify;
425 	routine->ExecForeignInsert = postgresExecForeignInsert;
426 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
427 	routine->ExecForeignDelete = postgresExecForeignDelete;
428 	routine->EndForeignModify = postgresEndForeignModify;
429 	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
430 	routine->PlanDirectModify = postgresPlanDirectModify;
431 	routine->BeginDirectModify = postgresBeginDirectModify;
432 	routine->IterateDirectModify = postgresIterateDirectModify;
433 	routine->EndDirectModify = postgresEndDirectModify;
434 
435 	/* Function for EvalPlanQual rechecks */
436 	routine->RecheckForeignScan = postgresRecheckForeignScan;
437 	/* Support functions for EXPLAIN */
438 	routine->ExplainForeignScan = postgresExplainForeignScan;
439 	routine->ExplainForeignModify = postgresExplainForeignModify;
440 	routine->ExplainDirectModify = postgresExplainDirectModify;
441 
442 	/* Support functions for ANALYZE */
443 	routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
444 
445 	/* Support functions for IMPORT FOREIGN SCHEMA */
446 	routine->ImportForeignSchema = postgresImportForeignSchema;
447 
448 	/* Support functions for join push-down */
449 	routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
450 
451 	PG_RETURN_POINTER(routine);
452 }
453 
454 /*
455  * postgresGetForeignRelSize
456  *		Estimate # of rows and width of the result of the scan
457  *
458  * We should consider the effect of all baserestrictinfo clauses here, but
459  * not any join clauses.
460  */
461 static void
postgresGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)462 postgresGetForeignRelSize(PlannerInfo *root,
463 						  RelOptInfo *baserel,
464 						  Oid foreigntableid)
465 {
466 	PgFdwRelationInfo *fpinfo;
467 	ListCell   *lc;
468 	RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
469 	const char *namespace;
470 	const char *relname;
471 	const char *refname;
472 
473 	/*
474 	 * We use PgFdwRelationInfo to pass various information to subsequent
475 	 * functions.
476 	 */
477 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
478 	baserel->fdw_private = (void *) fpinfo;
479 
480 	/* Base foreign tables need to be push down always. */
481 	fpinfo->pushdown_safe = true;
482 
483 	/* Look up foreign-table catalog info. */
484 	fpinfo->table = GetForeignTable(foreigntableid);
485 	fpinfo->server = GetForeignServer(fpinfo->table->serverid);
486 
487 	/*
488 	 * Extract user-settable option values.  Note that per-table settings of
489 	 * use_remote_estimate and fetch_size override per-server settings of
490 	 * them, respectively.
491 	 */
492 	fpinfo->use_remote_estimate = false;
493 	fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
494 	fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
495 	fpinfo->shippable_extensions = NIL;
496 	fpinfo->fetch_size = 100;
497 
498 	foreach(lc, fpinfo->server->options)
499 	{
500 		DefElem    *def = (DefElem *) lfirst(lc);
501 
502 		if (strcmp(def->defname, "use_remote_estimate") == 0)
503 			fpinfo->use_remote_estimate = defGetBoolean(def);
504 		else if (strcmp(def->defname, "fdw_startup_cost") == 0)
505 			fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
506 		else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
507 			fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
508 		else if (strcmp(def->defname, "extensions") == 0)
509 			fpinfo->shippable_extensions =
510 				ExtractExtensionList(defGetString(def), false);
511 		else if (strcmp(def->defname, "fetch_size") == 0)
512 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
513 	}
514 	foreach(lc, fpinfo->table->options)
515 	{
516 		DefElem    *def = (DefElem *) lfirst(lc);
517 
518 		if (strcmp(def->defname, "use_remote_estimate") == 0)
519 			fpinfo->use_remote_estimate = defGetBoolean(def);
520 		else if (strcmp(def->defname, "fetch_size") == 0)
521 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
522 	}
523 
524 	/*
525 	 * If the table or the server is configured to use remote estimates,
526 	 * identify which user to do remote access as during planning.  This
527 	 * should match what ExecCheckRTEPerms() does.  If we fail due to lack of
528 	 * permissions, the query would have failed at runtime anyway.
529 	 */
530 	if (fpinfo->use_remote_estimate)
531 	{
532 		Oid			userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
533 
534 		fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
535 	}
536 	else
537 		fpinfo->user = NULL;
538 
539 	/*
540 	 * Identify which baserestrictinfo clauses can be sent to the remote
541 	 * server and which can't.
542 	 */
543 	classifyConditions(root, baserel, baserel->baserestrictinfo,
544 					   &fpinfo->remote_conds, &fpinfo->local_conds);
545 
546 	/*
547 	 * Identify which attributes will need to be retrieved from the remote
548 	 * server.  These include all attrs needed for joins or final output, plus
549 	 * all attrs used in the local_conds.  (Note: if we end up using a
550 	 * parameterized scan, it's possible that some of the join clauses will be
551 	 * sent to the remote and thus we wouldn't really need to retrieve the
552 	 * columns used in them.  Doesn't seem worth detecting that case though.)
553 	 */
554 	fpinfo->attrs_used = NULL;
555 	pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
556 				   &fpinfo->attrs_used);
557 	foreach(lc, fpinfo->local_conds)
558 	{
559 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
560 
561 		pull_varattnos((Node *) rinfo->clause, baserel->relid,
562 					   &fpinfo->attrs_used);
563 	}
564 
565 	/*
566 	 * Compute the selectivity and cost of the local_conds, so we don't have
567 	 * to do it over again for each path.  The best we can do for these
568 	 * conditions is to estimate selectivity on the basis of local statistics.
569 	 */
570 	fpinfo->local_conds_sel = clauselist_selectivity(root,
571 													 fpinfo->local_conds,
572 													 baserel->relid,
573 													 JOIN_INNER,
574 													 NULL);
575 
576 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
577 
578 	/*
579 	 * Set cached relation costs to some negative value, so that we can detect
580 	 * when they are set to some sensible costs during one (usually the first)
581 	 * of the calls to estimate_path_cost_size().
582 	 */
583 	fpinfo->rel_startup_cost = -1;
584 	fpinfo->rel_total_cost = -1;
585 
586 	/*
587 	 * If the table or the server is configured to use remote estimates,
588 	 * connect to the foreign server and execute EXPLAIN to estimate the
589 	 * number of rows selected by the restriction clauses, as well as the
590 	 * average row width.  Otherwise, estimate using whatever statistics we
591 	 * have locally, in a way similar to ordinary tables.
592 	 */
593 	if (fpinfo->use_remote_estimate)
594 	{
595 		/*
596 		 * Get cost/size estimates with help of remote server.  Save the
597 		 * values in fpinfo so we don't need to do it again to generate the
598 		 * basic foreign path.
599 		 */
600 		estimate_path_cost_size(root, baserel, NIL, NIL,
601 								&fpinfo->rows, &fpinfo->width,
602 								&fpinfo->startup_cost, &fpinfo->total_cost);
603 
604 		/* Report estimated baserel size to planner. */
605 		baserel->rows = fpinfo->rows;
606 		baserel->reltarget->width = fpinfo->width;
607 	}
608 	else
609 	{
610 		/*
611 		 * If the foreign table has never been ANALYZEd, it will have relpages
612 		 * and reltuples equal to zero, which most likely has nothing to do
613 		 * with reality.  We can't do a whole lot about that if we're not
614 		 * allowed to consult the remote server, but we can use a hack similar
615 		 * to plancat.c's treatment of empty relations: use a minimum size
616 		 * estimate of 10 pages, and divide by the column-datatype-based width
617 		 * estimate to get the corresponding number of tuples.
618 		 */
619 		if (baserel->pages == 0 && baserel->tuples == 0)
620 		{
621 			baserel->pages = 10;
622 			baserel->tuples =
623 				(10 * BLCKSZ) / (baserel->reltarget->width +
624 								 MAXALIGN(SizeofHeapTupleHeader));
625 		}
626 
627 		/* Estimate baserel size as best we can with local statistics. */
628 		set_baserel_size_estimates(root, baserel);
629 
630 		/* Fill in basically-bogus cost estimates for use later. */
631 		estimate_path_cost_size(root, baserel, NIL, NIL,
632 								&fpinfo->rows, &fpinfo->width,
633 								&fpinfo->startup_cost, &fpinfo->total_cost);
634 	}
635 
636 	/*
637 	 * Set the name of relation in fpinfo, while we are constructing it here.
638 	 * It will be used to build the string describing the join relation in
639 	 * EXPLAIN output. We can't know whether VERBOSE option is specified or
640 	 * not, so always schema-qualify the foreign table name.
641 	 */
642 	fpinfo->relation_name = makeStringInfo();
643 	namespace = get_namespace_name(get_rel_namespace(foreigntableid));
644 	relname = get_rel_name(foreigntableid);
645 	refname = rte->eref->aliasname;
646 	appendStringInfo(fpinfo->relation_name, "%s.%s",
647 					 quote_identifier(namespace),
648 					 quote_identifier(relname));
649 	if (*refname && strcmp(refname, relname) != 0)
650 		appendStringInfo(fpinfo->relation_name, " %s",
651 						 quote_identifier(rte->eref->aliasname));
652 }
653 
654 /*
655  * get_useful_ecs_for_relation
656  *		Determine which EquivalenceClasses might be involved in useful
657  *		orderings of this relation.
658  *
659  * This function is in some respects a mirror image of the core function
660  * pathkeys_useful_for_merging: for a regular table, we know what indexes
661  * we have and want to test whether any of them are useful.  For a foreign
662  * table, we don't know what indexes are present on the remote side but
663  * want to speculate about which ones we'd like to use if they existed.
664  *
665  * This function returns a list of potentially-useful equivalence classes,
666  * but it does not guarantee that an EquivalenceMember exists which contains
667  * Vars only from the given relation.  For example, given ft1 JOIN t1 ON
668  * ft1.x + t1.x = 0, this function will say that the equivalence class
669  * containing ft1.x + t1.x is potentially useful.  Supposing ft1 is remote and
670  * t1 is local (or on a different server), it will turn out that no useful
671  * ORDER BY clause can be generated.  It's not our job to figure that out
672  * here; we're only interested in identifying relevant ECs.
673  */
674 static List *
get_useful_ecs_for_relation(PlannerInfo * root,RelOptInfo * rel)675 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
676 {
677 	List	   *useful_eclass_list = NIL;
678 	ListCell   *lc;
679 	Relids		relids;
680 
681 	/*
682 	 * First, consider whether any active EC is potentially useful for a merge
683 	 * join against this relation.
684 	 */
685 	if (rel->has_eclass_joins)
686 	{
687 		foreach(lc, root->eq_classes)
688 		{
689 			EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
690 
691 			if (eclass_useful_for_merging(root, cur_ec, rel))
692 				useful_eclass_list = lappend(useful_eclass_list, cur_ec);
693 		}
694 	}
695 
696 	/*
697 	 * Next, consider whether there are any non-EC derivable join clauses that
698 	 * are merge-joinable.  If the joininfo list is empty, we can exit
699 	 * quickly.
700 	 */
701 	if (rel->joininfo == NIL)
702 		return useful_eclass_list;
703 
704 	/* If this is a child rel, we must use the topmost parent rel to search. */
705 	if (rel->reloptkind == RELOPT_OTHER_MEMBER_REL)
706 		relids = find_childrel_top_parent(root, rel)->relids;
707 	else
708 		relids = rel->relids;
709 
710 	/* Check each join clause in turn. */
711 	foreach(lc, rel->joininfo)
712 	{
713 		RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
714 
715 		/* Consider only mergejoinable clauses */
716 		if (restrictinfo->mergeopfamilies == NIL)
717 			continue;
718 
719 		/* Make sure we've got canonical ECs. */
720 		update_mergeclause_eclasses(root, restrictinfo);
721 
722 		/*
723 		 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
724 		 * that left_ec and right_ec will be initialized, per comments in
725 		 * distribute_qual_to_rels.
726 		 *
727 		 * We want to identify which side of this merge-joinable clause
728 		 * contains columns from the relation produced by this RelOptInfo. We
729 		 * test for overlap, not containment, because there could be extra
730 		 * relations on either side.  For example, suppose we've got something
731 		 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
732 		 * A.y = D.y.  The input rel might be the joinrel between A and B, and
733 		 * we'll consider the join clause A.y = D.y. relids contains a
734 		 * relation not involved in the join class (B) and the equivalence
735 		 * class for the left-hand side of the clause contains a relation not
736 		 * involved in the input rel (C).  Despite the fact that we have only
737 		 * overlap and not containment in either direction, A.y is potentially
738 		 * useful as a sort column.
739 		 *
740 		 * Note that it's even possible that relids overlaps neither side of
741 		 * the join clause.  For example, consider A LEFT JOIN B ON A.x = B.x
742 		 * AND A.x = 1.  The clause A.x = 1 will appear in B's joininfo list,
743 		 * but overlaps neither side of B.  In that case, we just skip this
744 		 * join clause, since it doesn't suggest a useful sort order for this
745 		 * relation.
746 		 */
747 		if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
748 			useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
749 													 restrictinfo->right_ec);
750 		else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
751 			useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
752 													  restrictinfo->left_ec);
753 	}
754 
755 	return useful_eclass_list;
756 }
757 
758 /*
759  * get_useful_pathkeys_for_relation
760  *		Determine which orderings of a relation might be useful.
761  *
762  * Getting data in sorted order can be useful either because the requested
763  * order matches the final output ordering for the overall query we're
764  * planning, or because it enables an efficient merge join.  Here, we try
765  * to figure out which pathkeys to consider.
766  */
767 static List *
get_useful_pathkeys_for_relation(PlannerInfo * root,RelOptInfo * rel)768 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
769 {
770 	List	   *useful_pathkeys_list = NIL;
771 	List	   *useful_eclass_list;
772 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
773 	EquivalenceClass *query_ec = NULL;
774 	ListCell   *lc;
775 
776 	/*
777 	 * Pushing the query_pathkeys to the remote server is always worth
778 	 * considering, because it might let us avoid a local sort.
779 	 */
780 	if (root->query_pathkeys)
781 	{
782 		bool		query_pathkeys_ok = true;
783 
784 		foreach(lc, root->query_pathkeys)
785 		{
786 			PathKey    *pathkey = (PathKey *) lfirst(lc);
787 			EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
788 			Expr	   *em_expr;
789 
790 			/*
791 			 * The planner and executor don't have any clever strategy for
792 			 * taking data sorted by a prefix of the query's pathkeys and
793 			 * getting it to be sorted by all of those pathkeys. We'll just
794 			 * end up resorting the entire data set.  So, unless we can push
795 			 * down all of the query pathkeys, forget it.
796 			 *
797 			 * is_foreign_expr would detect volatile expressions as well, but
798 			 * checking ec_has_volatile here saves some cycles.
799 			 */
800 			if (pathkey_ec->ec_has_volatile ||
801 				!(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
802 				!is_foreign_expr(root, rel, em_expr))
803 			{
804 				query_pathkeys_ok = false;
805 				break;
806 			}
807 		}
808 
809 		if (query_pathkeys_ok)
810 			useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
811 	}
812 
813 	/*
814 	 * Even if we're not using remote estimates, having the remote side do the
815 	 * sort generally won't be any worse than doing it locally, and it might
816 	 * be much better if the remote side can generate data in the right order
817 	 * without needing a sort at all.  However, what we're going to do next is
818 	 * try to generate pathkeys that seem promising for possible merge joins,
819 	 * and that's more speculative.  A wrong choice might hurt quite a bit, so
820 	 * bail out if we can't use remote estimates.
821 	 */
822 	if (!fpinfo->use_remote_estimate)
823 		return useful_pathkeys_list;
824 
825 	/* Get the list of interesting EquivalenceClasses. */
826 	useful_eclass_list = get_useful_ecs_for_relation(root, rel);
827 
828 	/* Extract unique EC for query, if any, so we don't consider it again. */
829 	if (list_length(root->query_pathkeys) == 1)
830 	{
831 		PathKey    *query_pathkey = linitial(root->query_pathkeys);
832 
833 		query_ec = query_pathkey->pk_eclass;
834 	}
835 
836 	/*
837 	 * As a heuristic, the only pathkeys we consider here are those of length
838 	 * one.  It's surely possible to consider more, but since each one we
839 	 * choose to consider will generate a round-trip to the remote side, we
840 	 * need to be a bit cautious here.  It would sure be nice to have a local
841 	 * cache of information about remote index definitions...
842 	 */
843 	foreach(lc, useful_eclass_list)
844 	{
845 		EquivalenceClass *cur_ec = lfirst(lc);
846 		Expr	   *em_expr;
847 		PathKey    *pathkey;
848 
849 		/* If redundant with what we did above, skip it. */
850 		if (cur_ec == query_ec)
851 			continue;
852 
853 		/* If no pushable expression for this rel, skip it. */
854 		em_expr = find_em_expr_for_rel(cur_ec, rel);
855 		if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
856 			continue;
857 
858 		/* Looks like we can generate a pathkey, so let's do it. */
859 		pathkey = make_canonical_pathkey(root, cur_ec,
860 										 linitial_oid(cur_ec->ec_opfamilies),
861 										 BTLessStrategyNumber,
862 										 false);
863 		useful_pathkeys_list = lappend(useful_pathkeys_list,
864 									   list_make1(pathkey));
865 	}
866 
867 	return useful_pathkeys_list;
868 }
869 
870 /*
871  * postgresGetForeignPaths
872  *		Create possible scan paths for a scan on the foreign table
873  */
874 static void
postgresGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)875 postgresGetForeignPaths(PlannerInfo *root,
876 						RelOptInfo *baserel,
877 						Oid foreigntableid)
878 {
879 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
880 	ForeignPath *path;
881 	List	   *ppi_list;
882 	ListCell   *lc;
883 
884 	/*
885 	 * Create simplest ForeignScan path node and add it to baserel.  This path
886 	 * corresponds to SeqScan path of regular tables (though depending on what
887 	 * baserestrict conditions we were able to send to remote, there might
888 	 * actually be an indexscan happening there).  We already did all the work
889 	 * to estimate cost and size of this path.
890 	 *
891 	 * Although this path uses no join clauses, it could still have required
892 	 * parameterization due to LATERAL refs in its tlist.
893 	 */
894 	path = create_foreignscan_path(root, baserel,
895 								   NULL,		/* default pathtarget */
896 								   fpinfo->rows,
897 								   fpinfo->startup_cost,
898 								   fpinfo->total_cost,
899 								   NIL, /* no pathkeys */
900 								   baserel->lateral_relids,
901 								   NULL,		/* no extra plan */
902 								   NIL);		/* no fdw_private list */
903 	add_path(baserel, (Path *) path);
904 
905 	/* Add paths with pathkeys */
906 	add_paths_with_pathkeys_for_rel(root, baserel, NULL);
907 
908 	/*
909 	 * If we're not using remote estimates, stop here.  We have no way to
910 	 * estimate whether any join clauses would be worth sending across, so
911 	 * don't bother building parameterized paths.
912 	 */
913 	if (!fpinfo->use_remote_estimate)
914 		return;
915 
916 	/*
917 	 * Thumb through all join clauses for the rel to identify which outer
918 	 * relations could supply one or more safe-to-send-to-remote join clauses.
919 	 * We'll build a parameterized path for each such outer relation.
920 	 *
921 	 * It's convenient to manage this by representing each candidate outer
922 	 * relation by the ParamPathInfo node for it.  We can then use the
923 	 * ppi_clauses list in the ParamPathInfo node directly as a list of the
924 	 * interesting join clauses for that rel.  This takes care of the
925 	 * possibility that there are multiple safe join clauses for such a rel,
926 	 * and also ensures that we account for unsafe join clauses that we'll
927 	 * still have to enforce locally (since the parameterized-path machinery
928 	 * insists that we handle all movable clauses).
929 	 */
930 	ppi_list = NIL;
931 	foreach(lc, baserel->joininfo)
932 	{
933 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
934 		Relids		required_outer;
935 		ParamPathInfo *param_info;
936 
937 		/* Check if clause can be moved to this rel */
938 		if (!join_clause_is_movable_to(rinfo, baserel))
939 			continue;
940 
941 		/* See if it is safe to send to remote */
942 		if (!is_foreign_expr(root, baserel, rinfo->clause))
943 			continue;
944 
945 		/* Calculate required outer rels for the resulting path */
946 		required_outer = bms_union(rinfo->clause_relids,
947 								   baserel->lateral_relids);
948 		/* We do not want the foreign rel itself listed in required_outer */
949 		required_outer = bms_del_member(required_outer, baserel->relid);
950 
951 		/*
952 		 * required_outer probably can't be empty here, but if it were, we
953 		 * couldn't make a parameterized path.
954 		 */
955 		if (bms_is_empty(required_outer))
956 			continue;
957 
958 		/* Get the ParamPathInfo */
959 		param_info = get_baserel_parampathinfo(root, baserel,
960 											   required_outer);
961 		Assert(param_info != NULL);
962 
963 		/*
964 		 * Add it to list unless we already have it.  Testing pointer equality
965 		 * is OK since get_baserel_parampathinfo won't make duplicates.
966 		 */
967 		ppi_list = list_append_unique_ptr(ppi_list, param_info);
968 	}
969 
970 	/*
971 	 * The above scan examined only "generic" join clauses, not those that
972 	 * were absorbed into EquivalenceClauses.  See if we can make anything out
973 	 * of EquivalenceClauses.
974 	 */
975 	if (baserel->has_eclass_joins)
976 	{
977 		/*
978 		 * We repeatedly scan the eclass list looking for column references
979 		 * (or expressions) belonging to the foreign rel.  Each time we find
980 		 * one, we generate a list of equivalence joinclauses for it, and then
981 		 * see if any are safe to send to the remote.  Repeat till there are
982 		 * no more candidate EC members.
983 		 */
984 		ec_member_foreign_arg arg;
985 
986 		arg.already_used = NIL;
987 		for (;;)
988 		{
989 			List	   *clauses;
990 
991 			/* Make clauses, skipping any that join to lateral_referencers */
992 			arg.current = NULL;
993 			clauses = generate_implied_equalities_for_column(root,
994 															 baserel,
995 												   ec_member_matches_foreign,
996 															 (void *) &arg,
997 											   baserel->lateral_referencers);
998 
999 			/* Done if there are no more expressions in the foreign rel */
1000 			if (arg.current == NULL)
1001 			{
1002 				Assert(clauses == NIL);
1003 				break;
1004 			}
1005 
1006 			/* Scan the extracted join clauses */
1007 			foreach(lc, clauses)
1008 			{
1009 				RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1010 				Relids		required_outer;
1011 				ParamPathInfo *param_info;
1012 
1013 				/* Check if clause can be moved to this rel */
1014 				if (!join_clause_is_movable_to(rinfo, baserel))
1015 					continue;
1016 
1017 				/* See if it is safe to send to remote */
1018 				if (!is_foreign_expr(root, baserel, rinfo->clause))
1019 					continue;
1020 
1021 				/* Calculate required outer rels for the resulting path */
1022 				required_outer = bms_union(rinfo->clause_relids,
1023 										   baserel->lateral_relids);
1024 				required_outer = bms_del_member(required_outer, baserel->relid);
1025 				if (bms_is_empty(required_outer))
1026 					continue;
1027 
1028 				/* Get the ParamPathInfo */
1029 				param_info = get_baserel_parampathinfo(root, baserel,
1030 													   required_outer);
1031 				Assert(param_info != NULL);
1032 
1033 				/* Add it to list unless we already have it */
1034 				ppi_list = list_append_unique_ptr(ppi_list, param_info);
1035 			}
1036 
1037 			/* Try again, now ignoring the expression we found this time */
1038 			arg.already_used = lappend(arg.already_used, arg.current);
1039 		}
1040 	}
1041 
1042 	/*
1043 	 * Now build a path for each useful outer relation.
1044 	 */
1045 	foreach(lc, ppi_list)
1046 	{
1047 		ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1048 		double		rows;
1049 		int			width;
1050 		Cost		startup_cost;
1051 		Cost		total_cost;
1052 
1053 		/* Get a cost estimate from the remote */
1054 		estimate_path_cost_size(root, baserel,
1055 								param_info->ppi_clauses, NIL,
1056 								&rows, &width,
1057 								&startup_cost, &total_cost);
1058 
1059 		/*
1060 		 * ppi_rows currently won't get looked at by anything, but still we
1061 		 * may as well ensure that it matches our idea of the rowcount.
1062 		 */
1063 		param_info->ppi_rows = rows;
1064 
1065 		/* Make the path */
1066 		path = create_foreignscan_path(root, baserel,
1067 									   NULL,	/* default pathtarget */
1068 									   rows,
1069 									   startup_cost,
1070 									   total_cost,
1071 									   NIL,		/* no pathkeys */
1072 									   param_info->ppi_req_outer,
1073 									   NULL,
1074 									   NIL);	/* no fdw_private list */
1075 		add_path(baserel, (Path *) path);
1076 	}
1077 }
1078 
1079 /*
1080  * postgresGetForeignPlan
1081  *		Create ForeignScan plan node which implements selected best path
1082  */
1083 static ForeignScan *
postgresGetForeignPlan(PlannerInfo * root,RelOptInfo * foreignrel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)1084 postgresGetForeignPlan(PlannerInfo *root,
1085 					   RelOptInfo *foreignrel,
1086 					   Oid foreigntableid,
1087 					   ForeignPath *best_path,
1088 					   List *tlist,
1089 					   List *scan_clauses,
1090 					   Plan *outer_plan)
1091 {
1092 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1093 	Index		scan_relid;
1094 	List	   *fdw_private;
1095 	List	   *remote_conds = NIL;
1096 	List	   *remote_exprs = NIL;
1097 	List	   *local_exprs = NIL;
1098 	List	   *params_list = NIL;
1099 	List	   *retrieved_attrs;
1100 	StringInfoData sql;
1101 	ListCell   *lc;
1102 	List	   *fdw_scan_tlist = NIL;
1103 
1104 	/*
1105 	 * For base relations, set scan_relid as the relid of the relation. For
1106 	 * other kinds of relations set it to 0.
1107 	 */
1108 	if (foreignrel->reloptkind == RELOPT_BASEREL ||
1109 		foreignrel->reloptkind == RELOPT_OTHER_MEMBER_REL)
1110 		scan_relid = foreignrel->relid;
1111 	else
1112 	{
1113 		scan_relid = 0;
1114 
1115 		/*
1116 		 * create_scan_plan() and create_foreignscan_plan() pass
1117 		 * rel->baserestrictinfo + parameterization clauses through
1118 		 * scan_clauses. For a join rel->baserestrictinfo is NIL and we are
1119 		 * not considering parameterization right now, so there should be no
1120 		 * scan_clauses for a joinrel.
1121 		 */
1122 		Assert(!scan_clauses);
1123 	}
1124 
1125 	/*
1126 	 * Separate the scan_clauses into those that can be executed remotely and
1127 	 * those that can't.  baserestrictinfo clauses that were previously
1128 	 * determined to be safe or unsafe by classifyConditions are shown in
1129 	 * fpinfo->remote_conds and fpinfo->local_conds.  Anything else in the
1130 	 * scan_clauses list will be a join clause, which we have to check for
1131 	 * remote-safety.
1132 	 *
1133 	 * Note: the join clauses we see here should be the exact same ones
1134 	 * previously examined by postgresGetForeignPaths.  Possibly it'd be worth
1135 	 * passing forward the classification work done then, rather than
1136 	 * repeating it here.
1137 	 *
1138 	 * This code must match "extract_actual_clauses(scan_clauses, false)"
1139 	 * except for the additional decision about remote versus local execution.
1140 	 * Note however that we don't strip the RestrictInfo nodes from the
1141 	 * remote_conds list, since appendWhereClause expects a list of
1142 	 * RestrictInfos.
1143 	 */
1144 	foreach(lc, scan_clauses)
1145 	{
1146 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1147 
1148 		Assert(IsA(rinfo, RestrictInfo));
1149 
1150 		/* Ignore any pseudoconstants, they're dealt with elsewhere */
1151 		if (rinfo->pseudoconstant)
1152 			continue;
1153 
1154 		if (list_member_ptr(fpinfo->remote_conds, rinfo))
1155 		{
1156 			remote_conds = lappend(remote_conds, rinfo);
1157 			remote_exprs = lappend(remote_exprs, rinfo->clause);
1158 		}
1159 		else if (list_member_ptr(fpinfo->local_conds, rinfo))
1160 			local_exprs = lappend(local_exprs, rinfo->clause);
1161 		else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1162 		{
1163 			remote_conds = lappend(remote_conds, rinfo);
1164 			remote_exprs = lappend(remote_exprs, rinfo->clause);
1165 		}
1166 		else
1167 			local_exprs = lappend(local_exprs, rinfo->clause);
1168 	}
1169 
1170 	if (foreignrel->reloptkind == RELOPT_JOINREL)
1171 	{
1172 		/* For a join relation, get the conditions from fdw_private structure */
1173 		remote_conds = fpinfo->remote_conds;
1174 		local_exprs = fpinfo->local_conds;
1175 
1176 		/* Build the list of columns to be fetched from the foreign server. */
1177 		fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1178 
1179 		/*
1180 		 * Ensure that the outer plan produces a tuple whose descriptor
1181 		 * matches our scan tuple slot.  Also, remove the local conditions
1182 		 * from outer plan's quals, lest they be evaluated twice, once by the
1183 		 * local plan and once by the scan.
1184 		 */
1185 		if (outer_plan)
1186 		{
1187 			ListCell   *lc;
1188 
1189 			/*
1190 			 * First, update the plan's qual list if possible.  In some cases
1191 			 * the quals might be enforced below the topmost plan level, in
1192 			 * which case we'll fail to remove them; it's not worth working
1193 			 * harder than this.
1194 			 */
1195 			foreach(lc, local_exprs)
1196 			{
1197 				Node	   *qual = lfirst(lc);
1198 
1199 				outer_plan->qual = list_delete(outer_plan->qual, qual);
1200 
1201 				/*
1202 				 * For an inner join the local conditions of foreign scan plan
1203 				 * can be part of the joinquals as well.  (They might also be
1204 				 * in the mergequals or hashquals, but we can't touch those
1205 				 * without breaking the plan.)
1206 				 */
1207 				if (IsA(outer_plan, NestLoop) ||
1208 					IsA(outer_plan, MergeJoin) ||
1209 					IsA(outer_plan, HashJoin))
1210 				{
1211 					Join	   *join_plan = (Join *) outer_plan;
1212 
1213 					if (join_plan->jointype == JOIN_INNER)
1214 						join_plan->joinqual = list_delete(join_plan->joinqual,
1215 														  qual);
1216 				}
1217 			}
1218 
1219 			/*
1220 			 * Now fix the subplan's tlist --- this might result in inserting
1221 			 * a Result node atop the plan tree.
1222 			 */
1223 			outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1224 												best_path->path.parallel_safe);
1225 		}
1226 	}
1227 
1228 	/*
1229 	 * Build the query string to be sent for execution, and identify
1230 	 * expressions to be sent as parameters.
1231 	 */
1232 	initStringInfo(&sql);
1233 	deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1234 							remote_conds, best_path->path.pathkeys,
1235 							&retrieved_attrs, &params_list);
1236 
1237 	/*
1238 	 * Build the fdw_private list that will be available to the executor.
1239 	 * Items in the list must match order in enum FdwScanPrivateIndex.
1240 	 */
1241 	fdw_private = list_make4(makeString(sql.data),
1242 							 remote_conds,
1243 							 retrieved_attrs,
1244 							 makeInteger(fpinfo->fetch_size));
1245 	if (foreignrel->reloptkind == RELOPT_JOINREL)
1246 		fdw_private = lappend(fdw_private,
1247 							  makeString(fpinfo->relation_name->data));
1248 
1249 	/*
1250 	 * Create the ForeignScan node for the given relation.
1251 	 *
1252 	 * Note that the remote parameter expressions are stored in the fdw_exprs
1253 	 * field of the finished plan node; we can't keep them in private state
1254 	 * because then they wouldn't be subject to later planner processing.
1255 	 */
1256 	return make_foreignscan(tlist,
1257 							local_exprs,
1258 							scan_relid,
1259 							params_list,
1260 							fdw_private,
1261 							fdw_scan_tlist,
1262 							remote_exprs,
1263 							outer_plan);
1264 }
1265 
1266 /*
1267  * postgresBeginForeignScan
1268  *		Initiate an executor scan of a foreign PostgreSQL table.
1269  */
1270 static void
postgresBeginForeignScan(ForeignScanState * node,int eflags)1271 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1272 {
1273 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1274 	EState	   *estate = node->ss.ps.state;
1275 	PgFdwScanState *fsstate;
1276 	RangeTblEntry *rte;
1277 	Oid			userid;
1278 	ForeignTable *table;
1279 	UserMapping *user;
1280 	int			rtindex;
1281 	int			numParams;
1282 
1283 	/*
1284 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
1285 	 */
1286 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1287 		return;
1288 
1289 	/*
1290 	 * We'll save private state in node->fdw_state.
1291 	 */
1292 	fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1293 	node->fdw_state = (void *) fsstate;
1294 
1295 	/*
1296 	 * Identify which user to do the remote access as.  This should match what
1297 	 * ExecCheckRTEPerms() does.  In case of a join, use the lowest-numbered
1298 	 * member RTE as a representative; we would get the same result from any.
1299 	 */
1300 	if (fsplan->scan.scanrelid > 0)
1301 		rtindex = fsplan->scan.scanrelid;
1302 	else
1303 		rtindex = bms_next_member(fsplan->fs_relids, -1);
1304 	rte = rt_fetch(rtindex, estate->es_range_table);
1305 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1306 
1307 	/* Get info about foreign table. */
1308 	table = GetForeignTable(rte->relid);
1309 	user = GetUserMapping(userid, table->serverid);
1310 
1311 	/*
1312 	 * Get connection to the foreign server.  Connection manager will
1313 	 * establish new connection if necessary.
1314 	 */
1315 	fsstate->conn = GetConnection(user, false);
1316 
1317 	/* Assign a unique ID for my cursor */
1318 	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1319 	fsstate->cursor_exists = false;
1320 
1321 	/* Get private info created by planner functions. */
1322 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
1323 									 FdwScanPrivateSelectSql));
1324 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1325 											   FdwScanPrivateRetrievedAttrs);
1326 	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1327 										  FdwScanPrivateFetchSize));
1328 
1329 	/* Create contexts for batches of tuples and per-tuple temp workspace. */
1330 	fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1331 											   "postgres_fdw tuple data",
1332 											   ALLOCSET_DEFAULT_SIZES);
1333 	fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1334 											  "postgres_fdw temporary data",
1335 											  ALLOCSET_SMALL_SIZES);
1336 
1337 	/*
1338 	 * Get info we'll need for converting data fetched from the foreign server
1339 	 * into local representation and error reporting during that process.
1340 	 */
1341 	if (fsplan->scan.scanrelid > 0)
1342 	{
1343 		fsstate->rel = node->ss.ss_currentRelation;
1344 		fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1345 	}
1346 	else
1347 	{
1348 		fsstate->rel = NULL;
1349 		fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1350 	}
1351 
1352 	fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1353 
1354 	/*
1355 	 * Prepare for processing of parameters used in remote query, if any.
1356 	 */
1357 	numParams = list_length(fsplan->fdw_exprs);
1358 	fsstate->numParams = numParams;
1359 	if (numParams > 0)
1360 		prepare_query_params((PlanState *) node,
1361 							 fsplan->fdw_exprs,
1362 							 numParams,
1363 							 &fsstate->param_flinfo,
1364 							 &fsstate->param_exprs,
1365 							 &fsstate->param_values);
1366 }
1367 
1368 /*
1369  * postgresIterateForeignScan
1370  *		Retrieve next row from the result set, or clear tuple slot to indicate
1371  *		EOF.
1372  */
1373 static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState * node)1374 postgresIterateForeignScan(ForeignScanState *node)
1375 {
1376 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1377 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1378 
1379 	/*
1380 	 * If this is the first call after Begin or ReScan, we need to create the
1381 	 * cursor on the remote side.
1382 	 */
1383 	if (!fsstate->cursor_exists)
1384 		create_cursor(node);
1385 
1386 	/*
1387 	 * Get some more tuples, if we've run out.
1388 	 */
1389 	if (fsstate->next_tuple >= fsstate->num_tuples)
1390 	{
1391 		/* No point in another fetch if we already detected EOF, though. */
1392 		if (!fsstate->eof_reached)
1393 			fetch_more_data(node);
1394 		/* If we didn't get any tuples, must be end of data. */
1395 		if (fsstate->next_tuple >= fsstate->num_tuples)
1396 			return ExecClearTuple(slot);
1397 	}
1398 
1399 	/*
1400 	 * Return the next tuple.
1401 	 */
1402 	ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1403 				   slot,
1404 				   InvalidBuffer,
1405 				   false);
1406 
1407 	return slot;
1408 }
1409 
1410 /*
1411  * postgresReScanForeignScan
1412  *		Restart the scan.
1413  */
1414 static void
postgresReScanForeignScan(ForeignScanState * node)1415 postgresReScanForeignScan(ForeignScanState *node)
1416 {
1417 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1418 	char		sql[64];
1419 	PGresult   *res;
1420 
1421 	/* If we haven't created the cursor yet, nothing to do. */
1422 	if (!fsstate->cursor_exists)
1423 		return;
1424 
1425 	/*
1426 	 * If any internal parameters affecting this node have changed, we'd
1427 	 * better destroy and recreate the cursor.  Otherwise, rewinding it should
1428 	 * be good enough.  If we've only fetched zero or one batch, we needn't
1429 	 * even rewind the cursor, just rescan what we have.
1430 	 */
1431 	if (node->ss.ps.chgParam != NULL)
1432 	{
1433 		fsstate->cursor_exists = false;
1434 		snprintf(sql, sizeof(sql), "CLOSE c%u",
1435 				 fsstate->cursor_number);
1436 	}
1437 	else if (fsstate->fetch_ct_2 > 1)
1438 	{
1439 		snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1440 				 fsstate->cursor_number);
1441 	}
1442 	else
1443 	{
1444 		/* Easy: just rescan what we already have in memory, if anything */
1445 		fsstate->next_tuple = 0;
1446 		return;
1447 	}
1448 
1449 	/*
1450 	 * We don't use a PG_TRY block here, so be careful not to throw error
1451 	 * without releasing the PGresult.
1452 	 */
1453 	res = pgfdw_exec_query(fsstate->conn, sql);
1454 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1455 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1456 	PQclear(res);
1457 
1458 	/* Now force a fresh FETCH. */
1459 	fsstate->tuples = NULL;
1460 	fsstate->num_tuples = 0;
1461 	fsstate->next_tuple = 0;
1462 	fsstate->fetch_ct_2 = 0;
1463 	fsstate->eof_reached = false;
1464 }
1465 
1466 /*
1467  * postgresEndForeignScan
1468  *		Finish scanning foreign table and dispose objects used for this scan
1469  */
1470 static void
postgresEndForeignScan(ForeignScanState * node)1471 postgresEndForeignScan(ForeignScanState *node)
1472 {
1473 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1474 
1475 	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1476 	if (fsstate == NULL)
1477 		return;
1478 
1479 	/* Close the cursor if open, to prevent accumulation of cursors */
1480 	if (fsstate->cursor_exists)
1481 		close_cursor(fsstate->conn, fsstate->cursor_number);
1482 
1483 	/* Release remote connection */
1484 	ReleaseConnection(fsstate->conn);
1485 	fsstate->conn = NULL;
1486 
1487 	/* MemoryContexts will be deleted automatically. */
1488 }
1489 
1490 /*
1491  * postgresAddForeignUpdateTargets
1492  *		Add resjunk column(s) needed for update/delete on a foreign table
1493  */
1494 static void
postgresAddForeignUpdateTargets(Query * parsetree,RangeTblEntry * target_rte,Relation target_relation)1495 postgresAddForeignUpdateTargets(Query *parsetree,
1496 								RangeTblEntry *target_rte,
1497 								Relation target_relation)
1498 {
1499 	Var		   *var;
1500 	const char *attrname;
1501 	TargetEntry *tle;
1502 
1503 	/*
1504 	 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1505 	 */
1506 
1507 	/* Make a Var representing the desired value */
1508 	var = makeVar(parsetree->resultRelation,
1509 				  SelfItemPointerAttributeNumber,
1510 				  TIDOID,
1511 				  -1,
1512 				  InvalidOid,
1513 				  0);
1514 
1515 	/* Wrap it in a resjunk TLE with the right name ... */
1516 	attrname = "ctid";
1517 
1518 	tle = makeTargetEntry((Expr *) var,
1519 						  list_length(parsetree->targetList) + 1,
1520 						  pstrdup(attrname),
1521 						  true);
1522 
1523 	/* ... and add it to the query's targetlist */
1524 	parsetree->targetList = lappend(parsetree->targetList, tle);
1525 }
1526 
1527 /*
1528  * postgresPlanForeignModify
1529  *		Plan an insert/update/delete operation on a foreign table
1530  */
1531 static List *
postgresPlanForeignModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)1532 postgresPlanForeignModify(PlannerInfo *root,
1533 						  ModifyTable *plan,
1534 						  Index resultRelation,
1535 						  int subplan_index)
1536 {
1537 	CmdType		operation = plan->operation;
1538 	RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1539 	Relation	rel;
1540 	StringInfoData sql;
1541 	List	   *targetAttrs = NIL;
1542 	List	   *returningList = NIL;
1543 	List	   *retrieved_attrs = NIL;
1544 	bool		doNothing = false;
1545 
1546 	initStringInfo(&sql);
1547 
1548 	/*
1549 	 * Core code already has some lock on each rel being planned, so we can
1550 	 * use NoLock here.
1551 	 */
1552 	rel = heap_open(rte->relid, NoLock);
1553 
1554 	/*
1555 	 * In an INSERT, we transmit all columns that are defined in the foreign
1556 	 * table.  In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1557 	 * foreign table, we transmit all columns like INSERT; else we transmit
1558 	 * only columns that were explicitly targets of the UPDATE, so as to avoid
1559 	 * unnecessary data transmission.  (We can't do that for INSERT since we
1560 	 * would miss sending default values for columns not listed in the source
1561 	 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1562 	 * those triggers might change values for non-target columns, in which
1563 	 * case we would miss sending changed values for those columns.)
1564 	 */
1565 	if (operation == CMD_INSERT ||
1566 		(operation == CMD_UPDATE &&
1567 		 rel->trigdesc &&
1568 		 rel->trigdesc->trig_update_before_row))
1569 	{
1570 		TupleDesc	tupdesc = RelationGetDescr(rel);
1571 		int			attnum;
1572 
1573 		for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1574 		{
1575 			Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
1576 
1577 			if (!attr->attisdropped)
1578 				targetAttrs = lappend_int(targetAttrs, attnum);
1579 		}
1580 	}
1581 	else if (operation == CMD_UPDATE)
1582 	{
1583 		int			col;
1584 
1585 		col = -1;
1586 		while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1587 		{
1588 			/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1589 			AttrNumber	attno = col + FirstLowInvalidHeapAttributeNumber;
1590 
1591 			if (attno <= InvalidAttrNumber)		/* shouldn't happen */
1592 				elog(ERROR, "system-column update is not supported");
1593 			targetAttrs = lappend_int(targetAttrs, attno);
1594 		}
1595 	}
1596 
1597 	/*
1598 	 * Extract the relevant RETURNING list if any.
1599 	 */
1600 	if (plan->returningLists)
1601 		returningList = (List *) list_nth(plan->returningLists, subplan_index);
1602 
1603 	/*
1604 	 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1605 	 * should have already been rejected in the optimizer, as presently there
1606 	 * is no way to recognize an arbiter index on a foreign table.  Only DO
1607 	 * NOTHING is supported without an inference specification.
1608 	 */
1609 	if (plan->onConflictAction == ONCONFLICT_NOTHING)
1610 		doNothing = true;
1611 	else if (plan->onConflictAction != ONCONFLICT_NONE)
1612 		elog(ERROR, "unexpected ON CONFLICT specification: %d",
1613 			 (int) plan->onConflictAction);
1614 
1615 	/*
1616 	 * Construct the SQL command string.
1617 	 */
1618 	switch (operation)
1619 	{
1620 		case CMD_INSERT:
1621 			deparseInsertSql(&sql, root, resultRelation, rel,
1622 							 targetAttrs, doNothing, returningList,
1623 							 &retrieved_attrs);
1624 			break;
1625 		case CMD_UPDATE:
1626 			deparseUpdateSql(&sql, root, resultRelation, rel,
1627 							 targetAttrs, returningList,
1628 							 &retrieved_attrs);
1629 			break;
1630 		case CMD_DELETE:
1631 			deparseDeleteSql(&sql, root, resultRelation, rel,
1632 							 returningList,
1633 							 &retrieved_attrs);
1634 			break;
1635 		default:
1636 			elog(ERROR, "unexpected operation: %d", (int) operation);
1637 			break;
1638 	}
1639 
1640 	heap_close(rel, NoLock);
1641 
1642 	/*
1643 	 * Build the fdw_private list that will be available to the executor.
1644 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
1645 	 */
1646 	return list_make4(makeString(sql.data),
1647 					  targetAttrs,
1648 					  makeInteger((retrieved_attrs != NIL)),
1649 					  retrieved_attrs);
1650 }
1651 
1652 /*
1653  * postgresBeginForeignModify
1654  *		Begin an insert/update/delete operation on a foreign table
1655  */
1656 static void
postgresBeginForeignModify(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo,List * fdw_private,int subplan_index,int eflags)1657 postgresBeginForeignModify(ModifyTableState *mtstate,
1658 						   ResultRelInfo *resultRelInfo,
1659 						   List *fdw_private,
1660 						   int subplan_index,
1661 						   int eflags)
1662 {
1663 	PgFdwModifyState *fmstate;
1664 	EState	   *estate = mtstate->ps.state;
1665 	CmdType		operation = mtstate->operation;
1666 	Relation	rel = resultRelInfo->ri_RelationDesc;
1667 	RangeTblEntry *rte;
1668 	Oid			userid;
1669 	ForeignTable *table;
1670 	UserMapping *user;
1671 	AttrNumber	n_params;
1672 	Oid			typefnoid;
1673 	bool		isvarlena;
1674 	ListCell   *lc;
1675 
1676 	/*
1677 	 * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
1678 	 * stays NULL.
1679 	 */
1680 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1681 		return;
1682 
1683 	/* Begin constructing PgFdwModifyState. */
1684 	fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1685 	fmstate->rel = rel;
1686 
1687 	/*
1688 	 * Identify which user to do the remote access as.  This should match what
1689 	 * ExecCheckRTEPerms() does.
1690 	 */
1691 	rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1692 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1693 
1694 	/* Get info about foreign table. */
1695 	table = GetForeignTable(RelationGetRelid(rel));
1696 	user = GetUserMapping(userid, table->serverid);
1697 
1698 	/* Open connection; report that we'll create a prepared statement. */
1699 	fmstate->conn = GetConnection(user, true);
1700 	fmstate->p_name = NULL;		/* prepared statement not made yet */
1701 
1702 	/* Deconstruct fdw_private data. */
1703 	fmstate->query = strVal(list_nth(fdw_private,
1704 									 FdwModifyPrivateUpdateSql));
1705 	fmstate->target_attrs = (List *) list_nth(fdw_private,
1706 											  FdwModifyPrivateTargetAttnums);
1707 	fmstate->has_returning = intVal(list_nth(fdw_private,
1708 											 FdwModifyPrivateHasReturning));
1709 	fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1710 											 FdwModifyPrivateRetrievedAttrs);
1711 
1712 	/* Create context for per-tuple temp workspace. */
1713 	fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1714 											  "postgres_fdw temporary data",
1715 											  ALLOCSET_SMALL_SIZES);
1716 
1717 	/* Prepare for input conversion of RETURNING results. */
1718 	if (fmstate->has_returning)
1719 		fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
1720 
1721 	/* Prepare for output conversion of parameters used in prepared stmt. */
1722 	n_params = list_length(fmstate->target_attrs) + 1;
1723 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1724 	fmstate->p_nums = 0;
1725 
1726 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
1727 	{
1728 		/* Find the ctid resjunk column in the subplan's result */
1729 		Plan	   *subplan = mtstate->mt_plans[subplan_index]->plan;
1730 
1731 		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
1732 														  "ctid");
1733 		if (!AttributeNumberIsValid(fmstate->ctidAttno))
1734 			elog(ERROR, "could not find junk ctid column");
1735 
1736 		/* First transmittable parameter will be ctid */
1737 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1738 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1739 		fmstate->p_nums++;
1740 	}
1741 
1742 	if (operation == CMD_INSERT || operation == CMD_UPDATE)
1743 	{
1744 		/* Set up for remaining transmittable parameters */
1745 		foreach(lc, fmstate->target_attrs)
1746 		{
1747 			int			attnum = lfirst_int(lc);
1748 			Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
1749 
1750 			Assert(!attr->attisdropped);
1751 
1752 			getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1753 			fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1754 			fmstate->p_nums++;
1755 		}
1756 	}
1757 
1758 	Assert(fmstate->p_nums <= n_params);
1759 
1760 	resultRelInfo->ri_FdwState = fmstate;
1761 }
1762 
1763 /*
1764  * postgresExecForeignInsert
1765  *		Insert one row into a foreign table
1766  */
1767 static TupleTableSlot *
postgresExecForeignInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1768 postgresExecForeignInsert(EState *estate,
1769 						  ResultRelInfo *resultRelInfo,
1770 						  TupleTableSlot *slot,
1771 						  TupleTableSlot *planSlot)
1772 {
1773 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1774 	const char **p_values;
1775 	PGresult   *res;
1776 	int			n_rows;
1777 
1778 	/* Set up the prepared statement on the remote server, if we didn't yet */
1779 	if (!fmstate->p_name)
1780 		prepare_foreign_modify(fmstate);
1781 
1782 	/* Convert parameters needed by prepared statement to text form */
1783 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1784 
1785 	/*
1786 	 * Execute the prepared statement.
1787 	 */
1788 	if (!PQsendQueryPrepared(fmstate->conn,
1789 							 fmstate->p_name,
1790 							 fmstate->p_nums,
1791 							 p_values,
1792 							 NULL,
1793 							 NULL,
1794 							 0))
1795 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1796 
1797 	/*
1798 	 * Get the result, and check for success.
1799 	 *
1800 	 * We don't use a PG_TRY block here, so be careful not to throw error
1801 	 * without releasing the PGresult.
1802 	 */
1803 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
1804 	if (PQresultStatus(res) !=
1805 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1806 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1807 
1808 	/* Check number of rows affected, and fetch RETURNING tuple if any */
1809 	if (fmstate->has_returning)
1810 	{
1811 		n_rows = PQntuples(res);
1812 		if (n_rows > 0)
1813 			store_returning_result(fmstate, slot, res);
1814 	}
1815 	else
1816 		n_rows = atoi(PQcmdTuples(res));
1817 
1818 	/* And clean up */
1819 	PQclear(res);
1820 
1821 	MemoryContextReset(fmstate->temp_cxt);
1822 
1823 	/* Return NULL if nothing was inserted on the remote end */
1824 	return (n_rows > 0) ? slot : NULL;
1825 }
1826 
1827 /*
1828  * postgresExecForeignUpdate
1829  *		Update one row in a foreign table
1830  */
1831 static TupleTableSlot *
postgresExecForeignUpdate(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1832 postgresExecForeignUpdate(EState *estate,
1833 						  ResultRelInfo *resultRelInfo,
1834 						  TupleTableSlot *slot,
1835 						  TupleTableSlot *planSlot)
1836 {
1837 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1838 	Datum		datum;
1839 	bool		isNull;
1840 	const char **p_values;
1841 	PGresult   *res;
1842 	int			n_rows;
1843 
1844 	/* Set up the prepared statement on the remote server, if we didn't yet */
1845 	if (!fmstate->p_name)
1846 		prepare_foreign_modify(fmstate);
1847 
1848 	/* Get the ctid that was passed up as a resjunk column */
1849 	datum = ExecGetJunkAttribute(planSlot,
1850 								 fmstate->ctidAttno,
1851 								 &isNull);
1852 	/* shouldn't ever get a null result... */
1853 	if (isNull)
1854 		elog(ERROR, "ctid is NULL");
1855 
1856 	/* Convert parameters needed by prepared statement to text form */
1857 	p_values = convert_prep_stmt_params(fmstate,
1858 										(ItemPointer) DatumGetPointer(datum),
1859 										slot);
1860 
1861 	/*
1862 	 * Execute the prepared statement.
1863 	 */
1864 	if (!PQsendQueryPrepared(fmstate->conn,
1865 							 fmstate->p_name,
1866 							 fmstate->p_nums,
1867 							 p_values,
1868 							 NULL,
1869 							 NULL,
1870 							 0))
1871 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1872 
1873 	/*
1874 	 * Get the result, and check for success.
1875 	 *
1876 	 * We don't use a PG_TRY block here, so be careful not to throw error
1877 	 * without releasing the PGresult.
1878 	 */
1879 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
1880 	if (PQresultStatus(res) !=
1881 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1882 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1883 
1884 	/* Check number of rows affected, and fetch RETURNING tuple if any */
1885 	if (fmstate->has_returning)
1886 	{
1887 		n_rows = PQntuples(res);
1888 		if (n_rows > 0)
1889 			store_returning_result(fmstate, slot, res);
1890 	}
1891 	else
1892 		n_rows = atoi(PQcmdTuples(res));
1893 
1894 	/* And clean up */
1895 	PQclear(res);
1896 
1897 	MemoryContextReset(fmstate->temp_cxt);
1898 
1899 	/* Return NULL if nothing was updated on the remote end */
1900 	return (n_rows > 0) ? slot : NULL;
1901 }
1902 
1903 /*
1904  * postgresExecForeignDelete
1905  *		Delete one row from a foreign table
1906  */
1907 static TupleTableSlot *
postgresExecForeignDelete(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1908 postgresExecForeignDelete(EState *estate,
1909 						  ResultRelInfo *resultRelInfo,
1910 						  TupleTableSlot *slot,
1911 						  TupleTableSlot *planSlot)
1912 {
1913 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1914 	Datum		datum;
1915 	bool		isNull;
1916 	const char **p_values;
1917 	PGresult   *res;
1918 	int			n_rows;
1919 
1920 	/* Set up the prepared statement on the remote server, if we didn't yet */
1921 	if (!fmstate->p_name)
1922 		prepare_foreign_modify(fmstate);
1923 
1924 	/* Get the ctid that was passed up as a resjunk column */
1925 	datum = ExecGetJunkAttribute(planSlot,
1926 								 fmstate->ctidAttno,
1927 								 &isNull);
1928 	/* shouldn't ever get a null result... */
1929 	if (isNull)
1930 		elog(ERROR, "ctid is NULL");
1931 
1932 	/* Convert parameters needed by prepared statement to text form */
1933 	p_values = convert_prep_stmt_params(fmstate,
1934 										(ItemPointer) DatumGetPointer(datum),
1935 										NULL);
1936 
1937 	/*
1938 	 * Execute the prepared statement.
1939 	 */
1940 	if (!PQsendQueryPrepared(fmstate->conn,
1941 							 fmstate->p_name,
1942 							 fmstate->p_nums,
1943 							 p_values,
1944 							 NULL,
1945 							 NULL,
1946 							 0))
1947 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1948 
1949 	/*
1950 	 * Get the result, and check for success.
1951 	 *
1952 	 * We don't use a PG_TRY block here, so be careful not to throw error
1953 	 * without releasing the PGresult.
1954 	 */
1955 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
1956 	if (PQresultStatus(res) !=
1957 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1958 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1959 
1960 	/* Check number of rows affected, and fetch RETURNING tuple if any */
1961 	if (fmstate->has_returning)
1962 	{
1963 		n_rows = PQntuples(res);
1964 		if (n_rows > 0)
1965 			store_returning_result(fmstate, slot, res);
1966 	}
1967 	else
1968 		n_rows = atoi(PQcmdTuples(res));
1969 
1970 	/* And clean up */
1971 	PQclear(res);
1972 
1973 	MemoryContextReset(fmstate->temp_cxt);
1974 
1975 	/* Return NULL if nothing was deleted on the remote end */
1976 	return (n_rows > 0) ? slot : NULL;
1977 }
1978 
1979 /*
1980  * postgresEndForeignModify
1981  *		Finish an insert/update/delete operation on a foreign table
1982  */
1983 static void
postgresEndForeignModify(EState * estate,ResultRelInfo * resultRelInfo)1984 postgresEndForeignModify(EState *estate,
1985 						 ResultRelInfo *resultRelInfo)
1986 {
1987 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1988 
1989 	/* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1990 	if (fmstate == NULL)
1991 		return;
1992 
1993 	/* If we created a prepared statement, destroy it */
1994 	if (fmstate->p_name)
1995 	{
1996 		char		sql[64];
1997 		PGresult   *res;
1998 
1999 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
2000 
2001 		/*
2002 		 * We don't use a PG_TRY block here, so be careful not to throw error
2003 		 * without releasing the PGresult.
2004 		 */
2005 		res = pgfdw_exec_query(fmstate->conn, sql);
2006 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
2007 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
2008 		PQclear(res);
2009 		fmstate->p_name = NULL;
2010 	}
2011 
2012 	/* Release remote connection */
2013 	ReleaseConnection(fmstate->conn);
2014 	fmstate->conn = NULL;
2015 }
2016 
2017 /*
2018  * postgresIsForeignRelUpdatable
2019  *		Determine whether a foreign table supports INSERT, UPDATE and/or
2020  *		DELETE.
2021  */
2022 static int
postgresIsForeignRelUpdatable(Relation rel)2023 postgresIsForeignRelUpdatable(Relation rel)
2024 {
2025 	bool		updatable;
2026 	ForeignTable *table;
2027 	ForeignServer *server;
2028 	ListCell   *lc;
2029 
2030 	/*
2031 	 * By default, all postgres_fdw foreign tables are assumed updatable. This
2032 	 * can be overridden by a per-server setting, which in turn can be
2033 	 * overridden by a per-table setting.
2034 	 */
2035 	updatable = true;
2036 
2037 	table = GetForeignTable(RelationGetRelid(rel));
2038 	server = GetForeignServer(table->serverid);
2039 
2040 	foreach(lc, server->options)
2041 	{
2042 		DefElem    *def = (DefElem *) lfirst(lc);
2043 
2044 		if (strcmp(def->defname, "updatable") == 0)
2045 			updatable = defGetBoolean(def);
2046 	}
2047 	foreach(lc, table->options)
2048 	{
2049 		DefElem    *def = (DefElem *) lfirst(lc);
2050 
2051 		if (strcmp(def->defname, "updatable") == 0)
2052 			updatable = defGetBoolean(def);
2053 	}
2054 
2055 	/*
2056 	 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2057 	 */
2058 	return updatable ?
2059 		(1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2060 }
2061 
2062 /*
2063  * postgresRecheckForeignScan
2064  *		Execute a local join execution plan for a foreign join
2065  */
2066 static bool
postgresRecheckForeignScan(ForeignScanState * node,TupleTableSlot * slot)2067 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2068 {
2069 	Index		scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2070 	PlanState  *outerPlan = outerPlanState(node);
2071 	TupleTableSlot *result;
2072 
2073 	/* For base foreign relations, it suffices to set fdw_recheck_quals */
2074 	if (scanrelid > 0)
2075 		return true;
2076 
2077 	Assert(outerPlan != NULL);
2078 
2079 	/* Execute a local join execution plan */
2080 	result = ExecProcNode(outerPlan);
2081 	if (TupIsNull(result))
2082 		return false;
2083 
2084 	/* Store result in the given slot */
2085 	ExecCopySlot(slot, result);
2086 
2087 	return true;
2088 }
2089 
2090 /*
2091  * postgresPlanDirectModify
2092  *		Consider a direct foreign table modification
2093  *
2094  * Decide whether it is safe to modify a foreign table directly, and if so,
2095  * rewrite subplan accordingly.
2096  */
2097 static bool
postgresPlanDirectModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)2098 postgresPlanDirectModify(PlannerInfo *root,
2099 						 ModifyTable *plan,
2100 						 Index resultRelation,
2101 						 int subplan_index)
2102 {
2103 	CmdType		operation = plan->operation;
2104 	Plan	   *subplan = (Plan *) list_nth(plan->plans, subplan_index);
2105 	RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
2106 	Relation	rel;
2107 	StringInfoData sql;
2108 	ForeignScan *fscan;
2109 	List	   *targetAttrs = NIL;
2110 	List	   *remote_conds;
2111 	List	   *params_list = NIL;
2112 	List	   *returningList = NIL;
2113 	List	   *retrieved_attrs = NIL;
2114 
2115 	/*
2116 	 * Decide whether it is safe to modify a foreign table directly.
2117 	 */
2118 
2119 	/*
2120 	 * The table modification must be an UPDATE or DELETE.
2121 	 */
2122 	if (operation != CMD_UPDATE && operation != CMD_DELETE)
2123 		return false;
2124 
2125 	/*
2126 	 * It's unsafe to modify a foreign table directly if there are any local
2127 	 * joins needed.
2128 	 */
2129 	if (!IsA(subplan, ForeignScan))
2130 		return false;
2131 
2132 	/*
2133 	 * It's unsafe to modify a foreign table directly if there are any quals
2134 	 * that should be evaluated locally.
2135 	 */
2136 	if (subplan->qual != NIL)
2137 		return false;
2138 
2139 	/*
2140 	 * We can't handle an UPDATE or DELETE on a foreign join for now.
2141 	 */
2142 	fscan = (ForeignScan *) subplan;
2143 	if (fscan->scan.scanrelid == 0)
2144 		return false;
2145 
2146 	/*
2147 	 * It's unsafe to update a foreign table directly, if any expressions to
2148 	 * assign to the target columns are unsafe to evaluate remotely.
2149 	 */
2150 	if (operation == CMD_UPDATE)
2151 	{
2152 		RelOptInfo *baserel = root->simple_rel_array[resultRelation];
2153 		int			col;
2154 
2155 		/*
2156 		 * We transmit only columns that were explicitly targets of the
2157 		 * UPDATE, so as to avoid unnecessary data transmission.
2158 		 */
2159 		col = -1;
2160 		while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2161 		{
2162 			/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2163 			AttrNumber	attno = col + FirstLowInvalidHeapAttributeNumber;
2164 			TargetEntry *tle;
2165 
2166 			if (attno <= InvalidAttrNumber)		/* shouldn't happen */
2167 				elog(ERROR, "system-column update is not supported");
2168 
2169 			tle = get_tle_by_resno(subplan->targetlist, attno);
2170 
2171 			if (!tle)
2172 				elog(ERROR, "attribute number %d not found in subplan targetlist",
2173 					 attno);
2174 
2175 			if (!is_foreign_expr(root, baserel, (Expr *) tle->expr))
2176 				return false;
2177 
2178 			targetAttrs = lappend_int(targetAttrs, attno);
2179 		}
2180 	}
2181 
2182 	/*
2183 	 * Ok, rewrite subplan so as to modify the foreign table directly.
2184 	 */
2185 	initStringInfo(&sql);
2186 
2187 	/*
2188 	 * Core code already has some lock on each rel being planned, so we can
2189 	 * use NoLock here.
2190 	 */
2191 	rel = heap_open(rte->relid, NoLock);
2192 
2193 	/*
2194 	 * Extract the baserestrictinfo clauses that can be evaluated remotely.
2195 	 */
2196 	remote_conds = (List *) list_nth(fscan->fdw_private,
2197 									 FdwScanPrivateRemoteConds);
2198 
2199 	/*
2200 	 * Extract the relevant RETURNING list if any.
2201 	 */
2202 	if (plan->returningLists)
2203 		returningList = (List *) list_nth(plan->returningLists, subplan_index);
2204 
2205 	/*
2206 	 * Construct the SQL command string.
2207 	 */
2208 	switch (operation)
2209 	{
2210 		case CMD_UPDATE:
2211 			deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2212 								   ((Plan *) fscan)->targetlist,
2213 								   targetAttrs,
2214 								   remote_conds, &params_list,
2215 								   returningList, &retrieved_attrs);
2216 			break;
2217 		case CMD_DELETE:
2218 			deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2219 								   remote_conds, &params_list,
2220 								   returningList, &retrieved_attrs);
2221 			break;
2222 		default:
2223 			elog(ERROR, "unexpected operation: %d", (int) operation);
2224 			break;
2225 	}
2226 
2227 	/*
2228 	 * Update the operation info.
2229 	 */
2230 	fscan->operation = operation;
2231 
2232 	/*
2233 	 * Update the fdw_exprs list that will be available to the executor.
2234 	 */
2235 	fscan->fdw_exprs = params_list;
2236 
2237 	/*
2238 	 * Update the fdw_private list that will be available to the executor.
2239 	 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2240 	 */
2241 	fscan->fdw_private = list_make4(makeString(sql.data),
2242 									makeInteger((retrieved_attrs != NIL)),
2243 									retrieved_attrs,
2244 									makeInteger(plan->canSetTag));
2245 
2246 	heap_close(rel, NoLock);
2247 	return true;
2248 }
2249 
2250 /*
2251  * postgresBeginDirectModify
2252  *		Prepare a direct foreign table modification
2253  */
2254 static void
postgresBeginDirectModify(ForeignScanState * node,int eflags)2255 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2256 {
2257 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2258 	EState	   *estate = node->ss.ps.state;
2259 	PgFdwDirectModifyState *dmstate;
2260 	RangeTblEntry *rte;
2261 	Oid			userid;
2262 	ForeignTable *table;
2263 	UserMapping *user;
2264 	int			numParams;
2265 
2266 	/*
2267 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
2268 	 */
2269 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2270 		return;
2271 
2272 	/*
2273 	 * We'll save private state in node->fdw_state.
2274 	 */
2275 	dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2276 	node->fdw_state = (void *) dmstate;
2277 
2278 	/*
2279 	 * Identify which user to do the remote access as.  This should match what
2280 	 * ExecCheckRTEPerms() does.
2281 	 */
2282 	rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
2283 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2284 
2285 	/* Get info about foreign table. */
2286 	dmstate->rel = node->ss.ss_currentRelation;
2287 	table = GetForeignTable(RelationGetRelid(dmstate->rel));
2288 	user = GetUserMapping(userid, table->serverid);
2289 
2290 	/*
2291 	 * Get connection to the foreign server.  Connection manager will
2292 	 * establish new connection if necessary.
2293 	 */
2294 	dmstate->conn = GetConnection(user, false);
2295 
2296 	/* Initialize state variable */
2297 	dmstate->num_tuples = -1;	/* -1 means not set yet */
2298 
2299 	/* Get private info created by planner functions. */
2300 	dmstate->query = strVal(list_nth(fsplan->fdw_private,
2301 									 FdwDirectModifyPrivateUpdateSql));
2302 	dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2303 										FdwDirectModifyPrivateHasReturning));
2304 	dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2305 									   FdwDirectModifyPrivateRetrievedAttrs);
2306 	dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2307 										FdwDirectModifyPrivateSetProcessed));
2308 
2309 	/* Create context for per-tuple temp workspace. */
2310 	dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2311 											  "postgres_fdw temporary data",
2312 											  ALLOCSET_SMALL_SIZES);
2313 
2314 	/* Prepare for input conversion of RETURNING results. */
2315 	if (dmstate->has_returning)
2316 		dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
2317 
2318 	/*
2319 	 * Prepare for processing of parameters used in remote query, if any.
2320 	 */
2321 	numParams = list_length(fsplan->fdw_exprs);
2322 	dmstate->numParams = numParams;
2323 	if (numParams > 0)
2324 		prepare_query_params((PlanState *) node,
2325 							 fsplan->fdw_exprs,
2326 							 numParams,
2327 							 &dmstate->param_flinfo,
2328 							 &dmstate->param_exprs,
2329 							 &dmstate->param_values);
2330 }
2331 
2332 /*
2333  * postgresIterateDirectModify
2334  *		Execute a direct foreign table modification
2335  */
2336 static TupleTableSlot *
postgresIterateDirectModify(ForeignScanState * node)2337 postgresIterateDirectModify(ForeignScanState *node)
2338 {
2339 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2340 	EState	   *estate = node->ss.ps.state;
2341 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2342 
2343 	/*
2344 	 * If this is the first call after Begin, execute the statement.
2345 	 */
2346 	if (dmstate->num_tuples == -1)
2347 		execute_dml_stmt(node);
2348 
2349 	/*
2350 	 * If the local query doesn't specify RETURNING, just clear tuple slot.
2351 	 */
2352 	if (!resultRelInfo->ri_projectReturning)
2353 	{
2354 		TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2355 		Instrumentation *instr = node->ss.ps.instrument;
2356 
2357 		Assert(!dmstate->has_returning);
2358 
2359 		/* Increment the command es_processed count if necessary. */
2360 		if (dmstate->set_processed)
2361 			estate->es_processed += dmstate->num_tuples;
2362 
2363 		/* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2364 		if (instr)
2365 			instr->tuplecount += dmstate->num_tuples;
2366 
2367 		return ExecClearTuple(slot);
2368 	}
2369 
2370 	/*
2371 	 * Get the next RETURNING tuple.
2372 	 */
2373 	return get_returning_data(node);
2374 }
2375 
2376 /*
2377  * postgresEndDirectModify
2378  *		Finish a direct foreign table modification
2379  */
2380 static void
postgresEndDirectModify(ForeignScanState * node)2381 postgresEndDirectModify(ForeignScanState *node)
2382 {
2383 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2384 
2385 	/* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2386 	if (dmstate == NULL)
2387 		return;
2388 
2389 	/* Release PGresult */
2390 	if (dmstate->result)
2391 		PQclear(dmstate->result);
2392 
2393 	/* Release remote connection */
2394 	ReleaseConnection(dmstate->conn);
2395 	dmstate->conn = NULL;
2396 
2397 	/* MemoryContext will be deleted automatically. */
2398 }
2399 
2400 /*
2401  * postgresExplainForeignScan
2402  *		Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2403  */
2404 static void
postgresExplainForeignScan(ForeignScanState * node,ExplainState * es)2405 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2406 {
2407 	List	   *fdw_private;
2408 	char	   *sql;
2409 	char	   *relations;
2410 
2411 	fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2412 
2413 	/*
2414 	 * Add names of relation handled by the foreign scan when the scan is a
2415 	 * join
2416 	 */
2417 	if (list_length(fdw_private) > FdwScanPrivateRelations)
2418 	{
2419 		relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2420 		ExplainPropertyText("Relations", relations, es);
2421 	}
2422 
2423 	/*
2424 	 * Add remote query, when VERBOSE option is specified.
2425 	 */
2426 	if (es->verbose)
2427 	{
2428 		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2429 		ExplainPropertyText("Remote SQL", sql, es);
2430 	}
2431 }
2432 
2433 /*
2434  * postgresExplainForeignModify
2435  *		Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2436  */
2437 static void
postgresExplainForeignModify(ModifyTableState * mtstate,ResultRelInfo * rinfo,List * fdw_private,int subplan_index,ExplainState * es)2438 postgresExplainForeignModify(ModifyTableState *mtstate,
2439 							 ResultRelInfo *rinfo,
2440 							 List *fdw_private,
2441 							 int subplan_index,
2442 							 ExplainState *es)
2443 {
2444 	if (es->verbose)
2445 	{
2446 		char	   *sql = strVal(list_nth(fdw_private,
2447 										  FdwModifyPrivateUpdateSql));
2448 
2449 		ExplainPropertyText("Remote SQL", sql, es);
2450 	}
2451 }
2452 
2453 /*
2454  * postgresExplainDirectModify
2455  *		Produce extra output for EXPLAIN of a ForeignScan that modifies a
2456  *		foreign table directly
2457  */
2458 static void
postgresExplainDirectModify(ForeignScanState * node,ExplainState * es)2459 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2460 {
2461 	List	   *fdw_private;
2462 	char	   *sql;
2463 
2464 	if (es->verbose)
2465 	{
2466 		fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2467 		sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2468 		ExplainPropertyText("Remote SQL", sql, es);
2469 	}
2470 }
2471 
2472 
2473 /*
2474  * estimate_path_cost_size
2475  *		Get cost and size estimates for a foreign scan on given foreign relation
2476  *		either a base relation or a join between foreign relations.
2477  *
2478  * param_join_conds are the parameterization clauses with outer relations.
2479  * pathkeys specify the expected sort order if any for given path being costed.
2480  *
2481  * The function returns the cost and size estimates in p_rows, p_width,
2482  * p_startup_cost and p_total_cost variables.
2483  */
2484 static void
estimate_path_cost_size(PlannerInfo * root,RelOptInfo * foreignrel,List * param_join_conds,List * pathkeys,double * p_rows,int * p_width,Cost * p_startup_cost,Cost * p_total_cost)2485 estimate_path_cost_size(PlannerInfo *root,
2486 						RelOptInfo *foreignrel,
2487 						List *param_join_conds,
2488 						List *pathkeys,
2489 						double *p_rows, int *p_width,
2490 						Cost *p_startup_cost, Cost *p_total_cost)
2491 {
2492 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2493 	double		rows;
2494 	double		retrieved_rows;
2495 	int			width;
2496 	Cost		startup_cost;
2497 	Cost		total_cost;
2498 	Cost		cpu_per_tuple;
2499 
2500 	/*
2501 	 * If the table or the server is configured to use remote estimates,
2502 	 * connect to the foreign server and execute EXPLAIN to estimate the
2503 	 * number of rows selected by the restriction+join clauses.  Otherwise,
2504 	 * estimate rows using whatever statistics we have locally, in a way
2505 	 * similar to ordinary tables.
2506 	 */
2507 	if (fpinfo->use_remote_estimate)
2508 	{
2509 		List	   *remote_param_join_conds;
2510 		List	   *local_param_join_conds;
2511 		StringInfoData sql;
2512 		PGconn	   *conn;
2513 		Selectivity local_sel;
2514 		QualCost	local_cost;
2515 		List	   *fdw_scan_tlist = NIL;
2516 		List	   *remote_conds;
2517 
2518 		/* Required only to be passed to deparseSelectStmtForRel */
2519 		List	   *retrieved_attrs;
2520 
2521 		/*
2522 		 * param_join_conds might contain both clauses that are safe to send
2523 		 * across, and clauses that aren't.
2524 		 */
2525 		classifyConditions(root, foreignrel, param_join_conds,
2526 						   &remote_param_join_conds, &local_param_join_conds);
2527 
2528 		/* Build the list of columns to be fetched from the foreign server. */
2529 		if (foreignrel->reloptkind == RELOPT_JOINREL)
2530 			fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2531 		else
2532 			fdw_scan_tlist = NIL;
2533 
2534 		/*
2535 		 * The complete list of remote conditions includes everything from
2536 		 * baserestrictinfo plus any extra join_conds relevant to this
2537 		 * particular path.
2538 		 */
2539 		remote_conds = list_concat(list_copy(remote_param_join_conds),
2540 								   fpinfo->remote_conds);
2541 
2542 		/*
2543 		 * Construct EXPLAIN query including the desired SELECT, FROM, and
2544 		 * WHERE clauses. Params and other-relation Vars are replaced by dummy
2545 		 * values, so don't request params_list.
2546 		 */
2547 		initStringInfo(&sql);
2548 		appendStringInfoString(&sql, "EXPLAIN ");
2549 		deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2550 								remote_conds, pathkeys, &retrieved_attrs,
2551 								NULL);
2552 
2553 		/* Get the remote estimate */
2554 		conn = GetConnection(fpinfo->user, false);
2555 		get_remote_estimate(sql.data, conn, &rows, &width,
2556 							&startup_cost, &total_cost);
2557 		ReleaseConnection(conn);
2558 
2559 		retrieved_rows = rows;
2560 
2561 		/* Factor in the selectivity of the locally-checked quals */
2562 		local_sel = clauselist_selectivity(root,
2563 										   local_param_join_conds,
2564 										   foreignrel->relid,
2565 										   JOIN_INNER,
2566 										   NULL);
2567 		local_sel *= fpinfo->local_conds_sel;
2568 
2569 		rows = clamp_row_est(rows * local_sel);
2570 
2571 		/* Add in the eval cost of the locally-checked quals */
2572 		startup_cost += fpinfo->local_conds_cost.startup;
2573 		total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2574 		cost_qual_eval(&local_cost, local_param_join_conds, root);
2575 		startup_cost += local_cost.startup;
2576 		total_cost += local_cost.per_tuple * retrieved_rows;
2577 	}
2578 	else
2579 	{
2580 		Cost		run_cost = 0;
2581 
2582 		/*
2583 		 * We don't support join conditions in this mode (hence, no
2584 		 * parameterized paths can be made).
2585 		 */
2586 		Assert(param_join_conds == NIL);
2587 
2588 		/*
2589 		 * Use rows/width estimates made by set_baserel_size_estimates() for
2590 		 * base foreign relations and set_joinrel_size_estimates() for join
2591 		 * between foreign relations.
2592 		 */
2593 		rows = foreignrel->rows;
2594 		width = foreignrel->reltarget->width;
2595 
2596 		/* Back into an estimate of the number of retrieved rows. */
2597 		retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2598 
2599 		/*
2600 		 * We will come here again and again with different set of pathkeys
2601 		 * that caller wants to cost. We don't need to calculate the cost of
2602 		 * bare scan each time. Instead, use the costs if we have cached them
2603 		 * already.
2604 		 */
2605 		if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2606 		{
2607 			startup_cost = fpinfo->rel_startup_cost;
2608 			run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2609 		}
2610 		else if (foreignrel->reloptkind != RELOPT_JOINREL)
2611 		{
2612 			/* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2613 			retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2614 
2615 			/*
2616 			 * Cost as though this were a seqscan, which is pessimistic.  We
2617 			 * effectively imagine the local_conds are being evaluated
2618 			 * remotely, too.
2619 			 */
2620 			startup_cost = 0;
2621 			run_cost = 0;
2622 			run_cost += seq_page_cost * foreignrel->pages;
2623 
2624 			startup_cost += foreignrel->baserestrictcost.startup;
2625 			cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2626 			run_cost += cpu_per_tuple * foreignrel->tuples;
2627 		}
2628 		else
2629 		{
2630 			PgFdwRelationInfo *fpinfo_i;
2631 			PgFdwRelationInfo *fpinfo_o;
2632 			QualCost	join_cost;
2633 			QualCost	remote_conds_cost;
2634 			double		nrows;
2635 
2636 			/* For join we expect inner and outer relations set */
2637 			Assert(fpinfo->innerrel && fpinfo->outerrel);
2638 
2639 			fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2640 			fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2641 
2642 			/* Estimate of number of rows in cross product */
2643 			nrows = fpinfo_i->rows * fpinfo_o->rows;
2644 			/* Clamp retrieved rows estimate to at most size of cross product */
2645 			retrieved_rows = Min(retrieved_rows, nrows);
2646 
2647 			/*
2648 			 * The cost of foreign join is estimated as cost of generating
2649 			 * rows for the joining relations + cost for applying quals on the
2650 			 * rows.
2651 			 */
2652 
2653 			/* Calculate the cost of clauses pushed down the foreign server */
2654 			cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2655 			/* Calculate the cost of applying join clauses */
2656 			cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2657 
2658 			/*
2659 			 * Startup cost includes startup cost of joining relations and the
2660 			 * startup cost for join and other clauses. We do not include the
2661 			 * startup cost specific to join strategy (e.g. setting up hash
2662 			 * tables) since we do not know what strategy the foreign server
2663 			 * is going to use.
2664 			 */
2665 			startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2666 			startup_cost += join_cost.startup;
2667 			startup_cost += remote_conds_cost.startup;
2668 			startup_cost += fpinfo->local_conds_cost.startup;
2669 
2670 			/*
2671 			 * Run time cost includes:
2672 			 *
2673 			 * 1. Run time cost (total_cost - startup_cost) of relations being
2674 			 * joined
2675 			 *
2676 			 * 2. Run time cost of applying join clauses on the cross product
2677 			 * of the joining relations.
2678 			 *
2679 			 * 3. Run time cost of applying pushed down other clauses on the
2680 			 * result of join
2681 			 *
2682 			 * 4. Run time cost of applying nonpushable other clauses locally
2683 			 * on the result fetched from the foreign server.
2684 			 */
2685 			run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2686 			run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2687 			run_cost += nrows * join_cost.per_tuple;
2688 			nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2689 			run_cost += nrows * remote_conds_cost.per_tuple;
2690 			run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2691 		}
2692 
2693 		/*
2694 		 * Without remote estimates, we have no real way to estimate the cost
2695 		 * of generating sorted output.  It could be free if the query plan
2696 		 * the remote side would have chosen generates properly-sorted output
2697 		 * anyway, but in most cases it will cost something.  Estimate a value
2698 		 * high enough that we won't pick the sorted path when the ordering
2699 		 * isn't locally useful, but low enough that we'll err on the side of
2700 		 * pushing down the ORDER BY clause when it's useful to do so.
2701 		 */
2702 		if (pathkeys != NIL)
2703 		{
2704 			startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2705 			run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2706 		}
2707 
2708 		total_cost = startup_cost + run_cost;
2709 	}
2710 
2711 	/*
2712 	 * Cache the costs for scans without any pathkeys or parameterization
2713 	 * before adding the costs for transferring data from the foreign server.
2714 	 * These costs are useful for costing the join between this relation and
2715 	 * another foreign relation or to calculate the costs of paths with
2716 	 * pathkeys for this relation, when the costs can not be obtained from the
2717 	 * foreign server. This function will be called at least once for every
2718 	 * foreign relation without pathkeys and parameterization.
2719 	 */
2720 	if (pathkeys == NIL && param_join_conds == NIL)
2721 	{
2722 		fpinfo->rel_startup_cost = startup_cost;
2723 		fpinfo->rel_total_cost = total_cost;
2724 	}
2725 
2726 	/*
2727 	 * Add some additional cost factors to account for connection overhead
2728 	 * (fdw_startup_cost), transferring data across the network
2729 	 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2730 	 * (cpu_tuple_cost per retrieved row).
2731 	 */
2732 	startup_cost += fpinfo->fdw_startup_cost;
2733 	total_cost += fpinfo->fdw_startup_cost;
2734 	total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2735 	total_cost += cpu_tuple_cost * retrieved_rows;
2736 
2737 	/* Return results. */
2738 	*p_rows = rows;
2739 	*p_width = width;
2740 	*p_startup_cost = startup_cost;
2741 	*p_total_cost = total_cost;
2742 }
2743 
2744 /*
2745  * Estimate costs of executing a SQL statement remotely.
2746  * The given "sql" must be an EXPLAIN command.
2747  */
2748 static void
get_remote_estimate(const char * sql,PGconn * conn,double * rows,int * width,Cost * startup_cost,Cost * total_cost)2749 get_remote_estimate(const char *sql, PGconn *conn,
2750 					double *rows, int *width,
2751 					Cost *startup_cost, Cost *total_cost)
2752 {
2753 	PGresult   *volatile res = NULL;
2754 
2755 	/* PGresult must be released before leaving this function. */
2756 	PG_TRY();
2757 	{
2758 		char	   *line;
2759 		char	   *p;
2760 		int			n;
2761 
2762 		/*
2763 		 * Execute EXPLAIN remotely.
2764 		 */
2765 		res = pgfdw_exec_query(conn, sql);
2766 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
2767 			pgfdw_report_error(ERROR, res, conn, false, sql);
2768 
2769 		/*
2770 		 * Extract cost numbers for topmost plan node.  Note we search for a
2771 		 * left paren from the end of the line to avoid being confused by
2772 		 * other uses of parentheses.
2773 		 */
2774 		line = PQgetvalue(res, 0, 0);
2775 		p = strrchr(line, '(');
2776 		if (p == NULL)
2777 			elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2778 		n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
2779 				   startup_cost, total_cost, rows, width);
2780 		if (n != 4)
2781 			elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2782 
2783 		PQclear(res);
2784 		res = NULL;
2785 	}
2786 	PG_CATCH();
2787 	{
2788 		if (res)
2789 			PQclear(res);
2790 		PG_RE_THROW();
2791 	}
2792 	PG_END_TRY();
2793 }
2794 
2795 /*
2796  * Detect whether we want to process an EquivalenceClass member.
2797  *
2798  * This is a callback for use by generate_implied_equalities_for_column.
2799  */
2800 static bool
ec_member_matches_foreign(PlannerInfo * root,RelOptInfo * rel,EquivalenceClass * ec,EquivalenceMember * em,void * arg)2801 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
2802 						  EquivalenceClass *ec, EquivalenceMember *em,
2803 						  void *arg)
2804 {
2805 	ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
2806 	Expr	   *expr = em->em_expr;
2807 
2808 	/*
2809 	 * If we've identified what we're processing in the current scan, we only
2810 	 * want to match that expression.
2811 	 */
2812 	if (state->current != NULL)
2813 		return equal(expr, state->current);
2814 
2815 	/*
2816 	 * Otherwise, ignore anything we've already processed.
2817 	 */
2818 	if (list_member(state->already_used, expr))
2819 		return false;
2820 
2821 	/* This is the new target to process. */
2822 	state->current = expr;
2823 	return true;
2824 }
2825 
2826 /*
2827  * Create cursor for node's query with current parameter values.
2828  */
2829 static void
create_cursor(ForeignScanState * node)2830 create_cursor(ForeignScanState *node)
2831 {
2832 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2833 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
2834 	int			numParams = fsstate->numParams;
2835 	const char **values = fsstate->param_values;
2836 	PGconn	   *conn = fsstate->conn;
2837 	StringInfoData buf;
2838 	PGresult   *res;
2839 
2840 	/*
2841 	 * Construct array of query parameter values in text format.  We do the
2842 	 * conversions in the short-lived per-tuple context, so as not to cause a
2843 	 * memory leak over repeated scans.
2844 	 */
2845 	if (numParams > 0)
2846 	{
2847 		MemoryContext oldcontext;
2848 
2849 		oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2850 
2851 		process_query_params(econtext,
2852 							 fsstate->param_flinfo,
2853 							 fsstate->param_exprs,
2854 							 values);
2855 
2856 		MemoryContextSwitchTo(oldcontext);
2857 	}
2858 
2859 	/* Construct the DECLARE CURSOR command */
2860 	initStringInfo(&buf);
2861 	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
2862 					 fsstate->cursor_number, fsstate->query);
2863 
2864 	/*
2865 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
2866 	 * to infer types for all parameters.  Since we explicitly cast every
2867 	 * parameter (see deparse.c), the "inference" is trivial and will produce
2868 	 * the desired result.  This allows us to avoid assuming that the remote
2869 	 * server has the same OIDs we do for the parameters' types.
2870 	 */
2871 	if (!PQsendQueryParams(conn, buf.data, numParams,
2872 						   NULL, values, NULL, NULL, 0))
2873 		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
2874 
2875 	/*
2876 	 * Get the result, and check for success.
2877 	 *
2878 	 * We don't use a PG_TRY block here, so be careful not to throw error
2879 	 * without releasing the PGresult.
2880 	 */
2881 	res = pgfdw_get_result(conn, buf.data);
2882 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
2883 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
2884 	PQclear(res);
2885 
2886 	/* Mark the cursor as created, and show no tuples have been retrieved */
2887 	fsstate->cursor_exists = true;
2888 	fsstate->tuples = NULL;
2889 	fsstate->num_tuples = 0;
2890 	fsstate->next_tuple = 0;
2891 	fsstate->fetch_ct_2 = 0;
2892 	fsstate->eof_reached = false;
2893 
2894 	/* Clean up */
2895 	pfree(buf.data);
2896 }
2897 
2898 /*
2899  * Fetch some more rows from the node's cursor.
2900  */
2901 static void
fetch_more_data(ForeignScanState * node)2902 fetch_more_data(ForeignScanState *node)
2903 {
2904 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2905 	PGresult   *volatile res = NULL;
2906 	MemoryContext oldcontext;
2907 
2908 	/*
2909 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
2910 	 * batch.
2911 	 */
2912 	fsstate->tuples = NULL;
2913 	MemoryContextReset(fsstate->batch_cxt);
2914 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
2915 
2916 	/* PGresult must be released before leaving this function. */
2917 	PG_TRY();
2918 	{
2919 		PGconn	   *conn = fsstate->conn;
2920 		char		sql[64];
2921 		int			numrows;
2922 		int			i;
2923 
2924 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
2925 				 fsstate->fetch_size, fsstate->cursor_number);
2926 
2927 		res = pgfdw_exec_query(conn, sql);
2928 		/* On error, report the original query, not the FETCH. */
2929 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
2930 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
2931 
2932 		/* Convert the data into HeapTuples */
2933 		numrows = PQntuples(res);
2934 		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
2935 		fsstate->num_tuples = numrows;
2936 		fsstate->next_tuple = 0;
2937 
2938 		for (i = 0; i < numrows; i++)
2939 		{
2940 			Assert(IsA(node->ss.ps.plan, ForeignScan));
2941 
2942 			fsstate->tuples[i] =
2943 				make_tuple_from_result_row(res, i,
2944 										   fsstate->rel,
2945 										   fsstate->attinmeta,
2946 										   fsstate->retrieved_attrs,
2947 										   node,
2948 										   fsstate->temp_cxt);
2949 		}
2950 
2951 		/* Update fetch_ct_2 */
2952 		if (fsstate->fetch_ct_2 < 2)
2953 			fsstate->fetch_ct_2++;
2954 
2955 		/* Must be EOF if we didn't get as many tuples as we asked for. */
2956 		fsstate->eof_reached = (numrows < fsstate->fetch_size);
2957 
2958 		PQclear(res);
2959 		res = NULL;
2960 	}
2961 	PG_CATCH();
2962 	{
2963 		if (res)
2964 			PQclear(res);
2965 		PG_RE_THROW();
2966 	}
2967 	PG_END_TRY();
2968 
2969 	MemoryContextSwitchTo(oldcontext);
2970 }
2971 
2972 /*
2973  * Force assorted GUC parameters to settings that ensure that we'll output
2974  * data values in a form that is unambiguous to the remote server.
2975  *
2976  * This is rather expensive and annoying to do once per row, but there's
2977  * little choice if we want to be sure values are transmitted accurately;
2978  * we can't leave the settings in place between rows for fear of affecting
2979  * user-visible computations.
2980  *
2981  * We use the equivalent of a function SET option to allow the settings to
2982  * persist only until the caller calls reset_transmission_modes().  If an
2983  * error is thrown in between, guc.c will take care of undoing the settings.
2984  *
2985  * The return value is the nestlevel that must be passed to
2986  * reset_transmission_modes() to undo things.
2987  */
2988 int
set_transmission_modes(void)2989 set_transmission_modes(void)
2990 {
2991 	int			nestlevel = NewGUCNestLevel();
2992 
2993 	/*
2994 	 * The values set here should match what pg_dump does.  See also
2995 	 * configure_remote_session in connection.c.
2996 	 */
2997 	if (DateStyle != USE_ISO_DATES)
2998 		(void) set_config_option("datestyle", "ISO",
2999 								 PGC_USERSET, PGC_S_SESSION,
3000 								 GUC_ACTION_SAVE, true, 0, false);
3001 	if (IntervalStyle != INTSTYLE_POSTGRES)
3002 		(void) set_config_option("intervalstyle", "postgres",
3003 								 PGC_USERSET, PGC_S_SESSION,
3004 								 GUC_ACTION_SAVE, true, 0, false);
3005 	if (extra_float_digits < 3)
3006 		(void) set_config_option("extra_float_digits", "3",
3007 								 PGC_USERSET, PGC_S_SESSION,
3008 								 GUC_ACTION_SAVE, true, 0, false);
3009 
3010 	return nestlevel;
3011 }
3012 
3013 /*
3014  * Undo the effects of set_transmission_modes().
3015  */
3016 void
reset_transmission_modes(int nestlevel)3017 reset_transmission_modes(int nestlevel)
3018 {
3019 	AtEOXact_GUC(true, nestlevel);
3020 }
3021 
3022 /*
3023  * Utility routine to close a cursor.
3024  */
3025 static void
close_cursor(PGconn * conn,unsigned int cursor_number)3026 close_cursor(PGconn *conn, unsigned int cursor_number)
3027 {
3028 	char		sql[64];
3029 	PGresult   *res;
3030 
3031 	snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3032 
3033 	/*
3034 	 * We don't use a PG_TRY block here, so be careful not to throw error
3035 	 * without releasing the PGresult.
3036 	 */
3037 	res = pgfdw_exec_query(conn, sql);
3038 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3039 		pgfdw_report_error(ERROR, res, conn, true, sql);
3040 	PQclear(res);
3041 }
3042 
3043 /*
3044  * prepare_foreign_modify
3045  *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3046  */
3047 static void
prepare_foreign_modify(PgFdwModifyState * fmstate)3048 prepare_foreign_modify(PgFdwModifyState *fmstate)
3049 {
3050 	char		prep_name[NAMEDATALEN];
3051 	char	   *p_name;
3052 	PGresult   *res;
3053 
3054 	/* Construct name we'll use for the prepared statement. */
3055 	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3056 			 GetPrepStmtNumber(fmstate->conn));
3057 	p_name = pstrdup(prep_name);
3058 
3059 	/*
3060 	 * We intentionally do not specify parameter types here, but leave the
3061 	 * remote server to derive them by default.  This avoids possible problems
3062 	 * with the remote server using different type OIDs than we do.  All of
3063 	 * the prepared statements we use in this module are simple enough that
3064 	 * the remote server will make the right choices.
3065 	 */
3066 	if (!PQsendPrepare(fmstate->conn,
3067 					   p_name,
3068 					   fmstate->query,
3069 					   0,
3070 					   NULL))
3071 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3072 
3073 	/*
3074 	 * Get the result, and check for success.
3075 	 *
3076 	 * We don't use a PG_TRY block here, so be careful not to throw error
3077 	 * without releasing the PGresult.
3078 	 */
3079 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
3080 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3081 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3082 	PQclear(res);
3083 
3084 	/* This action shows that the prepare has been done. */
3085 	fmstate->p_name = p_name;
3086 }
3087 
3088 /*
3089  * convert_prep_stmt_params
3090  *		Create array of text strings representing parameter values
3091  *
3092  * tupleid is ctid to send, or NULL if none
3093  * slot is slot to get remaining parameters from, or NULL if none
3094  *
3095  * Data is constructed in temp_cxt; caller should reset that after use.
3096  */
3097 static const char **
convert_prep_stmt_params(PgFdwModifyState * fmstate,ItemPointer tupleid,TupleTableSlot * slot)3098 convert_prep_stmt_params(PgFdwModifyState *fmstate,
3099 						 ItemPointer tupleid,
3100 						 TupleTableSlot *slot)
3101 {
3102 	const char **p_values;
3103 	int			pindex = 0;
3104 	MemoryContext oldcontext;
3105 
3106 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3107 
3108 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3109 
3110 	/* 1st parameter should be ctid, if it's in use */
3111 	if (tupleid != NULL)
3112 	{
3113 		/* don't need set_transmission_modes for TID output */
3114 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3115 											  PointerGetDatum(tupleid));
3116 		pindex++;
3117 	}
3118 
3119 	/* get following parameters from slot */
3120 	if (slot != NULL && fmstate->target_attrs != NIL)
3121 	{
3122 		int			nestlevel;
3123 		ListCell   *lc;
3124 
3125 		nestlevel = set_transmission_modes();
3126 
3127 		foreach(lc, fmstate->target_attrs)
3128 		{
3129 			int			attnum = lfirst_int(lc);
3130 			Datum		value;
3131 			bool		isnull;
3132 
3133 			value = slot_getattr(slot, attnum, &isnull);
3134 			if (isnull)
3135 				p_values[pindex] = NULL;
3136 			else
3137 				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3138 													  value);
3139 			pindex++;
3140 		}
3141 
3142 		reset_transmission_modes(nestlevel);
3143 	}
3144 
3145 	Assert(pindex == fmstate->p_nums);
3146 
3147 	MemoryContextSwitchTo(oldcontext);
3148 
3149 	return p_values;
3150 }
3151 
3152 /*
3153  * store_returning_result
3154  *		Store the result of a RETURNING clause
3155  *
3156  * On error, be sure to release the PGresult on the way out.  Callers do not
3157  * have PG_TRY blocks to ensure this happens.
3158  */
3159 static void
store_returning_result(PgFdwModifyState * fmstate,TupleTableSlot * slot,PGresult * res)3160 store_returning_result(PgFdwModifyState *fmstate,
3161 					   TupleTableSlot *slot, PGresult *res)
3162 {
3163 	PG_TRY();
3164 	{
3165 		HeapTuple	newtup;
3166 
3167 		newtup = make_tuple_from_result_row(res, 0,
3168 											fmstate->rel,
3169 											fmstate->attinmeta,
3170 											fmstate->retrieved_attrs,
3171 											NULL,
3172 											fmstate->temp_cxt);
3173 		/* tuple will be deleted when it is cleared from the slot */
3174 		ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3175 	}
3176 	PG_CATCH();
3177 	{
3178 		if (res)
3179 			PQclear(res);
3180 		PG_RE_THROW();
3181 	}
3182 	PG_END_TRY();
3183 }
3184 
3185 /*
3186  * Execute a direct UPDATE/DELETE statement.
3187  */
3188 static void
execute_dml_stmt(ForeignScanState * node)3189 execute_dml_stmt(ForeignScanState *node)
3190 {
3191 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3192 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
3193 	int			numParams = dmstate->numParams;
3194 	const char **values = dmstate->param_values;
3195 
3196 	/*
3197 	 * Construct array of query parameter values in text format.
3198 	 */
3199 	if (numParams > 0)
3200 		process_query_params(econtext,
3201 							 dmstate->param_flinfo,
3202 							 dmstate->param_exprs,
3203 							 values);
3204 
3205 	/*
3206 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3207 	 * to infer types for all parameters.  Since we explicitly cast every
3208 	 * parameter (see deparse.c), the "inference" is trivial and will produce
3209 	 * the desired result.  This allows us to avoid assuming that the remote
3210 	 * server has the same OIDs we do for the parameters' types.
3211 	 */
3212 	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3213 						   NULL, values, NULL, NULL, 0))
3214 		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3215 
3216 	/*
3217 	 * Get the result, and check for success.
3218 	 *
3219 	 * We don't use a PG_TRY block here, so be careful not to throw error
3220 	 * without releasing the PGresult.
3221 	 */
3222 	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3223 	if (PQresultStatus(dmstate->result) !=
3224 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
3225 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3226 						   dmstate->query);
3227 
3228 	/* Get the number of rows affected. */
3229 	if (dmstate->has_returning)
3230 		dmstate->num_tuples = PQntuples(dmstate->result);
3231 	else
3232 		dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3233 }
3234 
3235 /*
3236  * Get the result of a RETURNING clause.
3237  */
3238 static TupleTableSlot *
get_returning_data(ForeignScanState * node)3239 get_returning_data(ForeignScanState *node)
3240 {
3241 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3242 	EState	   *estate = node->ss.ps.state;
3243 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3244 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3245 
3246 	Assert(resultRelInfo->ri_projectReturning);
3247 
3248 	/* If we didn't get any tuples, must be end of data. */
3249 	if (dmstate->next_tuple >= dmstate->num_tuples)
3250 		return ExecClearTuple(slot);
3251 
3252 	/* Increment the command es_processed count if necessary. */
3253 	if (dmstate->set_processed)
3254 		estate->es_processed += 1;
3255 
3256 	/*
3257 	 * Store a RETURNING tuple.  If has_returning is false, just emit a dummy
3258 	 * tuple.  (has_returning is false when the local query is of the form
3259 	 * "UPDATE/DELETE .. RETURNING 1" for example.)
3260 	 */
3261 	if (!dmstate->has_returning)
3262 		ExecStoreAllNullTuple(slot);
3263 	else
3264 	{
3265 		/*
3266 		 * On error, be sure to release the PGresult on the way out.  Callers
3267 		 * do not have PG_TRY blocks to ensure this happens.
3268 		 */
3269 		PG_TRY();
3270 		{
3271 			HeapTuple	newtup;
3272 
3273 			newtup = make_tuple_from_result_row(dmstate->result,
3274 												dmstate->next_tuple,
3275 												dmstate->rel,
3276 												dmstate->attinmeta,
3277 												dmstate->retrieved_attrs,
3278 												NULL,
3279 												dmstate->temp_cxt);
3280 			ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3281 		}
3282 		PG_CATCH();
3283 		{
3284 			if (dmstate->result)
3285 				PQclear(dmstate->result);
3286 			PG_RE_THROW();
3287 		}
3288 		PG_END_TRY();
3289 	}
3290 	dmstate->next_tuple++;
3291 
3292 	/* Make slot available for evaluation of the local query RETURNING list. */
3293 	resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
3294 
3295 	return slot;
3296 }
3297 
3298 /*
3299  * Prepare for processing of parameters used in remote query.
3300  */
3301 static void
prepare_query_params(PlanState * node,List * fdw_exprs,int numParams,FmgrInfo ** param_flinfo,List ** param_exprs,const char *** param_values)3302 prepare_query_params(PlanState *node,
3303 					 List *fdw_exprs,
3304 					 int numParams,
3305 					 FmgrInfo **param_flinfo,
3306 					 List **param_exprs,
3307 					 const char ***param_values)
3308 {
3309 	int			i;
3310 	ListCell   *lc;
3311 
3312 	Assert(numParams > 0);
3313 
3314 	/* Prepare for output conversion of parameters used in remote query. */
3315 	*param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
3316 
3317 	i = 0;
3318 	foreach(lc, fdw_exprs)
3319 	{
3320 		Node	   *param_expr = (Node *) lfirst(lc);
3321 		Oid			typefnoid;
3322 		bool		isvarlena;
3323 
3324 		getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
3325 		fmgr_info(typefnoid, &(*param_flinfo)[i]);
3326 		i++;
3327 	}
3328 
3329 	/*
3330 	 * Prepare remote-parameter expressions for evaluation.  (Note: in
3331 	 * practice, we expect that all these expressions will be just Params, so
3332 	 * we could possibly do something more efficient than using the full
3333 	 * expression-eval machinery for this.  But probably there would be little
3334 	 * benefit, and it'd require postgres_fdw to know more than is desirable
3335 	 * about Param evaluation.)
3336 	 */
3337 	*param_exprs = (List *) ExecInitExpr((Expr *) fdw_exprs, node);
3338 
3339 	/* Allocate buffer for text form of query parameters. */
3340 	*param_values = (const char **) palloc0(numParams * sizeof(char *));
3341 }
3342 
3343 /*
3344  * Construct array of query parameter values in text format.
3345  */
3346 static void
process_query_params(ExprContext * econtext,FmgrInfo * param_flinfo,List * param_exprs,const char ** param_values)3347 process_query_params(ExprContext *econtext,
3348 					 FmgrInfo *param_flinfo,
3349 					 List *param_exprs,
3350 					 const char **param_values)
3351 {
3352 	int			nestlevel;
3353 	int			i;
3354 	ListCell   *lc;
3355 
3356 	nestlevel = set_transmission_modes();
3357 
3358 	i = 0;
3359 	foreach(lc, param_exprs)
3360 	{
3361 		ExprState  *expr_state = (ExprState *) lfirst(lc);
3362 		Datum		expr_value;
3363 		bool		isNull;
3364 
3365 		/* Evaluate the parameter expression */
3366 		expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
3367 
3368 		/*
3369 		 * Get string representation of each parameter value by invoking
3370 		 * type-specific output function, unless the value is null.
3371 		 */
3372 		if (isNull)
3373 			param_values[i] = NULL;
3374 		else
3375 			param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
3376 
3377 		i++;
3378 	}
3379 
3380 	reset_transmission_modes(nestlevel);
3381 }
3382 
3383 /*
3384  * postgresAnalyzeForeignTable
3385  *		Test whether analyzing this foreign table is supported
3386  */
3387 static bool
postgresAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)3388 postgresAnalyzeForeignTable(Relation relation,
3389 							AcquireSampleRowsFunc *func,
3390 							BlockNumber *totalpages)
3391 {
3392 	ForeignTable *table;
3393 	UserMapping *user;
3394 	PGconn	   *conn;
3395 	StringInfoData sql;
3396 	PGresult   *volatile res = NULL;
3397 
3398 	/* Return the row-analysis function pointer */
3399 	*func = postgresAcquireSampleRowsFunc;
3400 
3401 	/*
3402 	 * Now we have to get the number of pages.  It's annoying that the ANALYZE
3403 	 * API requires us to return that now, because it forces some duplication
3404 	 * of effort between this routine and postgresAcquireSampleRowsFunc.  But
3405 	 * it's probably not worth redefining that API at this point.
3406 	 */
3407 
3408 	/*
3409 	 * Get the connection to use.  We do the remote access as the table's
3410 	 * owner, even if the ANALYZE was started by some other user.
3411 	 */
3412 	table = GetForeignTable(RelationGetRelid(relation));
3413 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3414 	conn = GetConnection(user, false);
3415 
3416 	/*
3417 	 * Construct command to get page count for relation.
3418 	 */
3419 	initStringInfo(&sql);
3420 	deparseAnalyzeSizeSql(&sql, relation);
3421 
3422 	/* In what follows, do not risk leaking any PGresults. */
3423 	PG_TRY();
3424 	{
3425 		res = pgfdw_exec_query(conn, sql.data);
3426 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3427 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
3428 
3429 		if (PQntuples(res) != 1 || PQnfields(res) != 1)
3430 			elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
3431 		*totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
3432 
3433 		PQclear(res);
3434 		res = NULL;
3435 	}
3436 	PG_CATCH();
3437 	{
3438 		if (res)
3439 			PQclear(res);
3440 		PG_RE_THROW();
3441 	}
3442 	PG_END_TRY();
3443 
3444 	ReleaseConnection(conn);
3445 
3446 	return true;
3447 }
3448 
3449 /*
3450  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
3451  *
3452  * We fetch the whole table from the remote side and pick out some sample rows.
3453  *
3454  * Selected rows are returned in the caller-allocated array rows[],
3455  * which must have at least targrows entries.
3456  * The actual number of rows selected is returned as the function result.
3457  * We also count the total number of rows in the table and return it into
3458  * *totalrows.  Note that *totaldeadrows is always set to 0.
3459  *
3460  * Note that the returned list of rows is not always in order by physical
3461  * position in the table.  Therefore, correlation estimates derived later
3462  * may be meaningless, but it's OK because we don't use the estimates
3463  * currently (the planner only pays attention to correlation for indexscans).
3464  */
3465 static int
postgresAcquireSampleRowsFunc(Relation relation,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)3466 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
3467 							  HeapTuple *rows, int targrows,
3468 							  double *totalrows,
3469 							  double *totaldeadrows)
3470 {
3471 	PgFdwAnalyzeState astate;
3472 	ForeignTable *table;
3473 	ForeignServer *server;
3474 	UserMapping *user;
3475 	PGconn	   *conn;
3476 	unsigned int cursor_number;
3477 	StringInfoData sql;
3478 	PGresult   *volatile res = NULL;
3479 
3480 	/* Initialize workspace state */
3481 	astate.rel = relation;
3482 	astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
3483 
3484 	astate.rows = rows;
3485 	astate.targrows = targrows;
3486 	astate.numrows = 0;
3487 	astate.samplerows = 0;
3488 	astate.rowstoskip = -1;		/* -1 means not set yet */
3489 	reservoir_init_selection_state(&astate.rstate, targrows);
3490 
3491 	/* Remember ANALYZE context, and create a per-tuple temp context */
3492 	astate.anl_cxt = CurrentMemoryContext;
3493 	astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
3494 											"postgres_fdw temporary data",
3495 											ALLOCSET_SMALL_SIZES);
3496 
3497 	/*
3498 	 * Get the connection to use.  We do the remote access as the table's
3499 	 * owner, even if the ANALYZE was started by some other user.
3500 	 */
3501 	table = GetForeignTable(RelationGetRelid(relation));
3502 	server = GetForeignServer(table->serverid);
3503 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3504 	conn = GetConnection(user, false);
3505 
3506 	/*
3507 	 * Construct cursor that retrieves whole rows from remote.
3508 	 */
3509 	cursor_number = GetCursorNumber(conn);
3510 	initStringInfo(&sql);
3511 	appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
3512 	deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
3513 
3514 	/* In what follows, do not risk leaking any PGresults. */
3515 	PG_TRY();
3516 	{
3517 		res = pgfdw_exec_query(conn, sql.data);
3518 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
3519 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
3520 		PQclear(res);
3521 		res = NULL;
3522 
3523 		/* Retrieve and process rows a batch at a time. */
3524 		for (;;)
3525 		{
3526 			char		fetch_sql[64];
3527 			int			fetch_size;
3528 			int			numrows;
3529 			int			i;
3530 			ListCell   *lc;
3531 
3532 			/* Allow users to cancel long query */
3533 			CHECK_FOR_INTERRUPTS();
3534 
3535 			/*
3536 			 * XXX possible future improvement: if rowstoskip is large, we
3537 			 * could issue a MOVE rather than physically fetching the rows,
3538 			 * then just adjust rowstoskip and samplerows appropriately.
3539 			 */
3540 
3541 			/* The fetch size is arbitrary, but shouldn't be enormous. */
3542 			fetch_size = 100;
3543 			foreach(lc, server->options)
3544 			{
3545 				DefElem    *def = (DefElem *) lfirst(lc);
3546 
3547 				if (strcmp(def->defname, "fetch_size") == 0)
3548 				{
3549 					fetch_size = strtol(defGetString(def), NULL, 10);
3550 					break;
3551 				}
3552 			}
3553 			foreach(lc, table->options)
3554 			{
3555 				DefElem    *def = (DefElem *) lfirst(lc);
3556 
3557 				if (strcmp(def->defname, "fetch_size") == 0)
3558 				{
3559 					fetch_size = strtol(defGetString(def), NULL, 10);
3560 					break;
3561 				}
3562 			}
3563 
3564 			/* Fetch some rows */
3565 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
3566 					 fetch_size, cursor_number);
3567 
3568 			res = pgfdw_exec_query(conn, fetch_sql);
3569 			/* On error, report the original query, not the FETCH. */
3570 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
3571 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
3572 
3573 			/* Process whatever we got. */
3574 			numrows = PQntuples(res);
3575 			for (i = 0; i < numrows; i++)
3576 				analyze_row_processor(res, i, &astate);
3577 
3578 			PQclear(res);
3579 			res = NULL;
3580 
3581 			/* Must be EOF if we didn't get all the rows requested. */
3582 			if (numrows < fetch_size)
3583 				break;
3584 		}
3585 
3586 		/* Close the cursor, just to be tidy. */
3587 		close_cursor(conn, cursor_number);
3588 	}
3589 	PG_CATCH();
3590 	{
3591 		if (res)
3592 			PQclear(res);
3593 		PG_RE_THROW();
3594 	}
3595 	PG_END_TRY();
3596 
3597 	ReleaseConnection(conn);
3598 
3599 	/* We assume that we have no dead tuple. */
3600 	*totaldeadrows = 0.0;
3601 
3602 	/* We've retrieved all living tuples from foreign server. */
3603 	*totalrows = astate.samplerows;
3604 
3605 	/*
3606 	 * Emit some interesting relation info
3607 	 */
3608 	ereport(elevel,
3609 			(errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
3610 					RelationGetRelationName(relation),
3611 					astate.samplerows, astate.numrows)));
3612 
3613 	return astate.numrows;
3614 }
3615 
3616 /*
3617  * Collect sample rows from the result of query.
3618  *	 - Use all tuples in sample until target # of samples are collected.
3619  *	 - Subsequently, replace already-sampled tuples randomly.
3620  */
3621 static void
analyze_row_processor(PGresult * res,int row,PgFdwAnalyzeState * astate)3622 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
3623 {
3624 	int			targrows = astate->targrows;
3625 	int			pos;			/* array index to store tuple in */
3626 	MemoryContext oldcontext;
3627 
3628 	/* Always increment sample row counter. */
3629 	astate->samplerows += 1;
3630 
3631 	/*
3632 	 * Determine the slot where this sample row should be stored.  Set pos to
3633 	 * negative value to indicate the row should be skipped.
3634 	 */
3635 	if (astate->numrows < targrows)
3636 	{
3637 		/* First targrows rows are always included into the sample */
3638 		pos = astate->numrows++;
3639 	}
3640 	else
3641 	{
3642 		/*
3643 		 * Now we start replacing tuples in the sample until we reach the end
3644 		 * of the relation.  Same algorithm as in acquire_sample_rows in
3645 		 * analyze.c; see Jeff Vitter's paper.
3646 		 */
3647 		if (astate->rowstoskip < 0)
3648 			astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
3649 
3650 		if (astate->rowstoskip <= 0)
3651 		{
3652 			/* Choose a random reservoir element to replace. */
3653 			pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
3654 			Assert(pos >= 0 && pos < targrows);
3655 			heap_freetuple(astate->rows[pos]);
3656 		}
3657 		else
3658 		{
3659 			/* Skip this tuple. */
3660 			pos = -1;
3661 		}
3662 
3663 		astate->rowstoskip -= 1;
3664 	}
3665 
3666 	if (pos >= 0)
3667 	{
3668 		/*
3669 		 * Create sample tuple from current result row, and store it in the
3670 		 * position determined above.  The tuple has to be created in anl_cxt.
3671 		 */
3672 		oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
3673 
3674 		astate->rows[pos] = make_tuple_from_result_row(res, row,
3675 													   astate->rel,
3676 													   astate->attinmeta,
3677 													 astate->retrieved_attrs,
3678 													   NULL,
3679 													   astate->temp_cxt);
3680 
3681 		MemoryContextSwitchTo(oldcontext);
3682 	}
3683 }
3684 
3685 /*
3686  * Import a foreign schema
3687  */
3688 static List *
postgresImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)3689 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
3690 {
3691 	List	   *commands = NIL;
3692 	bool		import_collate = true;
3693 	bool		import_default = false;
3694 	bool		import_not_null = true;
3695 	ForeignServer *server;
3696 	UserMapping *mapping;
3697 	PGconn	   *conn;
3698 	StringInfoData buf;
3699 	PGresult   *volatile res = NULL;
3700 	int			numrows,
3701 				i;
3702 	ListCell   *lc;
3703 
3704 	/* Parse statement options */
3705 	foreach(lc, stmt->options)
3706 	{
3707 		DefElem    *def = (DefElem *) lfirst(lc);
3708 
3709 		if (strcmp(def->defname, "import_collate") == 0)
3710 			import_collate = defGetBoolean(def);
3711 		else if (strcmp(def->defname, "import_default") == 0)
3712 			import_default = defGetBoolean(def);
3713 		else if (strcmp(def->defname, "import_not_null") == 0)
3714 			import_not_null = defGetBoolean(def);
3715 		else
3716 			ereport(ERROR,
3717 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
3718 					 errmsg("invalid option \"%s\"", def->defname)));
3719 	}
3720 
3721 	/*
3722 	 * Get connection to the foreign server.  Connection manager will
3723 	 * establish new connection if necessary.
3724 	 */
3725 	server = GetForeignServer(serverOid);
3726 	mapping = GetUserMapping(GetUserId(), server->serverid);
3727 	conn = GetConnection(mapping, false);
3728 
3729 	/* Don't attempt to import collation if remote server hasn't got it */
3730 	if (PQserverVersion(conn) < 90100)
3731 		import_collate = false;
3732 
3733 	/* Create workspace for strings */
3734 	initStringInfo(&buf);
3735 
3736 	/* In what follows, do not risk leaking any PGresults. */
3737 	PG_TRY();
3738 	{
3739 		/* Check that the schema really exists */
3740 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
3741 		deparseStringLiteral(&buf, stmt->remote_schema);
3742 
3743 		res = pgfdw_exec_query(conn, buf.data);
3744 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3745 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
3746 
3747 		if (PQntuples(res) != 1)
3748 			ereport(ERROR,
3749 					(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
3750 			  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
3751 					 stmt->remote_schema, server->servername)));
3752 
3753 		PQclear(res);
3754 		res = NULL;
3755 		resetStringInfo(&buf);
3756 
3757 		/*
3758 		 * Fetch all table data from this schema, possibly restricted by
3759 		 * EXCEPT or LIMIT TO.  (We don't actually need to pay any attention
3760 		 * to EXCEPT/LIMIT TO here, because the core code will filter the
3761 		 * statements we return according to those lists anyway.  But it
3762 		 * should save a few cycles to not process excluded tables in the
3763 		 * first place.)
3764 		 *
3765 		 * Note: because we run the connection with search_path restricted to
3766 		 * pg_catalog, the format_type() and pg_get_expr() outputs will always
3767 		 * include a schema name for types/functions in other schemas, which
3768 		 * is what we want.
3769 		 */
3770 		if (import_collate)
3771 			appendStringInfoString(&buf,
3772 								   "SELECT relname, "
3773 								   "  attname, "
3774 								   "  format_type(atttypid, atttypmod), "
3775 								   "  attnotnull, "
3776 								   "  pg_get_expr(adbin, adrelid), "
3777 								   "  collname, "
3778 								   "  collnsp.nspname "
3779 								   "FROM pg_class c "
3780 								   "  JOIN pg_namespace n ON "
3781 								   "    relnamespace = n.oid "
3782 								   "  LEFT JOIN pg_attribute a ON "
3783 								   "    attrelid = c.oid AND attnum > 0 "
3784 								   "      AND NOT attisdropped "
3785 								   "  LEFT JOIN pg_attrdef ad ON "
3786 								   "    adrelid = c.oid AND adnum = attnum "
3787 								   "  LEFT JOIN pg_collation coll ON "
3788 								   "    coll.oid = attcollation "
3789 								   "  LEFT JOIN pg_namespace collnsp ON "
3790 								   "    collnsp.oid = collnamespace ");
3791 		else
3792 			appendStringInfoString(&buf,
3793 								   "SELECT relname, "
3794 								   "  attname, "
3795 								   "  format_type(atttypid, atttypmod), "
3796 								   "  attnotnull, "
3797 								   "  pg_get_expr(adbin, adrelid), "
3798 								   "  NULL, NULL "
3799 								   "FROM pg_class c "
3800 								   "  JOIN pg_namespace n ON "
3801 								   "    relnamespace = n.oid "
3802 								   "  LEFT JOIN pg_attribute a ON "
3803 								   "    attrelid = c.oid AND attnum > 0 "
3804 								   "      AND NOT attisdropped "
3805 								   "  LEFT JOIN pg_attrdef ad ON "
3806 								   "    adrelid = c.oid AND adnum = attnum ");
3807 
3808 		appendStringInfoString(&buf,
3809 							   "WHERE c.relkind IN ('r', 'v', 'f', 'm') "
3810 							   "  AND n.nspname = ");
3811 		deparseStringLiteral(&buf, stmt->remote_schema);
3812 
3813 		/* Apply restrictions for LIMIT TO and EXCEPT */
3814 		if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
3815 			stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3816 		{
3817 			bool		first_item = true;
3818 
3819 			appendStringInfoString(&buf, " AND c.relname ");
3820 			if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3821 				appendStringInfoString(&buf, "NOT ");
3822 			appendStringInfoString(&buf, "IN (");
3823 
3824 			/* Append list of table names within IN clause */
3825 			foreach(lc, stmt->table_list)
3826 			{
3827 				RangeVar   *rv = (RangeVar *) lfirst(lc);
3828 
3829 				if (first_item)
3830 					first_item = false;
3831 				else
3832 					appendStringInfoString(&buf, ", ");
3833 				deparseStringLiteral(&buf, rv->relname);
3834 			}
3835 			appendStringInfoChar(&buf, ')');
3836 		}
3837 
3838 		/* Append ORDER BY at the end of query to ensure output ordering */
3839 		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
3840 
3841 		/* Fetch the data */
3842 		res = pgfdw_exec_query(conn, buf.data);
3843 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3844 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
3845 
3846 		/* Process results */
3847 		numrows = PQntuples(res);
3848 		/* note: incrementation of i happens in inner loop's while() test */
3849 		for (i = 0; i < numrows;)
3850 		{
3851 			char	   *tablename = PQgetvalue(res, i, 0);
3852 			bool		first_item = true;
3853 
3854 			resetStringInfo(&buf);
3855 			appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
3856 							 quote_identifier(tablename));
3857 
3858 			/* Scan all rows for this table */
3859 			do
3860 			{
3861 				char	   *attname;
3862 				char	   *typename;
3863 				char	   *attnotnull;
3864 				char	   *attdefault;
3865 				char	   *collname;
3866 				char	   *collnamespace;
3867 
3868 				/* If table has no columns, we'll see nulls here */
3869 				if (PQgetisnull(res, i, 1))
3870 					continue;
3871 
3872 				attname = PQgetvalue(res, i, 1);
3873 				typename = PQgetvalue(res, i, 2);
3874 				attnotnull = PQgetvalue(res, i, 3);
3875 				attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
3876 					PQgetvalue(res, i, 4);
3877 				collname = PQgetisnull(res, i, 5) ? (char *) NULL :
3878 					PQgetvalue(res, i, 5);
3879 				collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
3880 					PQgetvalue(res, i, 6);
3881 
3882 				if (first_item)
3883 					first_item = false;
3884 				else
3885 					appendStringInfoString(&buf, ",\n");
3886 
3887 				/* Print column name and type */
3888 				appendStringInfo(&buf, "  %s %s",
3889 								 quote_identifier(attname),
3890 								 typename);
3891 
3892 				/*
3893 				 * Add column_name option so that renaming the foreign table's
3894 				 * column doesn't break the association to the underlying
3895 				 * column.
3896 				 */
3897 				appendStringInfoString(&buf, " OPTIONS (column_name ");
3898 				deparseStringLiteral(&buf, attname);
3899 				appendStringInfoChar(&buf, ')');
3900 
3901 				/* Add COLLATE if needed */
3902 				if (import_collate && collname != NULL && collnamespace != NULL)
3903 					appendStringInfo(&buf, " COLLATE %s.%s",
3904 									 quote_identifier(collnamespace),
3905 									 quote_identifier(collname));
3906 
3907 				/* Add DEFAULT if needed */
3908 				if (import_default && attdefault != NULL)
3909 					appendStringInfo(&buf, " DEFAULT %s", attdefault);
3910 
3911 				/* Add NOT NULL if needed */
3912 				if (import_not_null && attnotnull[0] == 't')
3913 					appendStringInfoString(&buf, " NOT NULL");
3914 			}
3915 			while (++i < numrows &&
3916 				   strcmp(PQgetvalue(res, i, 0), tablename) == 0);
3917 
3918 			/*
3919 			 * Add server name and table-level options.  We specify remote
3920 			 * schema and table name as options (the latter to ensure that
3921 			 * renaming the foreign table doesn't break the association).
3922 			 */
3923 			appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
3924 							 quote_identifier(server->servername));
3925 
3926 			appendStringInfoString(&buf, "schema_name ");
3927 			deparseStringLiteral(&buf, stmt->remote_schema);
3928 			appendStringInfoString(&buf, ", table_name ");
3929 			deparseStringLiteral(&buf, tablename);
3930 
3931 			appendStringInfoString(&buf, ");");
3932 
3933 			commands = lappend(commands, pstrdup(buf.data));
3934 		}
3935 
3936 		/* Clean up */
3937 		PQclear(res);
3938 		res = NULL;
3939 	}
3940 	PG_CATCH();
3941 	{
3942 		if (res)
3943 			PQclear(res);
3944 		PG_RE_THROW();
3945 	}
3946 	PG_END_TRY();
3947 
3948 	ReleaseConnection(conn);
3949 
3950 	return commands;
3951 }
3952 
3953 /*
3954  * Assess whether the join between inner and outer relations can be pushed down
3955  * to the foreign server. As a side effect, save information we obtain in this
3956  * function to PgFdwRelationInfo passed in.
3957  */
3958 static bool
foreign_join_ok(PlannerInfo * root,RelOptInfo * joinrel,JoinType jointype,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinPathExtraData * extra)3959 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
3960 				RelOptInfo *outerrel, RelOptInfo *innerrel,
3961 				JoinPathExtraData *extra)
3962 {
3963 	PgFdwRelationInfo *fpinfo;
3964 	PgFdwRelationInfo *fpinfo_o;
3965 	PgFdwRelationInfo *fpinfo_i;
3966 	ListCell   *lc;
3967 	List	   *joinclauses;
3968 	List	   *otherclauses;
3969 
3970 	/*
3971 	 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
3972 	 * Constructing queries representing SEMI and ANTI joins is hard, hence
3973 	 * not considered right now.
3974 	 */
3975 	if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
3976 		jointype != JOIN_RIGHT && jointype != JOIN_FULL)
3977 		return false;
3978 
3979 	/*
3980 	 * If either of the joining relations is marked as unsafe to pushdown, the
3981 	 * join can not be pushed down.
3982 	 */
3983 	fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
3984 	fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
3985 	fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
3986 	if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
3987 		!fpinfo_i || !fpinfo_i->pushdown_safe)
3988 		return false;
3989 
3990 	/*
3991 	 * If joining relations have local conditions, those conditions are
3992 	 * required to be applied before joining the relations. Hence the join can
3993 	 * not be pushed down.
3994 	 */
3995 	if (fpinfo_o->local_conds || fpinfo_i->local_conds)
3996 		return false;
3997 
3998 	/* Separate restrict list into join quals and quals on join relation */
3999 	if (IS_OUTER_JOIN(jointype))
4000 		extract_actual_join_clauses(extra->restrictlist,
4001 									joinrel->relids,
4002 									&joinclauses, &otherclauses);
4003 	else
4004 	{
4005 		/*
4006 		 * Unlike an outer join, for inner join, the join result contains only
4007 		 * the rows which satisfy join clauses, similar to the other clause.
4008 		 * Hence all clauses can be treated as other quals. This helps to push
4009 		 * a join down to the foreign server even if some of its join quals
4010 		 * are not safe to pushdown.
4011 		 */
4012 		otherclauses = extract_actual_clauses(extra->restrictlist, false);
4013 		joinclauses = NIL;
4014 	}
4015 
4016 	/* Get foreign server */
4017 	fpinfo->server = fpinfo_o->server;
4018 
4019 	/*
4020 	 * Copy shippable_extensions before checking whether the foreign join is
4021 	 * OK, so that we know which quals can be evaluated on the foreign server.
4022 	 */
4023 	fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
4024 
4025 	/* Join quals must be safe to push down. */
4026 	foreach(lc, joinclauses)
4027 	{
4028 		Expr	   *expr = (Expr *) lfirst(lc);
4029 
4030 		if (!is_foreign_expr(root, joinrel, expr))
4031 			return false;
4032 	}
4033 
4034 	/*
4035 	 * deparseExplicitTargetList() isn't smart enough to handle anything other
4036 	 * than a Var.  In particular, if there's some PlaceHolderVar that would
4037 	 * need to be evaluated within this join tree (because there's an upper
4038 	 * reference to a quantity that may go to NULL as a result of an outer
4039 	 * join), then we can't try to push the join down because we'll fail when
4040 	 * we get to deparseExplicitTargetList().  However, a PlaceHolderVar that
4041 	 * needs to be evaluated *at the top* of this join tree is OK, because we
4042 	 * can do that locally after fetching the results from the remote side.
4043 	 */
4044 	foreach(lc, root->placeholder_list)
4045 	{
4046 		PlaceHolderInfo *phinfo = lfirst(lc);
4047 		Relids		relids = joinrel->relids;
4048 
4049 		if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4050 			bms_nonempty_difference(relids, phinfo->ph_eval_at))
4051 			return false;
4052 	}
4053 
4054 	/* Save the join clauses, for later use. */
4055 	fpinfo->joinclauses = joinclauses;
4056 
4057 	/*
4058 	 * Other clauses are applied after the join has been performed and thus
4059 	 * need not be all pushable. We will push those which can be pushed to
4060 	 * reduce the number of rows fetched from the foreign server. Rest of them
4061 	 * will be applied locally after fetching join result. Add them to fpinfo
4062 	 * so that other joins involving this joinrel will know that this joinrel
4063 	 * has local clauses.
4064 	 */
4065 	foreach(lc, otherclauses)
4066 	{
4067 		Expr	   *expr = (Expr *) lfirst(lc);
4068 
4069 		if (!is_foreign_expr(root, joinrel, expr))
4070 			fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
4071 		else
4072 			fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
4073 	}
4074 
4075 	fpinfo->outerrel = outerrel;
4076 	fpinfo->innerrel = innerrel;
4077 	fpinfo->jointype = jointype;
4078 
4079 	/*
4080 	 * Pull the other remote conditions from the joining relations into join
4081 	 * clauses or other remote clauses (remote_conds) of this relation
4082 	 * wherever possible. This avoids building subqueries at every join step,
4083 	 * which is not currently supported by the deparser logic.
4084 	 *
4085 	 * For an inner join, clauses from both the relations are added to the
4086 	 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4087 	 * the outer side are added to remote_conds since those can be evaluated
4088 	 * after the join is evaluated. The clauses from inner side are added to
4089 	 * the joinclauses, since they need to evaluated while constructing the
4090 	 * join.
4091 	 *
4092 	 * For a FULL OUTER JOIN, the other clauses from either relation can not
4093 	 * be added to the joinclauses or remote_conds, since each relation acts
4094 	 * as an outer relation for the other. Consider such full outer join as
4095 	 * unshippable because of the reasons mentioned above in this comment.
4096 	 *
4097 	 * The joining sides can not have local conditions, thus no need to test
4098 	 * shippability of the clauses being pulled up.
4099 	 */
4100 	switch (jointype)
4101 	{
4102 		case JOIN_INNER:
4103 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4104 										  list_copy(fpinfo_i->remote_conds));
4105 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4106 										  list_copy(fpinfo_o->remote_conds));
4107 			break;
4108 
4109 		case JOIN_LEFT:
4110 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4111 										  list_copy(fpinfo_i->remote_conds));
4112 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4113 										  list_copy(fpinfo_o->remote_conds));
4114 			break;
4115 
4116 		case JOIN_RIGHT:
4117 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4118 										  list_copy(fpinfo_o->remote_conds));
4119 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4120 										  list_copy(fpinfo_i->remote_conds));
4121 			break;
4122 
4123 		case JOIN_FULL:
4124 			if (fpinfo_i->remote_conds || fpinfo_o->remote_conds)
4125 				return false;
4126 			break;
4127 
4128 		default:
4129 			/* Should not happen, we have just check this above */
4130 			elog(ERROR, "unsupported join type %d", jointype);
4131 	}
4132 
4133 	/*
4134 	 * For an inner join, all restrictions can be treated alike. Treating the
4135 	 * pushed down conditions as join conditions allows a top level full outer
4136 	 * join to be deparsed without requiring subqueries.
4137 	 */
4138 	if (jointype == JOIN_INNER)
4139 	{
4140 		Assert(!fpinfo->joinclauses);
4141 		fpinfo->joinclauses = fpinfo->remote_conds;
4142 		fpinfo->remote_conds = NIL;
4143 	}
4144 
4145 	/* Mark that this join can be pushed down safely */
4146 	fpinfo->pushdown_safe = true;
4147 
4148 	/*
4149 	 * If user is willing to estimate cost for a scan of either of the joining
4150 	 * relations using EXPLAIN, he intends to estimate scans on that relation
4151 	 * more accurately. Then, it makes sense to estimate the cost the join
4152 	 * with that relation more accurately using EXPLAIN.
4153 	 */
4154 	fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
4155 		fpinfo_i->use_remote_estimate;
4156 
4157 	/* Get user mapping */
4158 	if (fpinfo->use_remote_estimate)
4159 	{
4160 		if (fpinfo_o->use_remote_estimate)
4161 			fpinfo->user = fpinfo_o->user;
4162 		else
4163 			fpinfo->user = fpinfo_i->user;
4164 	}
4165 	else
4166 		fpinfo->user = NULL;
4167 
4168 	/*
4169 	 * Since both the joining relations come from the same server, the server
4170 	 * level options should have same value for both the relations. Pick from
4171 	 * any side.
4172 	 */
4173 	fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
4174 	fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
4175 
4176 	/*
4177 	 * Set cached relation costs to some negative value, so that we can detect
4178 	 * when they are set to some sensible costs, during one (usually the
4179 	 * first) of the calls to estimate_path_cost_size().
4180 	 */
4181 	fpinfo->rel_startup_cost = -1;
4182 	fpinfo->rel_total_cost = -1;
4183 
4184 	/*
4185 	 * Set fetch size to maximum of the joining sides, since we are expecting
4186 	 * the rows returned by the join to be proportional to the relation sizes.
4187 	 */
4188 	if (fpinfo_o->fetch_size > fpinfo_i->fetch_size)
4189 		fpinfo->fetch_size = fpinfo_o->fetch_size;
4190 	else
4191 		fpinfo->fetch_size = fpinfo_i->fetch_size;
4192 
4193 	/*
4194 	 * Set the string describing this join relation to be used in EXPLAIN
4195 	 * output of corresponding ForeignScan.
4196 	 */
4197 	fpinfo->relation_name = makeStringInfo();
4198 	appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4199 					 fpinfo_o->relation_name->data,
4200 					 get_jointype_name(fpinfo->jointype),
4201 					 fpinfo_i->relation_name->data);
4202 
4203 	return true;
4204 }
4205 
4206 static void
add_paths_with_pathkeys_for_rel(PlannerInfo * root,RelOptInfo * rel,Path * epq_path)4207 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
4208 								Path *epq_path)
4209 {
4210 	List	   *useful_pathkeys_list = NIL;		/* List of all pathkeys */
4211 	ListCell   *lc;
4212 
4213 	useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4214 
4215 	/* Create one path for each set of pathkeys we found above. */
4216 	foreach(lc, useful_pathkeys_list)
4217 	{
4218 		double		rows;
4219 		int			width;
4220 		Cost		startup_cost;
4221 		Cost		total_cost;
4222 		List	   *useful_pathkeys = lfirst(lc);
4223 		Path	   *sorted_epq_path;
4224 
4225 		estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4226 								&rows, &width, &startup_cost, &total_cost);
4227 
4228 		/*
4229 		 * The EPQ path must be at least as well sorted as the path itself,
4230 		 * in case it gets used as input to a mergejoin.
4231 		 */
4232 		sorted_epq_path = epq_path;
4233 		if (sorted_epq_path != NULL &&
4234 			!pathkeys_contained_in(useful_pathkeys,
4235 								   sorted_epq_path->pathkeys))
4236 			sorted_epq_path = (Path *)
4237 				create_sort_path(root,
4238 								 rel,
4239 								 sorted_epq_path,
4240 								 useful_pathkeys,
4241 								 -1.0);
4242 
4243 		add_path(rel, (Path *)
4244 				 create_foreignscan_path(root, rel,
4245 										 NULL,
4246 										 rows,
4247 										 startup_cost,
4248 										 total_cost,
4249 										 useful_pathkeys,
4250 										 rel->lateral_relids,
4251 										 sorted_epq_path,
4252 										 NIL));
4253 	}
4254 }
4255 
4256 /*
4257  * postgresGetForeignJoinPaths
4258  *		Add possible ForeignPath to joinrel, if join is safe to push down.
4259  */
4260 static void
postgresGetForeignJoinPaths(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)4261 postgresGetForeignJoinPaths(PlannerInfo *root,
4262 							RelOptInfo *joinrel,
4263 							RelOptInfo *outerrel,
4264 							RelOptInfo *innerrel,
4265 							JoinType jointype,
4266 							JoinPathExtraData *extra)
4267 {
4268 	PgFdwRelationInfo *fpinfo;
4269 	ForeignPath *joinpath;
4270 	double		rows;
4271 	int			width;
4272 	Cost		startup_cost;
4273 	Cost		total_cost;
4274 	Path	   *epq_path;		/* Path to create plan to be executed when
4275 								 * EvalPlanQual gets triggered. */
4276 
4277 	/*
4278 	 * Skip if this join combination has been considered already.
4279 	 */
4280 	if (joinrel->fdw_private)
4281 		return;
4282 
4283 	/*
4284 	 * This code does not work for joins with lateral references, since those
4285 	 * must have parameterized paths, which we don't generate yet.
4286 	 */
4287 	if (!bms_is_empty(joinrel->lateral_relids))
4288 		return;
4289 
4290 	/*
4291 	 * Create unfinished PgFdwRelationInfo entry which is used to indicate
4292 	 * that the join relation is already considered, so that we won't waste
4293 	 * time in judging safety of join pushdown and adding the same paths again
4294 	 * if found safe. Once we know that this join can be pushed down, we fill
4295 	 * the entry.
4296 	 */
4297 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4298 	fpinfo->pushdown_safe = false;
4299 	joinrel->fdw_private = fpinfo;
4300 	/* attrs_used is only for base relations. */
4301 	fpinfo->attrs_used = NULL;
4302 
4303 	/*
4304 	 * If there is a possibility that EvalPlanQual will be executed, we need
4305 	 * to be able to reconstruct the row using scans of the base relations.
4306 	 * GetExistingLocalJoinPath will find a suitable path for this purpose in
4307 	 * the path list of the joinrel, if one exists.  We must be careful to
4308 	 * call it before adding any ForeignPath, since the ForeignPath might
4309 	 * dominate the only suitable local path available.  We also do it before
4310 	 * calling foreign_join_ok(), since that function updates fpinfo and marks
4311 	 * it as pushable if the join is found to be pushable.
4312 	 */
4313 	if (root->parse->commandType == CMD_DELETE ||
4314 		root->parse->commandType == CMD_UPDATE ||
4315 		root->rowMarks)
4316 	{
4317 		epq_path = GetExistingLocalJoinPath(joinrel);
4318 		if (!epq_path)
4319 		{
4320 			elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
4321 			return;
4322 		}
4323 	}
4324 	else
4325 		epq_path = NULL;
4326 
4327 	if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
4328 	{
4329 		/* Free path required for EPQ if we copied one; we don't need it now */
4330 		if (epq_path)
4331 			pfree(epq_path);
4332 		return;
4333 	}
4334 
4335 	/*
4336 	 * Compute the selectivity and cost of the local_conds, so we don't have
4337 	 * to do it over again for each path. The best we can do for these
4338 	 * conditions is to estimate selectivity on the basis of local statistics.
4339 	 * The local conditions are applied after the join has been computed on
4340 	 * the remote side like quals in WHERE clause, so pass jointype as
4341 	 * JOIN_INNER.
4342 	 */
4343 	fpinfo->local_conds_sel = clauselist_selectivity(root,
4344 													 fpinfo->local_conds,
4345 													 0,
4346 													 JOIN_INNER,
4347 													 NULL);
4348 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
4349 
4350 	/*
4351 	 * If we are going to estimate costs locally, estimate the join clause
4352 	 * selectivity here while we have special join info.
4353 	 */
4354 	if (!fpinfo->use_remote_estimate)
4355 		fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
4356 														0, fpinfo->jointype,
4357 														extra->sjinfo);
4358 
4359 	/* Estimate costs for bare join relation */
4360 	estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
4361 							&width, &startup_cost, &total_cost);
4362 	/* Now update this information in the joinrel */
4363 	joinrel->rows = rows;
4364 	joinrel->reltarget->width = width;
4365 	fpinfo->rows = rows;
4366 	fpinfo->width = width;
4367 	fpinfo->startup_cost = startup_cost;
4368 	fpinfo->total_cost = total_cost;
4369 
4370 	/*
4371 	 * Create a new join path and add it to the joinrel which represents a
4372 	 * join between foreign tables.
4373 	 */
4374 	joinpath = create_foreignscan_path(root,
4375 									   joinrel,
4376 									   NULL,	/* default pathtarget */
4377 									   rows,
4378 									   startup_cost,
4379 									   total_cost,
4380 									   NIL,		/* no pathkeys */
4381 									   joinrel->lateral_relids,
4382 									   epq_path,
4383 									   NULL);	/* no fdw_private */
4384 
4385 	/* Add generated path into joinrel by add_path(). */
4386 	add_path(joinrel, (Path *) joinpath);
4387 
4388 	/* Consider pathkeys for the join relation */
4389 	add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
4390 
4391 	/* XXX Consider parameterized paths for the join relation */
4392 }
4393 
4394 /*
4395  * Create a tuple from the specified row of the PGresult.
4396  *
4397  * rel is the local representation of the foreign table, attinmeta is
4398  * conversion data for the rel's tupdesc, and retrieved_attrs is an
4399  * integer list of the table column numbers present in the PGresult.
4400  * fsstate is the ForeignScan plan node's execution state.
4401  * temp_context is a working context that can be reset after each tuple.
4402  *
4403  * Note: either rel or fsstate, but not both, can be NULL.  rel is NULL
4404  * if we're processing a remote join, while fsstate is NULL in a non-query
4405  * context such as ANALYZE, or if we're processing a non-scan query node.
4406  */
4407 static HeapTuple
make_tuple_from_result_row(PGresult * res,int row,Relation rel,AttInMetadata * attinmeta,List * retrieved_attrs,ForeignScanState * fsstate,MemoryContext temp_context)4408 make_tuple_from_result_row(PGresult *res,
4409 						   int row,
4410 						   Relation rel,
4411 						   AttInMetadata *attinmeta,
4412 						   List *retrieved_attrs,
4413 						   ForeignScanState *fsstate,
4414 						   MemoryContext temp_context)
4415 {
4416 	HeapTuple	tuple;
4417 	TupleDesc	tupdesc;
4418 	Datum	   *values;
4419 	bool	   *nulls;
4420 	ItemPointer ctid = NULL;
4421 	ConversionLocation errpos;
4422 	ErrorContextCallback errcallback;
4423 	MemoryContext oldcontext;
4424 	ListCell   *lc;
4425 	int			j;
4426 
4427 	Assert(row < PQntuples(res));
4428 
4429 	/*
4430 	 * Do the following work in a temp context that we reset after each tuple.
4431 	 * This cleans up not only the data we have direct access to, but any
4432 	 * cruft the I/O functions might leak.
4433 	 */
4434 	oldcontext = MemoryContextSwitchTo(temp_context);
4435 
4436 	/*
4437 	 * Get the tuple descriptor for the row.  Use the rel's tupdesc if rel is
4438 	 * provided, otherwise look to the scan node's ScanTupleSlot.
4439 	 */
4440 	if (rel)
4441 		tupdesc = RelationGetDescr(rel);
4442 	else
4443 	{
4444 		PgFdwScanState *fdw_sstate;
4445 
4446 		Assert(fsstate);
4447 		fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
4448 		tupdesc = fdw_sstate->tupdesc;
4449 	}
4450 
4451 	values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
4452 	nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
4453 	/* Initialize to nulls for any columns not present in result */
4454 	memset(nulls, true, tupdesc->natts * sizeof(bool));
4455 
4456 	/*
4457 	 * Set up and install callback to report where conversion error occurs.
4458 	 */
4459 	errpos.cur_attno = 0;
4460 	errpos.rel = rel;
4461 	errpos.fsstate = fsstate;
4462 	errcallback.callback = conversion_error_callback;
4463 	errcallback.arg = (void *) &errpos;
4464 	errcallback.previous = error_context_stack;
4465 	error_context_stack = &errcallback;
4466 
4467 	/*
4468 	 * i indexes columns in the relation, j indexes columns in the PGresult.
4469 	 */
4470 	j = 0;
4471 	foreach(lc, retrieved_attrs)
4472 	{
4473 		int			i = lfirst_int(lc);
4474 		char	   *valstr;
4475 
4476 		/* fetch next column's textual value */
4477 		if (PQgetisnull(res, row, j))
4478 			valstr = NULL;
4479 		else
4480 			valstr = PQgetvalue(res, row, j);
4481 
4482 		/* convert value to internal representation */
4483 		errpos.cur_attno = i;
4484 		if (i > 0)
4485 		{
4486 			/* ordinary column */
4487 			Assert(i <= tupdesc->natts);
4488 			nulls[i - 1] = (valstr == NULL);
4489 			/* Apply the input function even to nulls, to support domains */
4490 			values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
4491 											  valstr,
4492 											  attinmeta->attioparams[i - 1],
4493 											  attinmeta->atttypmods[i - 1]);
4494 		}
4495 		else if (i == SelfItemPointerAttributeNumber)
4496 		{
4497 			/* ctid --- note we ignore any other system column in result */
4498 			if (valstr != NULL)
4499 			{
4500 				Datum		datum;
4501 
4502 				datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
4503 				ctid = (ItemPointer) DatumGetPointer(datum);
4504 			}
4505 		}
4506 		errpos.cur_attno = 0;
4507 
4508 		j++;
4509 	}
4510 
4511 	/* Uninstall error context callback. */
4512 	error_context_stack = errcallback.previous;
4513 
4514 	/*
4515 	 * Check we got the expected number of columns.  Note: j == 0 and
4516 	 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
4517 	 */
4518 	if (j > 0 && j != PQnfields(res))
4519 		elog(ERROR, "remote query result does not match the foreign table");
4520 
4521 	/*
4522 	 * Build the result tuple in caller's memory context.
4523 	 */
4524 	MemoryContextSwitchTo(oldcontext);
4525 
4526 	tuple = heap_form_tuple(tupdesc, values, nulls);
4527 
4528 	/*
4529 	 * If we have a CTID to return, install it in both t_self and t_ctid.
4530 	 * t_self is the normal place, but if the tuple is converted to a
4531 	 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
4532 	 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
4533 	 */
4534 	if (ctid)
4535 		tuple->t_self = tuple->t_data->t_ctid = *ctid;
4536 
4537 	/*
4538 	 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
4539 	 * heap_form_tuple.  heap_form_tuple actually creates the tuple with
4540 	 * DatumTupleFields, not HeapTupleFields, but the executor expects
4541 	 * HeapTupleFields and will happily extract system columns on that
4542 	 * assumption.  If we don't do this then, for example, the tuple length
4543 	 * ends up in the xmin field, which isn't what we want.
4544 	 */
4545 	HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
4546 	HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
4547 	HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
4548 
4549 	/* Clean up */
4550 	MemoryContextReset(temp_context);
4551 
4552 	return tuple;
4553 }
4554 
4555 /*
4556  * Callback function which is called when error occurs during column value
4557  * conversion.  Print names of column and relation.
4558  *
4559  * Note that this function mustn't do any catalog lookups, since we are in
4560  * an already-failed transaction.  Fortunately, we can get the needed info
4561  * from the relation or the query's rangetable instead.
4562  */
4563 static void
conversion_error_callback(void * arg)4564 conversion_error_callback(void *arg)
4565 {
4566 	ConversionLocation *errpos = (ConversionLocation *) arg;
4567 	Relation	rel = errpos->rel;
4568 	ForeignScanState *fsstate = errpos->fsstate;
4569 	const char *attname = NULL;
4570 	const char *relname = NULL;
4571 	bool		is_wholerow = false;
4572 
4573 	/*
4574 	 * If we're in a scan node, always use aliases from the rangetable, for
4575 	 * consistency between the simple-relation and remote-join cases.  Look at
4576 	 * the relation's tupdesc only if we're not in a scan node.
4577 	 */
4578 	if (fsstate)
4579 	{
4580 		/* ForeignScan case */
4581 		ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan;
4582 		int			varno = 0;
4583 		AttrNumber	colno = 0;
4584 
4585 		if (fsplan->scan.scanrelid > 0)
4586 		{
4587 			/* error occurred in a scan against a foreign table */
4588 			varno = fsplan->scan.scanrelid;
4589 			colno = errpos->cur_attno;
4590 		}
4591 		else
4592 		{
4593 			/* error occurred in a scan against a foreign join */
4594 			TargetEntry *tle;
4595 
4596 			Assert(IsA(fsplan, ForeignScan));
4597 			tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist,
4598 										   errpos->cur_attno - 1);
4599 			Assert(IsA(tle, TargetEntry));
4600 
4601 			/*
4602 			 * Target list can have Vars and expressions.  For Vars, we can
4603 			 * get some information, however for expressions we can't.  Thus
4604 			 * for expressions, just show generic context message.
4605 			 */
4606 			if (IsA(tle->expr, Var))
4607 			{
4608 				Var		   *var = (Var *) tle->expr;
4609 
4610 				varno = var->varno;
4611 				colno = var->varattno;
4612 			}
4613 		}
4614 
4615 		if (varno > 0)
4616 		{
4617 			EState	   *estate = fsstate->ss.ps.state;
4618 			RangeTblEntry *rte = rt_fetch(varno, estate->es_range_table);
4619 
4620 			relname = rte->eref->aliasname;
4621 
4622 			if (colno == 0)
4623 				is_wholerow = true;
4624 			else if (colno > 0 && colno <= list_length(rte->eref->colnames))
4625 				attname = strVal(list_nth(rte->eref->colnames, colno - 1));
4626 			else if (colno == SelfItemPointerAttributeNumber)
4627 				attname = "ctid";
4628 			else if (colno == ObjectIdAttributeNumber)
4629 				attname = "oid";
4630 		}
4631 	}
4632 	else if (rel)
4633 	{
4634 		/* Non-ForeignScan case (we should always have a rel here) */
4635 		TupleDesc	tupdesc = RelationGetDescr(rel);
4636 
4637 		relname = RelationGetRelationName(rel);
4638 		if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
4639 		{
4640 			Form_pg_attribute attr = TupleDescAttr(tupdesc,
4641 												   errpos->cur_attno - 1);
4642 
4643 			attname = NameStr(attr->attname);
4644 		}
4645 		else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
4646 			attname = "ctid";
4647 	}
4648 
4649 	if (relname && is_wholerow)
4650 		errcontext("whole-row reference to foreign table \"%s\"", relname);
4651 	else if (relname && attname)
4652 		errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
4653 	else
4654 		errcontext("processing expression at position %d in select list",
4655 				   errpos->cur_attno);
4656 }
4657 
4658 /*
4659  * Find an equivalence class member expression, all of whose Vars, come from
4660  * the indicated relation.
4661  */
4662 extern Expr *
find_em_expr_for_rel(EquivalenceClass * ec,RelOptInfo * rel)4663 find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
4664 {
4665 	ListCell   *lc_em;
4666 
4667 	foreach(lc_em, ec->ec_members)
4668 	{
4669 		EquivalenceMember *em = lfirst(lc_em);
4670 
4671 		if (bms_is_subset(em->em_relids, rel->relids) &&
4672 			!bms_is_empty(em->em_relids))
4673 		{
4674 			/*
4675 			 * If there is more than one equivalence member whose Vars are
4676 			 * taken entirely from this relation, we'll be content to choose
4677 			 * any one of those.
4678 			 */
4679 			return em->em_expr;
4680 		}
4681 	}
4682 
4683 	/* We didn't find any suitable equivalence class expression */
4684 	return NULL;
4685 }
4686