1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  *		  Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "catalog/pg_class.h"
20 #include "commands/defrem.h"
21 #include "commands/explain.h"
22 #include "commands/vacuum.h"
23 #include "foreign/fdwapi.h"
24 #include "funcapi.h"
25 #include "miscadmin.h"
26 #include "nodes/makefuncs.h"
27 #include "nodes/nodeFuncs.h"
28 #include "optimizer/cost.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/pathnode.h"
31 #include "optimizer/paths.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "optimizer/tlist.h"
36 #include "parser/parsetree.h"
37 #include "utils/builtins.h"
38 #include "utils/guc.h"
39 #include "utils/lsyscache.h"
40 #include "utils/memutils.h"
41 #include "utils/rel.h"
42 #include "utils/sampling.h"
43 #include "utils/selfuncs.h"
44 
45 PG_MODULE_MAGIC;
46 
47 /* Default CPU cost to start up a foreign query. */
48 #define DEFAULT_FDW_STARTUP_COST	100.0
49 
50 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
51 #define DEFAULT_FDW_TUPLE_COST		0.01
52 
53 /* If no remote estimates, assume a sort costs 20% extra */
54 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
55 
56 /*
57  * Indexes of FDW-private information stored in fdw_private lists.
58  *
59  * These items are indexed with the enum FdwScanPrivateIndex, so an item
60  * can be fetched with list_nth().  For example, to get the SELECT statement:
61  *		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
62  */
63 enum FdwScanPrivateIndex
64 {
65 	/* SQL statement to execute remotely (as a String node) */
66 	FdwScanPrivateSelectSql,
67 	/* Integer list of attribute numbers retrieved by the SELECT */
68 	FdwScanPrivateRetrievedAttrs,
69 	/* Integer representing the desired fetch_size */
70 	FdwScanPrivateFetchSize,
71 
72 	/*
73 	 * String describing join i.e. names of relations being joined and types
74 	 * of join, added when the scan is join
75 	 */
76 	FdwScanPrivateRelations
77 };
78 
79 /*
80  * Similarly, this enum describes what's kept in the fdw_private list for
81  * a ModifyTable node referencing a postgres_fdw foreign table.  We store:
82  *
83  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
84  * 2) Integer list of target attribute numbers for INSERT/UPDATE
85  *	  (NIL for a DELETE)
86  * 3) Boolean flag showing if the remote query has a RETURNING clause
87  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
88  */
89 enum FdwModifyPrivateIndex
90 {
91 	/* SQL statement to execute remotely (as a String node) */
92 	FdwModifyPrivateUpdateSql,
93 	/* Integer list of target attribute numbers for INSERT/UPDATE */
94 	FdwModifyPrivateTargetAttnums,
95 	/* has-returning flag (as an integer Value node) */
96 	FdwModifyPrivateHasReturning,
97 	/* Integer list of attribute numbers retrieved by RETURNING */
98 	FdwModifyPrivateRetrievedAttrs
99 };
100 
101 /*
102  * Similarly, this enum describes what's kept in the fdw_private list for
103  * a ForeignScan node that modifies a foreign table directly.  We store:
104  *
105  * 1) UPDATE/DELETE statement text to be sent to the remote server
106  * 2) Boolean flag showing if the remote query has a RETURNING clause
107  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
108  * 4) Boolean flag showing if we set the command es_processed
109  */
110 enum FdwDirectModifyPrivateIndex
111 {
112 	/* SQL statement to execute remotely (as a String node) */
113 	FdwDirectModifyPrivateUpdateSql,
114 	/* has-returning flag (as an integer Value node) */
115 	FdwDirectModifyPrivateHasReturning,
116 	/* Integer list of attribute numbers retrieved by RETURNING */
117 	FdwDirectModifyPrivateRetrievedAttrs,
118 	/* set-processed flag (as an integer Value node) */
119 	FdwDirectModifyPrivateSetProcessed
120 };
121 
122 /*
123  * Execution state of a foreign scan using postgres_fdw.
124  */
125 typedef struct PgFdwScanState
126 {
127 	Relation	rel;			/* relcache entry for the foreign table. NULL
128 								 * for a foreign join scan. */
129 	TupleDesc	tupdesc;		/* tuple descriptor of scan */
130 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
131 
132 	/* extracted fdw_private data */
133 	char	   *query;			/* text of SELECT command */
134 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
135 
136 	/* for remote query execution */
137 	PGconn	   *conn;			/* connection for the scan */
138 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
139 	bool		cursor_exists;	/* have we created the cursor? */
140 	int			numParams;		/* number of parameters passed to query */
141 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
142 	List	   *param_exprs;	/* executable expressions for param values */
143 	const char **param_values;	/* textual values of query parameters */
144 
145 	/* for storing result tuples */
146 	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
147 	int			num_tuples;		/* # of tuples in array */
148 	int			next_tuple;		/* index of next one to return */
149 
150 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
151 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
152 	bool		eof_reached;	/* true if last fetch reached EOF */
153 
154 	/* working memory contexts */
155 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
156 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
157 
158 	int			fetch_size;		/* number of tuples per fetch */
159 } PgFdwScanState;
160 
161 /*
162  * Execution state of a foreign insert/update/delete operation.
163  */
164 typedef struct PgFdwModifyState
165 {
166 	Relation	rel;			/* relcache entry for the foreign table */
167 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
168 
169 	/* for remote query execution */
170 	PGconn	   *conn;			/* connection for the scan */
171 	char	   *p_name;			/* name of prepared statement, if created */
172 
173 	/* extracted fdw_private data */
174 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
175 	List	   *target_attrs;	/* list of target attribute numbers */
176 	bool		has_returning;	/* is there a RETURNING clause? */
177 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
178 
179 	/* info about parameters for prepared statement */
180 	AttrNumber	ctidAttno;		/* attnum of input resjunk ctid column */
181 	int			p_nums;			/* number of parameters to transmit */
182 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
183 
184 	/* working memory context */
185 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
186 } PgFdwModifyState;
187 
188 /*
189  * Execution state of a foreign scan that modifies a foreign table directly.
190  */
191 typedef struct PgFdwDirectModifyState
192 {
193 	Relation	rel;			/* relcache entry for the foreign table */
194 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
195 
196 	/* extracted fdw_private data */
197 	char	   *query;			/* text of UPDATE/DELETE command */
198 	bool		has_returning;	/* is there a RETURNING clause? */
199 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
200 	bool		set_processed;	/* do we set the command es_processed? */
201 
202 	/* for remote query execution */
203 	PGconn	   *conn;			/* connection for the update */
204 	int			numParams;		/* number of parameters passed to query */
205 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
206 	List	   *param_exprs;	/* executable expressions for param values */
207 	const char **param_values;	/* textual values of query parameters */
208 
209 	/* for storing result tuples */
210 	PGresult   *result;			/* result for query */
211 	int			num_tuples;		/* # of result tuples */
212 	int			next_tuple;		/* index of next one to return */
213 
214 	/* working memory context */
215 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
216 } PgFdwDirectModifyState;
217 
218 /*
219  * Workspace for analyzing a foreign table.
220  */
221 typedef struct PgFdwAnalyzeState
222 {
223 	Relation	rel;			/* relcache entry for the foreign table */
224 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
225 	List	   *retrieved_attrs;	/* attr numbers retrieved by query */
226 
227 	/* collected sample rows */
228 	HeapTuple  *rows;			/* array of size targrows */
229 	int			targrows;		/* target # of sample rows */
230 	int			numrows;		/* # of sample rows collected */
231 
232 	/* for random sampling */
233 	double		samplerows;		/* # of rows fetched */
234 	double		rowstoskip;		/* # of rows to skip before next sample */
235 	ReservoirStateData rstate;	/* state for reservoir sampling */
236 
237 	/* working memory contexts */
238 	MemoryContext anl_cxt;		/* context for per-analyze lifespan data */
239 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
240 } PgFdwAnalyzeState;
241 
242 /*
243  * Identify the attribute where data conversion fails.
244  */
245 typedef struct ConversionLocation
246 {
247 	AttrNumber	cur_attno;		/* attribute number being processed, or 0 */
248 	Relation	rel;			/* foreign table being processed, or NULL */
249 	ForeignScanState *fsstate;	/* plan node being processed, or NULL */
250 } ConversionLocation;
251 
252 /* Callback argument for ec_member_matches_foreign */
253 typedef struct
254 {
255 	Expr	   *current;		/* current expr, or NULL if not yet found */
256 	List	   *already_used;	/* expressions already dealt with */
257 } ec_member_foreign_arg;
258 
259 /*
260  * SQL functions
261  */
262 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
263 
264 /*
265  * FDW callback routines
266  */
267 static void postgresGetForeignRelSize(PlannerInfo *root,
268 						  RelOptInfo *baserel,
269 						  Oid foreigntableid);
270 static void postgresGetForeignPaths(PlannerInfo *root,
271 						RelOptInfo *baserel,
272 						Oid foreigntableid);
273 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
274 					   RelOptInfo *baserel,
275 					   Oid foreigntableid,
276 					   ForeignPath *best_path,
277 					   List *tlist,
278 					   List *scan_clauses,
279 					   Plan *outer_plan);
280 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
281 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
282 static void postgresReScanForeignScan(ForeignScanState *node);
283 static void postgresEndForeignScan(ForeignScanState *node);
284 static void postgresAddForeignUpdateTargets(Query *parsetree,
285 								RangeTblEntry *target_rte,
286 								Relation target_relation);
287 static List *postgresPlanForeignModify(PlannerInfo *root,
288 						  ModifyTable *plan,
289 						  Index resultRelation,
290 						  int subplan_index);
291 static void postgresBeginForeignModify(ModifyTableState *mtstate,
292 						   ResultRelInfo *resultRelInfo,
293 						   List *fdw_private,
294 						   int subplan_index,
295 						   int eflags);
296 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
297 						  ResultRelInfo *resultRelInfo,
298 						  TupleTableSlot *slot,
299 						  TupleTableSlot *planSlot);
300 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
301 						  ResultRelInfo *resultRelInfo,
302 						  TupleTableSlot *slot,
303 						  TupleTableSlot *planSlot);
304 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
305 						  ResultRelInfo *resultRelInfo,
306 						  TupleTableSlot *slot,
307 						  TupleTableSlot *planSlot);
308 static void postgresEndForeignModify(EState *estate,
309 						 ResultRelInfo *resultRelInfo);
310 static int	postgresIsForeignRelUpdatable(Relation rel);
311 static bool postgresPlanDirectModify(PlannerInfo *root,
312 						 ModifyTable *plan,
313 						 Index resultRelation,
314 						 int subplan_index);
315 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
316 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
317 static void postgresEndDirectModify(ForeignScanState *node);
318 static void postgresExplainForeignScan(ForeignScanState *node,
319 						   ExplainState *es);
320 static void postgresExplainForeignModify(ModifyTableState *mtstate,
321 							 ResultRelInfo *rinfo,
322 							 List *fdw_private,
323 							 int subplan_index,
324 							 ExplainState *es);
325 static void postgresExplainDirectModify(ForeignScanState *node,
326 							ExplainState *es);
327 static bool postgresAnalyzeForeignTable(Relation relation,
328 							AcquireSampleRowsFunc *func,
329 							BlockNumber *totalpages);
330 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
331 							Oid serverOid);
332 static void postgresGetForeignJoinPaths(PlannerInfo *root,
333 							RelOptInfo *joinrel,
334 							RelOptInfo *outerrel,
335 							RelOptInfo *innerrel,
336 							JoinType jointype,
337 							JoinPathExtraData *extra);
338 static bool postgresRecheckForeignScan(ForeignScanState *node,
339 						   TupleTableSlot *slot);
340 static void postgresGetForeignUpperPaths(PlannerInfo *root,
341 							 UpperRelationKind stage,
342 							 RelOptInfo *input_rel,
343 							 RelOptInfo *output_rel);
344 
345 /*
346  * Helper functions
347  */
348 static void estimate_path_cost_size(PlannerInfo *root,
349 						RelOptInfo *baserel,
350 						List *join_conds,
351 						List *pathkeys,
352 						double *p_rows, int *p_width,
353 						Cost *p_startup_cost, Cost *p_total_cost);
354 static void get_remote_estimate(const char *sql,
355 					PGconn *conn,
356 					double *rows,
357 					int *width,
358 					Cost *startup_cost,
359 					Cost *total_cost);
360 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
361 						  EquivalenceClass *ec, EquivalenceMember *em,
362 						  void *arg);
363 static void create_cursor(ForeignScanState *node);
364 static void fetch_more_data(ForeignScanState *node);
365 static void close_cursor(PGconn *conn, unsigned int cursor_number);
366 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
367 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
368 						 ItemPointer tupleid,
369 						 TupleTableSlot *slot);
370 static void store_returning_result(PgFdwModifyState *fmstate,
371 					   TupleTableSlot *slot, PGresult *res);
372 static void execute_dml_stmt(ForeignScanState *node);
373 static TupleTableSlot *get_returning_data(ForeignScanState *node);
374 static void prepare_query_params(PlanState *node,
375 					 List *fdw_exprs,
376 					 int numParams,
377 					 FmgrInfo **param_flinfo,
378 					 List **param_exprs,
379 					 const char ***param_values);
380 static void process_query_params(ExprContext *econtext,
381 					 FmgrInfo *param_flinfo,
382 					 List *param_exprs,
383 					 const char **param_values);
384 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
385 							  HeapTuple *rows, int targrows,
386 							  double *totalrows,
387 							  double *totaldeadrows);
388 static void analyze_row_processor(PGresult *res, int row,
389 					  PgFdwAnalyzeState *astate);
390 static HeapTuple make_tuple_from_result_row(PGresult *res,
391 						   int row,
392 						   Relation rel,
393 						   AttInMetadata *attinmeta,
394 						   List *retrieved_attrs,
395 						   ForeignScanState *fsstate,
396 						   MemoryContext temp_context);
397 static void conversion_error_callback(void *arg);
398 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
399 				JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
400 				JoinPathExtraData *extra);
401 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel);
402 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
403 								 RelOptInfo *rel);
404 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
405 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
406 								Path *epq_path);
407 static void add_foreign_grouping_paths(PlannerInfo *root,
408 						   RelOptInfo *input_rel,
409 						   RelOptInfo *grouped_rel);
410 static void apply_server_options(PgFdwRelationInfo *fpinfo);
411 static void apply_table_options(PgFdwRelationInfo *fpinfo);
412 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
413 				  const PgFdwRelationInfo *fpinfo_o,
414 				  const PgFdwRelationInfo *fpinfo_i);
415 
416 
417 /*
418  * Foreign-data wrapper handler function: return a struct with pointers
419  * to my callback routines.
420  */
421 Datum
postgres_fdw_handler(PG_FUNCTION_ARGS)422 postgres_fdw_handler(PG_FUNCTION_ARGS)
423 {
424 	FdwRoutine *routine = makeNode(FdwRoutine);
425 
426 	/* Functions for scanning foreign tables */
427 	routine->GetForeignRelSize = postgresGetForeignRelSize;
428 	routine->GetForeignPaths = postgresGetForeignPaths;
429 	routine->GetForeignPlan = postgresGetForeignPlan;
430 	routine->BeginForeignScan = postgresBeginForeignScan;
431 	routine->IterateForeignScan = postgresIterateForeignScan;
432 	routine->ReScanForeignScan = postgresReScanForeignScan;
433 	routine->EndForeignScan = postgresEndForeignScan;
434 
435 	/* Functions for updating foreign tables */
436 	routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
437 	routine->PlanForeignModify = postgresPlanForeignModify;
438 	routine->BeginForeignModify = postgresBeginForeignModify;
439 	routine->ExecForeignInsert = postgresExecForeignInsert;
440 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
441 	routine->ExecForeignDelete = postgresExecForeignDelete;
442 	routine->EndForeignModify = postgresEndForeignModify;
443 	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
444 	routine->PlanDirectModify = postgresPlanDirectModify;
445 	routine->BeginDirectModify = postgresBeginDirectModify;
446 	routine->IterateDirectModify = postgresIterateDirectModify;
447 	routine->EndDirectModify = postgresEndDirectModify;
448 
449 	/* Function for EvalPlanQual rechecks */
450 	routine->RecheckForeignScan = postgresRecheckForeignScan;
451 	/* Support functions for EXPLAIN */
452 	routine->ExplainForeignScan = postgresExplainForeignScan;
453 	routine->ExplainForeignModify = postgresExplainForeignModify;
454 	routine->ExplainDirectModify = postgresExplainDirectModify;
455 
456 	/* Support functions for ANALYZE */
457 	routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
458 
459 	/* Support functions for IMPORT FOREIGN SCHEMA */
460 	routine->ImportForeignSchema = postgresImportForeignSchema;
461 
462 	/* Support functions for join push-down */
463 	routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
464 
465 	/* Support functions for upper relation push-down */
466 	routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
467 
468 	PG_RETURN_POINTER(routine);
469 }
470 
471 /*
472  * postgresGetForeignRelSize
473  *		Estimate # of rows and width of the result of the scan
474  *
475  * We should consider the effect of all baserestrictinfo clauses here, but
476  * not any join clauses.
477  */
478 static void
postgresGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)479 postgresGetForeignRelSize(PlannerInfo *root,
480 						  RelOptInfo *baserel,
481 						  Oid foreigntableid)
482 {
483 	PgFdwRelationInfo *fpinfo;
484 	ListCell   *lc;
485 	RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
486 	const char *namespace;
487 	const char *relname;
488 	const char *refname;
489 
490 	/*
491 	 * We use PgFdwRelationInfo to pass various information to subsequent
492 	 * functions.
493 	 */
494 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
495 	baserel->fdw_private = (void *) fpinfo;
496 
497 	/* Base foreign tables need to be pushed down always. */
498 	fpinfo->pushdown_safe = true;
499 
500 	/* Look up foreign-table catalog info. */
501 	fpinfo->table = GetForeignTable(foreigntableid);
502 	fpinfo->server = GetForeignServer(fpinfo->table->serverid);
503 
504 	/*
505 	 * Extract user-settable option values.  Note that per-table settings of
506 	 * use_remote_estimate and fetch_size override per-server settings of
507 	 * them, respectively.
508 	 */
509 	fpinfo->use_remote_estimate = false;
510 	fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
511 	fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
512 	fpinfo->shippable_extensions = NIL;
513 	fpinfo->fetch_size = 100;
514 
515 	apply_server_options(fpinfo);
516 	apply_table_options(fpinfo);
517 
518 	/*
519 	 * If the table or the server is configured to use remote estimates,
520 	 * identify which user to do remote access as during planning.  This
521 	 * should match what ExecCheckRTEPerms() does.  If we fail due to lack of
522 	 * permissions, the query would have failed at runtime anyway.
523 	 */
524 	if (fpinfo->use_remote_estimate)
525 	{
526 		Oid			userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
527 
528 		fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
529 	}
530 	else
531 		fpinfo->user = NULL;
532 
533 	/*
534 	 * Identify which baserestrictinfo clauses can be sent to the remote
535 	 * server and which can't.
536 	 */
537 	classifyConditions(root, baserel, baserel->baserestrictinfo,
538 					   &fpinfo->remote_conds, &fpinfo->local_conds);
539 
540 	/*
541 	 * Identify which attributes will need to be retrieved from the remote
542 	 * server.  These include all attrs needed for joins or final output, plus
543 	 * all attrs used in the local_conds.  (Note: if we end up using a
544 	 * parameterized scan, it's possible that some of the join clauses will be
545 	 * sent to the remote and thus we wouldn't really need to retrieve the
546 	 * columns used in them.  Doesn't seem worth detecting that case though.)
547 	 */
548 	fpinfo->attrs_used = NULL;
549 	pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
550 				   &fpinfo->attrs_used);
551 	foreach(lc, fpinfo->local_conds)
552 	{
553 		RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
554 
555 		pull_varattnos((Node *) rinfo->clause, baserel->relid,
556 					   &fpinfo->attrs_used);
557 	}
558 
559 	/*
560 	 * Compute the selectivity and cost of the local_conds, so we don't have
561 	 * to do it over again for each path.  The best we can do for these
562 	 * conditions is to estimate selectivity on the basis of local statistics.
563 	 */
564 	fpinfo->local_conds_sel = clauselist_selectivity(root,
565 													 fpinfo->local_conds,
566 													 baserel->relid,
567 													 JOIN_INNER,
568 													 NULL);
569 
570 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
571 
572 	/*
573 	 * Set cached relation costs to some negative value, so that we can detect
574 	 * when they are set to some sensible costs during one (usually the first)
575 	 * of the calls to estimate_path_cost_size().
576 	 */
577 	fpinfo->rel_startup_cost = -1;
578 	fpinfo->rel_total_cost = -1;
579 
580 	/*
581 	 * If the table or the server is configured to use remote estimates,
582 	 * connect to the foreign server and execute EXPLAIN to estimate the
583 	 * number of rows selected by the restriction clauses, as well as the
584 	 * average row width.  Otherwise, estimate using whatever statistics we
585 	 * have locally, in a way similar to ordinary tables.
586 	 */
587 	if (fpinfo->use_remote_estimate)
588 	{
589 		/*
590 		 * Get cost/size estimates with help of remote server.  Save the
591 		 * values in fpinfo so we don't need to do it again to generate the
592 		 * basic foreign path.
593 		 */
594 		estimate_path_cost_size(root, baserel, NIL, NIL,
595 								&fpinfo->rows, &fpinfo->width,
596 								&fpinfo->startup_cost, &fpinfo->total_cost);
597 
598 		/* Report estimated baserel size to planner. */
599 		baserel->rows = fpinfo->rows;
600 		baserel->reltarget->width = fpinfo->width;
601 	}
602 	else
603 	{
604 		/*
605 		 * If the foreign table has never been ANALYZEd, it will have relpages
606 		 * and reltuples equal to zero, which most likely has nothing to do
607 		 * with reality.  We can't do a whole lot about that if we're not
608 		 * allowed to consult the remote server, but we can use a hack similar
609 		 * to plancat.c's treatment of empty relations: use a minimum size
610 		 * estimate of 10 pages, and divide by the column-datatype-based width
611 		 * estimate to get the corresponding number of tuples.
612 		 */
613 		if (baserel->pages == 0 && baserel->tuples == 0)
614 		{
615 			baserel->pages = 10;
616 			baserel->tuples =
617 				(10 * BLCKSZ) / (baserel->reltarget->width +
618 								 MAXALIGN(SizeofHeapTupleHeader));
619 		}
620 
621 		/* Estimate baserel size as best we can with local statistics. */
622 		set_baserel_size_estimates(root, baserel);
623 
624 		/* Fill in basically-bogus cost estimates for use later. */
625 		estimate_path_cost_size(root, baserel, NIL, NIL,
626 								&fpinfo->rows, &fpinfo->width,
627 								&fpinfo->startup_cost, &fpinfo->total_cost);
628 	}
629 
630 	/*
631 	 * Set the name of relation in fpinfo, while we are constructing it here.
632 	 * It will be used to build the string describing the join relation in
633 	 * EXPLAIN output. We can't know whether VERBOSE option is specified or
634 	 * not, so always schema-qualify the foreign table name.
635 	 */
636 	fpinfo->relation_name = makeStringInfo();
637 	namespace = get_namespace_name(get_rel_namespace(foreigntableid));
638 	relname = get_rel_name(foreigntableid);
639 	refname = rte->eref->aliasname;
640 	appendStringInfo(fpinfo->relation_name, "%s.%s",
641 					 quote_identifier(namespace),
642 					 quote_identifier(relname));
643 	if (*refname && strcmp(refname, relname) != 0)
644 		appendStringInfo(fpinfo->relation_name, " %s",
645 						 quote_identifier(rte->eref->aliasname));
646 
647 	/* No outer and inner relations. */
648 	fpinfo->make_outerrel_subquery = false;
649 	fpinfo->make_innerrel_subquery = false;
650 	fpinfo->lower_subquery_rels = NULL;
651 	/* Set the relation index. */
652 	fpinfo->relation_index = baserel->relid;
653 }
654 
655 /*
656  * get_useful_ecs_for_relation
657  *		Determine which EquivalenceClasses might be involved in useful
658  *		orderings of this relation.
659  *
660  * This function is in some respects a mirror image of the core function
661  * pathkeys_useful_for_merging: for a regular table, we know what indexes
662  * we have and want to test whether any of them are useful.  For a foreign
663  * table, we don't know what indexes are present on the remote side but
664  * want to speculate about which ones we'd like to use if they existed.
665  *
666  * This function returns a list of potentially-useful equivalence classes,
667  * but it does not guarantee that an EquivalenceMember exists which contains
668  * Vars only from the given relation.  For example, given ft1 JOIN t1 ON
669  * ft1.x + t1.x = 0, this function will say that the equivalence class
670  * containing ft1.x + t1.x is potentially useful.  Supposing ft1 is remote and
671  * t1 is local (or on a different server), it will turn out that no useful
672  * ORDER BY clause can be generated.  It's not our job to figure that out
673  * here; we're only interested in identifying relevant ECs.
674  */
675 static List *
get_useful_ecs_for_relation(PlannerInfo * root,RelOptInfo * rel)676 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
677 {
678 	List	   *useful_eclass_list = NIL;
679 	ListCell   *lc;
680 	Relids		relids;
681 
682 	/*
683 	 * First, consider whether any active EC is potentially useful for a merge
684 	 * join against this relation.
685 	 */
686 	if (rel->has_eclass_joins)
687 	{
688 		foreach(lc, root->eq_classes)
689 		{
690 			EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
691 
692 			if (eclass_useful_for_merging(root, cur_ec, rel))
693 				useful_eclass_list = lappend(useful_eclass_list, cur_ec);
694 		}
695 	}
696 
697 	/*
698 	 * Next, consider whether there are any non-EC derivable join clauses that
699 	 * are merge-joinable.  If the joininfo list is empty, we can exit
700 	 * quickly.
701 	 */
702 	if (rel->joininfo == NIL)
703 		return useful_eclass_list;
704 
705 	/* If this is a child rel, we must use the topmost parent rel to search. */
706 	if (IS_OTHER_REL(rel))
707 	{
708 		Assert(!bms_is_empty(rel->top_parent_relids));
709 		relids = rel->top_parent_relids;
710 	}
711 	else
712 		relids = rel->relids;
713 
714 	/* Check each join clause in turn. */
715 	foreach(lc, rel->joininfo)
716 	{
717 		RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
718 
719 		/* Consider only mergejoinable clauses */
720 		if (restrictinfo->mergeopfamilies == NIL)
721 			continue;
722 
723 		/* Make sure we've got canonical ECs. */
724 		update_mergeclause_eclasses(root, restrictinfo);
725 
726 		/*
727 		 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
728 		 * that left_ec and right_ec will be initialized, per comments in
729 		 * distribute_qual_to_rels.
730 		 *
731 		 * We want to identify which side of this merge-joinable clause
732 		 * contains columns from the relation produced by this RelOptInfo. We
733 		 * test for overlap, not containment, because there could be extra
734 		 * relations on either side.  For example, suppose we've got something
735 		 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
736 		 * A.y = D.y.  The input rel might be the joinrel between A and B, and
737 		 * we'll consider the join clause A.y = D.y. relids contains a
738 		 * relation not involved in the join class (B) and the equivalence
739 		 * class for the left-hand side of the clause contains a relation not
740 		 * involved in the input rel (C).  Despite the fact that we have only
741 		 * overlap and not containment in either direction, A.y is potentially
742 		 * useful as a sort column.
743 		 *
744 		 * Note that it's even possible that relids overlaps neither side of
745 		 * the join clause.  For example, consider A LEFT JOIN B ON A.x = B.x
746 		 * AND A.x = 1.  The clause A.x = 1 will appear in B's joininfo list,
747 		 * but overlaps neither side of B.  In that case, we just skip this
748 		 * join clause, since it doesn't suggest a useful sort order for this
749 		 * relation.
750 		 */
751 		if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
752 			useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
753 														restrictinfo->right_ec);
754 		else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
755 			useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
756 														restrictinfo->left_ec);
757 	}
758 
759 	return useful_eclass_list;
760 }
761 
762 /*
763  * get_useful_pathkeys_for_relation
764  *		Determine which orderings of a relation might be useful.
765  *
766  * Getting data in sorted order can be useful either because the requested
767  * order matches the final output ordering for the overall query we're
768  * planning, or because it enables an efficient merge join.  Here, we try
769  * to figure out which pathkeys to consider.
770  */
771 static List *
get_useful_pathkeys_for_relation(PlannerInfo * root,RelOptInfo * rel)772 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
773 {
774 	List	   *useful_pathkeys_list = NIL;
775 	List	   *useful_eclass_list;
776 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
777 	EquivalenceClass *query_ec = NULL;
778 	ListCell   *lc;
779 
780 	/*
781 	 * Pushing the query_pathkeys to the remote server is always worth
782 	 * considering, because it might let us avoid a local sort.
783 	 */
784 	if (root->query_pathkeys)
785 	{
786 		bool		query_pathkeys_ok = true;
787 
788 		foreach(lc, root->query_pathkeys)
789 		{
790 			PathKey    *pathkey = (PathKey *) lfirst(lc);
791 			EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
792 			Expr	   *em_expr;
793 
794 			/*
795 			 * The planner and executor don't have any clever strategy for
796 			 * taking data sorted by a prefix of the query's pathkeys and
797 			 * getting it to be sorted by all of those pathkeys. We'll just
798 			 * end up resorting the entire data set.  So, unless we can push
799 			 * down all of the query pathkeys, forget it.
800 			 *
801 			 * is_foreign_expr would detect volatile expressions as well, but
802 			 * checking ec_has_volatile here saves some cycles.
803 			 */
804 			if (pathkey_ec->ec_has_volatile ||
805 				!(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
806 				!is_foreign_expr(root, rel, em_expr))
807 			{
808 				query_pathkeys_ok = false;
809 				break;
810 			}
811 		}
812 
813 		if (query_pathkeys_ok)
814 			useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
815 	}
816 
817 	/*
818 	 * Even if we're not using remote estimates, having the remote side do the
819 	 * sort generally won't be any worse than doing it locally, and it might
820 	 * be much better if the remote side can generate data in the right order
821 	 * without needing a sort at all.  However, what we're going to do next is
822 	 * try to generate pathkeys that seem promising for possible merge joins,
823 	 * and that's more speculative.  A wrong choice might hurt quite a bit, so
824 	 * bail out if we can't use remote estimates.
825 	 */
826 	if (!fpinfo->use_remote_estimate)
827 		return useful_pathkeys_list;
828 
829 	/* Get the list of interesting EquivalenceClasses. */
830 	useful_eclass_list = get_useful_ecs_for_relation(root, rel);
831 
832 	/* Extract unique EC for query, if any, so we don't consider it again. */
833 	if (list_length(root->query_pathkeys) == 1)
834 	{
835 		PathKey    *query_pathkey = linitial(root->query_pathkeys);
836 
837 		query_ec = query_pathkey->pk_eclass;
838 	}
839 
840 	/*
841 	 * As a heuristic, the only pathkeys we consider here are those of length
842 	 * one.  It's surely possible to consider more, but since each one we
843 	 * choose to consider will generate a round-trip to the remote side, we
844 	 * need to be a bit cautious here.  It would sure be nice to have a local
845 	 * cache of information about remote index definitions...
846 	 */
847 	foreach(lc, useful_eclass_list)
848 	{
849 		EquivalenceClass *cur_ec = lfirst(lc);
850 		Expr	   *em_expr;
851 		PathKey    *pathkey;
852 
853 		/* If redundant with what we did above, skip it. */
854 		if (cur_ec == query_ec)
855 			continue;
856 
857 		/* If no pushable expression for this rel, skip it. */
858 		em_expr = find_em_expr_for_rel(cur_ec, rel);
859 		if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
860 			continue;
861 
862 		/* Looks like we can generate a pathkey, so let's do it. */
863 		pathkey = make_canonical_pathkey(root, cur_ec,
864 										 linitial_oid(cur_ec->ec_opfamilies),
865 										 BTLessStrategyNumber,
866 										 false);
867 		useful_pathkeys_list = lappend(useful_pathkeys_list,
868 									   list_make1(pathkey));
869 	}
870 
871 	return useful_pathkeys_list;
872 }
873 
874 /*
875  * postgresGetForeignPaths
876  *		Create possible scan paths for a scan on the foreign table
877  */
878 static void
postgresGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)879 postgresGetForeignPaths(PlannerInfo *root,
880 						RelOptInfo *baserel,
881 						Oid foreigntableid)
882 {
883 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
884 	ForeignPath *path;
885 	List	   *ppi_list;
886 	ListCell   *lc;
887 
888 	/*
889 	 * Create simplest ForeignScan path node and add it to baserel.  This path
890 	 * corresponds to SeqScan path of regular tables (though depending on what
891 	 * baserestrict conditions we were able to send to remote, there might
892 	 * actually be an indexscan happening there).  We already did all the work
893 	 * to estimate cost and size of this path.
894 	 *
895 	 * Although this path uses no join clauses, it could still have required
896 	 * parameterization due to LATERAL refs in its tlist.
897 	 */
898 	path = create_foreignscan_path(root, baserel,
899 								   NULL,	/* default pathtarget */
900 								   fpinfo->rows,
901 								   fpinfo->startup_cost,
902 								   fpinfo->total_cost,
903 								   NIL, /* no pathkeys */
904 								   baserel->lateral_relids,
905 								   NULL,	/* no extra plan */
906 								   NIL);	/* no fdw_private list */
907 	add_path(baserel, (Path *) path);
908 
909 	/* Add paths with pathkeys */
910 	add_paths_with_pathkeys_for_rel(root, baserel, NULL);
911 
912 	/*
913 	 * If we're not using remote estimates, stop here.  We have no way to
914 	 * estimate whether any join clauses would be worth sending across, so
915 	 * don't bother building parameterized paths.
916 	 */
917 	if (!fpinfo->use_remote_estimate)
918 		return;
919 
920 	/*
921 	 * Thumb through all join clauses for the rel to identify which outer
922 	 * relations could supply one or more safe-to-send-to-remote join clauses.
923 	 * We'll build a parameterized path for each such outer relation.
924 	 *
925 	 * It's convenient to manage this by representing each candidate outer
926 	 * relation by the ParamPathInfo node for it.  We can then use the
927 	 * ppi_clauses list in the ParamPathInfo node directly as a list of the
928 	 * interesting join clauses for that rel.  This takes care of the
929 	 * possibility that there are multiple safe join clauses for such a rel,
930 	 * and also ensures that we account for unsafe join clauses that we'll
931 	 * still have to enforce locally (since the parameterized-path machinery
932 	 * insists that we handle all movable clauses).
933 	 */
934 	ppi_list = NIL;
935 	foreach(lc, baserel->joininfo)
936 	{
937 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
938 		Relids		required_outer;
939 		ParamPathInfo *param_info;
940 
941 		/* Check if clause can be moved to this rel */
942 		if (!join_clause_is_movable_to(rinfo, baserel))
943 			continue;
944 
945 		/* See if it is safe to send to remote */
946 		if (!is_foreign_expr(root, baserel, rinfo->clause))
947 			continue;
948 
949 		/* Calculate required outer rels for the resulting path */
950 		required_outer = bms_union(rinfo->clause_relids,
951 								   baserel->lateral_relids);
952 		/* We do not want the foreign rel itself listed in required_outer */
953 		required_outer = bms_del_member(required_outer, baserel->relid);
954 
955 		/*
956 		 * required_outer probably can't be empty here, but if it were, we
957 		 * couldn't make a parameterized path.
958 		 */
959 		if (bms_is_empty(required_outer))
960 			continue;
961 
962 		/* Get the ParamPathInfo */
963 		param_info = get_baserel_parampathinfo(root, baserel,
964 											   required_outer);
965 		Assert(param_info != NULL);
966 
967 		/*
968 		 * Add it to list unless we already have it.  Testing pointer equality
969 		 * is OK since get_baserel_parampathinfo won't make duplicates.
970 		 */
971 		ppi_list = list_append_unique_ptr(ppi_list, param_info);
972 	}
973 
974 	/*
975 	 * The above scan examined only "generic" join clauses, not those that
976 	 * were absorbed into EquivalenceClauses.  See if we can make anything out
977 	 * of EquivalenceClauses.
978 	 */
979 	if (baserel->has_eclass_joins)
980 	{
981 		/*
982 		 * We repeatedly scan the eclass list looking for column references
983 		 * (or expressions) belonging to the foreign rel.  Each time we find
984 		 * one, we generate a list of equivalence joinclauses for it, and then
985 		 * see if any are safe to send to the remote.  Repeat till there are
986 		 * no more candidate EC members.
987 		 */
988 		ec_member_foreign_arg arg;
989 
990 		arg.already_used = NIL;
991 		for (;;)
992 		{
993 			List	   *clauses;
994 
995 			/* Make clauses, skipping any that join to lateral_referencers */
996 			arg.current = NULL;
997 			clauses = generate_implied_equalities_for_column(root,
998 															 baserel,
999 															 ec_member_matches_foreign,
1000 															 (void *) &arg,
1001 															 baserel->lateral_referencers);
1002 
1003 			/* Done if there are no more expressions in the foreign rel */
1004 			if (arg.current == NULL)
1005 			{
1006 				Assert(clauses == NIL);
1007 				break;
1008 			}
1009 
1010 			/* Scan the extracted join clauses */
1011 			foreach(lc, clauses)
1012 			{
1013 				RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1014 				Relids		required_outer;
1015 				ParamPathInfo *param_info;
1016 
1017 				/* Check if clause can be moved to this rel */
1018 				if (!join_clause_is_movable_to(rinfo, baserel))
1019 					continue;
1020 
1021 				/* See if it is safe to send to remote */
1022 				if (!is_foreign_expr(root, baserel, rinfo->clause))
1023 					continue;
1024 
1025 				/* Calculate required outer rels for the resulting path */
1026 				required_outer = bms_union(rinfo->clause_relids,
1027 										   baserel->lateral_relids);
1028 				required_outer = bms_del_member(required_outer, baserel->relid);
1029 				if (bms_is_empty(required_outer))
1030 					continue;
1031 
1032 				/* Get the ParamPathInfo */
1033 				param_info = get_baserel_parampathinfo(root, baserel,
1034 													   required_outer);
1035 				Assert(param_info != NULL);
1036 
1037 				/* Add it to list unless we already have it */
1038 				ppi_list = list_append_unique_ptr(ppi_list, param_info);
1039 			}
1040 
1041 			/* Try again, now ignoring the expression we found this time */
1042 			arg.already_used = lappend(arg.already_used, arg.current);
1043 		}
1044 	}
1045 
1046 	/*
1047 	 * Now build a path for each useful outer relation.
1048 	 */
1049 	foreach(lc, ppi_list)
1050 	{
1051 		ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1052 		double		rows;
1053 		int			width;
1054 		Cost		startup_cost;
1055 		Cost		total_cost;
1056 
1057 		/* Get a cost estimate from the remote */
1058 		estimate_path_cost_size(root, baserel,
1059 								param_info->ppi_clauses, NIL,
1060 								&rows, &width,
1061 								&startup_cost, &total_cost);
1062 
1063 		/*
1064 		 * ppi_rows currently won't get looked at by anything, but still we
1065 		 * may as well ensure that it matches our idea of the rowcount.
1066 		 */
1067 		param_info->ppi_rows = rows;
1068 
1069 		/* Make the path */
1070 		path = create_foreignscan_path(root, baserel,
1071 									   NULL,	/* default pathtarget */
1072 									   rows,
1073 									   startup_cost,
1074 									   total_cost,
1075 									   NIL, /* no pathkeys */
1076 									   param_info->ppi_req_outer,
1077 									   NULL,
1078 									   NIL);	/* no fdw_private list */
1079 		add_path(baserel, (Path *) path);
1080 	}
1081 }
1082 
1083 /*
1084  * postgresGetForeignPlan
1085  *		Create ForeignScan plan node which implements selected best path
1086  */
1087 static ForeignScan *
postgresGetForeignPlan(PlannerInfo * root,RelOptInfo * foreignrel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)1088 postgresGetForeignPlan(PlannerInfo *root,
1089 					   RelOptInfo *foreignrel,
1090 					   Oid foreigntableid,
1091 					   ForeignPath *best_path,
1092 					   List *tlist,
1093 					   List *scan_clauses,
1094 					   Plan *outer_plan)
1095 {
1096 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1097 	Index		scan_relid;
1098 	List	   *fdw_private;
1099 	List	   *remote_exprs = NIL;
1100 	List	   *local_exprs = NIL;
1101 	List	   *params_list = NIL;
1102 	List	   *fdw_scan_tlist = NIL;
1103 	List	   *fdw_recheck_quals = NIL;
1104 	List	   *retrieved_attrs;
1105 	StringInfoData sql;
1106 	ListCell   *lc;
1107 
1108 	if (IS_SIMPLE_REL(foreignrel))
1109 	{
1110 		/*
1111 		 * For base relations, set scan_relid as the relid of the relation.
1112 		 */
1113 		scan_relid = foreignrel->relid;
1114 
1115 		/*
1116 		 * In a base-relation scan, we must apply the given scan_clauses.
1117 		 *
1118 		 * Separate the scan_clauses into those that can be executed remotely
1119 		 * and those that can't.  baserestrictinfo clauses that were
1120 		 * previously determined to be safe or unsafe by classifyConditions
1121 		 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1122 		 * else in the scan_clauses list will be a join clause, which we have
1123 		 * to check for remote-safety.
1124 		 *
1125 		 * Note: the join clauses we see here should be the exact same ones
1126 		 * previously examined by postgresGetForeignPaths.  Possibly it'd be
1127 		 * worth passing forward the classification work done then, rather
1128 		 * than repeating it here.
1129 		 *
1130 		 * This code must match "extract_actual_clauses(scan_clauses, false)"
1131 		 * except for the additional decision about remote versus local
1132 		 * execution.
1133 		 */
1134 		foreach(lc, scan_clauses)
1135 		{
1136 			RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1137 
1138 			/* Ignore any pseudoconstants, they're dealt with elsewhere */
1139 			if (rinfo->pseudoconstant)
1140 				continue;
1141 
1142 			if (list_member_ptr(fpinfo->remote_conds, rinfo))
1143 				remote_exprs = lappend(remote_exprs, rinfo->clause);
1144 			else if (list_member_ptr(fpinfo->local_conds, rinfo))
1145 				local_exprs = lappend(local_exprs, rinfo->clause);
1146 			else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1147 				remote_exprs = lappend(remote_exprs, rinfo->clause);
1148 			else
1149 				local_exprs = lappend(local_exprs, rinfo->clause);
1150 		}
1151 
1152 		/*
1153 		 * For a base-relation scan, we have to support EPQ recheck, which
1154 		 * should recheck all the remote quals.
1155 		 */
1156 		fdw_recheck_quals = remote_exprs;
1157 	}
1158 	else
1159 	{
1160 		/*
1161 		 * Join relation or upper relation - set scan_relid to 0.
1162 		 */
1163 		scan_relid = 0;
1164 
1165 		/*
1166 		 * For a join rel, baserestrictinfo is NIL and we are not considering
1167 		 * parameterization right now, so there should be no scan_clauses for
1168 		 * a joinrel or an upper rel either.
1169 		 */
1170 		Assert(!scan_clauses);
1171 
1172 		/*
1173 		 * Instead we get the conditions to apply from the fdw_private
1174 		 * structure.
1175 		 */
1176 		remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1177 		local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1178 
1179 		/*
1180 		 * We leave fdw_recheck_quals empty in this case, since we never need
1181 		 * to apply EPQ recheck clauses.  In the case of a joinrel, EPQ
1182 		 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1183 		 * If we're planning an upperrel (ie, remote grouping or aggregation)
1184 		 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1185 		 * allowed, and indeed we *can't* put the remote clauses into
1186 		 * fdw_recheck_quals because the unaggregated Vars won't be available
1187 		 * locally.
1188 		 */
1189 
1190 		/* Build the list of columns to be fetched from the foreign server. */
1191 		fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1192 
1193 		/*
1194 		 * Ensure that the outer plan produces a tuple whose descriptor
1195 		 * matches our scan tuple slot.  Also, remove the local conditions
1196 		 * from outer plan's quals, lest they be evaluated twice, once by the
1197 		 * local plan and once by the scan.
1198 		 */
1199 		if (outer_plan)
1200 		{
1201 			ListCell   *lc;
1202 
1203 			/*
1204 			 * Right now, we only consider grouping and aggregation beyond
1205 			 * joins. Queries involving aggregates or grouping do not require
1206 			 * EPQ mechanism, hence should not have an outer plan here.
1207 			 */
1208 			Assert(!IS_UPPER_REL(foreignrel));
1209 
1210 			/*
1211 			 * First, update the plan's qual list if possible.  In some cases
1212 			 * the quals might be enforced below the topmost plan level, in
1213 			 * which case we'll fail to remove them; it's not worth working
1214 			 * harder than this.
1215 			 */
1216 			foreach(lc, local_exprs)
1217 			{
1218 				Node	   *qual = lfirst(lc);
1219 
1220 				outer_plan->qual = list_delete(outer_plan->qual, qual);
1221 
1222 				/*
1223 				 * For an inner join the local conditions of foreign scan plan
1224 				 * can be part of the joinquals as well.  (They might also be
1225 				 * in the mergequals or hashquals, but we can't touch those
1226 				 * without breaking the plan.)
1227 				 */
1228 				if (IsA(outer_plan, NestLoop) ||
1229 					IsA(outer_plan, MergeJoin) ||
1230 					IsA(outer_plan, HashJoin))
1231 				{
1232 					Join	   *join_plan = (Join *) outer_plan;
1233 
1234 					if (join_plan->jointype == JOIN_INNER)
1235 						join_plan->joinqual = list_delete(join_plan->joinqual,
1236 														  qual);
1237 				}
1238 			}
1239 
1240 			/*
1241 			 * Now fix the subplan's tlist --- this might result in inserting
1242 			 * a Result node atop the plan tree.
1243 			 */
1244 			outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1245 												best_path->path.parallel_safe);
1246 		}
1247 	}
1248 
1249 	/*
1250 	 * Build the query string to be sent for execution, and identify
1251 	 * expressions to be sent as parameters.
1252 	 */
1253 	initStringInfo(&sql);
1254 	deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1255 							remote_exprs, best_path->path.pathkeys,
1256 							false, &retrieved_attrs, &params_list);
1257 
1258 	/* Remember remote_exprs for possible use by postgresPlanDirectModify */
1259 	fpinfo->final_remote_exprs = remote_exprs;
1260 
1261 	/*
1262 	 * Build the fdw_private list that will be available to the executor.
1263 	 * Items in the list must match order in enum FdwScanPrivateIndex.
1264 	 */
1265 	fdw_private = list_make3(makeString(sql.data),
1266 							 retrieved_attrs,
1267 							 makeInteger(fpinfo->fetch_size));
1268 	if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1269 		fdw_private = lappend(fdw_private,
1270 							  makeString(fpinfo->relation_name->data));
1271 
1272 	/*
1273 	 * Create the ForeignScan node for the given relation.
1274 	 *
1275 	 * Note that the remote parameter expressions are stored in the fdw_exprs
1276 	 * field of the finished plan node; we can't keep them in private state
1277 	 * because then they wouldn't be subject to later planner processing.
1278 	 */
1279 	return make_foreignscan(tlist,
1280 							local_exprs,
1281 							scan_relid,
1282 							params_list,
1283 							fdw_private,
1284 							fdw_scan_tlist,
1285 							fdw_recheck_quals,
1286 							outer_plan);
1287 }
1288 
1289 /*
1290  * postgresBeginForeignScan
1291  *		Initiate an executor scan of a foreign PostgreSQL table.
1292  */
1293 static void
postgresBeginForeignScan(ForeignScanState * node,int eflags)1294 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1295 {
1296 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1297 	EState	   *estate = node->ss.ps.state;
1298 	PgFdwScanState *fsstate;
1299 	RangeTblEntry *rte;
1300 	Oid			userid;
1301 	ForeignTable *table;
1302 	UserMapping *user;
1303 	int			rtindex;
1304 	int			numParams;
1305 
1306 	/*
1307 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
1308 	 */
1309 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1310 		return;
1311 
1312 	/*
1313 	 * We'll save private state in node->fdw_state.
1314 	 */
1315 	fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1316 	node->fdw_state = (void *) fsstate;
1317 
1318 	/*
1319 	 * Identify which user to do the remote access as.  This should match what
1320 	 * ExecCheckRTEPerms() does.  In case of a join or aggregate, use the
1321 	 * lowest-numbered member RTE as a representative; we would get the same
1322 	 * result from any.
1323 	 */
1324 	if (fsplan->scan.scanrelid > 0)
1325 		rtindex = fsplan->scan.scanrelid;
1326 	else
1327 		rtindex = bms_next_member(fsplan->fs_relids, -1);
1328 	rte = rt_fetch(rtindex, estate->es_range_table);
1329 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1330 
1331 	/* Get info about foreign table. */
1332 	table = GetForeignTable(rte->relid);
1333 	user = GetUserMapping(userid, table->serverid);
1334 
1335 	/*
1336 	 * Get connection to the foreign server.  Connection manager will
1337 	 * establish new connection if necessary.
1338 	 */
1339 	fsstate->conn = GetConnection(user, false);
1340 
1341 	/* Assign a unique ID for my cursor */
1342 	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1343 	fsstate->cursor_exists = false;
1344 
1345 	/* Get private info created by planner functions. */
1346 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
1347 									 FdwScanPrivateSelectSql));
1348 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1349 												 FdwScanPrivateRetrievedAttrs);
1350 	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1351 										  FdwScanPrivateFetchSize));
1352 
1353 	/* Create contexts for batches of tuples and per-tuple temp workspace. */
1354 	fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1355 											   "postgres_fdw tuple data",
1356 											   ALLOCSET_DEFAULT_SIZES);
1357 	fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1358 											  "postgres_fdw temporary data",
1359 											  ALLOCSET_SMALL_SIZES);
1360 
1361 	/*
1362 	 * Get info we'll need for converting data fetched from the foreign server
1363 	 * into local representation and error reporting during that process.
1364 	 */
1365 	if (fsplan->scan.scanrelid > 0)
1366 	{
1367 		fsstate->rel = node->ss.ss_currentRelation;
1368 		fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1369 	}
1370 	else
1371 	{
1372 		fsstate->rel = NULL;
1373 		fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1374 	}
1375 
1376 	fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1377 
1378 	/*
1379 	 * Prepare for processing of parameters used in remote query, if any.
1380 	 */
1381 	numParams = list_length(fsplan->fdw_exprs);
1382 	fsstate->numParams = numParams;
1383 	if (numParams > 0)
1384 		prepare_query_params((PlanState *) node,
1385 							 fsplan->fdw_exprs,
1386 							 numParams,
1387 							 &fsstate->param_flinfo,
1388 							 &fsstate->param_exprs,
1389 							 &fsstate->param_values);
1390 }
1391 
1392 /*
1393  * postgresIterateForeignScan
1394  *		Retrieve next row from the result set, or clear tuple slot to indicate
1395  *		EOF.
1396  */
1397 static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState * node)1398 postgresIterateForeignScan(ForeignScanState *node)
1399 {
1400 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1401 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1402 
1403 	/*
1404 	 * If this is the first call after Begin or ReScan, we need to create the
1405 	 * cursor on the remote side.
1406 	 */
1407 	if (!fsstate->cursor_exists)
1408 		create_cursor(node);
1409 
1410 	/*
1411 	 * Get some more tuples, if we've run out.
1412 	 */
1413 	if (fsstate->next_tuple >= fsstate->num_tuples)
1414 	{
1415 		/* No point in another fetch if we already detected EOF, though. */
1416 		if (!fsstate->eof_reached)
1417 			fetch_more_data(node);
1418 		/* If we didn't get any tuples, must be end of data. */
1419 		if (fsstate->next_tuple >= fsstate->num_tuples)
1420 			return ExecClearTuple(slot);
1421 	}
1422 
1423 	/*
1424 	 * Return the next tuple.
1425 	 */
1426 	ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1427 				   slot,
1428 				   InvalidBuffer,
1429 				   false);
1430 
1431 	return slot;
1432 }
1433 
1434 /*
1435  * postgresReScanForeignScan
1436  *		Restart the scan.
1437  */
1438 static void
postgresReScanForeignScan(ForeignScanState * node)1439 postgresReScanForeignScan(ForeignScanState *node)
1440 {
1441 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1442 	char		sql[64];
1443 	PGresult   *res;
1444 
1445 	/* If we haven't created the cursor yet, nothing to do. */
1446 	if (!fsstate->cursor_exists)
1447 		return;
1448 
1449 	/*
1450 	 * If any internal parameters affecting this node have changed, we'd
1451 	 * better destroy and recreate the cursor.  Otherwise, rewinding it should
1452 	 * be good enough.  If we've only fetched zero or one batch, we needn't
1453 	 * even rewind the cursor, just rescan what we have.
1454 	 */
1455 	if (node->ss.ps.chgParam != NULL)
1456 	{
1457 		fsstate->cursor_exists = false;
1458 		snprintf(sql, sizeof(sql), "CLOSE c%u",
1459 				 fsstate->cursor_number);
1460 	}
1461 	else if (fsstate->fetch_ct_2 > 1)
1462 	{
1463 		snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1464 				 fsstate->cursor_number);
1465 	}
1466 	else
1467 	{
1468 		/* Easy: just rescan what we already have in memory, if anything */
1469 		fsstate->next_tuple = 0;
1470 		return;
1471 	}
1472 
1473 	/*
1474 	 * We don't use a PG_TRY block here, so be careful not to throw error
1475 	 * without releasing the PGresult.
1476 	 */
1477 	res = pgfdw_exec_query(fsstate->conn, sql);
1478 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1479 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1480 	PQclear(res);
1481 
1482 	/* Now force a fresh FETCH. */
1483 	fsstate->tuples = NULL;
1484 	fsstate->num_tuples = 0;
1485 	fsstate->next_tuple = 0;
1486 	fsstate->fetch_ct_2 = 0;
1487 	fsstate->eof_reached = false;
1488 }
1489 
1490 /*
1491  * postgresEndForeignScan
1492  *		Finish scanning foreign table and dispose objects used for this scan
1493  */
1494 static void
postgresEndForeignScan(ForeignScanState * node)1495 postgresEndForeignScan(ForeignScanState *node)
1496 {
1497 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1498 
1499 	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1500 	if (fsstate == NULL)
1501 		return;
1502 
1503 	/* Close the cursor if open, to prevent accumulation of cursors */
1504 	if (fsstate->cursor_exists)
1505 		close_cursor(fsstate->conn, fsstate->cursor_number);
1506 
1507 	/* Release remote connection */
1508 	ReleaseConnection(fsstate->conn);
1509 	fsstate->conn = NULL;
1510 
1511 	/* MemoryContexts will be deleted automatically. */
1512 }
1513 
1514 /*
1515  * postgresAddForeignUpdateTargets
1516  *		Add resjunk column(s) needed for update/delete on a foreign table
1517  */
1518 static void
postgresAddForeignUpdateTargets(Query * parsetree,RangeTblEntry * target_rte,Relation target_relation)1519 postgresAddForeignUpdateTargets(Query *parsetree,
1520 								RangeTblEntry *target_rte,
1521 								Relation target_relation)
1522 {
1523 	Var		   *var;
1524 	const char *attrname;
1525 	TargetEntry *tle;
1526 
1527 	/*
1528 	 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1529 	 */
1530 
1531 	/* Make a Var representing the desired value */
1532 	var = makeVar(parsetree->resultRelation,
1533 				  SelfItemPointerAttributeNumber,
1534 				  TIDOID,
1535 				  -1,
1536 				  InvalidOid,
1537 				  0);
1538 
1539 	/* Wrap it in a resjunk TLE with the right name ... */
1540 	attrname = "ctid";
1541 
1542 	tle = makeTargetEntry((Expr *) var,
1543 						  list_length(parsetree->targetList) + 1,
1544 						  pstrdup(attrname),
1545 						  true);
1546 
1547 	/* ... and add it to the query's targetlist */
1548 	parsetree->targetList = lappend(parsetree->targetList, tle);
1549 }
1550 
1551 /*
1552  * postgresPlanForeignModify
1553  *		Plan an insert/update/delete operation on a foreign table
1554  */
1555 static List *
postgresPlanForeignModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)1556 postgresPlanForeignModify(PlannerInfo *root,
1557 						  ModifyTable *plan,
1558 						  Index resultRelation,
1559 						  int subplan_index)
1560 {
1561 	CmdType		operation = plan->operation;
1562 	RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1563 	Relation	rel;
1564 	StringInfoData sql;
1565 	List	   *targetAttrs = NIL;
1566 	List	   *returningList = NIL;
1567 	List	   *retrieved_attrs = NIL;
1568 	bool		doNothing = false;
1569 
1570 	initStringInfo(&sql);
1571 
1572 	/*
1573 	 * Core code already has some lock on each rel being planned, so we can
1574 	 * use NoLock here.
1575 	 */
1576 	rel = heap_open(rte->relid, NoLock);
1577 
1578 	/*
1579 	 * In an INSERT, we transmit all columns that are defined in the foreign
1580 	 * table.  In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1581 	 * foreign table, we transmit all columns like INSERT; else we transmit
1582 	 * only columns that were explicitly targets of the UPDATE, so as to avoid
1583 	 * unnecessary data transmission.  (We can't do that for INSERT since we
1584 	 * would miss sending default values for columns not listed in the source
1585 	 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1586 	 * those triggers might change values for non-target columns, in which
1587 	 * case we would miss sending changed values for those columns.)
1588 	 */
1589 	if (operation == CMD_INSERT ||
1590 		(operation == CMD_UPDATE &&
1591 		 rel->trigdesc &&
1592 		 rel->trigdesc->trig_update_before_row))
1593 	{
1594 		TupleDesc	tupdesc = RelationGetDescr(rel);
1595 		int			attnum;
1596 
1597 		for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1598 		{
1599 			Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
1600 
1601 			if (!attr->attisdropped)
1602 				targetAttrs = lappend_int(targetAttrs, attnum);
1603 		}
1604 	}
1605 	else if (operation == CMD_UPDATE)
1606 	{
1607 		int			col;
1608 
1609 		col = -1;
1610 		while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1611 		{
1612 			/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1613 			AttrNumber	attno = col + FirstLowInvalidHeapAttributeNumber;
1614 
1615 			if (attno <= InvalidAttrNumber) /* shouldn't happen */
1616 				elog(ERROR, "system-column update is not supported");
1617 			targetAttrs = lappend_int(targetAttrs, attno);
1618 		}
1619 	}
1620 
1621 	/*
1622 	 * Extract the relevant RETURNING list if any.
1623 	 */
1624 	if (plan->returningLists)
1625 		returningList = (List *) list_nth(plan->returningLists, subplan_index);
1626 
1627 	/*
1628 	 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1629 	 * should have already been rejected in the optimizer, as presently there
1630 	 * is no way to recognize an arbiter index on a foreign table.  Only DO
1631 	 * NOTHING is supported without an inference specification.
1632 	 */
1633 	if (plan->onConflictAction == ONCONFLICT_NOTHING)
1634 		doNothing = true;
1635 	else if (plan->onConflictAction != ONCONFLICT_NONE)
1636 		elog(ERROR, "unexpected ON CONFLICT specification: %d",
1637 			 (int) plan->onConflictAction);
1638 
1639 	/*
1640 	 * Construct the SQL command string.
1641 	 */
1642 	switch (operation)
1643 	{
1644 		case CMD_INSERT:
1645 			deparseInsertSql(&sql, root, resultRelation, rel,
1646 							 targetAttrs, doNothing, returningList,
1647 							 &retrieved_attrs);
1648 			break;
1649 		case CMD_UPDATE:
1650 			deparseUpdateSql(&sql, root, resultRelation, rel,
1651 							 targetAttrs, returningList,
1652 							 &retrieved_attrs);
1653 			break;
1654 		case CMD_DELETE:
1655 			deparseDeleteSql(&sql, root, resultRelation, rel,
1656 							 returningList,
1657 							 &retrieved_attrs);
1658 			break;
1659 		default:
1660 			elog(ERROR, "unexpected operation: %d", (int) operation);
1661 			break;
1662 	}
1663 
1664 	heap_close(rel, NoLock);
1665 
1666 	/*
1667 	 * Build the fdw_private list that will be available to the executor.
1668 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
1669 	 */
1670 	return list_make4(makeString(sql.data),
1671 					  targetAttrs,
1672 					  makeInteger((retrieved_attrs != NIL)),
1673 					  retrieved_attrs);
1674 }
1675 
1676 /*
1677  * postgresBeginForeignModify
1678  *		Begin an insert/update/delete operation on a foreign table
1679  */
1680 static void
postgresBeginForeignModify(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo,List * fdw_private,int subplan_index,int eflags)1681 postgresBeginForeignModify(ModifyTableState *mtstate,
1682 						   ResultRelInfo *resultRelInfo,
1683 						   List *fdw_private,
1684 						   int subplan_index,
1685 						   int eflags)
1686 {
1687 	PgFdwModifyState *fmstate;
1688 	EState	   *estate = mtstate->ps.state;
1689 	CmdType		operation = mtstate->operation;
1690 	Relation	rel = resultRelInfo->ri_RelationDesc;
1691 	RangeTblEntry *rte;
1692 	Oid			userid;
1693 	ForeignTable *table;
1694 	UserMapping *user;
1695 	AttrNumber	n_params;
1696 	Oid			typefnoid;
1697 	bool		isvarlena;
1698 	ListCell   *lc;
1699 
1700 	/*
1701 	 * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
1702 	 * stays NULL.
1703 	 */
1704 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1705 		return;
1706 
1707 	/* Begin constructing PgFdwModifyState. */
1708 	fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1709 	fmstate->rel = rel;
1710 
1711 	/*
1712 	 * Identify which user to do the remote access as.  This should match what
1713 	 * ExecCheckRTEPerms() does.
1714 	 */
1715 	rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1716 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1717 
1718 	/* Get info about foreign table. */
1719 	table = GetForeignTable(RelationGetRelid(rel));
1720 	user = GetUserMapping(userid, table->serverid);
1721 
1722 	/* Open connection; report that we'll create a prepared statement. */
1723 	fmstate->conn = GetConnection(user, true);
1724 	fmstate->p_name = NULL;		/* prepared statement not made yet */
1725 
1726 	/* Deconstruct fdw_private data. */
1727 	fmstate->query = strVal(list_nth(fdw_private,
1728 									 FdwModifyPrivateUpdateSql));
1729 	fmstate->target_attrs = (List *) list_nth(fdw_private,
1730 											  FdwModifyPrivateTargetAttnums);
1731 	fmstate->has_returning = intVal(list_nth(fdw_private,
1732 											 FdwModifyPrivateHasReturning));
1733 	fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1734 												 FdwModifyPrivateRetrievedAttrs);
1735 
1736 	/* Create context for per-tuple temp workspace. */
1737 	fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1738 											  "postgres_fdw temporary data",
1739 											  ALLOCSET_SMALL_SIZES);
1740 
1741 	/* Prepare for input conversion of RETURNING results. */
1742 	if (fmstate->has_returning)
1743 		fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
1744 
1745 	/* Prepare for output conversion of parameters used in prepared stmt. */
1746 	n_params = list_length(fmstate->target_attrs) + 1;
1747 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1748 	fmstate->p_nums = 0;
1749 
1750 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
1751 	{
1752 		/* Find the ctid resjunk column in the subplan's result */
1753 		Plan	   *subplan = mtstate->mt_plans[subplan_index]->plan;
1754 
1755 		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
1756 														  "ctid");
1757 		if (!AttributeNumberIsValid(fmstate->ctidAttno))
1758 			elog(ERROR, "could not find junk ctid column");
1759 
1760 		/* First transmittable parameter will be ctid */
1761 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1762 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1763 		fmstate->p_nums++;
1764 	}
1765 
1766 	if (operation == CMD_INSERT || operation == CMD_UPDATE)
1767 	{
1768 		/* Set up for remaining transmittable parameters */
1769 		foreach(lc, fmstate->target_attrs)
1770 		{
1771 			int			attnum = lfirst_int(lc);
1772 			Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
1773 
1774 			Assert(!attr->attisdropped);
1775 
1776 			getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1777 			fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1778 			fmstate->p_nums++;
1779 		}
1780 	}
1781 
1782 	Assert(fmstate->p_nums <= n_params);
1783 
1784 	resultRelInfo->ri_FdwState = fmstate;
1785 }
1786 
1787 /*
1788  * postgresExecForeignInsert
1789  *		Insert one row into a foreign table
1790  */
1791 static TupleTableSlot *
postgresExecForeignInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1792 postgresExecForeignInsert(EState *estate,
1793 						  ResultRelInfo *resultRelInfo,
1794 						  TupleTableSlot *slot,
1795 						  TupleTableSlot *planSlot)
1796 {
1797 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1798 	const char **p_values;
1799 	PGresult   *res;
1800 	int			n_rows;
1801 
1802 	/* Set up the prepared statement on the remote server, if we didn't yet */
1803 	if (!fmstate->p_name)
1804 		prepare_foreign_modify(fmstate);
1805 
1806 	/* Convert parameters needed by prepared statement to text form */
1807 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1808 
1809 	/*
1810 	 * Execute the prepared statement.
1811 	 */
1812 	if (!PQsendQueryPrepared(fmstate->conn,
1813 							 fmstate->p_name,
1814 							 fmstate->p_nums,
1815 							 p_values,
1816 							 NULL,
1817 							 NULL,
1818 							 0))
1819 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1820 
1821 	/*
1822 	 * Get the result, and check for success.
1823 	 *
1824 	 * We don't use a PG_TRY block here, so be careful not to throw error
1825 	 * without releasing the PGresult.
1826 	 */
1827 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
1828 	if (PQresultStatus(res) !=
1829 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1830 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1831 
1832 	/* Check number of rows affected, and fetch RETURNING tuple if any */
1833 	if (fmstate->has_returning)
1834 	{
1835 		n_rows = PQntuples(res);
1836 		if (n_rows > 0)
1837 			store_returning_result(fmstate, slot, res);
1838 	}
1839 	else
1840 		n_rows = atoi(PQcmdTuples(res));
1841 
1842 	/* And clean up */
1843 	PQclear(res);
1844 
1845 	MemoryContextReset(fmstate->temp_cxt);
1846 
1847 	/* Return NULL if nothing was inserted on the remote end */
1848 	return (n_rows > 0) ? slot : NULL;
1849 }
1850 
1851 /*
1852  * postgresExecForeignUpdate
1853  *		Update one row in a foreign table
1854  */
1855 static TupleTableSlot *
postgresExecForeignUpdate(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1856 postgresExecForeignUpdate(EState *estate,
1857 						  ResultRelInfo *resultRelInfo,
1858 						  TupleTableSlot *slot,
1859 						  TupleTableSlot *planSlot)
1860 {
1861 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1862 	Datum		datum;
1863 	bool		isNull;
1864 	const char **p_values;
1865 	PGresult   *res;
1866 	int			n_rows;
1867 
1868 	/* Set up the prepared statement on the remote server, if we didn't yet */
1869 	if (!fmstate->p_name)
1870 		prepare_foreign_modify(fmstate);
1871 
1872 	/* Get the ctid that was passed up as a resjunk column */
1873 	datum = ExecGetJunkAttribute(planSlot,
1874 								 fmstate->ctidAttno,
1875 								 &isNull);
1876 	/* shouldn't ever get a null result... */
1877 	if (isNull)
1878 		elog(ERROR, "ctid is NULL");
1879 
1880 	/* Convert parameters needed by prepared statement to text form */
1881 	p_values = convert_prep_stmt_params(fmstate,
1882 										(ItemPointer) DatumGetPointer(datum),
1883 										slot);
1884 
1885 	/*
1886 	 * Execute the prepared statement.
1887 	 */
1888 	if (!PQsendQueryPrepared(fmstate->conn,
1889 							 fmstate->p_name,
1890 							 fmstate->p_nums,
1891 							 p_values,
1892 							 NULL,
1893 							 NULL,
1894 							 0))
1895 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1896 
1897 	/*
1898 	 * Get the result, and check for success.
1899 	 *
1900 	 * We don't use a PG_TRY block here, so be careful not to throw error
1901 	 * without releasing the PGresult.
1902 	 */
1903 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
1904 	if (PQresultStatus(res) !=
1905 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1906 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1907 
1908 	/* Check number of rows affected, and fetch RETURNING tuple if any */
1909 	if (fmstate->has_returning)
1910 	{
1911 		n_rows = PQntuples(res);
1912 		if (n_rows > 0)
1913 			store_returning_result(fmstate, slot, res);
1914 	}
1915 	else
1916 		n_rows = atoi(PQcmdTuples(res));
1917 
1918 	/* And clean up */
1919 	PQclear(res);
1920 
1921 	MemoryContextReset(fmstate->temp_cxt);
1922 
1923 	/* Return NULL if nothing was updated on the remote end */
1924 	return (n_rows > 0) ? slot : NULL;
1925 }
1926 
1927 /*
1928  * postgresExecForeignDelete
1929  *		Delete one row from a foreign table
1930  */
1931 static TupleTableSlot *
postgresExecForeignDelete(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1932 postgresExecForeignDelete(EState *estate,
1933 						  ResultRelInfo *resultRelInfo,
1934 						  TupleTableSlot *slot,
1935 						  TupleTableSlot *planSlot)
1936 {
1937 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1938 	Datum		datum;
1939 	bool		isNull;
1940 	const char **p_values;
1941 	PGresult   *res;
1942 	int			n_rows;
1943 
1944 	/* Set up the prepared statement on the remote server, if we didn't yet */
1945 	if (!fmstate->p_name)
1946 		prepare_foreign_modify(fmstate);
1947 
1948 	/* Get the ctid that was passed up as a resjunk column */
1949 	datum = ExecGetJunkAttribute(planSlot,
1950 								 fmstate->ctidAttno,
1951 								 &isNull);
1952 	/* shouldn't ever get a null result... */
1953 	if (isNull)
1954 		elog(ERROR, "ctid is NULL");
1955 
1956 	/* Convert parameters needed by prepared statement to text form */
1957 	p_values = convert_prep_stmt_params(fmstate,
1958 										(ItemPointer) DatumGetPointer(datum),
1959 										NULL);
1960 
1961 	/*
1962 	 * Execute the prepared statement.
1963 	 */
1964 	if (!PQsendQueryPrepared(fmstate->conn,
1965 							 fmstate->p_name,
1966 							 fmstate->p_nums,
1967 							 p_values,
1968 							 NULL,
1969 							 NULL,
1970 							 0))
1971 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1972 
1973 	/*
1974 	 * Get the result, and check for success.
1975 	 *
1976 	 * We don't use a PG_TRY block here, so be careful not to throw error
1977 	 * without releasing the PGresult.
1978 	 */
1979 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
1980 	if (PQresultStatus(res) !=
1981 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1982 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1983 
1984 	/* Check number of rows affected, and fetch RETURNING tuple if any */
1985 	if (fmstate->has_returning)
1986 	{
1987 		n_rows = PQntuples(res);
1988 		if (n_rows > 0)
1989 			store_returning_result(fmstate, slot, res);
1990 	}
1991 	else
1992 		n_rows = atoi(PQcmdTuples(res));
1993 
1994 	/* And clean up */
1995 	PQclear(res);
1996 
1997 	MemoryContextReset(fmstate->temp_cxt);
1998 
1999 	/* Return NULL if nothing was deleted on the remote end */
2000 	return (n_rows > 0) ? slot : NULL;
2001 }
2002 
2003 /*
2004  * postgresEndForeignModify
2005  *		Finish an insert/update/delete operation on a foreign table
2006  */
2007 static void
postgresEndForeignModify(EState * estate,ResultRelInfo * resultRelInfo)2008 postgresEndForeignModify(EState *estate,
2009 						 ResultRelInfo *resultRelInfo)
2010 {
2011 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2012 
2013 	/* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2014 	if (fmstate == NULL)
2015 		return;
2016 
2017 	/* If we created a prepared statement, destroy it */
2018 	if (fmstate->p_name)
2019 	{
2020 		char		sql[64];
2021 		PGresult   *res;
2022 
2023 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
2024 
2025 		/*
2026 		 * We don't use a PG_TRY block here, so be careful not to throw error
2027 		 * without releasing the PGresult.
2028 		 */
2029 		res = pgfdw_exec_query(fmstate->conn, sql);
2030 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
2031 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
2032 		PQclear(res);
2033 		fmstate->p_name = NULL;
2034 	}
2035 
2036 	/* Release remote connection */
2037 	ReleaseConnection(fmstate->conn);
2038 	fmstate->conn = NULL;
2039 }
2040 
2041 /*
2042  * postgresIsForeignRelUpdatable
2043  *		Determine whether a foreign table supports INSERT, UPDATE and/or
2044  *		DELETE.
2045  */
2046 static int
postgresIsForeignRelUpdatable(Relation rel)2047 postgresIsForeignRelUpdatable(Relation rel)
2048 {
2049 	bool		updatable;
2050 	ForeignTable *table;
2051 	ForeignServer *server;
2052 	ListCell   *lc;
2053 
2054 	/*
2055 	 * By default, all postgres_fdw foreign tables are assumed updatable. This
2056 	 * can be overridden by a per-server setting, which in turn can be
2057 	 * overridden by a per-table setting.
2058 	 */
2059 	updatable = true;
2060 
2061 	table = GetForeignTable(RelationGetRelid(rel));
2062 	server = GetForeignServer(table->serverid);
2063 
2064 	foreach(lc, server->options)
2065 	{
2066 		DefElem    *def = (DefElem *) lfirst(lc);
2067 
2068 		if (strcmp(def->defname, "updatable") == 0)
2069 			updatable = defGetBoolean(def);
2070 	}
2071 	foreach(lc, table->options)
2072 	{
2073 		DefElem    *def = (DefElem *) lfirst(lc);
2074 
2075 		if (strcmp(def->defname, "updatable") == 0)
2076 			updatable = defGetBoolean(def);
2077 	}
2078 
2079 	/*
2080 	 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2081 	 */
2082 	return updatable ?
2083 		(1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2084 }
2085 
2086 /*
2087  * postgresRecheckForeignScan
2088  *		Execute a local join execution plan for a foreign join
2089  */
2090 static bool
postgresRecheckForeignScan(ForeignScanState * node,TupleTableSlot * slot)2091 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2092 {
2093 	Index		scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2094 	PlanState  *outerPlan = outerPlanState(node);
2095 	TupleTableSlot *result;
2096 
2097 	/* For base foreign relations, it suffices to set fdw_recheck_quals */
2098 	if (scanrelid > 0)
2099 		return true;
2100 
2101 	Assert(outerPlan != NULL);
2102 
2103 	/* Execute a local join execution plan */
2104 	result = ExecProcNode(outerPlan);
2105 	if (TupIsNull(result))
2106 		return false;
2107 
2108 	/* Store result in the given slot */
2109 	ExecCopySlot(slot, result);
2110 
2111 	return true;
2112 }
2113 
2114 /*
2115  * postgresPlanDirectModify
2116  *		Consider a direct foreign table modification
2117  *
2118  * Decide whether it is safe to modify a foreign table directly, and if so,
2119  * rewrite subplan accordingly.
2120  */
2121 static bool
postgresPlanDirectModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)2122 postgresPlanDirectModify(PlannerInfo *root,
2123 						 ModifyTable *plan,
2124 						 Index resultRelation,
2125 						 int subplan_index)
2126 {
2127 	CmdType		operation = plan->operation;
2128 	Plan	   *subplan;
2129 	RelOptInfo *foreignrel;
2130 	RangeTblEntry *rte;
2131 	PgFdwRelationInfo *fpinfo;
2132 	Relation	rel;
2133 	StringInfoData sql;
2134 	ForeignScan *fscan;
2135 	List	   *targetAttrs = NIL;
2136 	List	   *remote_exprs;
2137 	List	   *params_list = NIL;
2138 	List	   *returningList = NIL;
2139 	List	   *retrieved_attrs = NIL;
2140 
2141 	/*
2142 	 * Decide whether it is safe to modify a foreign table directly.
2143 	 */
2144 
2145 	/*
2146 	 * The table modification must be an UPDATE or DELETE.
2147 	 */
2148 	if (operation != CMD_UPDATE && operation != CMD_DELETE)
2149 		return false;
2150 
2151 	/*
2152 	 * It's unsafe to modify a foreign table directly if there are any local
2153 	 * joins needed.
2154 	 */
2155 	subplan = (Plan *) list_nth(plan->plans, subplan_index);
2156 	if (!IsA(subplan, ForeignScan))
2157 		return false;
2158 	fscan = (ForeignScan *) subplan;
2159 
2160 	/*
2161 	 * It's unsafe to modify a foreign table directly if there are any quals
2162 	 * that should be evaluated locally.
2163 	 */
2164 	if (subplan->qual != NIL)
2165 		return false;
2166 
2167 	/*
2168 	 * We can't handle an UPDATE or DELETE on a foreign join for now.
2169 	 */
2170 	if (fscan->scan.scanrelid == 0)
2171 		return false;
2172 
2173 	/* Safe to fetch data about the target foreign rel */
2174 	foreignrel = root->simple_rel_array[resultRelation];
2175 	rte = root->simple_rte_array[resultRelation];
2176 	fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2177 
2178 	/*
2179 	 * It's unsafe to update a foreign table directly, if any expressions to
2180 	 * assign to the target columns are unsafe to evaluate remotely.
2181 	 */
2182 	if (operation == CMD_UPDATE)
2183 	{
2184 		int			col;
2185 
2186 		/*
2187 		 * We transmit only columns that were explicitly targets of the
2188 		 * UPDATE, so as to avoid unnecessary data transmission.
2189 		 */
2190 		col = -1;
2191 		while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2192 		{
2193 			/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2194 			AttrNumber	attno = col + FirstLowInvalidHeapAttributeNumber;
2195 			TargetEntry *tle;
2196 
2197 			if (attno <= InvalidAttrNumber) /* shouldn't happen */
2198 				elog(ERROR, "system-column update is not supported");
2199 
2200 			tle = get_tle_by_resno(subplan->targetlist, attno);
2201 
2202 			if (!tle)
2203 				elog(ERROR, "attribute number %d not found in subplan targetlist",
2204 					 attno);
2205 
2206 			if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2207 				return false;
2208 
2209 			targetAttrs = lappend_int(targetAttrs, attno);
2210 		}
2211 	}
2212 
2213 	/*
2214 	 * Ok, rewrite subplan so as to modify the foreign table directly.
2215 	 */
2216 	initStringInfo(&sql);
2217 
2218 	/*
2219 	 * Core code already has some lock on each rel being planned, so we can
2220 	 * use NoLock here.
2221 	 */
2222 	rel = heap_open(rte->relid, NoLock);
2223 
2224 	/*
2225 	 * Recall the qual clauses that must be evaluated remotely.  (These are
2226 	 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2227 	 * doesn't care.)
2228 	 */
2229 	remote_exprs = fpinfo->final_remote_exprs;
2230 
2231 	/*
2232 	 * Extract the relevant RETURNING list if any.
2233 	 */
2234 	if (plan->returningLists)
2235 		returningList = (List *) list_nth(plan->returningLists, subplan_index);
2236 
2237 	/*
2238 	 * Construct the SQL command string.
2239 	 */
2240 	switch (operation)
2241 	{
2242 		case CMD_UPDATE:
2243 			deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2244 								   ((Plan *) fscan)->targetlist,
2245 								   targetAttrs,
2246 								   remote_exprs, &params_list,
2247 								   returningList, &retrieved_attrs);
2248 			break;
2249 		case CMD_DELETE:
2250 			deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2251 								   remote_exprs, &params_list,
2252 								   returningList, &retrieved_attrs);
2253 			break;
2254 		default:
2255 			elog(ERROR, "unexpected operation: %d", (int) operation);
2256 			break;
2257 	}
2258 
2259 	/*
2260 	 * Update the operation info.
2261 	 */
2262 	fscan->operation = operation;
2263 
2264 	/*
2265 	 * Update the fdw_exprs list that will be available to the executor.
2266 	 */
2267 	fscan->fdw_exprs = params_list;
2268 
2269 	/*
2270 	 * Update the fdw_private list that will be available to the executor.
2271 	 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2272 	 */
2273 	fscan->fdw_private = list_make4(makeString(sql.data),
2274 									makeInteger((retrieved_attrs != NIL)),
2275 									retrieved_attrs,
2276 									makeInteger(plan->canSetTag));
2277 
2278 	heap_close(rel, NoLock);
2279 	return true;
2280 }
2281 
2282 /*
2283  * postgresBeginDirectModify
2284  *		Prepare a direct foreign table modification
2285  */
2286 static void
postgresBeginDirectModify(ForeignScanState * node,int eflags)2287 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2288 {
2289 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2290 	EState	   *estate = node->ss.ps.state;
2291 	PgFdwDirectModifyState *dmstate;
2292 	RangeTblEntry *rte;
2293 	Oid			userid;
2294 	ForeignTable *table;
2295 	UserMapping *user;
2296 	int			numParams;
2297 
2298 	/*
2299 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
2300 	 */
2301 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2302 		return;
2303 
2304 	/*
2305 	 * We'll save private state in node->fdw_state.
2306 	 */
2307 	dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2308 	node->fdw_state = (void *) dmstate;
2309 
2310 	/*
2311 	 * Identify which user to do the remote access as.  This should match what
2312 	 * ExecCheckRTEPerms() does.
2313 	 */
2314 	rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
2315 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2316 
2317 	/* Get info about foreign table. */
2318 	dmstate->rel = node->ss.ss_currentRelation;
2319 	table = GetForeignTable(RelationGetRelid(dmstate->rel));
2320 	user = GetUserMapping(userid, table->serverid);
2321 
2322 	/*
2323 	 * Get connection to the foreign server.  Connection manager will
2324 	 * establish new connection if necessary.
2325 	 */
2326 	dmstate->conn = GetConnection(user, false);
2327 
2328 	/* Initialize state variable */
2329 	dmstate->num_tuples = -1;	/* -1 means not set yet */
2330 
2331 	/* Get private info created by planner functions. */
2332 	dmstate->query = strVal(list_nth(fsplan->fdw_private,
2333 									 FdwDirectModifyPrivateUpdateSql));
2334 	dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2335 											 FdwDirectModifyPrivateHasReturning));
2336 	dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2337 												 FdwDirectModifyPrivateRetrievedAttrs);
2338 	dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2339 											 FdwDirectModifyPrivateSetProcessed));
2340 
2341 	/* Create context for per-tuple temp workspace. */
2342 	dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2343 											  "postgres_fdw temporary data",
2344 											  ALLOCSET_SMALL_SIZES);
2345 
2346 	/* Prepare for input conversion of RETURNING results. */
2347 	if (dmstate->has_returning)
2348 		dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
2349 
2350 	/*
2351 	 * Prepare for processing of parameters used in remote query, if any.
2352 	 */
2353 	numParams = list_length(fsplan->fdw_exprs);
2354 	dmstate->numParams = numParams;
2355 	if (numParams > 0)
2356 		prepare_query_params((PlanState *) node,
2357 							 fsplan->fdw_exprs,
2358 							 numParams,
2359 							 &dmstate->param_flinfo,
2360 							 &dmstate->param_exprs,
2361 							 &dmstate->param_values);
2362 }
2363 
2364 /*
2365  * postgresIterateDirectModify
2366  *		Execute a direct foreign table modification
2367  */
2368 static TupleTableSlot *
postgresIterateDirectModify(ForeignScanState * node)2369 postgresIterateDirectModify(ForeignScanState *node)
2370 {
2371 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2372 	EState	   *estate = node->ss.ps.state;
2373 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2374 
2375 	/*
2376 	 * If this is the first call after Begin, execute the statement.
2377 	 */
2378 	if (dmstate->num_tuples == -1)
2379 		execute_dml_stmt(node);
2380 
2381 	/*
2382 	 * If the local query doesn't specify RETURNING, just clear tuple slot.
2383 	 */
2384 	if (!resultRelInfo->ri_projectReturning)
2385 	{
2386 		TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2387 		Instrumentation *instr = node->ss.ps.instrument;
2388 
2389 		Assert(!dmstate->has_returning);
2390 
2391 		/* Increment the command es_processed count if necessary. */
2392 		if (dmstate->set_processed)
2393 			estate->es_processed += dmstate->num_tuples;
2394 
2395 		/* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2396 		if (instr)
2397 			instr->tuplecount += dmstate->num_tuples;
2398 
2399 		return ExecClearTuple(slot);
2400 	}
2401 
2402 	/*
2403 	 * Get the next RETURNING tuple.
2404 	 */
2405 	return get_returning_data(node);
2406 }
2407 
2408 /*
2409  * postgresEndDirectModify
2410  *		Finish a direct foreign table modification
2411  */
2412 static void
postgresEndDirectModify(ForeignScanState * node)2413 postgresEndDirectModify(ForeignScanState *node)
2414 {
2415 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2416 
2417 	/* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2418 	if (dmstate == NULL)
2419 		return;
2420 
2421 	/* Release PGresult */
2422 	if (dmstate->result)
2423 		PQclear(dmstate->result);
2424 
2425 	/* Release remote connection */
2426 	ReleaseConnection(dmstate->conn);
2427 	dmstate->conn = NULL;
2428 
2429 	/* MemoryContext will be deleted automatically. */
2430 }
2431 
2432 /*
2433  * postgresExplainForeignScan
2434  *		Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2435  */
2436 static void
postgresExplainForeignScan(ForeignScanState * node,ExplainState * es)2437 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2438 {
2439 	List	   *fdw_private;
2440 	char	   *sql;
2441 	char	   *relations;
2442 
2443 	fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2444 
2445 	/*
2446 	 * Add names of relation handled by the foreign scan when the scan is a
2447 	 * join
2448 	 */
2449 	if (list_length(fdw_private) > FdwScanPrivateRelations)
2450 	{
2451 		relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2452 		ExplainPropertyText("Relations", relations, es);
2453 	}
2454 
2455 	/*
2456 	 * Add remote query, when VERBOSE option is specified.
2457 	 */
2458 	if (es->verbose)
2459 	{
2460 		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2461 		ExplainPropertyText("Remote SQL", sql, es);
2462 	}
2463 }
2464 
2465 /*
2466  * postgresExplainForeignModify
2467  *		Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2468  */
2469 static void
postgresExplainForeignModify(ModifyTableState * mtstate,ResultRelInfo * rinfo,List * fdw_private,int subplan_index,ExplainState * es)2470 postgresExplainForeignModify(ModifyTableState *mtstate,
2471 							 ResultRelInfo *rinfo,
2472 							 List *fdw_private,
2473 							 int subplan_index,
2474 							 ExplainState *es)
2475 {
2476 	if (es->verbose)
2477 	{
2478 		char	   *sql = strVal(list_nth(fdw_private,
2479 										  FdwModifyPrivateUpdateSql));
2480 
2481 		ExplainPropertyText("Remote SQL", sql, es);
2482 	}
2483 }
2484 
2485 /*
2486  * postgresExplainDirectModify
2487  *		Produce extra output for EXPLAIN of a ForeignScan that modifies a
2488  *		foreign table directly
2489  */
2490 static void
postgresExplainDirectModify(ForeignScanState * node,ExplainState * es)2491 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2492 {
2493 	List	   *fdw_private;
2494 	char	   *sql;
2495 
2496 	if (es->verbose)
2497 	{
2498 		fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2499 		sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2500 		ExplainPropertyText("Remote SQL", sql, es);
2501 	}
2502 }
2503 
2504 
2505 /*
2506  * estimate_path_cost_size
2507  *		Get cost and size estimates for a foreign scan on given foreign relation
2508  *		either a base relation or a join between foreign relations or an upper
2509  *		relation containing foreign relations.
2510  *
2511  * param_join_conds are the parameterization clauses with outer relations.
2512  * pathkeys specify the expected sort order if any for given path being costed.
2513  *
2514  * The function returns the cost and size estimates in p_rows, p_width,
2515  * p_startup_cost and p_total_cost variables.
2516  */
2517 static void
estimate_path_cost_size(PlannerInfo * root,RelOptInfo * foreignrel,List * param_join_conds,List * pathkeys,double * p_rows,int * p_width,Cost * p_startup_cost,Cost * p_total_cost)2518 estimate_path_cost_size(PlannerInfo *root,
2519 						RelOptInfo *foreignrel,
2520 						List *param_join_conds,
2521 						List *pathkeys,
2522 						double *p_rows, int *p_width,
2523 						Cost *p_startup_cost, Cost *p_total_cost)
2524 {
2525 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2526 	double		rows;
2527 	double		retrieved_rows;
2528 	int			width;
2529 	Cost		startup_cost;
2530 	Cost		total_cost;
2531 	Cost		cpu_per_tuple;
2532 
2533 	/*
2534 	 * If the table or the server is configured to use remote estimates,
2535 	 * connect to the foreign server and execute EXPLAIN to estimate the
2536 	 * number of rows selected by the restriction+join clauses.  Otherwise,
2537 	 * estimate rows using whatever statistics we have locally, in a way
2538 	 * similar to ordinary tables.
2539 	 */
2540 	if (fpinfo->use_remote_estimate)
2541 	{
2542 		List	   *remote_param_join_conds;
2543 		List	   *local_param_join_conds;
2544 		StringInfoData sql;
2545 		PGconn	   *conn;
2546 		Selectivity local_sel;
2547 		QualCost	local_cost;
2548 		List	   *fdw_scan_tlist = NIL;
2549 		List	   *remote_conds;
2550 
2551 		/* Required only to be passed to deparseSelectStmtForRel */
2552 		List	   *retrieved_attrs;
2553 
2554 		/*
2555 		 * param_join_conds might contain both clauses that are safe to send
2556 		 * across, and clauses that aren't.
2557 		 */
2558 		classifyConditions(root, foreignrel, param_join_conds,
2559 						   &remote_param_join_conds, &local_param_join_conds);
2560 
2561 		/* Build the list of columns to be fetched from the foreign server. */
2562 		if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2563 			fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2564 		else
2565 			fdw_scan_tlist = NIL;
2566 
2567 		/*
2568 		 * The complete list of remote conditions includes everything from
2569 		 * baserestrictinfo plus any extra join_conds relevant to this
2570 		 * particular path.
2571 		 */
2572 		remote_conds = list_concat(list_copy(remote_param_join_conds),
2573 								   fpinfo->remote_conds);
2574 
2575 		/*
2576 		 * Construct EXPLAIN query including the desired SELECT, FROM, and
2577 		 * WHERE clauses. Params and other-relation Vars are replaced by dummy
2578 		 * values, so don't request params_list.
2579 		 */
2580 		initStringInfo(&sql);
2581 		appendStringInfoString(&sql, "EXPLAIN ");
2582 		deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2583 								remote_conds, pathkeys, false,
2584 								&retrieved_attrs, NULL);
2585 
2586 		/* Get the remote estimate */
2587 		conn = GetConnection(fpinfo->user, false);
2588 		get_remote_estimate(sql.data, conn, &rows, &width,
2589 							&startup_cost, &total_cost);
2590 		ReleaseConnection(conn);
2591 
2592 		retrieved_rows = rows;
2593 
2594 		/* Factor in the selectivity of the locally-checked quals */
2595 		local_sel = clauselist_selectivity(root,
2596 										   local_param_join_conds,
2597 										   foreignrel->relid,
2598 										   JOIN_INNER,
2599 										   NULL);
2600 		local_sel *= fpinfo->local_conds_sel;
2601 
2602 		rows = clamp_row_est(rows * local_sel);
2603 
2604 		/* Add in the eval cost of the locally-checked quals */
2605 		startup_cost += fpinfo->local_conds_cost.startup;
2606 		total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2607 		cost_qual_eval(&local_cost, local_param_join_conds, root);
2608 		startup_cost += local_cost.startup;
2609 		total_cost += local_cost.per_tuple * retrieved_rows;
2610 	}
2611 	else
2612 	{
2613 		Cost		run_cost = 0;
2614 
2615 		/*
2616 		 * We don't support join conditions in this mode (hence, no
2617 		 * parameterized paths can be made).
2618 		 */
2619 		Assert(param_join_conds == NIL);
2620 
2621 		/*
2622 		 * Use rows/width estimates made by set_baserel_size_estimates() for
2623 		 * base foreign relations and set_joinrel_size_estimates() for join
2624 		 * between foreign relations.
2625 		 */
2626 		rows = foreignrel->rows;
2627 		width = foreignrel->reltarget->width;
2628 
2629 		/* Back into an estimate of the number of retrieved rows. */
2630 		retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2631 
2632 		/*
2633 		 * We will come here again and again with different set of pathkeys
2634 		 * that caller wants to cost. We don't need to calculate the cost of
2635 		 * bare scan each time. Instead, use the costs if we have cached them
2636 		 * already.
2637 		 */
2638 		if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2639 		{
2640 			startup_cost = fpinfo->rel_startup_cost;
2641 			run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2642 		}
2643 		else if (IS_JOIN_REL(foreignrel))
2644 		{
2645 			PgFdwRelationInfo *fpinfo_i;
2646 			PgFdwRelationInfo *fpinfo_o;
2647 			QualCost	join_cost;
2648 			QualCost	remote_conds_cost;
2649 			double		nrows;
2650 
2651 			/* For join we expect inner and outer relations set */
2652 			Assert(fpinfo->innerrel && fpinfo->outerrel);
2653 
2654 			fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2655 			fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2656 
2657 			/* Estimate of number of rows in cross product */
2658 			nrows = fpinfo_i->rows * fpinfo_o->rows;
2659 			/* Clamp retrieved rows estimate to at most size of cross product */
2660 			retrieved_rows = Min(retrieved_rows, nrows);
2661 
2662 			/*
2663 			 * The cost of foreign join is estimated as cost of generating
2664 			 * rows for the joining relations + cost for applying quals on the
2665 			 * rows.
2666 			 */
2667 
2668 			/*
2669 			 * Calculate the cost of clauses pushed down to the foreign server
2670 			 */
2671 			cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2672 			/* Calculate the cost of applying join clauses */
2673 			cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2674 
2675 			/*
2676 			 * Startup cost includes startup cost of joining relations and the
2677 			 * startup cost for join and other clauses. We do not include the
2678 			 * startup cost specific to join strategy (e.g. setting up hash
2679 			 * tables) since we do not know what strategy the foreign server
2680 			 * is going to use.
2681 			 */
2682 			startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2683 			startup_cost += join_cost.startup;
2684 			startup_cost += remote_conds_cost.startup;
2685 			startup_cost += fpinfo->local_conds_cost.startup;
2686 
2687 			/*
2688 			 * Run time cost includes:
2689 			 *
2690 			 * 1. Run time cost (total_cost - startup_cost) of relations being
2691 			 * joined
2692 			 *
2693 			 * 2. Run time cost of applying join clauses on the cross product
2694 			 * of the joining relations.
2695 			 *
2696 			 * 3. Run time cost of applying pushed down other clauses on the
2697 			 * result of join
2698 			 *
2699 			 * 4. Run time cost of applying nonpushable other clauses locally
2700 			 * on the result fetched from the foreign server.
2701 			 */
2702 			run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2703 			run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2704 			run_cost += nrows * join_cost.per_tuple;
2705 			nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2706 			run_cost += nrows * remote_conds_cost.per_tuple;
2707 			run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2708 		}
2709 		else if (IS_UPPER_REL(foreignrel))
2710 		{
2711 			PgFdwRelationInfo *ofpinfo;
2712 			PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG];
2713 			AggClauseCosts aggcosts;
2714 			double		input_rows;
2715 			int			numGroupCols;
2716 			double		numGroups = 1;
2717 
2718 			/*
2719 			 * This cost model is mixture of costing done for sorted and
2720 			 * hashed aggregates in cost_agg().  We are not sure which
2721 			 * strategy will be considered at remote side, thus for
2722 			 * simplicity, we put all startup related costs in startup_cost
2723 			 * and all finalization and run cost are added in total_cost.
2724 			 *
2725 			 * Also, core does not care about costing HAVING expressions and
2726 			 * adding that to the costs.  So similarly, here too we are not
2727 			 * considering remote and local conditions for costing.
2728 			 */
2729 
2730 			ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2731 
2732 			/* Get rows and width from input rel */
2733 			input_rows = ofpinfo->rows;
2734 			width = ofpinfo->width;
2735 
2736 			/* Collect statistics about aggregates for estimating costs. */
2737 			MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2738 			if (root->parse->hasAggs)
2739 			{
2740 				get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2741 									 AGGSPLIT_SIMPLE, &aggcosts);
2742 				get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2743 									 AGGSPLIT_SIMPLE, &aggcosts);
2744 			}
2745 
2746 			/* Get number of grouping columns and possible number of groups */
2747 			numGroupCols = list_length(root->parse->groupClause);
2748 			numGroups = estimate_num_groups(root,
2749 											get_sortgrouplist_exprs(root->parse->groupClause,
2750 																	fpinfo->grouped_tlist),
2751 											input_rows, NULL);
2752 
2753 			/*
2754 			 * Number of rows expected from foreign server will be same as
2755 			 * that of number of groups.
2756 			 */
2757 			rows = retrieved_rows = numGroups;
2758 
2759 			/*-----
2760 			 * Startup cost includes:
2761 			 *	  1. Startup cost for underneath input relation
2762 			 *	  2. Cost of performing aggregation, per cost_agg()
2763 			 *	  3. Startup cost for PathTarget eval
2764 			 *-----
2765 			 */
2766 			startup_cost = ofpinfo->rel_startup_cost;
2767 			startup_cost += aggcosts.transCost.startup;
2768 			startup_cost += aggcosts.transCost.per_tuple * input_rows;
2769 			startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2770 			startup_cost += ptarget->cost.startup;
2771 
2772 			/*-----
2773 			 * Run time cost includes:
2774 			 *	  1. Run time cost of underneath input relation
2775 			 *	  2. Run time cost of performing aggregation, per cost_agg()
2776 			 *	  3. PathTarget eval cost for each output row
2777 			 *-----
2778 			 */
2779 			run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2780 			run_cost += aggcosts.finalCost * numGroups;
2781 			run_cost += cpu_tuple_cost * numGroups;
2782 			run_cost += ptarget->cost.per_tuple * numGroups;
2783 		}
2784 		else
2785 		{
2786 			/* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2787 			retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2788 
2789 			/*
2790 			 * Cost as though this were a seqscan, which is pessimistic.  We
2791 			 * effectively imagine the local_conds are being evaluated
2792 			 * remotely, too.
2793 			 */
2794 			startup_cost = 0;
2795 			run_cost = 0;
2796 			run_cost += seq_page_cost * foreignrel->pages;
2797 
2798 			startup_cost += foreignrel->baserestrictcost.startup;
2799 			cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2800 			run_cost += cpu_per_tuple * foreignrel->tuples;
2801 		}
2802 
2803 		/*
2804 		 * Without remote estimates, we have no real way to estimate the cost
2805 		 * of generating sorted output.  It could be free if the query plan
2806 		 * the remote side would have chosen generates properly-sorted output
2807 		 * anyway, but in most cases it will cost something.  Estimate a value
2808 		 * high enough that we won't pick the sorted path when the ordering
2809 		 * isn't locally useful, but low enough that we'll err on the side of
2810 		 * pushing down the ORDER BY clause when it's useful to do so.
2811 		 */
2812 		if (pathkeys != NIL)
2813 		{
2814 			startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2815 			run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2816 		}
2817 
2818 		total_cost = startup_cost + run_cost;
2819 	}
2820 
2821 	/*
2822 	 * Cache the costs for scans without any pathkeys or parameterization
2823 	 * before adding the costs for transferring data from the foreign server.
2824 	 * These costs are useful for costing the join between this relation and
2825 	 * another foreign relation or to calculate the costs of paths with
2826 	 * pathkeys for this relation, when the costs can not be obtained from the
2827 	 * foreign server. This function will be called at least once for every
2828 	 * foreign relation without pathkeys and parameterization.
2829 	 */
2830 	if (pathkeys == NIL && param_join_conds == NIL)
2831 	{
2832 		fpinfo->rel_startup_cost = startup_cost;
2833 		fpinfo->rel_total_cost = total_cost;
2834 	}
2835 
2836 	/*
2837 	 * Add some additional cost factors to account for connection overhead
2838 	 * (fdw_startup_cost), transferring data across the network
2839 	 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2840 	 * (cpu_tuple_cost per retrieved row).
2841 	 */
2842 	startup_cost += fpinfo->fdw_startup_cost;
2843 	total_cost += fpinfo->fdw_startup_cost;
2844 	total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2845 	total_cost += cpu_tuple_cost * retrieved_rows;
2846 
2847 	/* Return results. */
2848 	*p_rows = rows;
2849 	*p_width = width;
2850 	*p_startup_cost = startup_cost;
2851 	*p_total_cost = total_cost;
2852 }
2853 
2854 /*
2855  * Estimate costs of executing a SQL statement remotely.
2856  * The given "sql" must be an EXPLAIN command.
2857  */
2858 static void
get_remote_estimate(const char * sql,PGconn * conn,double * rows,int * width,Cost * startup_cost,Cost * total_cost)2859 get_remote_estimate(const char *sql, PGconn *conn,
2860 					double *rows, int *width,
2861 					Cost *startup_cost, Cost *total_cost)
2862 {
2863 	PGresult   *volatile res = NULL;
2864 
2865 	/* PGresult must be released before leaving this function. */
2866 	PG_TRY();
2867 	{
2868 		char	   *line;
2869 		char	   *p;
2870 		int			n;
2871 
2872 		/*
2873 		 * Execute EXPLAIN remotely.
2874 		 */
2875 		res = pgfdw_exec_query(conn, sql);
2876 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
2877 			pgfdw_report_error(ERROR, res, conn, false, sql);
2878 
2879 		/*
2880 		 * Extract cost numbers for topmost plan node.  Note we search for a
2881 		 * left paren from the end of the line to avoid being confused by
2882 		 * other uses of parentheses.
2883 		 */
2884 		line = PQgetvalue(res, 0, 0);
2885 		p = strrchr(line, '(');
2886 		if (p == NULL)
2887 			elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2888 		n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
2889 				   startup_cost, total_cost, rows, width);
2890 		if (n != 4)
2891 			elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2892 
2893 		PQclear(res);
2894 		res = NULL;
2895 	}
2896 	PG_CATCH();
2897 	{
2898 		if (res)
2899 			PQclear(res);
2900 		PG_RE_THROW();
2901 	}
2902 	PG_END_TRY();
2903 }
2904 
2905 /*
2906  * Detect whether we want to process an EquivalenceClass member.
2907  *
2908  * This is a callback for use by generate_implied_equalities_for_column.
2909  */
2910 static bool
ec_member_matches_foreign(PlannerInfo * root,RelOptInfo * rel,EquivalenceClass * ec,EquivalenceMember * em,void * arg)2911 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
2912 						  EquivalenceClass *ec, EquivalenceMember *em,
2913 						  void *arg)
2914 {
2915 	ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
2916 	Expr	   *expr = em->em_expr;
2917 
2918 	/*
2919 	 * If we've identified what we're processing in the current scan, we only
2920 	 * want to match that expression.
2921 	 */
2922 	if (state->current != NULL)
2923 		return equal(expr, state->current);
2924 
2925 	/*
2926 	 * Otherwise, ignore anything we've already processed.
2927 	 */
2928 	if (list_member(state->already_used, expr))
2929 		return false;
2930 
2931 	/* This is the new target to process. */
2932 	state->current = expr;
2933 	return true;
2934 }
2935 
2936 /*
2937  * Create cursor for node's query with current parameter values.
2938  */
2939 static void
create_cursor(ForeignScanState * node)2940 create_cursor(ForeignScanState *node)
2941 {
2942 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2943 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
2944 	int			numParams = fsstate->numParams;
2945 	const char **values = fsstate->param_values;
2946 	PGconn	   *conn = fsstate->conn;
2947 	StringInfoData buf;
2948 	PGresult   *res;
2949 
2950 	/*
2951 	 * Construct array of query parameter values in text format.  We do the
2952 	 * conversions in the short-lived per-tuple context, so as not to cause a
2953 	 * memory leak over repeated scans.
2954 	 */
2955 	if (numParams > 0)
2956 	{
2957 		MemoryContext oldcontext;
2958 
2959 		oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2960 
2961 		process_query_params(econtext,
2962 							 fsstate->param_flinfo,
2963 							 fsstate->param_exprs,
2964 							 values);
2965 
2966 		MemoryContextSwitchTo(oldcontext);
2967 	}
2968 
2969 	/* Construct the DECLARE CURSOR command */
2970 	initStringInfo(&buf);
2971 	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
2972 					 fsstate->cursor_number, fsstate->query);
2973 
2974 	/*
2975 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
2976 	 * to infer types for all parameters.  Since we explicitly cast every
2977 	 * parameter (see deparse.c), the "inference" is trivial and will produce
2978 	 * the desired result.  This allows us to avoid assuming that the remote
2979 	 * server has the same OIDs we do for the parameters' types.
2980 	 */
2981 	if (!PQsendQueryParams(conn, buf.data, numParams,
2982 						   NULL, values, NULL, NULL, 0))
2983 		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
2984 
2985 	/*
2986 	 * Get the result, and check for success.
2987 	 *
2988 	 * We don't use a PG_TRY block here, so be careful not to throw error
2989 	 * without releasing the PGresult.
2990 	 */
2991 	res = pgfdw_get_result(conn, buf.data);
2992 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
2993 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
2994 	PQclear(res);
2995 
2996 	/* Mark the cursor as created, and show no tuples have been retrieved */
2997 	fsstate->cursor_exists = true;
2998 	fsstate->tuples = NULL;
2999 	fsstate->num_tuples = 0;
3000 	fsstate->next_tuple = 0;
3001 	fsstate->fetch_ct_2 = 0;
3002 	fsstate->eof_reached = false;
3003 
3004 	/* Clean up */
3005 	pfree(buf.data);
3006 }
3007 
3008 /*
3009  * Fetch some more rows from the node's cursor.
3010  */
3011 static void
fetch_more_data(ForeignScanState * node)3012 fetch_more_data(ForeignScanState *node)
3013 {
3014 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3015 	PGresult   *volatile res = NULL;
3016 	MemoryContext oldcontext;
3017 
3018 	/*
3019 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
3020 	 * batch.
3021 	 */
3022 	fsstate->tuples = NULL;
3023 	MemoryContextReset(fsstate->batch_cxt);
3024 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3025 
3026 	/* PGresult must be released before leaving this function. */
3027 	PG_TRY();
3028 	{
3029 		PGconn	   *conn = fsstate->conn;
3030 		char		sql[64];
3031 		int			numrows;
3032 		int			i;
3033 
3034 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3035 				 fsstate->fetch_size, fsstate->cursor_number);
3036 
3037 		res = pgfdw_exec_query(conn, sql);
3038 		/* On error, report the original query, not the FETCH. */
3039 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3040 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3041 
3042 		/* Convert the data into HeapTuples */
3043 		numrows = PQntuples(res);
3044 		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3045 		fsstate->num_tuples = numrows;
3046 		fsstate->next_tuple = 0;
3047 
3048 		for (i = 0; i < numrows; i++)
3049 		{
3050 			Assert(IsA(node->ss.ps.plan, ForeignScan));
3051 
3052 			fsstate->tuples[i] =
3053 				make_tuple_from_result_row(res, i,
3054 										   fsstate->rel,
3055 										   fsstate->attinmeta,
3056 										   fsstate->retrieved_attrs,
3057 										   node,
3058 										   fsstate->temp_cxt);
3059 		}
3060 
3061 		/* Update fetch_ct_2 */
3062 		if (fsstate->fetch_ct_2 < 2)
3063 			fsstate->fetch_ct_2++;
3064 
3065 		/* Must be EOF if we didn't get as many tuples as we asked for. */
3066 		fsstate->eof_reached = (numrows < fsstate->fetch_size);
3067 
3068 		PQclear(res);
3069 		res = NULL;
3070 	}
3071 	PG_CATCH();
3072 	{
3073 		if (res)
3074 			PQclear(res);
3075 		PG_RE_THROW();
3076 	}
3077 	PG_END_TRY();
3078 
3079 	MemoryContextSwitchTo(oldcontext);
3080 }
3081 
3082 /*
3083  * Force assorted GUC parameters to settings that ensure that we'll output
3084  * data values in a form that is unambiguous to the remote server.
3085  *
3086  * This is rather expensive and annoying to do once per row, but there's
3087  * little choice if we want to be sure values are transmitted accurately;
3088  * we can't leave the settings in place between rows for fear of affecting
3089  * user-visible computations.
3090  *
3091  * We use the equivalent of a function SET option to allow the settings to
3092  * persist only until the caller calls reset_transmission_modes().  If an
3093  * error is thrown in between, guc.c will take care of undoing the settings.
3094  *
3095  * The return value is the nestlevel that must be passed to
3096  * reset_transmission_modes() to undo things.
3097  */
3098 int
set_transmission_modes(void)3099 set_transmission_modes(void)
3100 {
3101 	int			nestlevel = NewGUCNestLevel();
3102 
3103 	/*
3104 	 * The values set here should match what pg_dump does.  See also
3105 	 * configure_remote_session in connection.c.
3106 	 */
3107 	if (DateStyle != USE_ISO_DATES)
3108 		(void) set_config_option("datestyle", "ISO",
3109 								 PGC_USERSET, PGC_S_SESSION,
3110 								 GUC_ACTION_SAVE, true, 0, false);
3111 	if (IntervalStyle != INTSTYLE_POSTGRES)
3112 		(void) set_config_option("intervalstyle", "postgres",
3113 								 PGC_USERSET, PGC_S_SESSION,
3114 								 GUC_ACTION_SAVE, true, 0, false);
3115 	if (extra_float_digits < 3)
3116 		(void) set_config_option("extra_float_digits", "3",
3117 								 PGC_USERSET, PGC_S_SESSION,
3118 								 GUC_ACTION_SAVE, true, 0, false);
3119 
3120 	return nestlevel;
3121 }
3122 
3123 /*
3124  * Undo the effects of set_transmission_modes().
3125  */
3126 void
reset_transmission_modes(int nestlevel)3127 reset_transmission_modes(int nestlevel)
3128 {
3129 	AtEOXact_GUC(true, nestlevel);
3130 }
3131 
3132 /*
3133  * Utility routine to close a cursor.
3134  */
3135 static void
close_cursor(PGconn * conn,unsigned int cursor_number)3136 close_cursor(PGconn *conn, unsigned int cursor_number)
3137 {
3138 	char		sql[64];
3139 	PGresult   *res;
3140 
3141 	snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3142 
3143 	/*
3144 	 * We don't use a PG_TRY block here, so be careful not to throw error
3145 	 * without releasing the PGresult.
3146 	 */
3147 	res = pgfdw_exec_query(conn, sql);
3148 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3149 		pgfdw_report_error(ERROR, res, conn, true, sql);
3150 	PQclear(res);
3151 }
3152 
3153 /*
3154  * prepare_foreign_modify
3155  *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3156  */
3157 static void
prepare_foreign_modify(PgFdwModifyState * fmstate)3158 prepare_foreign_modify(PgFdwModifyState *fmstate)
3159 {
3160 	char		prep_name[NAMEDATALEN];
3161 	char	   *p_name;
3162 	PGresult   *res;
3163 
3164 	/* Construct name we'll use for the prepared statement. */
3165 	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3166 			 GetPrepStmtNumber(fmstate->conn));
3167 	p_name = pstrdup(prep_name);
3168 
3169 	/*
3170 	 * We intentionally do not specify parameter types here, but leave the
3171 	 * remote server to derive them by default.  This avoids possible problems
3172 	 * with the remote server using different type OIDs than we do.  All of
3173 	 * the prepared statements we use in this module are simple enough that
3174 	 * the remote server will make the right choices.
3175 	 */
3176 	if (!PQsendPrepare(fmstate->conn,
3177 					   p_name,
3178 					   fmstate->query,
3179 					   0,
3180 					   NULL))
3181 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3182 
3183 	/*
3184 	 * Get the result, and check for success.
3185 	 *
3186 	 * We don't use a PG_TRY block here, so be careful not to throw error
3187 	 * without releasing the PGresult.
3188 	 */
3189 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
3190 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3191 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3192 	PQclear(res);
3193 
3194 	/* This action shows that the prepare has been done. */
3195 	fmstate->p_name = p_name;
3196 }
3197 
3198 /*
3199  * convert_prep_stmt_params
3200  *		Create array of text strings representing parameter values
3201  *
3202  * tupleid is ctid to send, or NULL if none
3203  * slot is slot to get remaining parameters from, or NULL if none
3204  *
3205  * Data is constructed in temp_cxt; caller should reset that after use.
3206  */
3207 static const char **
convert_prep_stmt_params(PgFdwModifyState * fmstate,ItemPointer tupleid,TupleTableSlot * slot)3208 convert_prep_stmt_params(PgFdwModifyState *fmstate,
3209 						 ItemPointer tupleid,
3210 						 TupleTableSlot *slot)
3211 {
3212 	const char **p_values;
3213 	int			pindex = 0;
3214 	MemoryContext oldcontext;
3215 
3216 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3217 
3218 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3219 
3220 	/* 1st parameter should be ctid, if it's in use */
3221 	if (tupleid != NULL)
3222 	{
3223 		/* don't need set_transmission_modes for TID output */
3224 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3225 											  PointerGetDatum(tupleid));
3226 		pindex++;
3227 	}
3228 
3229 	/* get following parameters from slot */
3230 	if (slot != NULL && fmstate->target_attrs != NIL)
3231 	{
3232 		int			nestlevel;
3233 		ListCell   *lc;
3234 
3235 		nestlevel = set_transmission_modes();
3236 
3237 		foreach(lc, fmstate->target_attrs)
3238 		{
3239 			int			attnum = lfirst_int(lc);
3240 			Datum		value;
3241 			bool		isnull;
3242 
3243 			value = slot_getattr(slot, attnum, &isnull);
3244 			if (isnull)
3245 				p_values[pindex] = NULL;
3246 			else
3247 				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3248 													  value);
3249 			pindex++;
3250 		}
3251 
3252 		reset_transmission_modes(nestlevel);
3253 	}
3254 
3255 	Assert(pindex == fmstate->p_nums);
3256 
3257 	MemoryContextSwitchTo(oldcontext);
3258 
3259 	return p_values;
3260 }
3261 
3262 /*
3263  * store_returning_result
3264  *		Store the result of a RETURNING clause
3265  *
3266  * On error, be sure to release the PGresult on the way out.  Callers do not
3267  * have PG_TRY blocks to ensure this happens.
3268  */
3269 static void
store_returning_result(PgFdwModifyState * fmstate,TupleTableSlot * slot,PGresult * res)3270 store_returning_result(PgFdwModifyState *fmstate,
3271 					   TupleTableSlot *slot, PGresult *res)
3272 {
3273 	PG_TRY();
3274 	{
3275 		HeapTuple	newtup;
3276 
3277 		newtup = make_tuple_from_result_row(res, 0,
3278 											fmstate->rel,
3279 											fmstate->attinmeta,
3280 											fmstate->retrieved_attrs,
3281 											NULL,
3282 											fmstate->temp_cxt);
3283 		/* tuple will be deleted when it is cleared from the slot */
3284 		ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3285 	}
3286 	PG_CATCH();
3287 	{
3288 		if (res)
3289 			PQclear(res);
3290 		PG_RE_THROW();
3291 	}
3292 	PG_END_TRY();
3293 }
3294 
3295 /*
3296  * Execute a direct UPDATE/DELETE statement.
3297  */
3298 static void
execute_dml_stmt(ForeignScanState * node)3299 execute_dml_stmt(ForeignScanState *node)
3300 {
3301 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3302 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
3303 	int			numParams = dmstate->numParams;
3304 	const char **values = dmstate->param_values;
3305 
3306 	/*
3307 	 * Construct array of query parameter values in text format.
3308 	 */
3309 	if (numParams > 0)
3310 		process_query_params(econtext,
3311 							 dmstate->param_flinfo,
3312 							 dmstate->param_exprs,
3313 							 values);
3314 
3315 	/*
3316 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3317 	 * to infer types for all parameters.  Since we explicitly cast every
3318 	 * parameter (see deparse.c), the "inference" is trivial and will produce
3319 	 * the desired result.  This allows us to avoid assuming that the remote
3320 	 * server has the same OIDs we do for the parameters' types.
3321 	 */
3322 	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3323 						   NULL, values, NULL, NULL, 0))
3324 		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3325 
3326 	/*
3327 	 * Get the result, and check for success.
3328 	 *
3329 	 * We don't use a PG_TRY block here, so be careful not to throw error
3330 	 * without releasing the PGresult.
3331 	 */
3332 	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3333 	if (PQresultStatus(dmstate->result) !=
3334 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
3335 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3336 						   dmstate->query);
3337 
3338 	/* Get the number of rows affected. */
3339 	if (dmstate->has_returning)
3340 		dmstate->num_tuples = PQntuples(dmstate->result);
3341 	else
3342 		dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3343 }
3344 
3345 /*
3346  * Get the result of a RETURNING clause.
3347  */
3348 static TupleTableSlot *
get_returning_data(ForeignScanState * node)3349 get_returning_data(ForeignScanState *node)
3350 {
3351 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3352 	EState	   *estate = node->ss.ps.state;
3353 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3354 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3355 
3356 	Assert(resultRelInfo->ri_projectReturning);
3357 
3358 	/* If we didn't get any tuples, must be end of data. */
3359 	if (dmstate->next_tuple >= dmstate->num_tuples)
3360 		return ExecClearTuple(slot);
3361 
3362 	/* Increment the command es_processed count if necessary. */
3363 	if (dmstate->set_processed)
3364 		estate->es_processed += 1;
3365 
3366 	/*
3367 	 * Store a RETURNING tuple.  If has_returning is false, just emit a dummy
3368 	 * tuple.  (has_returning is false when the local query is of the form
3369 	 * "UPDATE/DELETE .. RETURNING 1" for example.)
3370 	 */
3371 	if (!dmstate->has_returning)
3372 		ExecStoreAllNullTuple(slot);
3373 	else
3374 	{
3375 		/*
3376 		 * On error, be sure to release the PGresult on the way out.  Callers
3377 		 * do not have PG_TRY blocks to ensure this happens.
3378 		 */
3379 		PG_TRY();
3380 		{
3381 			HeapTuple	newtup;
3382 
3383 			newtup = make_tuple_from_result_row(dmstate->result,
3384 												dmstate->next_tuple,
3385 												dmstate->rel,
3386 												dmstate->attinmeta,
3387 												dmstate->retrieved_attrs,
3388 												NULL,
3389 												dmstate->temp_cxt);
3390 			ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3391 		}
3392 		PG_CATCH();
3393 		{
3394 			if (dmstate->result)
3395 				PQclear(dmstate->result);
3396 			PG_RE_THROW();
3397 		}
3398 		PG_END_TRY();
3399 	}
3400 	dmstate->next_tuple++;
3401 
3402 	/* Make slot available for evaluation of the local query RETURNING list. */
3403 	resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
3404 
3405 	return slot;
3406 }
3407 
3408 /*
3409  * Prepare for processing of parameters used in remote query.
3410  */
3411 static void
prepare_query_params(PlanState * node,List * fdw_exprs,int numParams,FmgrInfo ** param_flinfo,List ** param_exprs,const char *** param_values)3412 prepare_query_params(PlanState *node,
3413 					 List *fdw_exprs,
3414 					 int numParams,
3415 					 FmgrInfo **param_flinfo,
3416 					 List **param_exprs,
3417 					 const char ***param_values)
3418 {
3419 	int			i;
3420 	ListCell   *lc;
3421 
3422 	Assert(numParams > 0);
3423 
3424 	/* Prepare for output conversion of parameters used in remote query. */
3425 	*param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
3426 
3427 	i = 0;
3428 	foreach(lc, fdw_exprs)
3429 	{
3430 		Node	   *param_expr = (Node *) lfirst(lc);
3431 		Oid			typefnoid;
3432 		bool		isvarlena;
3433 
3434 		getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
3435 		fmgr_info(typefnoid, &(*param_flinfo)[i]);
3436 		i++;
3437 	}
3438 
3439 	/*
3440 	 * Prepare remote-parameter expressions for evaluation.  (Note: in
3441 	 * practice, we expect that all these expressions will be just Params, so
3442 	 * we could possibly do something more efficient than using the full
3443 	 * expression-eval machinery for this.  But probably there would be little
3444 	 * benefit, and it'd require postgres_fdw to know more than is desirable
3445 	 * about Param evaluation.)
3446 	 */
3447 	*param_exprs = ExecInitExprList(fdw_exprs, node);
3448 
3449 	/* Allocate buffer for text form of query parameters. */
3450 	*param_values = (const char **) palloc0(numParams * sizeof(char *));
3451 }
3452 
3453 /*
3454  * Construct array of query parameter values in text format.
3455  */
3456 static void
process_query_params(ExprContext * econtext,FmgrInfo * param_flinfo,List * param_exprs,const char ** param_values)3457 process_query_params(ExprContext *econtext,
3458 					 FmgrInfo *param_flinfo,
3459 					 List *param_exprs,
3460 					 const char **param_values)
3461 {
3462 	int			nestlevel;
3463 	int			i;
3464 	ListCell   *lc;
3465 
3466 	nestlevel = set_transmission_modes();
3467 
3468 	i = 0;
3469 	foreach(lc, param_exprs)
3470 	{
3471 		ExprState  *expr_state = (ExprState *) lfirst(lc);
3472 		Datum		expr_value;
3473 		bool		isNull;
3474 
3475 		/* Evaluate the parameter expression */
3476 		expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
3477 
3478 		/*
3479 		 * Get string representation of each parameter value by invoking
3480 		 * type-specific output function, unless the value is null.
3481 		 */
3482 		if (isNull)
3483 			param_values[i] = NULL;
3484 		else
3485 			param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
3486 
3487 		i++;
3488 	}
3489 
3490 	reset_transmission_modes(nestlevel);
3491 }
3492 
3493 /*
3494  * postgresAnalyzeForeignTable
3495  *		Test whether analyzing this foreign table is supported
3496  */
3497 static bool
postgresAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)3498 postgresAnalyzeForeignTable(Relation relation,
3499 							AcquireSampleRowsFunc *func,
3500 							BlockNumber *totalpages)
3501 {
3502 	ForeignTable *table;
3503 	UserMapping *user;
3504 	PGconn	   *conn;
3505 	StringInfoData sql;
3506 	PGresult   *volatile res = NULL;
3507 
3508 	/* Return the row-analysis function pointer */
3509 	*func = postgresAcquireSampleRowsFunc;
3510 
3511 	/*
3512 	 * Now we have to get the number of pages.  It's annoying that the ANALYZE
3513 	 * API requires us to return that now, because it forces some duplication
3514 	 * of effort between this routine and postgresAcquireSampleRowsFunc.  But
3515 	 * it's probably not worth redefining that API at this point.
3516 	 */
3517 
3518 	/*
3519 	 * Get the connection to use.  We do the remote access as the table's
3520 	 * owner, even if the ANALYZE was started by some other user.
3521 	 */
3522 	table = GetForeignTable(RelationGetRelid(relation));
3523 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3524 	conn = GetConnection(user, false);
3525 
3526 	/*
3527 	 * Construct command to get page count for relation.
3528 	 */
3529 	initStringInfo(&sql);
3530 	deparseAnalyzeSizeSql(&sql, relation);
3531 
3532 	/* In what follows, do not risk leaking any PGresults. */
3533 	PG_TRY();
3534 	{
3535 		res = pgfdw_exec_query(conn, sql.data);
3536 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3537 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
3538 
3539 		if (PQntuples(res) != 1 || PQnfields(res) != 1)
3540 			elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
3541 		*totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
3542 
3543 		PQclear(res);
3544 		res = NULL;
3545 	}
3546 	PG_CATCH();
3547 	{
3548 		if (res)
3549 			PQclear(res);
3550 		PG_RE_THROW();
3551 	}
3552 	PG_END_TRY();
3553 
3554 	ReleaseConnection(conn);
3555 
3556 	return true;
3557 }
3558 
3559 /*
3560  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
3561  *
3562  * We fetch the whole table from the remote side and pick out some sample rows.
3563  *
3564  * Selected rows are returned in the caller-allocated array rows[],
3565  * which must have at least targrows entries.
3566  * The actual number of rows selected is returned as the function result.
3567  * We also count the total number of rows in the table and return it into
3568  * *totalrows.  Note that *totaldeadrows is always set to 0.
3569  *
3570  * Note that the returned list of rows is not always in order by physical
3571  * position in the table.  Therefore, correlation estimates derived later
3572  * may be meaningless, but it's OK because we don't use the estimates
3573  * currently (the planner only pays attention to correlation for indexscans).
3574  */
3575 static int
postgresAcquireSampleRowsFunc(Relation relation,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)3576 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
3577 							  HeapTuple *rows, int targrows,
3578 							  double *totalrows,
3579 							  double *totaldeadrows)
3580 {
3581 	PgFdwAnalyzeState astate;
3582 	ForeignTable *table;
3583 	ForeignServer *server;
3584 	UserMapping *user;
3585 	PGconn	   *conn;
3586 	unsigned int cursor_number;
3587 	StringInfoData sql;
3588 	PGresult   *volatile res = NULL;
3589 
3590 	/* Initialize workspace state */
3591 	astate.rel = relation;
3592 	astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
3593 
3594 	astate.rows = rows;
3595 	astate.targrows = targrows;
3596 	astate.numrows = 0;
3597 	astate.samplerows = 0;
3598 	astate.rowstoskip = -1;		/* -1 means not set yet */
3599 	reservoir_init_selection_state(&astate.rstate, targrows);
3600 
3601 	/* Remember ANALYZE context, and create a per-tuple temp context */
3602 	astate.anl_cxt = CurrentMemoryContext;
3603 	astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
3604 											"postgres_fdw temporary data",
3605 											ALLOCSET_SMALL_SIZES);
3606 
3607 	/*
3608 	 * Get the connection to use.  We do the remote access as the table's
3609 	 * owner, even if the ANALYZE was started by some other user.
3610 	 */
3611 	table = GetForeignTable(RelationGetRelid(relation));
3612 	server = GetForeignServer(table->serverid);
3613 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3614 	conn = GetConnection(user, false);
3615 
3616 	/*
3617 	 * Construct cursor that retrieves whole rows from remote.
3618 	 */
3619 	cursor_number = GetCursorNumber(conn);
3620 	initStringInfo(&sql);
3621 	appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
3622 	deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
3623 
3624 	/* In what follows, do not risk leaking any PGresults. */
3625 	PG_TRY();
3626 	{
3627 		res = pgfdw_exec_query(conn, sql.data);
3628 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
3629 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
3630 		PQclear(res);
3631 		res = NULL;
3632 
3633 		/* Retrieve and process rows a batch at a time. */
3634 		for (;;)
3635 		{
3636 			char		fetch_sql[64];
3637 			int			fetch_size;
3638 			int			numrows;
3639 			int			i;
3640 			ListCell   *lc;
3641 
3642 			/* Allow users to cancel long query */
3643 			CHECK_FOR_INTERRUPTS();
3644 
3645 			/*
3646 			 * XXX possible future improvement: if rowstoskip is large, we
3647 			 * could issue a MOVE rather than physically fetching the rows,
3648 			 * then just adjust rowstoskip and samplerows appropriately.
3649 			 */
3650 
3651 			/* The fetch size is arbitrary, but shouldn't be enormous. */
3652 			fetch_size = 100;
3653 			foreach(lc, server->options)
3654 			{
3655 				DefElem    *def = (DefElem *) lfirst(lc);
3656 
3657 				if (strcmp(def->defname, "fetch_size") == 0)
3658 				{
3659 					fetch_size = strtol(defGetString(def), NULL, 10);
3660 					break;
3661 				}
3662 			}
3663 			foreach(lc, table->options)
3664 			{
3665 				DefElem    *def = (DefElem *) lfirst(lc);
3666 
3667 				if (strcmp(def->defname, "fetch_size") == 0)
3668 				{
3669 					fetch_size = strtol(defGetString(def), NULL, 10);
3670 					break;
3671 				}
3672 			}
3673 
3674 			/* Fetch some rows */
3675 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
3676 					 fetch_size, cursor_number);
3677 
3678 			res = pgfdw_exec_query(conn, fetch_sql);
3679 			/* On error, report the original query, not the FETCH. */
3680 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
3681 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
3682 
3683 			/* Process whatever we got. */
3684 			numrows = PQntuples(res);
3685 			for (i = 0; i < numrows; i++)
3686 				analyze_row_processor(res, i, &astate);
3687 
3688 			PQclear(res);
3689 			res = NULL;
3690 
3691 			/* Must be EOF if we didn't get all the rows requested. */
3692 			if (numrows < fetch_size)
3693 				break;
3694 		}
3695 
3696 		/* Close the cursor, just to be tidy. */
3697 		close_cursor(conn, cursor_number);
3698 	}
3699 	PG_CATCH();
3700 	{
3701 		if (res)
3702 			PQclear(res);
3703 		PG_RE_THROW();
3704 	}
3705 	PG_END_TRY();
3706 
3707 	ReleaseConnection(conn);
3708 
3709 	/* We assume that we have no dead tuple. */
3710 	*totaldeadrows = 0.0;
3711 
3712 	/* We've retrieved all living tuples from foreign server. */
3713 	*totalrows = astate.samplerows;
3714 
3715 	/*
3716 	 * Emit some interesting relation info
3717 	 */
3718 	ereport(elevel,
3719 			(errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
3720 					RelationGetRelationName(relation),
3721 					astate.samplerows, astate.numrows)));
3722 
3723 	return astate.numrows;
3724 }
3725 
3726 /*
3727  * Collect sample rows from the result of query.
3728  *	 - Use all tuples in sample until target # of samples are collected.
3729  *	 - Subsequently, replace already-sampled tuples randomly.
3730  */
3731 static void
analyze_row_processor(PGresult * res,int row,PgFdwAnalyzeState * astate)3732 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
3733 {
3734 	int			targrows = astate->targrows;
3735 	int			pos;			/* array index to store tuple in */
3736 	MemoryContext oldcontext;
3737 
3738 	/* Always increment sample row counter. */
3739 	astate->samplerows += 1;
3740 
3741 	/*
3742 	 * Determine the slot where this sample row should be stored.  Set pos to
3743 	 * negative value to indicate the row should be skipped.
3744 	 */
3745 	if (astate->numrows < targrows)
3746 	{
3747 		/* First targrows rows are always included into the sample */
3748 		pos = astate->numrows++;
3749 	}
3750 	else
3751 	{
3752 		/*
3753 		 * Now we start replacing tuples in the sample until we reach the end
3754 		 * of the relation.  Same algorithm as in acquire_sample_rows in
3755 		 * analyze.c; see Jeff Vitter's paper.
3756 		 */
3757 		if (astate->rowstoskip < 0)
3758 			astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
3759 
3760 		if (astate->rowstoskip <= 0)
3761 		{
3762 			/* Choose a random reservoir element to replace. */
3763 			pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
3764 			Assert(pos >= 0 && pos < targrows);
3765 			heap_freetuple(astate->rows[pos]);
3766 		}
3767 		else
3768 		{
3769 			/* Skip this tuple. */
3770 			pos = -1;
3771 		}
3772 
3773 		astate->rowstoskip -= 1;
3774 	}
3775 
3776 	if (pos >= 0)
3777 	{
3778 		/*
3779 		 * Create sample tuple from current result row, and store it in the
3780 		 * position determined above.  The tuple has to be created in anl_cxt.
3781 		 */
3782 		oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
3783 
3784 		astate->rows[pos] = make_tuple_from_result_row(res, row,
3785 													   astate->rel,
3786 													   astate->attinmeta,
3787 													   astate->retrieved_attrs,
3788 													   NULL,
3789 													   astate->temp_cxt);
3790 
3791 		MemoryContextSwitchTo(oldcontext);
3792 	}
3793 }
3794 
3795 /*
3796  * Import a foreign schema
3797  */
3798 static List *
postgresImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)3799 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
3800 {
3801 	List	   *commands = NIL;
3802 	bool		import_collate = true;
3803 	bool		import_default = false;
3804 	bool		import_not_null = true;
3805 	ForeignServer *server;
3806 	UserMapping *mapping;
3807 	PGconn	   *conn;
3808 	StringInfoData buf;
3809 	PGresult   *volatile res = NULL;
3810 	int			numrows,
3811 				i;
3812 	ListCell   *lc;
3813 
3814 	/* Parse statement options */
3815 	foreach(lc, stmt->options)
3816 	{
3817 		DefElem    *def = (DefElem *) lfirst(lc);
3818 
3819 		if (strcmp(def->defname, "import_collate") == 0)
3820 			import_collate = defGetBoolean(def);
3821 		else if (strcmp(def->defname, "import_default") == 0)
3822 			import_default = defGetBoolean(def);
3823 		else if (strcmp(def->defname, "import_not_null") == 0)
3824 			import_not_null = defGetBoolean(def);
3825 		else
3826 			ereport(ERROR,
3827 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
3828 					 errmsg("invalid option \"%s\"", def->defname)));
3829 	}
3830 
3831 	/*
3832 	 * Get connection to the foreign server.  Connection manager will
3833 	 * establish new connection if necessary.
3834 	 */
3835 	server = GetForeignServer(serverOid);
3836 	mapping = GetUserMapping(GetUserId(), server->serverid);
3837 	conn = GetConnection(mapping, false);
3838 
3839 	/* Don't attempt to import collation if remote server hasn't got it */
3840 	if (PQserverVersion(conn) < 90100)
3841 		import_collate = false;
3842 
3843 	/* Create workspace for strings */
3844 	initStringInfo(&buf);
3845 
3846 	/* In what follows, do not risk leaking any PGresults. */
3847 	PG_TRY();
3848 	{
3849 		/* Check that the schema really exists */
3850 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
3851 		deparseStringLiteral(&buf, stmt->remote_schema);
3852 
3853 		res = pgfdw_exec_query(conn, buf.data);
3854 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3855 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
3856 
3857 		if (PQntuples(res) != 1)
3858 			ereport(ERROR,
3859 					(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
3860 					 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
3861 							stmt->remote_schema, server->servername)));
3862 
3863 		PQclear(res);
3864 		res = NULL;
3865 		resetStringInfo(&buf);
3866 
3867 		/*
3868 		 * Fetch all table data from this schema, possibly restricted by
3869 		 * EXCEPT or LIMIT TO.  (We don't actually need to pay any attention
3870 		 * to EXCEPT/LIMIT TO here, because the core code will filter the
3871 		 * statements we return according to those lists anyway.  But it
3872 		 * should save a few cycles to not process excluded tables in the
3873 		 * first place.)
3874 		 *
3875 		 * Ignore table data for partitions and only include the definitions
3876 		 * of the root partitioned tables to allow access to the complete
3877 		 * remote data set locally in the schema imported.
3878 		 *
3879 		 * Note: because we run the connection with search_path restricted to
3880 		 * pg_catalog, the format_type() and pg_get_expr() outputs will always
3881 		 * include a schema name for types/functions in other schemas, which
3882 		 * is what we want.
3883 		 */
3884 		if (import_collate)
3885 			appendStringInfoString(&buf,
3886 								   "SELECT relname, "
3887 								   "  attname, "
3888 								   "  format_type(atttypid, atttypmod), "
3889 								   "  attnotnull, "
3890 								   "  pg_get_expr(adbin, adrelid), "
3891 								   "  collname, "
3892 								   "  collnsp.nspname "
3893 								   "FROM pg_class c "
3894 								   "  JOIN pg_namespace n ON "
3895 								   "    relnamespace = n.oid "
3896 								   "  LEFT JOIN pg_attribute a ON "
3897 								   "    attrelid = c.oid AND attnum > 0 "
3898 								   "      AND NOT attisdropped "
3899 								   "  LEFT JOIN pg_attrdef ad ON "
3900 								   "    adrelid = c.oid AND adnum = attnum "
3901 								   "  LEFT JOIN pg_collation coll ON "
3902 								   "    coll.oid = attcollation "
3903 								   "  LEFT JOIN pg_namespace collnsp ON "
3904 								   "    collnsp.oid = collnamespace ");
3905 		else
3906 			appendStringInfoString(&buf,
3907 								   "SELECT relname, "
3908 								   "  attname, "
3909 								   "  format_type(atttypid, atttypmod), "
3910 								   "  attnotnull, "
3911 								   "  pg_get_expr(adbin, adrelid), "
3912 								   "  NULL, NULL "
3913 								   "FROM pg_class c "
3914 								   "  JOIN pg_namespace n ON "
3915 								   "    relnamespace = n.oid "
3916 								   "  LEFT JOIN pg_attribute a ON "
3917 								   "    attrelid = c.oid AND attnum > 0 "
3918 								   "      AND NOT attisdropped "
3919 								   "  LEFT JOIN pg_attrdef ad ON "
3920 								   "    adrelid = c.oid AND adnum = attnum ");
3921 
3922 		appendStringInfoString(&buf,
3923 							   "WHERE c.relkind IN ("
3924 							   CppAsString2(RELKIND_RELATION) ","
3925 							   CppAsString2(RELKIND_VIEW) ","
3926 							   CppAsString2(RELKIND_FOREIGN_TABLE) ","
3927 							   CppAsString2(RELKIND_MATVIEW) ","
3928 							   CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
3929 							   "  AND n.nspname = ");
3930 		deparseStringLiteral(&buf, stmt->remote_schema);
3931 
3932 		/* Partitions are supported since Postgres 10 */
3933 		if (PQserverVersion(conn) >= 100000)
3934 			appendStringInfoString(&buf, " AND NOT c.relispartition ");
3935 
3936 		/* Apply restrictions for LIMIT TO and EXCEPT */
3937 		if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
3938 			stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3939 		{
3940 			bool		first_item = true;
3941 
3942 			appendStringInfoString(&buf, " AND c.relname ");
3943 			if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3944 				appendStringInfoString(&buf, "NOT ");
3945 			appendStringInfoString(&buf, "IN (");
3946 
3947 			/* Append list of table names within IN clause */
3948 			foreach(lc, stmt->table_list)
3949 			{
3950 				RangeVar   *rv = (RangeVar *) lfirst(lc);
3951 
3952 				if (first_item)
3953 					first_item = false;
3954 				else
3955 					appendStringInfoString(&buf, ", ");
3956 				deparseStringLiteral(&buf, rv->relname);
3957 			}
3958 			appendStringInfoChar(&buf, ')');
3959 		}
3960 
3961 		/* Append ORDER BY at the end of query to ensure output ordering */
3962 		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
3963 
3964 		/* Fetch the data */
3965 		res = pgfdw_exec_query(conn, buf.data);
3966 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3967 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
3968 
3969 		/* Process results */
3970 		numrows = PQntuples(res);
3971 		/* note: incrementation of i happens in inner loop's while() test */
3972 		for (i = 0; i < numrows;)
3973 		{
3974 			char	   *tablename = PQgetvalue(res, i, 0);
3975 			bool		first_item = true;
3976 
3977 			resetStringInfo(&buf);
3978 			appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
3979 							 quote_identifier(tablename));
3980 
3981 			/* Scan all rows for this table */
3982 			do
3983 			{
3984 				char	   *attname;
3985 				char	   *typename;
3986 				char	   *attnotnull;
3987 				char	   *attdefault;
3988 				char	   *collname;
3989 				char	   *collnamespace;
3990 
3991 				/* If table has no columns, we'll see nulls here */
3992 				if (PQgetisnull(res, i, 1))
3993 					continue;
3994 
3995 				attname = PQgetvalue(res, i, 1);
3996 				typename = PQgetvalue(res, i, 2);
3997 				attnotnull = PQgetvalue(res, i, 3);
3998 				attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
3999 					PQgetvalue(res, i, 4);
4000 				collname = PQgetisnull(res, i, 5) ? (char *) NULL :
4001 					PQgetvalue(res, i, 5);
4002 				collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
4003 					PQgetvalue(res, i, 6);
4004 
4005 				if (first_item)
4006 					first_item = false;
4007 				else
4008 					appendStringInfoString(&buf, ",\n");
4009 
4010 				/* Print column name and type */
4011 				appendStringInfo(&buf, "  %s %s",
4012 								 quote_identifier(attname),
4013 								 typename);
4014 
4015 				/*
4016 				 * Add column_name option so that renaming the foreign table's
4017 				 * column doesn't break the association to the underlying
4018 				 * column.
4019 				 */
4020 				appendStringInfoString(&buf, " OPTIONS (column_name ");
4021 				deparseStringLiteral(&buf, attname);
4022 				appendStringInfoChar(&buf, ')');
4023 
4024 				/* Add COLLATE if needed */
4025 				if (import_collate && collname != NULL && collnamespace != NULL)
4026 					appendStringInfo(&buf, " COLLATE %s.%s",
4027 									 quote_identifier(collnamespace),
4028 									 quote_identifier(collname));
4029 
4030 				/* Add DEFAULT if needed */
4031 				if (import_default && attdefault != NULL)
4032 					appendStringInfo(&buf, " DEFAULT %s", attdefault);
4033 
4034 				/* Add NOT NULL if needed */
4035 				if (import_not_null && attnotnull[0] == 't')
4036 					appendStringInfoString(&buf, " NOT NULL");
4037 			}
4038 			while (++i < numrows &&
4039 				   strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4040 
4041 			/*
4042 			 * Add server name and table-level options.  We specify remote
4043 			 * schema and table name as options (the latter to ensure that
4044 			 * renaming the foreign table doesn't break the association).
4045 			 */
4046 			appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4047 							 quote_identifier(server->servername));
4048 
4049 			appendStringInfoString(&buf, "schema_name ");
4050 			deparseStringLiteral(&buf, stmt->remote_schema);
4051 			appendStringInfoString(&buf, ", table_name ");
4052 			deparseStringLiteral(&buf, tablename);
4053 
4054 			appendStringInfoString(&buf, ");");
4055 
4056 			commands = lappend(commands, pstrdup(buf.data));
4057 		}
4058 
4059 		/* Clean up */
4060 		PQclear(res);
4061 		res = NULL;
4062 	}
4063 	PG_CATCH();
4064 	{
4065 		if (res)
4066 			PQclear(res);
4067 		PG_RE_THROW();
4068 	}
4069 	PG_END_TRY();
4070 
4071 	ReleaseConnection(conn);
4072 
4073 	return commands;
4074 }
4075 
4076 /*
4077  * Assess whether the join between inner and outer relations can be pushed down
4078  * to the foreign server. As a side effect, save information we obtain in this
4079  * function to PgFdwRelationInfo passed in.
4080  */
4081 static bool
foreign_join_ok(PlannerInfo * root,RelOptInfo * joinrel,JoinType jointype,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinPathExtraData * extra)4082 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
4083 				RelOptInfo *outerrel, RelOptInfo *innerrel,
4084 				JoinPathExtraData *extra)
4085 {
4086 	PgFdwRelationInfo *fpinfo;
4087 	PgFdwRelationInfo *fpinfo_o;
4088 	PgFdwRelationInfo *fpinfo_i;
4089 	ListCell   *lc;
4090 	List	   *joinclauses;
4091 
4092 	/*
4093 	 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4094 	 * Constructing queries representing SEMI and ANTI joins is hard, hence
4095 	 * not considered right now.
4096 	 */
4097 	if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4098 		jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4099 		return false;
4100 
4101 	/*
4102 	 * If either of the joining relations is marked as unsafe to pushdown, the
4103 	 * join can not be pushed down.
4104 	 */
4105 	fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4106 	fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4107 	fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4108 	if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4109 		!fpinfo_i || !fpinfo_i->pushdown_safe)
4110 		return false;
4111 
4112 	/*
4113 	 * If joining relations have local conditions, those conditions are
4114 	 * required to be applied before joining the relations. Hence the join can
4115 	 * not be pushed down.
4116 	 */
4117 	if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4118 		return false;
4119 
4120 	/*
4121 	 * Merge FDW options.  We might be tempted to do this after we have deemed
4122 	 * the foreign join to be OK.  But we must do this beforehand so that we
4123 	 * know which quals can be evaluated on the foreign server, which might
4124 	 * depend on shippable_extensions.
4125 	 */
4126 	fpinfo->server = fpinfo_o->server;
4127 	merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
4128 
4129 	/*
4130 	 * Separate restrict list into join quals and pushed-down (other) quals.
4131 	 *
4132 	 * Join quals belonging to an outer join must all be shippable, else we
4133 	 * cannot execute the join remotely.  Add such quals to 'joinclauses'.
4134 	 *
4135 	 * Add other quals to fpinfo->remote_conds if they are shippable, else to
4136 	 * fpinfo->local_conds.  In an inner join it's okay to execute conditions
4137 	 * either locally or remotely; the same is true for pushed-down conditions
4138 	 * at an outer join.
4139 	 *
4140 	 * Note we might return failure after having already scribbled on
4141 	 * fpinfo->remote_conds and fpinfo->local_conds.  That's okay because we
4142 	 * won't consult those lists again if we deem the join unshippable.
4143 	 */
4144 	joinclauses = NIL;
4145 	foreach(lc, extra->restrictlist)
4146 	{
4147 		RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
4148 		bool		is_remote_clause = is_foreign_expr(root, joinrel,
4149 													   rinfo->clause);
4150 
4151 		if (IS_OUTER_JOIN(jointype) &&
4152 			!RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
4153 		{
4154 			if (!is_remote_clause)
4155 				return false;
4156 			joinclauses = lappend(joinclauses, rinfo);
4157 		}
4158 		else
4159 		{
4160 			if (is_remote_clause)
4161 				fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
4162 			else
4163 				fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
4164 		}
4165 	}
4166 
4167 	/*
4168 	 * deparseExplicitTargetList() isn't smart enough to handle anything other
4169 	 * than a Var.  In particular, if there's some PlaceHolderVar that would
4170 	 * need to be evaluated within this join tree (because there's an upper
4171 	 * reference to a quantity that may go to NULL as a result of an outer
4172 	 * join), then we can't try to push the join down because we'll fail when
4173 	 * we get to deparseExplicitTargetList().  However, a PlaceHolderVar that
4174 	 * needs to be evaluated *at the top* of this join tree is OK, because we
4175 	 * can do that locally after fetching the results from the remote side.
4176 	 */
4177 	foreach(lc, root->placeholder_list)
4178 	{
4179 		PlaceHolderInfo *phinfo = lfirst(lc);
4180 		Relids		relids = joinrel->relids;
4181 
4182 		if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4183 			bms_nonempty_difference(relids, phinfo->ph_eval_at))
4184 			return false;
4185 	}
4186 
4187 	/* Save the join clauses, for later use. */
4188 	fpinfo->joinclauses = joinclauses;
4189 
4190 	fpinfo->outerrel = outerrel;
4191 	fpinfo->innerrel = innerrel;
4192 	fpinfo->jointype = jointype;
4193 
4194 	/*
4195 	 * By default, both the input relations are not required to be deparsed as
4196 	 * subqueries, but there might be some relations covered by the input
4197 	 * relations that are required to be deparsed as subqueries, so save the
4198 	 * relids of those relations for later use by the deparser.
4199 	 */
4200 	fpinfo->make_outerrel_subquery = false;
4201 	fpinfo->make_innerrel_subquery = false;
4202 	Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
4203 	Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
4204 	fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
4205 											fpinfo_i->lower_subquery_rels);
4206 
4207 	/*
4208 	 * Pull the other remote conditions from the joining relations into join
4209 	 * clauses or other remote clauses (remote_conds) of this relation
4210 	 * wherever possible. This avoids building subqueries at every join step.
4211 	 *
4212 	 * For an inner join, clauses from both the relations are added to the
4213 	 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4214 	 * the outer side are added to remote_conds since those can be evaluated
4215 	 * after the join is evaluated. The clauses from inner side are added to
4216 	 * the joinclauses, since they need to be evaluated while constructing the
4217 	 * join.
4218 	 *
4219 	 * For a FULL OUTER JOIN, the other clauses from either relation can not
4220 	 * be added to the joinclauses or remote_conds, since each relation acts
4221 	 * as an outer relation for the other.
4222 	 *
4223 	 * The joining sides can not have local conditions, thus no need to test
4224 	 * shippability of the clauses being pulled up.
4225 	 */
4226 	switch (jointype)
4227 	{
4228 		case JOIN_INNER:
4229 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4230 											   list_copy(fpinfo_i->remote_conds));
4231 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4232 											   list_copy(fpinfo_o->remote_conds));
4233 			break;
4234 
4235 		case JOIN_LEFT:
4236 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4237 											  list_copy(fpinfo_i->remote_conds));
4238 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4239 											   list_copy(fpinfo_o->remote_conds));
4240 			break;
4241 
4242 		case JOIN_RIGHT:
4243 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4244 											  list_copy(fpinfo_o->remote_conds));
4245 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4246 											   list_copy(fpinfo_i->remote_conds));
4247 			break;
4248 
4249 		case JOIN_FULL:
4250 
4251 			/*
4252 			 * In this case, if any of the input relations has conditions, we
4253 			 * need to deparse that relation as a subquery so that the
4254 			 * conditions can be evaluated before the join.  Remember it in
4255 			 * the fpinfo of this relation so that the deparser can take
4256 			 * appropriate action.  Also, save the relids of base relations
4257 			 * covered by that relation for later use by the deparser.
4258 			 */
4259 			if (fpinfo_o->remote_conds)
4260 			{
4261 				fpinfo->make_outerrel_subquery = true;
4262 				fpinfo->lower_subquery_rels =
4263 					bms_add_members(fpinfo->lower_subquery_rels,
4264 									outerrel->relids);
4265 			}
4266 			if (fpinfo_i->remote_conds)
4267 			{
4268 				fpinfo->make_innerrel_subquery = true;
4269 				fpinfo->lower_subquery_rels =
4270 					bms_add_members(fpinfo->lower_subquery_rels,
4271 									innerrel->relids);
4272 			}
4273 			break;
4274 
4275 		default:
4276 			/* Should not happen, we have just checked this above */
4277 			elog(ERROR, "unsupported join type %d", jointype);
4278 	}
4279 
4280 	/*
4281 	 * For an inner join, all restrictions can be treated alike. Treating the
4282 	 * pushed down conditions as join conditions allows a top level full outer
4283 	 * join to be deparsed without requiring subqueries.
4284 	 */
4285 	if (jointype == JOIN_INNER)
4286 	{
4287 		Assert(!fpinfo->joinclauses);
4288 		fpinfo->joinclauses = fpinfo->remote_conds;
4289 		fpinfo->remote_conds = NIL;
4290 	}
4291 
4292 	/* Mark that this join can be pushed down safely */
4293 	fpinfo->pushdown_safe = true;
4294 
4295 	/* Get user mapping */
4296 	if (fpinfo->use_remote_estimate)
4297 	{
4298 		if (fpinfo_o->use_remote_estimate)
4299 			fpinfo->user = fpinfo_o->user;
4300 		else
4301 			fpinfo->user = fpinfo_i->user;
4302 	}
4303 	else
4304 		fpinfo->user = NULL;
4305 
4306 	/*
4307 	 * Set cached relation costs to some negative value, so that we can detect
4308 	 * when they are set to some sensible costs, during one (usually the
4309 	 * first) of the calls to estimate_path_cost_size().
4310 	 */
4311 	fpinfo->rel_startup_cost = -1;
4312 	fpinfo->rel_total_cost = -1;
4313 
4314 	/*
4315 	 * Set the string describing this join relation to be used in EXPLAIN
4316 	 * output of corresponding ForeignScan.
4317 	 */
4318 	fpinfo->relation_name = makeStringInfo();
4319 	appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4320 					 fpinfo_o->relation_name->data,
4321 					 get_jointype_name(fpinfo->jointype),
4322 					 fpinfo_i->relation_name->data);
4323 
4324 	/*
4325 	 * Set the relation index.  This is defined as the position of this
4326 	 * joinrel in the join_rel_list list plus the length of the rtable list.
4327 	 * Note that since this joinrel is at the end of the join_rel_list list
4328 	 * when we are called, we can get the position by list_length.
4329 	 */
4330 	Assert(fpinfo->relation_index == 0);	/* shouldn't be set yet */
4331 	fpinfo->relation_index =
4332 		list_length(root->parse->rtable) + list_length(root->join_rel_list);
4333 
4334 	return true;
4335 }
4336 
4337 static void
add_paths_with_pathkeys_for_rel(PlannerInfo * root,RelOptInfo * rel,Path * epq_path)4338 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
4339 								Path *epq_path)
4340 {
4341 	List	   *useful_pathkeys_list = NIL; /* List of all pathkeys */
4342 	ListCell   *lc;
4343 
4344 	useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4345 
4346 	/* Create one path for each set of pathkeys we found above. */
4347 	foreach(lc, useful_pathkeys_list)
4348 	{
4349 		double		rows;
4350 		int			width;
4351 		Cost		startup_cost;
4352 		Cost		total_cost;
4353 		List	   *useful_pathkeys = lfirst(lc);
4354 		Path	   *sorted_epq_path;
4355 
4356 		estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4357 								&rows, &width, &startup_cost, &total_cost);
4358 
4359 		/*
4360 		 * The EPQ path must be at least as well sorted as the path itself,
4361 		 * in case it gets used as input to a mergejoin.
4362 		 */
4363 		sorted_epq_path = epq_path;
4364 		if (sorted_epq_path != NULL &&
4365 			!pathkeys_contained_in(useful_pathkeys,
4366 								   sorted_epq_path->pathkeys))
4367 			sorted_epq_path = (Path *)
4368 				create_sort_path(root,
4369 								 rel,
4370 								 sorted_epq_path,
4371 								 useful_pathkeys,
4372 								 -1.0);
4373 
4374 		add_path(rel, (Path *)
4375 				 create_foreignscan_path(root, rel,
4376 										 NULL,
4377 										 rows,
4378 										 startup_cost,
4379 										 total_cost,
4380 										 useful_pathkeys,
4381 										 rel->lateral_relids,
4382 										 sorted_epq_path,
4383 										 NIL));
4384 	}
4385 }
4386 
4387 /*
4388  * Parse options from foreign server and apply them to fpinfo.
4389  *
4390  * New options might also require tweaking merge_fdw_options().
4391  */
4392 static void
apply_server_options(PgFdwRelationInfo * fpinfo)4393 apply_server_options(PgFdwRelationInfo *fpinfo)
4394 {
4395 	ListCell   *lc;
4396 
4397 	foreach(lc, fpinfo->server->options)
4398 	{
4399 		DefElem    *def = (DefElem *) lfirst(lc);
4400 
4401 		if (strcmp(def->defname, "use_remote_estimate") == 0)
4402 			fpinfo->use_remote_estimate = defGetBoolean(def);
4403 		else if (strcmp(def->defname, "fdw_startup_cost") == 0)
4404 			fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
4405 		else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
4406 			fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
4407 		else if (strcmp(def->defname, "extensions") == 0)
4408 			fpinfo->shippable_extensions =
4409 				ExtractExtensionList(defGetString(def), false);
4410 		else if (strcmp(def->defname, "fetch_size") == 0)
4411 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
4412 	}
4413 }
4414 
4415 /*
4416  * Parse options from foreign table and apply them to fpinfo.
4417  *
4418  * New options might also require tweaking merge_fdw_options().
4419  */
4420 static void
apply_table_options(PgFdwRelationInfo * fpinfo)4421 apply_table_options(PgFdwRelationInfo *fpinfo)
4422 {
4423 	ListCell   *lc;
4424 
4425 	foreach(lc, fpinfo->table->options)
4426 	{
4427 		DefElem    *def = (DefElem *) lfirst(lc);
4428 
4429 		if (strcmp(def->defname, "use_remote_estimate") == 0)
4430 			fpinfo->use_remote_estimate = defGetBoolean(def);
4431 		else if (strcmp(def->defname, "fetch_size") == 0)
4432 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
4433 	}
4434 }
4435 
4436 /*
4437  * Merge FDW options from input relations into a new set of options for a join
4438  * or an upper rel.
4439  *
4440  * For a join relation, FDW-specific information about the inner and outer
4441  * relations is provided using fpinfo_i and fpinfo_o.  For an upper relation,
4442  * fpinfo_o provides the information for the input relation; fpinfo_i is
4443  * expected to NULL.
4444  */
4445 static void
merge_fdw_options(PgFdwRelationInfo * fpinfo,const PgFdwRelationInfo * fpinfo_o,const PgFdwRelationInfo * fpinfo_i)4446 merge_fdw_options(PgFdwRelationInfo *fpinfo,
4447 				  const PgFdwRelationInfo *fpinfo_o,
4448 				  const PgFdwRelationInfo *fpinfo_i)
4449 {
4450 	/* We must always have fpinfo_o. */
4451 	Assert(fpinfo_o);
4452 
4453 	/* fpinfo_i may be NULL, but if present the servers must both match. */
4454 	Assert(!fpinfo_i ||
4455 		   fpinfo_i->server->serverid == fpinfo_o->server->serverid);
4456 
4457 	/*
4458 	 * Copy the server specific FDW options.  (For a join, both relations come
4459 	 * from the same server, so the server options should have the same value
4460 	 * for both relations.)
4461 	 */
4462 	fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
4463 	fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
4464 	fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
4465 	fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
4466 	fpinfo->fetch_size = fpinfo_o->fetch_size;
4467 
4468 	/* Merge the table level options from either side of the join. */
4469 	if (fpinfo_i)
4470 	{
4471 		/*
4472 		 * We'll prefer to use remote estimates for this join if any table
4473 		 * from either side of the join is using remote estimates.  This is
4474 		 * most likely going to be preferred since they're already willing to
4475 		 * pay the price of a round trip to get the remote EXPLAIN.  In any
4476 		 * case it's not entirely clear how we might otherwise handle this
4477 		 * best.
4478 		 */
4479 		fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
4480 			fpinfo_i->use_remote_estimate;
4481 
4482 		/*
4483 		 * Set fetch size to maximum of the joining sides, since we are
4484 		 * expecting the rows returned by the join to be proportional to the
4485 		 * relation sizes.
4486 		 */
4487 		fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
4488 	}
4489 }
4490 
4491 /*
4492  * postgresGetForeignJoinPaths
4493  *		Add possible ForeignPath to joinrel, if join is safe to push down.
4494  */
4495 static void
postgresGetForeignJoinPaths(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)4496 postgresGetForeignJoinPaths(PlannerInfo *root,
4497 							RelOptInfo *joinrel,
4498 							RelOptInfo *outerrel,
4499 							RelOptInfo *innerrel,
4500 							JoinType jointype,
4501 							JoinPathExtraData *extra)
4502 {
4503 	PgFdwRelationInfo *fpinfo;
4504 	ForeignPath *joinpath;
4505 	double		rows;
4506 	int			width;
4507 	Cost		startup_cost;
4508 	Cost		total_cost;
4509 	Path	   *epq_path;		/* Path to create plan to be executed when
4510 								 * EvalPlanQual gets triggered. */
4511 
4512 	/*
4513 	 * Skip if this join combination has been considered already.
4514 	 */
4515 	if (joinrel->fdw_private)
4516 		return;
4517 
4518 	/*
4519 	 * This code does not work for joins with lateral references, since those
4520 	 * must have parameterized paths, which we don't generate yet.
4521 	 */
4522 	if (!bms_is_empty(joinrel->lateral_relids))
4523 		return;
4524 
4525 	/*
4526 	 * Create unfinished PgFdwRelationInfo entry which is used to indicate
4527 	 * that the join relation is already considered, so that we won't waste
4528 	 * time in judging safety of join pushdown and adding the same paths again
4529 	 * if found safe. Once we know that this join can be pushed down, we fill
4530 	 * the entry.
4531 	 */
4532 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4533 	fpinfo->pushdown_safe = false;
4534 	joinrel->fdw_private = fpinfo;
4535 	/* attrs_used is only for base relations. */
4536 	fpinfo->attrs_used = NULL;
4537 
4538 	/*
4539 	 * If there is a possibility that EvalPlanQual will be executed, we need
4540 	 * to be able to reconstruct the row using scans of the base relations.
4541 	 * GetExistingLocalJoinPath will find a suitable path for this purpose in
4542 	 * the path list of the joinrel, if one exists.  We must be careful to
4543 	 * call it before adding any ForeignPath, since the ForeignPath might
4544 	 * dominate the only suitable local path available.  We also do it before
4545 	 * calling foreign_join_ok(), since that function updates fpinfo and marks
4546 	 * it as pushable if the join is found to be pushable.
4547 	 */
4548 	if (root->parse->commandType == CMD_DELETE ||
4549 		root->parse->commandType == CMD_UPDATE ||
4550 		root->rowMarks)
4551 	{
4552 		epq_path = GetExistingLocalJoinPath(joinrel);
4553 		if (!epq_path)
4554 		{
4555 			elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
4556 			return;
4557 		}
4558 	}
4559 	else
4560 		epq_path = NULL;
4561 
4562 	if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
4563 	{
4564 		/* Free path required for EPQ if we copied one; we don't need it now */
4565 		if (epq_path)
4566 			pfree(epq_path);
4567 		return;
4568 	}
4569 
4570 	/*
4571 	 * Compute the selectivity and cost of the local_conds, so we don't have
4572 	 * to do it over again for each path. The best we can do for these
4573 	 * conditions is to estimate selectivity on the basis of local statistics.
4574 	 * The local conditions are applied after the join has been computed on
4575 	 * the remote side like quals in WHERE clause, so pass jointype as
4576 	 * JOIN_INNER.
4577 	 */
4578 	fpinfo->local_conds_sel = clauselist_selectivity(root,
4579 													 fpinfo->local_conds,
4580 													 0,
4581 													 JOIN_INNER,
4582 													 NULL);
4583 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
4584 
4585 	/*
4586 	 * If we are going to estimate costs locally, estimate the join clause
4587 	 * selectivity here while we have special join info.
4588 	 */
4589 	if (!fpinfo->use_remote_estimate)
4590 		fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
4591 														0, fpinfo->jointype,
4592 														extra->sjinfo);
4593 
4594 	/* Estimate costs for bare join relation */
4595 	estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
4596 							&width, &startup_cost, &total_cost);
4597 	/* Now update this information in the joinrel */
4598 	joinrel->rows = rows;
4599 	joinrel->reltarget->width = width;
4600 	fpinfo->rows = rows;
4601 	fpinfo->width = width;
4602 	fpinfo->startup_cost = startup_cost;
4603 	fpinfo->total_cost = total_cost;
4604 
4605 	/*
4606 	 * Create a new join path and add it to the joinrel which represents a
4607 	 * join between foreign tables.
4608 	 */
4609 	joinpath = create_foreignscan_path(root,
4610 									   joinrel,
4611 									   NULL,	/* default pathtarget */
4612 									   rows,
4613 									   startup_cost,
4614 									   total_cost,
4615 									   NIL, /* no pathkeys */
4616 									   joinrel->lateral_relids,
4617 									   epq_path,
4618 									   NIL);	/* no fdw_private */
4619 
4620 	/* Add generated path into joinrel by add_path(). */
4621 	add_path(joinrel, (Path *) joinpath);
4622 
4623 	/* Consider pathkeys for the join relation */
4624 	add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
4625 
4626 	/* XXX Consider parameterized paths for the join relation */
4627 }
4628 
4629 /*
4630  * Assess whether the aggregation, grouping and having operations can be pushed
4631  * down to the foreign server.  As a side effect, save information we obtain in
4632  * this function to PgFdwRelationInfo of the input relation.
4633  */
4634 static bool
foreign_grouping_ok(PlannerInfo * root,RelOptInfo * grouped_rel)4635 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel)
4636 {
4637 	Query	   *query = root->parse;
4638 	PathTarget *grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
4639 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
4640 	PgFdwRelationInfo *ofpinfo;
4641 	ListCell   *lc;
4642 	int			i;
4643 	List	   *tlist = NIL;
4644 
4645 	/* We currently don't support pushing Grouping Sets. */
4646 	if (query->groupingSets)
4647 		return false;
4648 
4649 	/* Get the fpinfo of the underlying scan relation. */
4650 	ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
4651 
4652 	/*
4653 	 * If underlying scan relation has any local conditions, those conditions
4654 	 * are required to be applied before performing aggregation.  Hence the
4655 	 * aggregate cannot be pushed down.
4656 	 */
4657 	if (ofpinfo->local_conds)
4658 		return false;
4659 
4660 	/*
4661 	 * Examine grouping expressions, as well as other expressions we'd need to
4662 	 * compute, and check whether they are safe to push down to the foreign
4663 	 * server.  All GROUP BY expressions will be part of the grouping target
4664 	 * and thus there is no need to search for them separately.  Add grouping
4665 	 * expressions into target list which will be passed to foreign server.
4666 	 *
4667 	 * A tricky fine point is that we must not put any expression into the
4668 	 * target list that is just a foreign param (that is, something that
4669 	 * deparse.c would conclude has to be sent to the foreign server).  If we
4670 	 * do, the expression will also appear in the fdw_exprs list of the plan
4671 	 * node, and setrefs.c will get confused and decide that the fdw_exprs
4672 	 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
4673 	 * a broken plan.  Somewhat oddly, it's OK if the expression contains such
4674 	 * a node, as long as it's not at top level; then no match is possible.
4675 	 */
4676 	i = 0;
4677 	foreach(lc, grouping_target->exprs)
4678 	{
4679 		Expr	   *expr = (Expr *) lfirst(lc);
4680 		Index		sgref = get_pathtarget_sortgroupref(grouping_target, i);
4681 		ListCell   *l;
4682 
4683 		/* Check whether this expression is part of GROUP BY clause */
4684 		if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
4685 		{
4686 			TargetEntry *tle;
4687 
4688 			/*
4689 			 * If any GROUP BY expression is not shippable, then we cannot
4690 			 * push down aggregation to the foreign server.
4691 			 */
4692 			if (!is_foreign_expr(root, grouped_rel, expr))
4693 				return false;
4694 
4695 			/*
4696 			 * If it would be a foreign param, we can't put it into the tlist,
4697 			 * so we have to fail.
4698 			 */
4699 			if (is_foreign_param(root, grouped_rel, expr))
4700 				return false;
4701 
4702 			/*
4703 			 * Pushable, so add to tlist.  We need to create a TLE for this
4704 			 * expression and apply the sortgroupref to it.  We cannot use
4705 			 * add_to_flat_tlist() here because that avoids making duplicate
4706 			 * entries in the tlist.  If there are duplicate entries with
4707 			 * distinct sortgrouprefs, we have to duplicate that situation in
4708 			 * the output tlist.
4709 			 */
4710 			tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
4711 			tle->ressortgroupref = sgref;
4712 			tlist = lappend(tlist, tle);
4713 		}
4714 		else
4715 		{
4716 			/*
4717 			 * Non-grouping expression we need to compute.  Can we ship it
4718 			 * as-is to the foreign server?
4719 			 */
4720 			if (is_foreign_expr(root, grouped_rel, expr) &&
4721 				!is_foreign_param(root, grouped_rel, expr))
4722 			{
4723 				/* Yes, so add to tlist as-is; OK to suppress duplicates */
4724 				tlist = add_to_flat_tlist(tlist, list_make1(expr));
4725 			}
4726 			else
4727 			{
4728 				/* Not pushable as a whole; extract its Vars and aggregates */
4729 				List	   *aggvars;
4730 
4731 				aggvars = pull_var_clause((Node *) expr,
4732 										  PVC_INCLUDE_AGGREGATES);
4733 
4734 				/*
4735 				 * If any aggregate expression is not shippable, then we
4736 				 * cannot push down aggregation to the foreign server.  (We
4737 				 * don't have to check is_foreign_param, since that certainly
4738 				 * won't return true for any such expression.)
4739 				 */
4740 				if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
4741 					return false;
4742 
4743 				/*
4744 				 * Add aggregates, if any, into the targetlist.  Plain Vars
4745 				 * outside an aggregate can be ignored, because they should be
4746 				 * either same as some GROUP BY column or part of some GROUP
4747 				 * BY expression.  In either case, they are already part of
4748 				 * the targetlist and thus no need to add them again.  In fact
4749 				 * including plain Vars in the tlist when they do not match a
4750 				 * GROUP BY column would cause the foreign server to complain
4751 				 * that the shipped query is invalid.
4752 				 */
4753 				foreach(l, aggvars)
4754 				{
4755 					Expr	   *expr = (Expr *) lfirst(l);
4756 
4757 					if (IsA(expr, Aggref))
4758 						tlist = add_to_flat_tlist(tlist, list_make1(expr));
4759 				}
4760 			}
4761 		}
4762 
4763 		i++;
4764 	}
4765 
4766 	/*
4767 	 * Classify the pushable and non-pushable HAVING clauses and save them in
4768 	 * remote_conds and local_conds of the grouped rel's fpinfo.
4769 	 */
4770 	if (root->hasHavingQual && query->havingQual)
4771 	{
4772 		ListCell   *lc;
4773 
4774 		foreach(lc, (List *) query->havingQual)
4775 		{
4776 			Expr	   *expr = (Expr *) lfirst(lc);
4777 			RestrictInfo *rinfo;
4778 
4779 			/*
4780 			 * Currently, the core code doesn't wrap havingQuals in
4781 			 * RestrictInfos, so we must make our own.
4782 			 */
4783 			Assert(!IsA(expr, RestrictInfo));
4784 			rinfo = make_restrictinfo(expr,
4785 									  true,
4786 									  false,
4787 									  false,
4788 									  root->qual_security_level,
4789 									  grouped_rel->relids,
4790 									  NULL,
4791 									  NULL);
4792 			if (is_foreign_expr(root, grouped_rel, expr))
4793 				fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
4794 			else
4795 				fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
4796 		}
4797 	}
4798 
4799 	/*
4800 	 * If there are any local conditions, pull Vars and aggregates from it and
4801 	 * check whether they are safe to pushdown or not.
4802 	 */
4803 	if (fpinfo->local_conds)
4804 	{
4805 		List	   *aggvars = NIL;
4806 		ListCell   *lc;
4807 
4808 		foreach(lc, fpinfo->local_conds)
4809 		{
4810 			RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
4811 
4812 			aggvars = list_concat(aggvars,
4813 								  pull_var_clause((Node *) rinfo->clause,
4814 												  PVC_INCLUDE_AGGREGATES));
4815 		}
4816 
4817 		foreach(lc, aggvars)
4818 		{
4819 			Expr	   *expr = (Expr *) lfirst(lc);
4820 
4821 			/*
4822 			 * If aggregates within local conditions are not safe to push
4823 			 * down, then we cannot push down the query.  Vars are already
4824 			 * part of GROUP BY clause which are checked above, so no need to
4825 			 * access them again here.  Again, we need not check
4826 			 * is_foreign_param for a foreign aggregate.
4827 			 */
4828 			if (IsA(expr, Aggref))
4829 			{
4830 				if (!is_foreign_expr(root, grouped_rel, expr))
4831 					return false;
4832 
4833 				tlist = add_to_flat_tlist(tlist, list_make1(expr));
4834 			}
4835 		}
4836 	}
4837 
4838 	/* Store generated targetlist */
4839 	fpinfo->grouped_tlist = tlist;
4840 
4841 	/* Safe to pushdown */
4842 	fpinfo->pushdown_safe = true;
4843 
4844 	/*
4845 	 * Set cached relation costs to some negative value, so that we can detect
4846 	 * when they are set to some sensible costs, during one (usually the
4847 	 * first) of the calls to estimate_path_cost_size().
4848 	 */
4849 	fpinfo->rel_startup_cost = -1;
4850 	fpinfo->rel_total_cost = -1;
4851 
4852 	/*
4853 	 * Set the string describing this grouped relation to be used in EXPLAIN
4854 	 * output of corresponding ForeignScan.
4855 	 */
4856 	fpinfo->relation_name = makeStringInfo();
4857 	appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
4858 					 ofpinfo->relation_name->data);
4859 
4860 	return true;
4861 }
4862 
4863 /*
4864  * postgresGetForeignUpperPaths
4865  *		Add paths for post-join operations like aggregation, grouping etc. if
4866  *		corresponding operations are safe to push down.
4867  *
4868  * Right now, we only support aggregate, grouping and having clause pushdown.
4869  */
4870 static void
postgresGetForeignUpperPaths(PlannerInfo * root,UpperRelationKind stage,RelOptInfo * input_rel,RelOptInfo * output_rel)4871 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
4872 							 RelOptInfo *input_rel, RelOptInfo *output_rel)
4873 {
4874 	PgFdwRelationInfo *fpinfo;
4875 
4876 	/*
4877 	 * If input rel is not safe to pushdown, then simply return as we cannot
4878 	 * perform any post-join operations on the foreign server.
4879 	 */
4880 	if (!input_rel->fdw_private ||
4881 		!((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
4882 		return;
4883 
4884 	/* Ignore stages we don't support; and skip any duplicate calls. */
4885 	if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
4886 		return;
4887 
4888 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4889 	fpinfo->pushdown_safe = false;
4890 	output_rel->fdw_private = fpinfo;
4891 
4892 	add_foreign_grouping_paths(root, input_rel, output_rel);
4893 }
4894 
4895 /*
4896  * add_foreign_grouping_paths
4897  *		Add foreign path for grouping and/or aggregation.
4898  *
4899  * Given input_rel represents the underlying scan.  The paths are added to the
4900  * given grouped_rel.
4901  */
4902 static void
add_foreign_grouping_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * grouped_rel)4903 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
4904 						   RelOptInfo *grouped_rel)
4905 {
4906 	Query	   *parse = root->parse;
4907 	PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
4908 	PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
4909 	ForeignPath *grouppath;
4910 	PathTarget *grouping_target;
4911 	double		rows;
4912 	int			width;
4913 	Cost		startup_cost;
4914 	Cost		total_cost;
4915 
4916 	/* Nothing to be done, if there is no grouping or aggregation required. */
4917 	if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
4918 		!root->hasHavingQual)
4919 		return;
4920 
4921 	grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
4922 
4923 	/* save the input_rel as outerrel in fpinfo */
4924 	fpinfo->outerrel = input_rel;
4925 
4926 	/*
4927 	 * Copy foreign table, foreign server, user mapping, FDW options etc.
4928 	 * details from the input relation's fpinfo.
4929 	 */
4930 	fpinfo->table = ifpinfo->table;
4931 	fpinfo->server = ifpinfo->server;
4932 	fpinfo->user = ifpinfo->user;
4933 	merge_fdw_options(fpinfo, ifpinfo, NULL);
4934 
4935 	/* Assess if it is safe to push down aggregation and grouping. */
4936 	if (!foreign_grouping_ok(root, grouped_rel))
4937 		return;
4938 
4939 	/* Estimate the cost of push down */
4940 	estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
4941 							&width, &startup_cost, &total_cost);
4942 
4943 	/* Now update this information in the fpinfo */
4944 	fpinfo->rows = rows;
4945 	fpinfo->width = width;
4946 	fpinfo->startup_cost = startup_cost;
4947 	fpinfo->total_cost = total_cost;
4948 
4949 	/* Create and add foreign path to the grouping relation. */
4950 	grouppath = create_foreignscan_path(root,
4951 										grouped_rel,
4952 										grouping_target,
4953 										rows,
4954 										startup_cost,
4955 										total_cost,
4956 										NIL,	/* no pathkeys */
4957 										grouped_rel->lateral_relids,
4958 										NULL,
4959 										NIL);	/* no fdw_private */
4960 
4961 	/* Add generated path into grouped_rel by add_path(). */
4962 	add_path(grouped_rel, (Path *) grouppath);
4963 }
4964 
4965 /*
4966  * Create a tuple from the specified row of the PGresult.
4967  *
4968  * rel is the local representation of the foreign table, attinmeta is
4969  * conversion data for the rel's tupdesc, and retrieved_attrs is an
4970  * integer list of the table column numbers present in the PGresult.
4971  * fsstate is the ForeignScan plan node's execution state.
4972  * temp_context is a working context that can be reset after each tuple.
4973  *
4974  * Note: either rel or fsstate, but not both, can be NULL.  rel is NULL
4975  * if we're processing a remote join, while fsstate is NULL in a non-query
4976  * context such as ANALYZE, or if we're processing a non-scan query node.
4977  */
4978 static HeapTuple
make_tuple_from_result_row(PGresult * res,int row,Relation rel,AttInMetadata * attinmeta,List * retrieved_attrs,ForeignScanState * fsstate,MemoryContext temp_context)4979 make_tuple_from_result_row(PGresult *res,
4980 						   int row,
4981 						   Relation rel,
4982 						   AttInMetadata *attinmeta,
4983 						   List *retrieved_attrs,
4984 						   ForeignScanState *fsstate,
4985 						   MemoryContext temp_context)
4986 {
4987 	HeapTuple	tuple;
4988 	TupleDesc	tupdesc;
4989 	Datum	   *values;
4990 	bool	   *nulls;
4991 	ItemPointer ctid = NULL;
4992 	Oid			oid = InvalidOid;
4993 	ConversionLocation errpos;
4994 	ErrorContextCallback errcallback;
4995 	MemoryContext oldcontext;
4996 	ListCell   *lc;
4997 	int			j;
4998 
4999 	Assert(row < PQntuples(res));
5000 
5001 	/*
5002 	 * Do the following work in a temp context that we reset after each tuple.
5003 	 * This cleans up not only the data we have direct access to, but any
5004 	 * cruft the I/O functions might leak.
5005 	 */
5006 	oldcontext = MemoryContextSwitchTo(temp_context);
5007 
5008 	/*
5009 	 * Get the tuple descriptor for the row.  Use the rel's tupdesc if rel is
5010 	 * provided, otherwise look to the scan node's ScanTupleSlot.
5011 	 */
5012 	if (rel)
5013 		tupdesc = RelationGetDescr(rel);
5014 	else
5015 	{
5016 		PgFdwScanState *fdw_sstate;
5017 
5018 		Assert(fsstate);
5019 		fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
5020 		tupdesc = fdw_sstate->tupdesc;
5021 	}
5022 
5023 	values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
5024 	nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
5025 	/* Initialize to nulls for any columns not present in result */
5026 	memset(nulls, true, tupdesc->natts * sizeof(bool));
5027 
5028 	/*
5029 	 * Set up and install callback to report where conversion error occurs.
5030 	 */
5031 	errpos.cur_attno = 0;
5032 	errpos.rel = rel;
5033 	errpos.fsstate = fsstate;
5034 	errcallback.callback = conversion_error_callback;
5035 	errcallback.arg = (void *) &errpos;
5036 	errcallback.previous = error_context_stack;
5037 	error_context_stack = &errcallback;
5038 
5039 	/*
5040 	 * i indexes columns in the relation, j indexes columns in the PGresult.
5041 	 */
5042 	j = 0;
5043 	foreach(lc, retrieved_attrs)
5044 	{
5045 		int			i = lfirst_int(lc);
5046 		char	   *valstr;
5047 
5048 		/* fetch next column's textual value */
5049 		if (PQgetisnull(res, row, j))
5050 			valstr = NULL;
5051 		else
5052 			valstr = PQgetvalue(res, row, j);
5053 
5054 		/*
5055 		 * convert value to internal representation
5056 		 *
5057 		 * Note: we ignore system columns other than ctid and oid in result
5058 		 */
5059 		errpos.cur_attno = i;
5060 		if (i > 0)
5061 		{
5062 			/* ordinary column */
5063 			Assert(i <= tupdesc->natts);
5064 			nulls[i - 1] = (valstr == NULL);
5065 			/* Apply the input function even to nulls, to support domains */
5066 			values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
5067 											  valstr,
5068 											  attinmeta->attioparams[i - 1],
5069 											  attinmeta->atttypmods[i - 1]);
5070 		}
5071 		else if (i == SelfItemPointerAttributeNumber)
5072 		{
5073 			/* ctid */
5074 			if (valstr != NULL)
5075 			{
5076 				Datum		datum;
5077 
5078 				datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
5079 				ctid = (ItemPointer) DatumGetPointer(datum);
5080 			}
5081 		}
5082 		else if (i == ObjectIdAttributeNumber)
5083 		{
5084 			/* oid */
5085 			if (valstr != NULL)
5086 			{
5087 				Datum		datum;
5088 
5089 				datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
5090 				oid = DatumGetObjectId(datum);
5091 			}
5092 		}
5093 		errpos.cur_attno = 0;
5094 
5095 		j++;
5096 	}
5097 
5098 	/* Uninstall error context callback. */
5099 	error_context_stack = errcallback.previous;
5100 
5101 	/*
5102 	 * Check we got the expected number of columns.  Note: j == 0 and
5103 	 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
5104 	 */
5105 	if (j > 0 && j != PQnfields(res))
5106 		elog(ERROR, "remote query result does not match the foreign table");
5107 
5108 	/*
5109 	 * Build the result tuple in caller's memory context.
5110 	 */
5111 	MemoryContextSwitchTo(oldcontext);
5112 
5113 	tuple = heap_form_tuple(tupdesc, values, nulls);
5114 
5115 	/*
5116 	 * If we have a CTID to return, install it in both t_self and t_ctid.
5117 	 * t_self is the normal place, but if the tuple is converted to a
5118 	 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
5119 	 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
5120 	 */
5121 	if (ctid)
5122 		tuple->t_self = tuple->t_data->t_ctid = *ctid;
5123 
5124 	/*
5125 	 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
5126 	 * heap_form_tuple.  heap_form_tuple actually creates the tuple with
5127 	 * DatumTupleFields, not HeapTupleFields, but the executor expects
5128 	 * HeapTupleFields and will happily extract system columns on that
5129 	 * assumption.  If we don't do this then, for example, the tuple length
5130 	 * ends up in the xmin field, which isn't what we want.
5131 	 */
5132 	HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
5133 	HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
5134 	HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
5135 
5136 	/*
5137 	 * If we have an OID to return, install it.
5138 	 */
5139 	if (OidIsValid(oid))
5140 		HeapTupleSetOid(tuple, oid);
5141 
5142 	/* Clean up */
5143 	MemoryContextReset(temp_context);
5144 
5145 	return tuple;
5146 }
5147 
5148 /*
5149  * Callback function which is called when error occurs during column value
5150  * conversion.  Print names of column and relation.
5151  *
5152  * Note that this function mustn't do any catalog lookups, since we are in
5153  * an already-failed transaction.  Fortunately, we can get the needed info
5154  * from the relation or the query's rangetable instead.
5155  */
5156 static void
conversion_error_callback(void * arg)5157 conversion_error_callback(void *arg)
5158 {
5159 	ConversionLocation *errpos = (ConversionLocation *) arg;
5160 	Relation	rel = errpos->rel;
5161 	ForeignScanState *fsstate = errpos->fsstate;
5162 	const char *attname = NULL;
5163 	const char *relname = NULL;
5164 	bool		is_wholerow = false;
5165 
5166 	/*
5167 	 * If we're in a scan node, always use aliases from the rangetable, for
5168 	 * consistency between the simple-relation and remote-join cases.  Look at
5169 	 * the relation's tupdesc only if we're not in a scan node.
5170 	 */
5171 	if (fsstate)
5172 	{
5173 		/* ForeignScan case */
5174 		ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
5175 		int			varno = 0;
5176 		AttrNumber	colno = 0;
5177 
5178 		if (fsplan->scan.scanrelid > 0)
5179 		{
5180 			/* error occurred in a scan against a foreign table */
5181 			varno = fsplan->scan.scanrelid;
5182 			colno = errpos->cur_attno;
5183 		}
5184 		else
5185 		{
5186 			/* error occurred in a scan against a foreign join */
5187 			TargetEntry *tle;
5188 
5189 			tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
5190 								errpos->cur_attno - 1);
5191 
5192 			/*
5193 			 * Target list can have Vars and expressions.  For Vars, we can
5194 			 * get some information, however for expressions we can't.  Thus
5195 			 * for expressions, just show generic context message.
5196 			 */
5197 			if (IsA(tle->expr, Var))
5198 			{
5199 				Var		   *var = (Var *) tle->expr;
5200 
5201 				varno = var->varno;
5202 				colno = var->varattno;
5203 			}
5204 		}
5205 
5206 		if (varno > 0)
5207 		{
5208 			EState	   *estate = fsstate->ss.ps.state;
5209 			RangeTblEntry *rte = rt_fetch(varno, estate->es_range_table);
5210 
5211 			relname = rte->eref->aliasname;
5212 
5213 			if (colno == 0)
5214 				is_wholerow = true;
5215 			else if (colno > 0 && colno <= list_length(rte->eref->colnames))
5216 				attname = strVal(list_nth(rte->eref->colnames, colno - 1));
5217 			else if (colno == SelfItemPointerAttributeNumber)
5218 				attname = "ctid";
5219 			else if (colno == ObjectIdAttributeNumber)
5220 				attname = "oid";
5221 		}
5222 	}
5223 	else if (rel)
5224 	{
5225 		/* Non-ForeignScan case (we should always have a rel here) */
5226 		TupleDesc	tupdesc = RelationGetDescr(rel);
5227 
5228 		relname = RelationGetRelationName(rel);
5229 		if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
5230 		{
5231 			Form_pg_attribute attr = TupleDescAttr(tupdesc,
5232 												   errpos->cur_attno - 1);
5233 
5234 			attname = NameStr(attr->attname);
5235 		}
5236 		else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
5237 			attname = "ctid";
5238 	}
5239 
5240 	if (relname && is_wholerow)
5241 		errcontext("whole-row reference to foreign table \"%s\"", relname);
5242 	else if (relname && attname)
5243 		errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
5244 	else
5245 		errcontext("processing expression at position %d in select list",
5246 				   errpos->cur_attno);
5247 }
5248 
5249 /*
5250  * Find an equivalence class member expression, all of whose Vars, come from
5251  * the indicated relation.
5252  */
5253 extern Expr *
find_em_expr_for_rel(EquivalenceClass * ec,RelOptInfo * rel)5254 find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
5255 {
5256 	ListCell   *lc_em;
5257 
5258 	foreach(lc_em, ec->ec_members)
5259 	{
5260 		EquivalenceMember *em = lfirst(lc_em);
5261 
5262 		if (bms_is_subset(em->em_relids, rel->relids) &&
5263 			!bms_is_empty(em->em_relids))
5264 		{
5265 			/*
5266 			 * If there is more than one equivalence member whose Vars are
5267 			 * taken entirely from this relation, we'll be content to choose
5268 			 * any one of those.
5269 			 */
5270 			return em->em_expr;
5271 		}
5272 	}
5273 
5274 	/* We didn't find any suitable equivalence class expression */
5275 	return NULL;
5276 }
5277