1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  *		  Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "commands/defrem.h"
22 #include "commands/explain.h"
23 #include "commands/vacuum.h"
24 #include "foreign/fdwapi.h"
25 #include "funcapi.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "nodes/nodeFuncs.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/cost.h"
31 #include "optimizer/optimizer.h"
32 #include "optimizer/pathnode.h"
33 #include "optimizer/paths.h"
34 #include "optimizer/planmain.h"
35 #include "optimizer/restrictinfo.h"
36 #include "optimizer/tlist.h"
37 #include "parser/parsetree.h"
38 #include "utils/builtins.h"
39 #include "utils/float.h"
40 #include "utils/guc.h"
41 #include "utils/lsyscache.h"
42 #include "utils/memutils.h"
43 #include "utils/rel.h"
44 #include "utils/sampling.h"
45 #include "utils/selfuncs.h"
46 
47 /* source-code-compatibility hacks for pull_varnos() API change */
48 #define make_restrictinfo(a,b,c,d,e,f,g,h,i) make_restrictinfo_new(a,b,c,d,e,f,g,h,i)
49 
50 PG_MODULE_MAGIC;
51 
52 /* Default CPU cost to start up a foreign query. */
53 #define DEFAULT_FDW_STARTUP_COST	100.0
54 
55 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
56 #define DEFAULT_FDW_TUPLE_COST		0.01
57 
58 /* If no remote estimates, assume a sort costs 20% extra */
59 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
60 
61 /*
62  * Indexes of FDW-private information stored in fdw_private lists.
63  *
64  * These items are indexed with the enum FdwScanPrivateIndex, so an item
65  * can be fetched with list_nth().  For example, to get the SELECT statement:
66  *		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
67  */
68 enum FdwScanPrivateIndex
69 {
70 	/* SQL statement to execute remotely (as a String node) */
71 	FdwScanPrivateSelectSql,
72 	/* Integer list of attribute numbers retrieved by the SELECT */
73 	FdwScanPrivateRetrievedAttrs,
74 	/* Integer representing the desired fetch_size */
75 	FdwScanPrivateFetchSize,
76 
77 	/*
78 	 * String describing join i.e. names of relations being joined and types
79 	 * of join, added when the scan is join
80 	 */
81 	FdwScanPrivateRelations
82 };
83 
84 /*
85  * Similarly, this enum describes what's kept in the fdw_private list for
86  * a ModifyTable node referencing a postgres_fdw foreign table.  We store:
87  *
88  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
89  * 2) Integer list of target attribute numbers for INSERT/UPDATE
90  *	  (NIL for a DELETE)
91  * 3) Boolean flag showing if the remote query has a RETURNING clause
92  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
93  */
94 enum FdwModifyPrivateIndex
95 {
96 	/* SQL statement to execute remotely (as a String node) */
97 	FdwModifyPrivateUpdateSql,
98 	/* Integer list of target attribute numbers for INSERT/UPDATE */
99 	FdwModifyPrivateTargetAttnums,
100 	/* has-returning flag (as an integer Value node) */
101 	FdwModifyPrivateHasReturning,
102 	/* Integer list of attribute numbers retrieved by RETURNING */
103 	FdwModifyPrivateRetrievedAttrs
104 };
105 
106 /*
107  * Similarly, this enum describes what's kept in the fdw_private list for
108  * a ForeignScan node that modifies a foreign table directly.  We store:
109  *
110  * 1) UPDATE/DELETE statement text to be sent to the remote server
111  * 2) Boolean flag showing if the remote query has a RETURNING clause
112  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
113  * 4) Boolean flag showing if we set the command es_processed
114  */
115 enum FdwDirectModifyPrivateIndex
116 {
117 	/* SQL statement to execute remotely (as a String node) */
118 	FdwDirectModifyPrivateUpdateSql,
119 	/* has-returning flag (as an integer Value node) */
120 	FdwDirectModifyPrivateHasReturning,
121 	/* Integer list of attribute numbers retrieved by RETURNING */
122 	FdwDirectModifyPrivateRetrievedAttrs,
123 	/* set-processed flag (as an integer Value node) */
124 	FdwDirectModifyPrivateSetProcessed
125 };
126 
127 /*
128  * Execution state of a foreign scan using postgres_fdw.
129  */
130 typedef struct PgFdwScanState
131 {
132 	Relation	rel;			/* relcache entry for the foreign table. NULL
133 								 * for a foreign join scan. */
134 	TupleDesc	tupdesc;		/* tuple descriptor of scan */
135 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
136 
137 	/* extracted fdw_private data */
138 	char	   *query;			/* text of SELECT command */
139 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
140 
141 	/* for remote query execution */
142 	PGconn	   *conn;			/* connection for the scan */
143 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
144 	bool		cursor_exists;	/* have we created the cursor? */
145 	int			numParams;		/* number of parameters passed to query */
146 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
147 	List	   *param_exprs;	/* executable expressions for param values */
148 	const char **param_values;	/* textual values of query parameters */
149 
150 	/* for storing result tuples */
151 	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
152 	int			num_tuples;		/* # of tuples in array */
153 	int			next_tuple;		/* index of next one to return */
154 
155 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
156 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
157 	bool		eof_reached;	/* true if last fetch reached EOF */
158 
159 	/* working memory contexts */
160 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
161 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
162 
163 	int			fetch_size;		/* number of tuples per fetch */
164 } PgFdwScanState;
165 
166 /*
167  * Execution state of a foreign insert/update/delete operation.
168  */
169 typedef struct PgFdwModifyState
170 {
171 	Relation	rel;			/* relcache entry for the foreign table */
172 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
173 
174 	/* for remote query execution */
175 	PGconn	   *conn;			/* connection for the scan */
176 	char	   *p_name;			/* name of prepared statement, if created */
177 
178 	/* extracted fdw_private data */
179 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
180 	List	   *target_attrs;	/* list of target attribute numbers */
181 	bool		has_returning;	/* is there a RETURNING clause? */
182 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
183 
184 	/* info about parameters for prepared statement */
185 	AttrNumber	ctidAttno;		/* attnum of input resjunk ctid column */
186 	int			p_nums;			/* number of parameters to transmit */
187 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
188 
189 	/* working memory context */
190 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
191 
192 	/* for update row movement if subplan result rel */
193 	struct PgFdwModifyState *aux_fmstate;	/* foreign-insert state, if
194 											 * created */
195 } PgFdwModifyState;
196 
197 /*
198  * Execution state of a foreign scan that modifies a foreign table directly.
199  */
200 typedef struct PgFdwDirectModifyState
201 {
202 	Relation	rel;			/* relcache entry for the foreign table */
203 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
204 
205 	/* extracted fdw_private data */
206 	char	   *query;			/* text of UPDATE/DELETE command */
207 	bool		has_returning;	/* is there a RETURNING clause? */
208 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
209 	bool		set_processed;	/* do we set the command es_processed? */
210 
211 	/* for remote query execution */
212 	PGconn	   *conn;			/* connection for the update */
213 	int			numParams;		/* number of parameters passed to query */
214 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
215 	List	   *param_exprs;	/* executable expressions for param values */
216 	const char **param_values;	/* textual values of query parameters */
217 
218 	/* for storing result tuples */
219 	PGresult   *result;			/* result for query */
220 	int			num_tuples;		/* # of result tuples */
221 	int			next_tuple;		/* index of next one to return */
222 	Relation	resultRel;		/* relcache entry for the target relation */
223 	AttrNumber *attnoMap;		/* array of attnums of input user columns */
224 	AttrNumber	ctidAttno;		/* attnum of input ctid column */
225 	AttrNumber	oidAttno;		/* attnum of input oid column */
226 	bool		hasSystemCols;	/* are there system columns of resultRel? */
227 
228 	/* working memory context */
229 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
230 } PgFdwDirectModifyState;
231 
232 /*
233  * Workspace for analyzing a foreign table.
234  */
235 typedef struct PgFdwAnalyzeState
236 {
237 	Relation	rel;			/* relcache entry for the foreign table */
238 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
239 	List	   *retrieved_attrs;	/* attr numbers retrieved by query */
240 
241 	/* collected sample rows */
242 	HeapTuple  *rows;			/* array of size targrows */
243 	int			targrows;		/* target # of sample rows */
244 	int			numrows;		/* # of sample rows collected */
245 
246 	/* for random sampling */
247 	double		samplerows;		/* # of rows fetched */
248 	double		rowstoskip;		/* # of rows to skip before next sample */
249 	ReservoirStateData rstate;	/* state for reservoir sampling */
250 
251 	/* working memory contexts */
252 	MemoryContext anl_cxt;		/* context for per-analyze lifespan data */
253 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
254 } PgFdwAnalyzeState;
255 
256 /*
257  * This enum describes what's kept in the fdw_private list for a ForeignPath.
258  * We store:
259  *
260  * 1) Boolean flag showing if the remote query has the final sort
261  * 2) Boolean flag showing if the remote query has the LIMIT clause
262  */
263 enum FdwPathPrivateIndex
264 {
265 	/* has-final-sort flag (as an integer Value node) */
266 	FdwPathPrivateHasFinalSort,
267 	/* has-limit flag (as an integer Value node) */
268 	FdwPathPrivateHasLimit
269 };
270 
271 /* Struct for extra information passed to estimate_path_cost_size() */
272 typedef struct
273 {
274 	PathTarget *target;
275 	bool		has_final_sort;
276 	bool		has_limit;
277 	double		limit_tuples;
278 	int64		count_est;
279 	int64		offset_est;
280 } PgFdwPathExtraData;
281 
282 /*
283  * Identify the attribute where data conversion fails.
284  */
285 typedef struct ConversionLocation
286 {
287 	AttrNumber	cur_attno;		/* attribute number being processed, or 0 */
288 	Relation	rel;			/* foreign table being processed, or NULL */
289 	ForeignScanState *fsstate;	/* plan node being processed, or NULL */
290 } ConversionLocation;
291 
292 /* Callback argument for ec_member_matches_foreign */
293 typedef struct
294 {
295 	Expr	   *current;		/* current expr, or NULL if not yet found */
296 	List	   *already_used;	/* expressions already dealt with */
297 } ec_member_foreign_arg;
298 
299 /*
300  * SQL functions
301  */
302 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
303 
304 /*
305  * FDW callback routines
306  */
307 static void postgresGetForeignRelSize(PlannerInfo *root,
308 									  RelOptInfo *baserel,
309 									  Oid foreigntableid);
310 static void postgresGetForeignPaths(PlannerInfo *root,
311 									RelOptInfo *baserel,
312 									Oid foreigntableid);
313 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
314 										   RelOptInfo *foreignrel,
315 										   Oid foreigntableid,
316 										   ForeignPath *best_path,
317 										   List *tlist,
318 										   List *scan_clauses,
319 										   Plan *outer_plan);
320 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
321 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
322 static void postgresReScanForeignScan(ForeignScanState *node);
323 static void postgresEndForeignScan(ForeignScanState *node);
324 static void postgresAddForeignUpdateTargets(Query *parsetree,
325 											RangeTblEntry *target_rte,
326 											Relation target_relation);
327 static List *postgresPlanForeignModify(PlannerInfo *root,
328 									   ModifyTable *plan,
329 									   Index resultRelation,
330 									   int subplan_index);
331 static void postgresBeginForeignModify(ModifyTableState *mtstate,
332 									   ResultRelInfo *resultRelInfo,
333 									   List *fdw_private,
334 									   int subplan_index,
335 									   int eflags);
336 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
337 												 ResultRelInfo *resultRelInfo,
338 												 TupleTableSlot *slot,
339 												 TupleTableSlot *planSlot);
340 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
341 												 ResultRelInfo *resultRelInfo,
342 												 TupleTableSlot *slot,
343 												 TupleTableSlot *planSlot);
344 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
345 												 ResultRelInfo *resultRelInfo,
346 												 TupleTableSlot *slot,
347 												 TupleTableSlot *planSlot);
348 static void postgresEndForeignModify(EState *estate,
349 									 ResultRelInfo *resultRelInfo);
350 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
351 									   ResultRelInfo *resultRelInfo);
352 static void postgresEndForeignInsert(EState *estate,
353 									 ResultRelInfo *resultRelInfo);
354 static int	postgresIsForeignRelUpdatable(Relation rel);
355 static bool postgresPlanDirectModify(PlannerInfo *root,
356 									 ModifyTable *plan,
357 									 Index resultRelation,
358 									 int subplan_index);
359 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
360 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
361 static void postgresEndDirectModify(ForeignScanState *node);
362 static void postgresExplainForeignScan(ForeignScanState *node,
363 									   ExplainState *es);
364 static void postgresExplainForeignModify(ModifyTableState *mtstate,
365 										 ResultRelInfo *rinfo,
366 										 List *fdw_private,
367 										 int subplan_index,
368 										 ExplainState *es);
369 static void postgresExplainDirectModify(ForeignScanState *node,
370 										ExplainState *es);
371 static bool postgresAnalyzeForeignTable(Relation relation,
372 										AcquireSampleRowsFunc *func,
373 										BlockNumber *totalpages);
374 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
375 										 Oid serverOid);
376 static void postgresGetForeignJoinPaths(PlannerInfo *root,
377 										RelOptInfo *joinrel,
378 										RelOptInfo *outerrel,
379 										RelOptInfo *innerrel,
380 										JoinType jointype,
381 										JoinPathExtraData *extra);
382 static bool postgresRecheckForeignScan(ForeignScanState *node,
383 									   TupleTableSlot *slot);
384 static void postgresGetForeignUpperPaths(PlannerInfo *root,
385 										 UpperRelationKind stage,
386 										 RelOptInfo *input_rel,
387 										 RelOptInfo *output_rel,
388 										 void *extra);
389 
390 /*
391  * Helper functions
392  */
393 static void estimate_path_cost_size(PlannerInfo *root,
394 									RelOptInfo *foreignrel,
395 									List *param_join_conds,
396 									List *pathkeys,
397 									PgFdwPathExtraData *fpextra,
398 									double *p_rows, int *p_width,
399 									Cost *p_startup_cost, Cost *p_total_cost);
400 static void get_remote_estimate(const char *sql,
401 								PGconn *conn,
402 								double *rows,
403 								int *width,
404 								Cost *startup_cost,
405 								Cost *total_cost);
406 static void adjust_foreign_grouping_path_cost(PlannerInfo *root,
407 											  List *pathkeys,
408 											  double retrieved_rows,
409 											  double width,
410 											  double limit_tuples,
411 											  Cost *p_startup_cost,
412 											  Cost *p_run_cost);
413 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
414 									  EquivalenceClass *ec, EquivalenceMember *em,
415 									  void *arg);
416 static void create_cursor(ForeignScanState *node);
417 static void fetch_more_data(ForeignScanState *node);
418 static void close_cursor(PGconn *conn, unsigned int cursor_number);
419 static PgFdwModifyState *create_foreign_modify(EState *estate,
420 											   RangeTblEntry *rte,
421 											   ResultRelInfo *resultRelInfo,
422 											   CmdType operation,
423 											   Plan *subplan,
424 											   char *query,
425 											   List *target_attrs,
426 											   bool has_returning,
427 											   List *retrieved_attrs);
428 static TupleTableSlot *execute_foreign_modify(EState *estate,
429 											  ResultRelInfo *resultRelInfo,
430 											  CmdType operation,
431 											  TupleTableSlot *slot,
432 											  TupleTableSlot *planSlot);
433 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
434 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
435 											 ItemPointer tupleid,
436 											 TupleTableSlot *slot);
437 static void store_returning_result(PgFdwModifyState *fmstate,
438 								   TupleTableSlot *slot, PGresult *res);
439 static void finish_foreign_modify(PgFdwModifyState *fmstate);
440 static List *build_remote_returning(Index rtindex, Relation rel,
441 									List *returningList);
442 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
443 static void execute_dml_stmt(ForeignScanState *node);
444 static TupleTableSlot *get_returning_data(ForeignScanState *node);
445 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
446 								  List *fdw_scan_tlist,
447 								  Index rtindex);
448 static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
449 											  TupleTableSlot *slot,
450 											  EState *estate);
451 static void prepare_query_params(PlanState *node,
452 								 List *fdw_exprs,
453 								 int numParams,
454 								 FmgrInfo **param_flinfo,
455 								 List **param_exprs,
456 								 const char ***param_values);
457 static void process_query_params(ExprContext *econtext,
458 								 FmgrInfo *param_flinfo,
459 								 List *param_exprs,
460 								 const char **param_values);
461 static int	postgresAcquireSampleRowsFunc(Relation relation, int elevel,
462 										  HeapTuple *rows, int targrows,
463 										  double *totalrows,
464 										  double *totaldeadrows);
465 static void analyze_row_processor(PGresult *res, int row,
466 								  PgFdwAnalyzeState *astate);
467 static HeapTuple make_tuple_from_result_row(PGresult *res,
468 											int row,
469 											Relation rel,
470 											AttInMetadata *attinmeta,
471 											List *retrieved_attrs,
472 											ForeignScanState *fsstate,
473 											MemoryContext temp_context);
474 static void conversion_error_callback(void *arg);
475 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
476 							JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
477 							JoinPathExtraData *extra);
478 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
479 								Node *havingQual);
480 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
481 											  RelOptInfo *rel);
482 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
483 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
484 											Path *epq_path);
485 static void add_foreign_grouping_paths(PlannerInfo *root,
486 									   RelOptInfo *input_rel,
487 									   RelOptInfo *grouped_rel,
488 									   GroupPathExtraData *extra);
489 static void add_foreign_ordered_paths(PlannerInfo *root,
490 									  RelOptInfo *input_rel,
491 									  RelOptInfo *ordered_rel);
492 static void add_foreign_final_paths(PlannerInfo *root,
493 									RelOptInfo *input_rel,
494 									RelOptInfo *final_rel,
495 									FinalPathExtraData *extra);
496 static void apply_server_options(PgFdwRelationInfo *fpinfo);
497 static void apply_table_options(PgFdwRelationInfo *fpinfo);
498 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
499 							  const PgFdwRelationInfo *fpinfo_o,
500 							  const PgFdwRelationInfo *fpinfo_i);
501 
502 
503 /*
504  * Foreign-data wrapper handler function: return a struct with pointers
505  * to my callback routines.
506  */
507 Datum
postgres_fdw_handler(PG_FUNCTION_ARGS)508 postgres_fdw_handler(PG_FUNCTION_ARGS)
509 {
510 	FdwRoutine *routine = makeNode(FdwRoutine);
511 
512 	/* Functions for scanning foreign tables */
513 	routine->GetForeignRelSize = postgresGetForeignRelSize;
514 	routine->GetForeignPaths = postgresGetForeignPaths;
515 	routine->GetForeignPlan = postgresGetForeignPlan;
516 	routine->BeginForeignScan = postgresBeginForeignScan;
517 	routine->IterateForeignScan = postgresIterateForeignScan;
518 	routine->ReScanForeignScan = postgresReScanForeignScan;
519 	routine->EndForeignScan = postgresEndForeignScan;
520 
521 	/* Functions for updating foreign tables */
522 	routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
523 	routine->PlanForeignModify = postgresPlanForeignModify;
524 	routine->BeginForeignModify = postgresBeginForeignModify;
525 	routine->ExecForeignInsert = postgresExecForeignInsert;
526 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
527 	routine->ExecForeignDelete = postgresExecForeignDelete;
528 	routine->EndForeignModify = postgresEndForeignModify;
529 	routine->BeginForeignInsert = postgresBeginForeignInsert;
530 	routine->EndForeignInsert = postgresEndForeignInsert;
531 	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
532 	routine->PlanDirectModify = postgresPlanDirectModify;
533 	routine->BeginDirectModify = postgresBeginDirectModify;
534 	routine->IterateDirectModify = postgresIterateDirectModify;
535 	routine->EndDirectModify = postgresEndDirectModify;
536 
537 	/* Function for EvalPlanQual rechecks */
538 	routine->RecheckForeignScan = postgresRecheckForeignScan;
539 	/* Support functions for EXPLAIN */
540 	routine->ExplainForeignScan = postgresExplainForeignScan;
541 	routine->ExplainForeignModify = postgresExplainForeignModify;
542 	routine->ExplainDirectModify = postgresExplainDirectModify;
543 
544 	/* Support functions for ANALYZE */
545 	routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
546 
547 	/* Support functions for IMPORT FOREIGN SCHEMA */
548 	routine->ImportForeignSchema = postgresImportForeignSchema;
549 
550 	/* Support functions for join push-down */
551 	routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
552 
553 	/* Support functions for upper relation push-down */
554 	routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
555 
556 	PG_RETURN_POINTER(routine);
557 }
558 
559 /*
560  * postgresGetForeignRelSize
561  *		Estimate # of rows and width of the result of the scan
562  *
563  * We should consider the effect of all baserestrictinfo clauses here, but
564  * not any join clauses.
565  */
566 static void
postgresGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)567 postgresGetForeignRelSize(PlannerInfo *root,
568 						  RelOptInfo *baserel,
569 						  Oid foreigntableid)
570 {
571 	PgFdwRelationInfo *fpinfo;
572 	ListCell   *lc;
573 	RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
574 	const char *namespace;
575 	const char *relname;
576 	const char *refname;
577 
578 	/*
579 	 * We use PgFdwRelationInfo to pass various information to subsequent
580 	 * functions.
581 	 */
582 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
583 	baserel->fdw_private = (void *) fpinfo;
584 
585 	/* Base foreign tables need to be pushed down always. */
586 	fpinfo->pushdown_safe = true;
587 
588 	/* Look up foreign-table catalog info. */
589 	fpinfo->table = GetForeignTable(foreigntableid);
590 	fpinfo->server = GetForeignServer(fpinfo->table->serverid);
591 
592 	/*
593 	 * Extract user-settable option values.  Note that per-table settings of
594 	 * use_remote_estimate and fetch_size override per-server settings of
595 	 * them, respectively.
596 	 */
597 	fpinfo->use_remote_estimate = false;
598 	fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
599 	fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
600 	fpinfo->shippable_extensions = NIL;
601 	fpinfo->fetch_size = 100;
602 
603 	apply_server_options(fpinfo);
604 	apply_table_options(fpinfo);
605 
606 	/*
607 	 * If the table or the server is configured to use remote estimates,
608 	 * identify which user to do remote access as during planning.  This
609 	 * should match what ExecCheckRTEPerms() does.  If we fail due to lack of
610 	 * permissions, the query would have failed at runtime anyway.
611 	 */
612 	if (fpinfo->use_remote_estimate)
613 	{
614 		Oid			userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
615 
616 		fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
617 	}
618 	else
619 		fpinfo->user = NULL;
620 
621 	/*
622 	 * Identify which baserestrictinfo clauses can be sent to the remote
623 	 * server and which can't.
624 	 */
625 	classifyConditions(root, baserel, baserel->baserestrictinfo,
626 					   &fpinfo->remote_conds, &fpinfo->local_conds);
627 
628 	/*
629 	 * Identify which attributes will need to be retrieved from the remote
630 	 * server.  These include all attrs needed for joins or final output, plus
631 	 * all attrs used in the local_conds.  (Note: if we end up using a
632 	 * parameterized scan, it's possible that some of the join clauses will be
633 	 * sent to the remote and thus we wouldn't really need to retrieve the
634 	 * columns used in them.  Doesn't seem worth detecting that case though.)
635 	 */
636 	fpinfo->attrs_used = NULL;
637 	pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
638 				   &fpinfo->attrs_used);
639 	foreach(lc, fpinfo->local_conds)
640 	{
641 		RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
642 
643 		pull_varattnos((Node *) rinfo->clause, baserel->relid,
644 					   &fpinfo->attrs_used);
645 	}
646 
647 	/*
648 	 * Compute the selectivity and cost of the local_conds, so we don't have
649 	 * to do it over again for each path.  The best we can do for these
650 	 * conditions is to estimate selectivity on the basis of local statistics.
651 	 */
652 	fpinfo->local_conds_sel = clauselist_selectivity(root,
653 													 fpinfo->local_conds,
654 													 baserel->relid,
655 													 JOIN_INNER,
656 													 NULL);
657 
658 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
659 
660 	/*
661 	 * Set # of retrieved rows and cached relation costs to some negative
662 	 * value, so that we can detect when they are set to some sensible values,
663 	 * during one (usually the first) of the calls to estimate_path_cost_size.
664 	 */
665 	fpinfo->retrieved_rows = -1;
666 	fpinfo->rel_startup_cost = -1;
667 	fpinfo->rel_total_cost = -1;
668 
669 	/*
670 	 * If the table or the server is configured to use remote estimates,
671 	 * connect to the foreign server and execute EXPLAIN to estimate the
672 	 * number of rows selected by the restriction clauses, as well as the
673 	 * average row width.  Otherwise, estimate using whatever statistics we
674 	 * have locally, in a way similar to ordinary tables.
675 	 */
676 	if (fpinfo->use_remote_estimate)
677 	{
678 		/*
679 		 * Get cost/size estimates with help of remote server.  Save the
680 		 * values in fpinfo so we don't need to do it again to generate the
681 		 * basic foreign path.
682 		 */
683 		estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
684 								&fpinfo->rows, &fpinfo->width,
685 								&fpinfo->startup_cost, &fpinfo->total_cost);
686 
687 		/* Report estimated baserel size to planner. */
688 		baserel->rows = fpinfo->rows;
689 		baserel->reltarget->width = fpinfo->width;
690 	}
691 	else
692 	{
693 		/*
694 		 * If the foreign table has never been ANALYZEd, it will have relpages
695 		 * and reltuples equal to zero, which most likely has nothing to do
696 		 * with reality.  We can't do a whole lot about that if we're not
697 		 * allowed to consult the remote server, but we can use a hack similar
698 		 * to plancat.c's treatment of empty relations: use a minimum size
699 		 * estimate of 10 pages, and divide by the column-datatype-based width
700 		 * estimate to get the corresponding number of tuples.
701 		 */
702 		if (baserel->pages == 0 && baserel->tuples == 0)
703 		{
704 			baserel->pages = 10;
705 			baserel->tuples =
706 				(10 * BLCKSZ) / (baserel->reltarget->width +
707 								 MAXALIGN(SizeofHeapTupleHeader));
708 		}
709 
710 		/* Estimate baserel size as best we can with local statistics. */
711 		set_baserel_size_estimates(root, baserel);
712 
713 		/* Fill in basically-bogus cost estimates for use later. */
714 		estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
715 								&fpinfo->rows, &fpinfo->width,
716 								&fpinfo->startup_cost, &fpinfo->total_cost);
717 	}
718 
719 	/*
720 	 * Set the name of relation in fpinfo, while we are constructing it here.
721 	 * It will be used to build the string describing the join relation in
722 	 * EXPLAIN output. We can't know whether VERBOSE option is specified or
723 	 * not, so always schema-qualify the foreign table name.
724 	 */
725 	fpinfo->relation_name = makeStringInfo();
726 	namespace = get_namespace_name(get_rel_namespace(foreigntableid));
727 	relname = get_rel_name(foreigntableid);
728 	refname = rte->eref->aliasname;
729 	appendStringInfo(fpinfo->relation_name, "%s.%s",
730 					 quote_identifier(namespace),
731 					 quote_identifier(relname));
732 	if (*refname && strcmp(refname, relname) != 0)
733 		appendStringInfo(fpinfo->relation_name, " %s",
734 						 quote_identifier(rte->eref->aliasname));
735 
736 	/* No outer and inner relations. */
737 	fpinfo->make_outerrel_subquery = false;
738 	fpinfo->make_innerrel_subquery = false;
739 	fpinfo->lower_subquery_rels = NULL;
740 	/* Set the relation index. */
741 	fpinfo->relation_index = baserel->relid;
742 }
743 
744 /*
745  * get_useful_ecs_for_relation
746  *		Determine which EquivalenceClasses might be involved in useful
747  *		orderings of this relation.
748  *
749  * This function is in some respects a mirror image of the core function
750  * pathkeys_useful_for_merging: for a regular table, we know what indexes
751  * we have and want to test whether any of them are useful.  For a foreign
752  * table, we don't know what indexes are present on the remote side but
753  * want to speculate about which ones we'd like to use if they existed.
754  *
755  * This function returns a list of potentially-useful equivalence classes,
756  * but it does not guarantee that an EquivalenceMember exists which contains
757  * Vars only from the given relation.  For example, given ft1 JOIN t1 ON
758  * ft1.x + t1.x = 0, this function will say that the equivalence class
759  * containing ft1.x + t1.x is potentially useful.  Supposing ft1 is remote and
760  * t1 is local (or on a different server), it will turn out that no useful
761  * ORDER BY clause can be generated.  It's not our job to figure that out
762  * here; we're only interested in identifying relevant ECs.
763  */
764 static List *
get_useful_ecs_for_relation(PlannerInfo * root,RelOptInfo * rel)765 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
766 {
767 	List	   *useful_eclass_list = NIL;
768 	ListCell   *lc;
769 	Relids		relids;
770 
771 	/*
772 	 * First, consider whether any active EC is potentially useful for a merge
773 	 * join against this relation.
774 	 */
775 	if (rel->has_eclass_joins)
776 	{
777 		foreach(lc, root->eq_classes)
778 		{
779 			EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
780 
781 			if (eclass_useful_for_merging(root, cur_ec, rel))
782 				useful_eclass_list = lappend(useful_eclass_list, cur_ec);
783 		}
784 	}
785 
786 	/*
787 	 * Next, consider whether there are any non-EC derivable join clauses that
788 	 * are merge-joinable.  If the joininfo list is empty, we can exit
789 	 * quickly.
790 	 */
791 	if (rel->joininfo == NIL)
792 		return useful_eclass_list;
793 
794 	/* If this is a child rel, we must use the topmost parent rel to search. */
795 	if (IS_OTHER_REL(rel))
796 	{
797 		Assert(!bms_is_empty(rel->top_parent_relids));
798 		relids = rel->top_parent_relids;
799 	}
800 	else
801 		relids = rel->relids;
802 
803 	/* Check each join clause in turn. */
804 	foreach(lc, rel->joininfo)
805 	{
806 		RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
807 
808 		/* Consider only mergejoinable clauses */
809 		if (restrictinfo->mergeopfamilies == NIL)
810 			continue;
811 
812 		/* Make sure we've got canonical ECs. */
813 		update_mergeclause_eclasses(root, restrictinfo);
814 
815 		/*
816 		 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
817 		 * that left_ec and right_ec will be initialized, per comments in
818 		 * distribute_qual_to_rels.
819 		 *
820 		 * We want to identify which side of this merge-joinable clause
821 		 * contains columns from the relation produced by this RelOptInfo. We
822 		 * test for overlap, not containment, because there could be extra
823 		 * relations on either side.  For example, suppose we've got something
824 		 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
825 		 * A.y = D.y.  The input rel might be the joinrel between A and B, and
826 		 * we'll consider the join clause A.y = D.y. relids contains a
827 		 * relation not involved in the join class (B) and the equivalence
828 		 * class for the left-hand side of the clause contains a relation not
829 		 * involved in the input rel (C).  Despite the fact that we have only
830 		 * overlap and not containment in either direction, A.y is potentially
831 		 * useful as a sort column.
832 		 *
833 		 * Note that it's even possible that relids overlaps neither side of
834 		 * the join clause.  For example, consider A LEFT JOIN B ON A.x = B.x
835 		 * AND A.x = 1.  The clause A.x = 1 will appear in B's joininfo list,
836 		 * but overlaps neither side of B.  In that case, we just skip this
837 		 * join clause, since it doesn't suggest a useful sort order for this
838 		 * relation.
839 		 */
840 		if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
841 			useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
842 														restrictinfo->right_ec);
843 		else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
844 			useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
845 														restrictinfo->left_ec);
846 	}
847 
848 	return useful_eclass_list;
849 }
850 
851 /*
852  * get_useful_pathkeys_for_relation
853  *		Determine which orderings of a relation might be useful.
854  *
855  * Getting data in sorted order can be useful either because the requested
856  * order matches the final output ordering for the overall query we're
857  * planning, or because it enables an efficient merge join.  Here, we try
858  * to figure out which pathkeys to consider.
859  */
860 static List *
get_useful_pathkeys_for_relation(PlannerInfo * root,RelOptInfo * rel)861 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
862 {
863 	List	   *useful_pathkeys_list = NIL;
864 	List	   *useful_eclass_list;
865 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
866 	EquivalenceClass *query_ec = NULL;
867 	ListCell   *lc;
868 
869 	/*
870 	 * Pushing the query_pathkeys to the remote server is always worth
871 	 * considering, because it might let us avoid a local sort.
872 	 */
873 	fpinfo->qp_is_pushdown_safe = false;
874 	if (root->query_pathkeys)
875 	{
876 		bool		query_pathkeys_ok = true;
877 
878 		foreach(lc, root->query_pathkeys)
879 		{
880 			PathKey    *pathkey = (PathKey *) lfirst(lc);
881 			EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
882 			Expr	   *em_expr;
883 
884 			/*
885 			 * The planner and executor don't have any clever strategy for
886 			 * taking data sorted by a prefix of the query's pathkeys and
887 			 * getting it to be sorted by all of those pathkeys. We'll just
888 			 * end up resorting the entire data set.  So, unless we can push
889 			 * down all of the query pathkeys, forget it.
890 			 *
891 			 * is_foreign_expr would detect volatile expressions as well, but
892 			 * checking ec_has_volatile here saves some cycles.
893 			 */
894 			if (pathkey_ec->ec_has_volatile ||
895 				!(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
896 				!is_foreign_expr(root, rel, em_expr))
897 			{
898 				query_pathkeys_ok = false;
899 				break;
900 			}
901 		}
902 
903 		if (query_pathkeys_ok)
904 		{
905 			useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
906 			fpinfo->qp_is_pushdown_safe = true;
907 		}
908 	}
909 
910 	/*
911 	 * Even if we're not using remote estimates, having the remote side do the
912 	 * sort generally won't be any worse than doing it locally, and it might
913 	 * be much better if the remote side can generate data in the right order
914 	 * without needing a sort at all.  However, what we're going to do next is
915 	 * try to generate pathkeys that seem promising for possible merge joins,
916 	 * and that's more speculative.  A wrong choice might hurt quite a bit, so
917 	 * bail out if we can't use remote estimates.
918 	 */
919 	if (!fpinfo->use_remote_estimate)
920 		return useful_pathkeys_list;
921 
922 	/* Get the list of interesting EquivalenceClasses. */
923 	useful_eclass_list = get_useful_ecs_for_relation(root, rel);
924 
925 	/* Extract unique EC for query, if any, so we don't consider it again. */
926 	if (list_length(root->query_pathkeys) == 1)
927 	{
928 		PathKey    *query_pathkey = linitial(root->query_pathkeys);
929 
930 		query_ec = query_pathkey->pk_eclass;
931 	}
932 
933 	/*
934 	 * As a heuristic, the only pathkeys we consider here are those of length
935 	 * one.  It's surely possible to consider more, but since each one we
936 	 * choose to consider will generate a round-trip to the remote side, we
937 	 * need to be a bit cautious here.  It would sure be nice to have a local
938 	 * cache of information about remote index definitions...
939 	 */
940 	foreach(lc, useful_eclass_list)
941 	{
942 		EquivalenceClass *cur_ec = lfirst(lc);
943 		Expr	   *em_expr;
944 		PathKey    *pathkey;
945 
946 		/* If redundant with what we did above, skip it. */
947 		if (cur_ec == query_ec)
948 			continue;
949 
950 		/* If no pushable expression for this rel, skip it. */
951 		em_expr = find_em_expr_for_rel(cur_ec, rel);
952 		if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
953 			continue;
954 
955 		/* Looks like we can generate a pathkey, so let's do it. */
956 		pathkey = make_canonical_pathkey(root, cur_ec,
957 										 linitial_oid(cur_ec->ec_opfamilies),
958 										 BTLessStrategyNumber,
959 										 false);
960 		useful_pathkeys_list = lappend(useful_pathkeys_list,
961 									   list_make1(pathkey));
962 	}
963 
964 	return useful_pathkeys_list;
965 }
966 
967 /*
968  * postgresGetForeignPaths
969  *		Create possible scan paths for a scan on the foreign table
970  */
971 static void
postgresGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)972 postgresGetForeignPaths(PlannerInfo *root,
973 						RelOptInfo *baserel,
974 						Oid foreigntableid)
975 {
976 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
977 	ForeignPath *path;
978 	List	   *ppi_list;
979 	ListCell   *lc;
980 
981 	/*
982 	 * Create simplest ForeignScan path node and add it to baserel.  This path
983 	 * corresponds to SeqScan path of regular tables (though depending on what
984 	 * baserestrict conditions we were able to send to remote, there might
985 	 * actually be an indexscan happening there).  We already did all the work
986 	 * to estimate cost and size of this path.
987 	 *
988 	 * Although this path uses no join clauses, it could still have required
989 	 * parameterization due to LATERAL refs in its tlist.
990 	 */
991 	path = create_foreignscan_path(root, baserel,
992 								   NULL,	/* default pathtarget */
993 								   fpinfo->rows,
994 								   fpinfo->startup_cost,
995 								   fpinfo->total_cost,
996 								   NIL, /* no pathkeys */
997 								   baserel->lateral_relids,
998 								   NULL,	/* no extra plan */
999 								   NIL);	/* no fdw_private list */
1000 	add_path(baserel, (Path *) path);
1001 
1002 	/* Add paths with pathkeys */
1003 	add_paths_with_pathkeys_for_rel(root, baserel, NULL);
1004 
1005 	/*
1006 	 * If we're not using remote estimates, stop here.  We have no way to
1007 	 * estimate whether any join clauses would be worth sending across, so
1008 	 * don't bother building parameterized paths.
1009 	 */
1010 	if (!fpinfo->use_remote_estimate)
1011 		return;
1012 
1013 	/*
1014 	 * Thumb through all join clauses for the rel to identify which outer
1015 	 * relations could supply one or more safe-to-send-to-remote join clauses.
1016 	 * We'll build a parameterized path for each such outer relation.
1017 	 *
1018 	 * It's convenient to manage this by representing each candidate outer
1019 	 * relation by the ParamPathInfo node for it.  We can then use the
1020 	 * ppi_clauses list in the ParamPathInfo node directly as a list of the
1021 	 * interesting join clauses for that rel.  This takes care of the
1022 	 * possibility that there are multiple safe join clauses for such a rel,
1023 	 * and also ensures that we account for unsafe join clauses that we'll
1024 	 * still have to enforce locally (since the parameterized-path machinery
1025 	 * insists that we handle all movable clauses).
1026 	 */
1027 	ppi_list = NIL;
1028 	foreach(lc, baserel->joininfo)
1029 	{
1030 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1031 		Relids		required_outer;
1032 		ParamPathInfo *param_info;
1033 
1034 		/* Check if clause can be moved to this rel */
1035 		if (!join_clause_is_movable_to(rinfo, baserel))
1036 			continue;
1037 
1038 		/* See if it is safe to send to remote */
1039 		if (!is_foreign_expr(root, baserel, rinfo->clause))
1040 			continue;
1041 
1042 		/* Calculate required outer rels for the resulting path */
1043 		required_outer = bms_union(rinfo->clause_relids,
1044 								   baserel->lateral_relids);
1045 		/* We do not want the foreign rel itself listed in required_outer */
1046 		required_outer = bms_del_member(required_outer, baserel->relid);
1047 
1048 		/*
1049 		 * required_outer probably can't be empty here, but if it were, we
1050 		 * couldn't make a parameterized path.
1051 		 */
1052 		if (bms_is_empty(required_outer))
1053 			continue;
1054 
1055 		/* Get the ParamPathInfo */
1056 		param_info = get_baserel_parampathinfo(root, baserel,
1057 											   required_outer);
1058 		Assert(param_info != NULL);
1059 
1060 		/*
1061 		 * Add it to list unless we already have it.  Testing pointer equality
1062 		 * is OK since get_baserel_parampathinfo won't make duplicates.
1063 		 */
1064 		ppi_list = list_append_unique_ptr(ppi_list, param_info);
1065 	}
1066 
1067 	/*
1068 	 * The above scan examined only "generic" join clauses, not those that
1069 	 * were absorbed into EquivalenceClauses.  See if we can make anything out
1070 	 * of EquivalenceClauses.
1071 	 */
1072 	if (baserel->has_eclass_joins)
1073 	{
1074 		/*
1075 		 * We repeatedly scan the eclass list looking for column references
1076 		 * (or expressions) belonging to the foreign rel.  Each time we find
1077 		 * one, we generate a list of equivalence joinclauses for it, and then
1078 		 * see if any are safe to send to the remote.  Repeat till there are
1079 		 * no more candidate EC members.
1080 		 */
1081 		ec_member_foreign_arg arg;
1082 
1083 		arg.already_used = NIL;
1084 		for (;;)
1085 		{
1086 			List	   *clauses;
1087 
1088 			/* Make clauses, skipping any that join to lateral_referencers */
1089 			arg.current = NULL;
1090 			clauses = generate_implied_equalities_for_column(root,
1091 															 baserel,
1092 															 ec_member_matches_foreign,
1093 															 (void *) &arg,
1094 															 baserel->lateral_referencers);
1095 
1096 			/* Done if there are no more expressions in the foreign rel */
1097 			if (arg.current == NULL)
1098 			{
1099 				Assert(clauses == NIL);
1100 				break;
1101 			}
1102 
1103 			/* Scan the extracted join clauses */
1104 			foreach(lc, clauses)
1105 			{
1106 				RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1107 				Relids		required_outer;
1108 				ParamPathInfo *param_info;
1109 
1110 				/* Check if clause can be moved to this rel */
1111 				if (!join_clause_is_movable_to(rinfo, baserel))
1112 					continue;
1113 
1114 				/* See if it is safe to send to remote */
1115 				if (!is_foreign_expr(root, baserel, rinfo->clause))
1116 					continue;
1117 
1118 				/* Calculate required outer rels for the resulting path */
1119 				required_outer = bms_union(rinfo->clause_relids,
1120 										   baserel->lateral_relids);
1121 				required_outer = bms_del_member(required_outer, baserel->relid);
1122 				if (bms_is_empty(required_outer))
1123 					continue;
1124 
1125 				/* Get the ParamPathInfo */
1126 				param_info = get_baserel_parampathinfo(root, baserel,
1127 													   required_outer);
1128 				Assert(param_info != NULL);
1129 
1130 				/* Add it to list unless we already have it */
1131 				ppi_list = list_append_unique_ptr(ppi_list, param_info);
1132 			}
1133 
1134 			/* Try again, now ignoring the expression we found this time */
1135 			arg.already_used = lappend(arg.already_used, arg.current);
1136 		}
1137 	}
1138 
1139 	/*
1140 	 * Now build a path for each useful outer relation.
1141 	 */
1142 	foreach(lc, ppi_list)
1143 	{
1144 		ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1145 		double		rows;
1146 		int			width;
1147 		Cost		startup_cost;
1148 		Cost		total_cost;
1149 
1150 		/* Get a cost estimate from the remote */
1151 		estimate_path_cost_size(root, baserel,
1152 								param_info->ppi_clauses, NIL, NULL,
1153 								&rows, &width,
1154 								&startup_cost, &total_cost);
1155 
1156 		/*
1157 		 * ppi_rows currently won't get looked at by anything, but still we
1158 		 * may as well ensure that it matches our idea of the rowcount.
1159 		 */
1160 		param_info->ppi_rows = rows;
1161 
1162 		/* Make the path */
1163 		path = create_foreignscan_path(root, baserel,
1164 									   NULL,	/* default pathtarget */
1165 									   rows,
1166 									   startup_cost,
1167 									   total_cost,
1168 									   NIL, /* no pathkeys */
1169 									   param_info->ppi_req_outer,
1170 									   NULL,
1171 									   NIL);	/* no fdw_private list */
1172 		add_path(baserel, (Path *) path);
1173 	}
1174 }
1175 
1176 /*
1177  * postgresGetForeignPlan
1178  *		Create ForeignScan plan node which implements selected best path
1179  */
1180 static ForeignScan *
postgresGetForeignPlan(PlannerInfo * root,RelOptInfo * foreignrel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)1181 postgresGetForeignPlan(PlannerInfo *root,
1182 					   RelOptInfo *foreignrel,
1183 					   Oid foreigntableid,
1184 					   ForeignPath *best_path,
1185 					   List *tlist,
1186 					   List *scan_clauses,
1187 					   Plan *outer_plan)
1188 {
1189 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1190 	Index		scan_relid;
1191 	List	   *fdw_private;
1192 	List	   *remote_exprs = NIL;
1193 	List	   *local_exprs = NIL;
1194 	List	   *params_list = NIL;
1195 	List	   *fdw_scan_tlist = NIL;
1196 	List	   *fdw_recheck_quals = NIL;
1197 	List	   *retrieved_attrs;
1198 	StringInfoData sql;
1199 	bool		has_final_sort = false;
1200 	bool		has_limit = false;
1201 	ListCell   *lc;
1202 
1203 	/*
1204 	 * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1205 	 */
1206 	if (best_path->fdw_private)
1207 	{
1208 		has_final_sort = intVal(list_nth(best_path->fdw_private,
1209 										 FdwPathPrivateHasFinalSort));
1210 		has_limit = intVal(list_nth(best_path->fdw_private,
1211 									FdwPathPrivateHasLimit));
1212 	}
1213 
1214 	if (IS_SIMPLE_REL(foreignrel))
1215 	{
1216 		/*
1217 		 * For base relations, set scan_relid as the relid of the relation.
1218 		 */
1219 		scan_relid = foreignrel->relid;
1220 
1221 		/*
1222 		 * In a base-relation scan, we must apply the given scan_clauses.
1223 		 *
1224 		 * Separate the scan_clauses into those that can be executed remotely
1225 		 * and those that can't.  baserestrictinfo clauses that were
1226 		 * previously determined to be safe or unsafe by classifyConditions
1227 		 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1228 		 * else in the scan_clauses list will be a join clause, which we have
1229 		 * to check for remote-safety.
1230 		 *
1231 		 * Note: the join clauses we see here should be the exact same ones
1232 		 * previously examined by postgresGetForeignPaths.  Possibly it'd be
1233 		 * worth passing forward the classification work done then, rather
1234 		 * than repeating it here.
1235 		 *
1236 		 * This code must match "extract_actual_clauses(scan_clauses, false)"
1237 		 * except for the additional decision about remote versus local
1238 		 * execution.
1239 		 */
1240 		foreach(lc, scan_clauses)
1241 		{
1242 			RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1243 
1244 			/* Ignore any pseudoconstants, they're dealt with elsewhere */
1245 			if (rinfo->pseudoconstant)
1246 				continue;
1247 
1248 			if (list_member_ptr(fpinfo->remote_conds, rinfo))
1249 				remote_exprs = lappend(remote_exprs, rinfo->clause);
1250 			else if (list_member_ptr(fpinfo->local_conds, rinfo))
1251 				local_exprs = lappend(local_exprs, rinfo->clause);
1252 			else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1253 				remote_exprs = lappend(remote_exprs, rinfo->clause);
1254 			else
1255 				local_exprs = lappend(local_exprs, rinfo->clause);
1256 		}
1257 
1258 		/*
1259 		 * For a base-relation scan, we have to support EPQ recheck, which
1260 		 * should recheck all the remote quals.
1261 		 */
1262 		fdw_recheck_quals = remote_exprs;
1263 	}
1264 	else
1265 	{
1266 		/*
1267 		 * Join relation or upper relation - set scan_relid to 0.
1268 		 */
1269 		scan_relid = 0;
1270 
1271 		/*
1272 		 * For a join rel, baserestrictinfo is NIL and we are not considering
1273 		 * parameterization right now, so there should be no scan_clauses for
1274 		 * a joinrel or an upper rel either.
1275 		 */
1276 		Assert(!scan_clauses);
1277 
1278 		/*
1279 		 * Instead we get the conditions to apply from the fdw_private
1280 		 * structure.
1281 		 */
1282 		remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1283 		local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1284 
1285 		/*
1286 		 * We leave fdw_recheck_quals empty in this case, since we never need
1287 		 * to apply EPQ recheck clauses.  In the case of a joinrel, EPQ
1288 		 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1289 		 * If we're planning an upperrel (ie, remote grouping or aggregation)
1290 		 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1291 		 * allowed, and indeed we *can't* put the remote clauses into
1292 		 * fdw_recheck_quals because the unaggregated Vars won't be available
1293 		 * locally.
1294 		 */
1295 
1296 		/* Build the list of columns to be fetched from the foreign server. */
1297 		fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1298 
1299 		/*
1300 		 * Ensure that the outer plan produces a tuple whose descriptor
1301 		 * matches our scan tuple slot.  Also, remove the local conditions
1302 		 * from outer plan's quals, lest they be evaluated twice, once by the
1303 		 * local plan and once by the scan.
1304 		 */
1305 		if (outer_plan)
1306 		{
1307 			ListCell   *lc;
1308 
1309 			/*
1310 			 * Right now, we only consider grouping and aggregation beyond
1311 			 * joins. Queries involving aggregates or grouping do not require
1312 			 * EPQ mechanism, hence should not have an outer plan here.
1313 			 */
1314 			Assert(!IS_UPPER_REL(foreignrel));
1315 
1316 			/*
1317 			 * First, update the plan's qual list if possible.  In some cases
1318 			 * the quals might be enforced below the topmost plan level, in
1319 			 * which case we'll fail to remove them; it's not worth working
1320 			 * harder than this.
1321 			 */
1322 			foreach(lc, local_exprs)
1323 			{
1324 				Node	   *qual = lfirst(lc);
1325 
1326 				outer_plan->qual = list_delete(outer_plan->qual, qual);
1327 
1328 				/*
1329 				 * For an inner join the local conditions of foreign scan plan
1330 				 * can be part of the joinquals as well.  (They might also be
1331 				 * in the mergequals or hashquals, but we can't touch those
1332 				 * without breaking the plan.)
1333 				 */
1334 				if (IsA(outer_plan, NestLoop) ||
1335 					IsA(outer_plan, MergeJoin) ||
1336 					IsA(outer_plan, HashJoin))
1337 				{
1338 					Join	   *join_plan = (Join *) outer_plan;
1339 
1340 					if (join_plan->jointype == JOIN_INNER)
1341 						join_plan->joinqual = list_delete(join_plan->joinqual,
1342 														  qual);
1343 				}
1344 			}
1345 
1346 			/*
1347 			 * Now fix the subplan's tlist --- this might result in inserting
1348 			 * a Result node atop the plan tree.
1349 			 */
1350 			outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1351 												best_path->path.parallel_safe);
1352 		}
1353 	}
1354 
1355 	/*
1356 	 * Build the query string to be sent for execution, and identify
1357 	 * expressions to be sent as parameters.
1358 	 */
1359 	initStringInfo(&sql);
1360 	deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1361 							remote_exprs, best_path->path.pathkeys,
1362 							has_final_sort, has_limit, false,
1363 							&retrieved_attrs, &params_list);
1364 
1365 	/* Remember remote_exprs for possible use by postgresPlanDirectModify */
1366 	fpinfo->final_remote_exprs = remote_exprs;
1367 
1368 	/*
1369 	 * Build the fdw_private list that will be available to the executor.
1370 	 * Items in the list must match order in enum FdwScanPrivateIndex.
1371 	 */
1372 	fdw_private = list_make3(makeString(sql.data),
1373 							 retrieved_attrs,
1374 							 makeInteger(fpinfo->fetch_size));
1375 	if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1376 		fdw_private = lappend(fdw_private,
1377 							  makeString(fpinfo->relation_name->data));
1378 
1379 	/*
1380 	 * Create the ForeignScan node for the given relation.
1381 	 *
1382 	 * Note that the remote parameter expressions are stored in the fdw_exprs
1383 	 * field of the finished plan node; we can't keep them in private state
1384 	 * because then they wouldn't be subject to later planner processing.
1385 	 */
1386 	return make_foreignscan(tlist,
1387 							local_exprs,
1388 							scan_relid,
1389 							params_list,
1390 							fdw_private,
1391 							fdw_scan_tlist,
1392 							fdw_recheck_quals,
1393 							outer_plan);
1394 }
1395 
1396 /*
1397  * postgresBeginForeignScan
1398  *		Initiate an executor scan of a foreign PostgreSQL table.
1399  */
1400 static void
postgresBeginForeignScan(ForeignScanState * node,int eflags)1401 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1402 {
1403 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1404 	EState	   *estate = node->ss.ps.state;
1405 	PgFdwScanState *fsstate;
1406 	RangeTblEntry *rte;
1407 	Oid			userid;
1408 	ForeignTable *table;
1409 	UserMapping *user;
1410 	int			rtindex;
1411 	int			numParams;
1412 
1413 	/*
1414 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
1415 	 */
1416 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1417 		return;
1418 
1419 	/*
1420 	 * We'll save private state in node->fdw_state.
1421 	 */
1422 	fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1423 	node->fdw_state = (void *) fsstate;
1424 
1425 	/*
1426 	 * Identify which user to do the remote access as.  This should match what
1427 	 * ExecCheckRTEPerms() does.  In case of a join or aggregate, use the
1428 	 * lowest-numbered member RTE as a representative; we would get the same
1429 	 * result from any.
1430 	 */
1431 	if (fsplan->scan.scanrelid > 0)
1432 		rtindex = fsplan->scan.scanrelid;
1433 	else
1434 		rtindex = bms_next_member(fsplan->fs_relids, -1);
1435 	rte = exec_rt_fetch(rtindex, estate);
1436 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1437 
1438 	/* Get info about foreign table. */
1439 	table = GetForeignTable(rte->relid);
1440 	user = GetUserMapping(userid, table->serverid);
1441 
1442 	/*
1443 	 * Get connection to the foreign server.  Connection manager will
1444 	 * establish new connection if necessary.
1445 	 */
1446 	fsstate->conn = GetConnection(user, false);
1447 
1448 	/* Assign a unique ID for my cursor */
1449 	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1450 	fsstate->cursor_exists = false;
1451 
1452 	/* Get private info created by planner functions. */
1453 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
1454 									 FdwScanPrivateSelectSql));
1455 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1456 												 FdwScanPrivateRetrievedAttrs);
1457 	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1458 										  FdwScanPrivateFetchSize));
1459 
1460 	/* Create contexts for batches of tuples and per-tuple temp workspace. */
1461 	fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1462 											   "postgres_fdw tuple data",
1463 											   ALLOCSET_DEFAULT_SIZES);
1464 	fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1465 											  "postgres_fdw temporary data",
1466 											  ALLOCSET_SMALL_SIZES);
1467 
1468 	/*
1469 	 * Get info we'll need for converting data fetched from the foreign server
1470 	 * into local representation and error reporting during that process.
1471 	 */
1472 	if (fsplan->scan.scanrelid > 0)
1473 	{
1474 		fsstate->rel = node->ss.ss_currentRelation;
1475 		fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1476 	}
1477 	else
1478 	{
1479 		fsstate->rel = NULL;
1480 		fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1481 	}
1482 
1483 	fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1484 
1485 	/*
1486 	 * Prepare for processing of parameters used in remote query, if any.
1487 	 */
1488 	numParams = list_length(fsplan->fdw_exprs);
1489 	fsstate->numParams = numParams;
1490 	if (numParams > 0)
1491 		prepare_query_params((PlanState *) node,
1492 							 fsplan->fdw_exprs,
1493 							 numParams,
1494 							 &fsstate->param_flinfo,
1495 							 &fsstate->param_exprs,
1496 							 &fsstate->param_values);
1497 }
1498 
1499 /*
1500  * postgresIterateForeignScan
1501  *		Retrieve next row from the result set, or clear tuple slot to indicate
1502  *		EOF.
1503  */
1504 static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState * node)1505 postgresIterateForeignScan(ForeignScanState *node)
1506 {
1507 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1508 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1509 
1510 	/*
1511 	 * If this is the first call after Begin or ReScan, we need to create the
1512 	 * cursor on the remote side.
1513 	 */
1514 	if (!fsstate->cursor_exists)
1515 		create_cursor(node);
1516 
1517 	/*
1518 	 * Get some more tuples, if we've run out.
1519 	 */
1520 	if (fsstate->next_tuple >= fsstate->num_tuples)
1521 	{
1522 		/* No point in another fetch if we already detected EOF, though. */
1523 		if (!fsstate->eof_reached)
1524 			fetch_more_data(node);
1525 		/* If we didn't get any tuples, must be end of data. */
1526 		if (fsstate->next_tuple >= fsstate->num_tuples)
1527 			return ExecClearTuple(slot);
1528 	}
1529 
1530 	/*
1531 	 * Return the next tuple.
1532 	 */
1533 	ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1534 					   slot,
1535 					   false);
1536 
1537 	return slot;
1538 }
1539 
1540 /*
1541  * postgresReScanForeignScan
1542  *		Restart the scan.
1543  */
1544 static void
postgresReScanForeignScan(ForeignScanState * node)1545 postgresReScanForeignScan(ForeignScanState *node)
1546 {
1547 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1548 	char		sql[64];
1549 	PGresult   *res;
1550 
1551 	/* If we haven't created the cursor yet, nothing to do. */
1552 	if (!fsstate->cursor_exists)
1553 		return;
1554 
1555 	/*
1556 	 * If any internal parameters affecting this node have changed, we'd
1557 	 * better destroy and recreate the cursor.  Otherwise, rewinding it should
1558 	 * be good enough.  If we've only fetched zero or one batch, we needn't
1559 	 * even rewind the cursor, just rescan what we have.
1560 	 */
1561 	if (node->ss.ps.chgParam != NULL)
1562 	{
1563 		fsstate->cursor_exists = false;
1564 		snprintf(sql, sizeof(sql), "CLOSE c%u",
1565 				 fsstate->cursor_number);
1566 	}
1567 	else if (fsstate->fetch_ct_2 > 1)
1568 	{
1569 		snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1570 				 fsstate->cursor_number);
1571 	}
1572 	else
1573 	{
1574 		/* Easy: just rescan what we already have in memory, if anything */
1575 		fsstate->next_tuple = 0;
1576 		return;
1577 	}
1578 
1579 	/*
1580 	 * We don't use a PG_TRY block here, so be careful not to throw error
1581 	 * without releasing the PGresult.
1582 	 */
1583 	res = pgfdw_exec_query(fsstate->conn, sql);
1584 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1585 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1586 	PQclear(res);
1587 
1588 	/* Now force a fresh FETCH. */
1589 	fsstate->tuples = NULL;
1590 	fsstate->num_tuples = 0;
1591 	fsstate->next_tuple = 0;
1592 	fsstate->fetch_ct_2 = 0;
1593 	fsstate->eof_reached = false;
1594 }
1595 
1596 /*
1597  * postgresEndForeignScan
1598  *		Finish scanning foreign table and dispose objects used for this scan
1599  */
1600 static void
postgresEndForeignScan(ForeignScanState * node)1601 postgresEndForeignScan(ForeignScanState *node)
1602 {
1603 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1604 
1605 	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1606 	if (fsstate == NULL)
1607 		return;
1608 
1609 	/* Close the cursor if open, to prevent accumulation of cursors */
1610 	if (fsstate->cursor_exists)
1611 		close_cursor(fsstate->conn, fsstate->cursor_number);
1612 
1613 	/* Release remote connection */
1614 	ReleaseConnection(fsstate->conn);
1615 	fsstate->conn = NULL;
1616 
1617 	/* MemoryContexts will be deleted automatically. */
1618 }
1619 
1620 /*
1621  * postgresAddForeignUpdateTargets
1622  *		Add resjunk column(s) needed for update/delete on a foreign table
1623  */
1624 static void
postgresAddForeignUpdateTargets(Query * parsetree,RangeTblEntry * target_rte,Relation target_relation)1625 postgresAddForeignUpdateTargets(Query *parsetree,
1626 								RangeTblEntry *target_rte,
1627 								Relation target_relation)
1628 {
1629 	Var		   *var;
1630 	const char *attrname;
1631 	TargetEntry *tle;
1632 
1633 	/*
1634 	 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1635 	 */
1636 
1637 	/* Make a Var representing the desired value */
1638 	var = makeVar(parsetree->resultRelation,
1639 				  SelfItemPointerAttributeNumber,
1640 				  TIDOID,
1641 				  -1,
1642 				  InvalidOid,
1643 				  0);
1644 
1645 	/* Wrap it in a resjunk TLE with the right name ... */
1646 	attrname = "ctid";
1647 
1648 	tle = makeTargetEntry((Expr *) var,
1649 						  list_length(parsetree->targetList) + 1,
1650 						  pstrdup(attrname),
1651 						  true);
1652 
1653 	/* ... and add it to the query's targetlist */
1654 	parsetree->targetList = lappend(parsetree->targetList, tle);
1655 }
1656 
1657 /*
1658  * postgresPlanForeignModify
1659  *		Plan an insert/update/delete operation on a foreign table
1660  */
1661 static List *
postgresPlanForeignModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)1662 postgresPlanForeignModify(PlannerInfo *root,
1663 						  ModifyTable *plan,
1664 						  Index resultRelation,
1665 						  int subplan_index)
1666 {
1667 	CmdType		operation = plan->operation;
1668 	RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1669 	Relation	rel;
1670 	StringInfoData sql;
1671 	List	   *targetAttrs = NIL;
1672 	List	   *withCheckOptionList = NIL;
1673 	List	   *returningList = NIL;
1674 	List	   *retrieved_attrs = NIL;
1675 	bool		doNothing = false;
1676 
1677 	initStringInfo(&sql);
1678 
1679 	/*
1680 	 * Core code already has some lock on each rel being planned, so we can
1681 	 * use NoLock here.
1682 	 */
1683 	rel = table_open(rte->relid, NoLock);
1684 
1685 	/*
1686 	 * In an INSERT, we transmit all columns that are defined in the foreign
1687 	 * table.  In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1688 	 * foreign table, we transmit all columns like INSERT; else we transmit
1689 	 * only columns that were explicitly targets of the UPDATE, so as to avoid
1690 	 * unnecessary data transmission.  (We can't do that for INSERT since we
1691 	 * would miss sending default values for columns not listed in the source
1692 	 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1693 	 * those triggers might change values for non-target columns, in which
1694 	 * case we would miss sending changed values for those columns.)
1695 	 */
1696 	if (operation == CMD_INSERT ||
1697 		(operation == CMD_UPDATE &&
1698 		 rel->trigdesc &&
1699 		 rel->trigdesc->trig_update_before_row))
1700 	{
1701 		TupleDesc	tupdesc = RelationGetDescr(rel);
1702 		int			attnum;
1703 
1704 		for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1705 		{
1706 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1707 
1708 			if (!attr->attisdropped)
1709 				targetAttrs = lappend_int(targetAttrs, attnum);
1710 		}
1711 	}
1712 	else if (operation == CMD_UPDATE)
1713 	{
1714 		int			col;
1715 		Bitmapset  *allUpdatedCols = bms_union(rte->updatedCols, rte->extraUpdatedCols);
1716 
1717 		col = -1;
1718 		while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1719 		{
1720 			/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1721 			AttrNumber	attno = col + FirstLowInvalidHeapAttributeNumber;
1722 
1723 			if (attno <= InvalidAttrNumber) /* shouldn't happen */
1724 				elog(ERROR, "system-column update is not supported");
1725 			targetAttrs = lappend_int(targetAttrs, attno);
1726 		}
1727 	}
1728 
1729 	/*
1730 	 * Extract the relevant WITH CHECK OPTION list if any.
1731 	 */
1732 	if (plan->withCheckOptionLists)
1733 		withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1734 												subplan_index);
1735 
1736 	/*
1737 	 * Extract the relevant RETURNING list if any.
1738 	 */
1739 	if (plan->returningLists)
1740 		returningList = (List *) list_nth(plan->returningLists, subplan_index);
1741 
1742 	/*
1743 	 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1744 	 * should have already been rejected in the optimizer, as presently there
1745 	 * is no way to recognize an arbiter index on a foreign table.  Only DO
1746 	 * NOTHING is supported without an inference specification.
1747 	 */
1748 	if (plan->onConflictAction == ONCONFLICT_NOTHING)
1749 		doNothing = true;
1750 	else if (plan->onConflictAction != ONCONFLICT_NONE)
1751 		elog(ERROR, "unexpected ON CONFLICT specification: %d",
1752 			 (int) plan->onConflictAction);
1753 
1754 	/*
1755 	 * Construct the SQL command string.
1756 	 */
1757 	switch (operation)
1758 	{
1759 		case CMD_INSERT:
1760 			deparseInsertSql(&sql, rte, resultRelation, rel,
1761 							 targetAttrs, doNothing,
1762 							 withCheckOptionList, returningList,
1763 							 &retrieved_attrs);
1764 			break;
1765 		case CMD_UPDATE:
1766 			deparseUpdateSql(&sql, rte, resultRelation, rel,
1767 							 targetAttrs,
1768 							 withCheckOptionList, returningList,
1769 							 &retrieved_attrs);
1770 			break;
1771 		case CMD_DELETE:
1772 			deparseDeleteSql(&sql, rte, resultRelation, rel,
1773 							 returningList,
1774 							 &retrieved_attrs);
1775 			break;
1776 		default:
1777 			elog(ERROR, "unexpected operation: %d", (int) operation);
1778 			break;
1779 	}
1780 
1781 	table_close(rel, NoLock);
1782 
1783 	/*
1784 	 * Build the fdw_private list that will be available to the executor.
1785 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
1786 	 */
1787 	return list_make4(makeString(sql.data),
1788 					  targetAttrs,
1789 					  makeInteger((retrieved_attrs != NIL)),
1790 					  retrieved_attrs);
1791 }
1792 
1793 /*
1794  * postgresBeginForeignModify
1795  *		Begin an insert/update/delete operation on a foreign table
1796  */
1797 static void
postgresBeginForeignModify(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo,List * fdw_private,int subplan_index,int eflags)1798 postgresBeginForeignModify(ModifyTableState *mtstate,
1799 						   ResultRelInfo *resultRelInfo,
1800 						   List *fdw_private,
1801 						   int subplan_index,
1802 						   int eflags)
1803 {
1804 	PgFdwModifyState *fmstate;
1805 	char	   *query;
1806 	List	   *target_attrs;
1807 	bool		has_returning;
1808 	List	   *retrieved_attrs;
1809 	RangeTblEntry *rte;
1810 
1811 	/*
1812 	 * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
1813 	 * stays NULL.
1814 	 */
1815 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1816 		return;
1817 
1818 	/* Deconstruct fdw_private data. */
1819 	query = strVal(list_nth(fdw_private,
1820 							FdwModifyPrivateUpdateSql));
1821 	target_attrs = (List *) list_nth(fdw_private,
1822 									 FdwModifyPrivateTargetAttnums);
1823 	has_returning = intVal(list_nth(fdw_private,
1824 									FdwModifyPrivateHasReturning));
1825 	retrieved_attrs = (List *) list_nth(fdw_private,
1826 										FdwModifyPrivateRetrievedAttrs);
1827 
1828 	/* Find RTE. */
1829 	rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1830 						mtstate->ps.state);
1831 
1832 	/* Construct an execution state. */
1833 	fmstate = create_foreign_modify(mtstate->ps.state,
1834 									rte,
1835 									resultRelInfo,
1836 									mtstate->operation,
1837 									mtstate->mt_plans[subplan_index]->plan,
1838 									query,
1839 									target_attrs,
1840 									has_returning,
1841 									retrieved_attrs);
1842 
1843 	resultRelInfo->ri_FdwState = fmstate;
1844 }
1845 
1846 /*
1847  * postgresExecForeignInsert
1848  *		Insert one row into a foreign table
1849  */
1850 static TupleTableSlot *
postgresExecForeignInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1851 postgresExecForeignInsert(EState *estate,
1852 						  ResultRelInfo *resultRelInfo,
1853 						  TupleTableSlot *slot,
1854 						  TupleTableSlot *planSlot)
1855 {
1856 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1857 	TupleTableSlot *rslot;
1858 
1859 	/*
1860 	 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1861 	 * postgresBeginForeignInsert())
1862 	 */
1863 	if (fmstate->aux_fmstate)
1864 		resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1865 	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1866 								   slot, planSlot);
1867 	/* Revert that change */
1868 	if (fmstate->aux_fmstate)
1869 		resultRelInfo->ri_FdwState = fmstate;
1870 
1871 	return rslot;
1872 }
1873 
1874 /*
1875  * postgresExecForeignUpdate
1876  *		Update one row in a foreign table
1877  */
1878 static TupleTableSlot *
postgresExecForeignUpdate(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1879 postgresExecForeignUpdate(EState *estate,
1880 						  ResultRelInfo *resultRelInfo,
1881 						  TupleTableSlot *slot,
1882 						  TupleTableSlot *planSlot)
1883 {
1884 	return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
1885 								  slot, planSlot);
1886 }
1887 
1888 /*
1889  * postgresExecForeignDelete
1890  *		Delete one row from a foreign table
1891  */
1892 static TupleTableSlot *
postgresExecForeignDelete(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1893 postgresExecForeignDelete(EState *estate,
1894 						  ResultRelInfo *resultRelInfo,
1895 						  TupleTableSlot *slot,
1896 						  TupleTableSlot *planSlot)
1897 {
1898 	return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
1899 								  slot, planSlot);
1900 }
1901 
1902 /*
1903  * postgresEndForeignModify
1904  *		Finish an insert/update/delete operation on a foreign table
1905  */
1906 static void
postgresEndForeignModify(EState * estate,ResultRelInfo * resultRelInfo)1907 postgresEndForeignModify(EState *estate,
1908 						 ResultRelInfo *resultRelInfo)
1909 {
1910 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1911 
1912 	/* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1913 	if (fmstate == NULL)
1914 		return;
1915 
1916 	/* Destroy the execution state */
1917 	finish_foreign_modify(fmstate);
1918 }
1919 
1920 /*
1921  * postgresBeginForeignInsert
1922  *		Begin an insert operation on a foreign table
1923  */
1924 static void
postgresBeginForeignInsert(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo)1925 postgresBeginForeignInsert(ModifyTableState *mtstate,
1926 						   ResultRelInfo *resultRelInfo)
1927 {
1928 	PgFdwModifyState *fmstate;
1929 	ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
1930 	EState	   *estate = mtstate->ps.state;
1931 	Index		resultRelation;
1932 	Relation	rel = resultRelInfo->ri_RelationDesc;
1933 	RangeTblEntry *rte;
1934 	TupleDesc	tupdesc = RelationGetDescr(rel);
1935 	int			attnum;
1936 	StringInfoData sql;
1937 	List	   *targetAttrs = NIL;
1938 	List	   *retrieved_attrs = NIL;
1939 	bool		doNothing = false;
1940 
1941 	/*
1942 	 * If the foreign table we are about to insert routed rows into is also an
1943 	 * UPDATE subplan result rel that will be updated later, proceeding with
1944 	 * the INSERT will result in the later UPDATE incorrectly modifying those
1945 	 * routed rows, so prevent the INSERT --- it would be nice if we could
1946 	 * handle this case; but for now, throw an error for safety.
1947 	 */
1948 	if (plan && plan->operation == CMD_UPDATE &&
1949 		(resultRelInfo->ri_usesFdwDirectModify ||
1950 		 resultRelInfo->ri_FdwState) &&
1951 		resultRelInfo > mtstate->resultRelInfo + mtstate->mt_whichplan)
1952 		ereport(ERROR,
1953 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1954 				 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
1955 						RelationGetRelationName(rel))));
1956 
1957 	initStringInfo(&sql);
1958 
1959 	/* We transmit all columns that are defined in the foreign table. */
1960 	for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1961 	{
1962 		Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1963 
1964 		if (!attr->attisdropped)
1965 			targetAttrs = lappend_int(targetAttrs, attnum);
1966 	}
1967 
1968 	/* Check if we add the ON CONFLICT clause to the remote query. */
1969 	if (plan)
1970 	{
1971 		OnConflictAction onConflictAction = plan->onConflictAction;
1972 
1973 		/* We only support DO NOTHING without an inference specification. */
1974 		if (onConflictAction == ONCONFLICT_NOTHING)
1975 			doNothing = true;
1976 		else if (onConflictAction != ONCONFLICT_NONE)
1977 			elog(ERROR, "unexpected ON CONFLICT specification: %d",
1978 				 (int) onConflictAction);
1979 	}
1980 
1981 	/*
1982 	 * If the foreign table is a partition that doesn't have a corresponding
1983 	 * RTE entry, we need to create a new RTE
1984 	 * describing the foreign table for use by deparseInsertSql and
1985 	 * create_foreign_modify() below, after first copying the parent's RTE and
1986 	 * modifying some fields to describe the foreign partition to work on.
1987 	 * However, if this is invoked by UPDATE, the existing RTE may already
1988 	 * correspond to this partition if it is one of the UPDATE subplan target
1989 	 * rels; in that case, we can just use the existing RTE as-is.
1990 	 */
1991 	if (resultRelInfo->ri_RangeTableIndex == 0)
1992 	{
1993 		ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
1994 
1995 		rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
1996 		rte = copyObject(rte);
1997 		rte->relid = RelationGetRelid(rel);
1998 		rte->relkind = RELKIND_FOREIGN_TABLE;
1999 
2000 		/*
2001 		 * For UPDATE, we must use the RT index of the first subplan target
2002 		 * rel's RTE, because the core code would have built expressions for
2003 		 * the partition, such as RETURNING, using that RT index as varno of
2004 		 * Vars contained in those expressions.
2005 		 */
2006 		if (plan && plan->operation == CMD_UPDATE &&
2007 			rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
2008 			resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2009 		else
2010 			resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2011 	}
2012 	else
2013 	{
2014 		resultRelation = resultRelInfo->ri_RangeTableIndex;
2015 		rte = exec_rt_fetch(resultRelation, estate);
2016 	}
2017 
2018 	/* Construct the SQL command string. */
2019 	deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2020 					 resultRelInfo->ri_WithCheckOptions,
2021 					 resultRelInfo->ri_returningList,
2022 					 &retrieved_attrs);
2023 
2024 	/* Construct an execution state. */
2025 	fmstate = create_foreign_modify(mtstate->ps.state,
2026 									rte,
2027 									resultRelInfo,
2028 									CMD_INSERT,
2029 									NULL,
2030 									sql.data,
2031 									targetAttrs,
2032 									retrieved_attrs != NIL,
2033 									retrieved_attrs);
2034 
2035 	/*
2036 	 * If the given resultRelInfo already has PgFdwModifyState set, it means
2037 	 * the foreign table is an UPDATE subplan result rel; in which case, store
2038 	 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2039 	 */
2040 	if (resultRelInfo->ri_FdwState)
2041 	{
2042 		Assert(plan && plan->operation == CMD_UPDATE);
2043 		Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2044 		((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2045 	}
2046 	else
2047 		resultRelInfo->ri_FdwState = fmstate;
2048 }
2049 
2050 /*
2051  * postgresEndForeignInsert
2052  *		Finish an insert operation on a foreign table
2053  */
2054 static void
postgresEndForeignInsert(EState * estate,ResultRelInfo * resultRelInfo)2055 postgresEndForeignInsert(EState *estate,
2056 						 ResultRelInfo *resultRelInfo)
2057 {
2058 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2059 
2060 	Assert(fmstate != NULL);
2061 
2062 	/*
2063 	 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2064 	 * postgresBeginForeignInsert())
2065 	 */
2066 	if (fmstate->aux_fmstate)
2067 		fmstate = fmstate->aux_fmstate;
2068 
2069 	/* Destroy the execution state */
2070 	finish_foreign_modify(fmstate);
2071 }
2072 
2073 /*
2074  * postgresIsForeignRelUpdatable
2075  *		Determine whether a foreign table supports INSERT, UPDATE and/or
2076  *		DELETE.
2077  */
2078 static int
postgresIsForeignRelUpdatable(Relation rel)2079 postgresIsForeignRelUpdatable(Relation rel)
2080 {
2081 	bool		updatable;
2082 	ForeignTable *table;
2083 	ForeignServer *server;
2084 	ListCell   *lc;
2085 
2086 	/*
2087 	 * By default, all postgres_fdw foreign tables are assumed updatable. This
2088 	 * can be overridden by a per-server setting, which in turn can be
2089 	 * overridden by a per-table setting.
2090 	 */
2091 	updatable = true;
2092 
2093 	table = GetForeignTable(RelationGetRelid(rel));
2094 	server = GetForeignServer(table->serverid);
2095 
2096 	foreach(lc, server->options)
2097 	{
2098 		DefElem    *def = (DefElem *) lfirst(lc);
2099 
2100 		if (strcmp(def->defname, "updatable") == 0)
2101 			updatable = defGetBoolean(def);
2102 	}
2103 	foreach(lc, table->options)
2104 	{
2105 		DefElem    *def = (DefElem *) lfirst(lc);
2106 
2107 		if (strcmp(def->defname, "updatable") == 0)
2108 			updatable = defGetBoolean(def);
2109 	}
2110 
2111 	/*
2112 	 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2113 	 */
2114 	return updatable ?
2115 		(1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2116 }
2117 
2118 /*
2119  * postgresRecheckForeignScan
2120  *		Execute a local join execution plan for a foreign join
2121  */
2122 static bool
postgresRecheckForeignScan(ForeignScanState * node,TupleTableSlot * slot)2123 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2124 {
2125 	Index		scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2126 	PlanState  *outerPlan = outerPlanState(node);
2127 	TupleTableSlot *result;
2128 
2129 	/* For base foreign relations, it suffices to set fdw_recheck_quals */
2130 	if (scanrelid > 0)
2131 		return true;
2132 
2133 	Assert(outerPlan != NULL);
2134 
2135 	/* Execute a local join execution plan */
2136 	result = ExecProcNode(outerPlan);
2137 	if (TupIsNull(result))
2138 		return false;
2139 
2140 	/* Store result in the given slot */
2141 	ExecCopySlot(slot, result);
2142 
2143 	return true;
2144 }
2145 
2146 /*
2147  * postgresPlanDirectModify
2148  *		Consider a direct foreign table modification
2149  *
2150  * Decide whether it is safe to modify a foreign table directly, and if so,
2151  * rewrite subplan accordingly.
2152  */
2153 static bool
postgresPlanDirectModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)2154 postgresPlanDirectModify(PlannerInfo *root,
2155 						 ModifyTable *plan,
2156 						 Index resultRelation,
2157 						 int subplan_index)
2158 {
2159 	CmdType		operation = plan->operation;
2160 	Plan	   *subplan;
2161 	RelOptInfo *foreignrel;
2162 	RangeTblEntry *rte;
2163 	PgFdwRelationInfo *fpinfo;
2164 	Relation	rel;
2165 	StringInfoData sql;
2166 	ForeignScan *fscan;
2167 	List	   *targetAttrs = NIL;
2168 	List	   *remote_exprs;
2169 	List	   *params_list = NIL;
2170 	List	   *returningList = NIL;
2171 	List	   *retrieved_attrs = NIL;
2172 
2173 	/*
2174 	 * Decide whether it is safe to modify a foreign table directly.
2175 	 */
2176 
2177 	/*
2178 	 * The table modification must be an UPDATE or DELETE.
2179 	 */
2180 	if (operation != CMD_UPDATE && operation != CMD_DELETE)
2181 		return false;
2182 
2183 	/*
2184 	 * It's unsafe to modify a foreign table directly if there are any local
2185 	 * joins needed.
2186 	 */
2187 	subplan = (Plan *) list_nth(plan->plans, subplan_index);
2188 	if (!IsA(subplan, ForeignScan))
2189 		return false;
2190 	fscan = (ForeignScan *) subplan;
2191 
2192 	/*
2193 	 * It's unsafe to modify a foreign table directly if there are any quals
2194 	 * that should be evaluated locally.
2195 	 */
2196 	if (subplan->qual != NIL)
2197 		return false;
2198 
2199 	/* Safe to fetch data about the target foreign rel */
2200 	if (fscan->scan.scanrelid == 0)
2201 	{
2202 		foreignrel = find_join_rel(root, fscan->fs_relids);
2203 		/* We should have a rel for this foreign join. */
2204 		Assert(foreignrel);
2205 	}
2206 	else
2207 		foreignrel = root->simple_rel_array[resultRelation];
2208 	rte = root->simple_rte_array[resultRelation];
2209 	fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2210 
2211 	/*
2212 	 * It's unsafe to update a foreign table directly, if any expressions to
2213 	 * assign to the target columns are unsafe to evaluate remotely.
2214 	 */
2215 	if (operation == CMD_UPDATE)
2216 	{
2217 		int			col;
2218 
2219 		/*
2220 		 * We transmit only columns that were explicitly targets of the
2221 		 * UPDATE, so as to avoid unnecessary data transmission.
2222 		 */
2223 		col = -1;
2224 		while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2225 		{
2226 			/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2227 			AttrNumber	attno = col + FirstLowInvalidHeapAttributeNumber;
2228 			TargetEntry *tle;
2229 
2230 			if (attno <= InvalidAttrNumber) /* shouldn't happen */
2231 				elog(ERROR, "system-column update is not supported");
2232 
2233 			tle = get_tle_by_resno(subplan->targetlist, attno);
2234 
2235 			if (!tle)
2236 				elog(ERROR, "attribute number %d not found in subplan targetlist",
2237 					 attno);
2238 
2239 			if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2240 				return false;
2241 
2242 			targetAttrs = lappend_int(targetAttrs, attno);
2243 		}
2244 	}
2245 
2246 	/*
2247 	 * Ok, rewrite subplan so as to modify the foreign table directly.
2248 	 */
2249 	initStringInfo(&sql);
2250 
2251 	/*
2252 	 * Core code already has some lock on each rel being planned, so we can
2253 	 * use NoLock here.
2254 	 */
2255 	rel = table_open(rte->relid, NoLock);
2256 
2257 	/*
2258 	 * Recall the qual clauses that must be evaluated remotely.  (These are
2259 	 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2260 	 * doesn't care.)
2261 	 */
2262 	remote_exprs = fpinfo->final_remote_exprs;
2263 
2264 	/*
2265 	 * Extract the relevant RETURNING list if any.
2266 	 */
2267 	if (plan->returningLists)
2268 	{
2269 		returningList = (List *) list_nth(plan->returningLists, subplan_index);
2270 
2271 		/*
2272 		 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2273 		 * we fetch from the foreign server any Vars specified in RETURNING
2274 		 * that refer not only to the target relation but to non-target
2275 		 * relations.  So we'll deparse them into the RETURNING clause of the
2276 		 * remote query; use a targetlist consisting of them instead, which
2277 		 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2278 		 * node below.
2279 		 */
2280 		if (fscan->scan.scanrelid == 0)
2281 			returningList = build_remote_returning(resultRelation, rel,
2282 												   returningList);
2283 	}
2284 
2285 	/*
2286 	 * Construct the SQL command string.
2287 	 */
2288 	switch (operation)
2289 	{
2290 		case CMD_UPDATE:
2291 			deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2292 								   foreignrel,
2293 								   ((Plan *) fscan)->targetlist,
2294 								   targetAttrs,
2295 								   remote_exprs, &params_list,
2296 								   returningList, &retrieved_attrs);
2297 			break;
2298 		case CMD_DELETE:
2299 			deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2300 								   foreignrel,
2301 								   remote_exprs, &params_list,
2302 								   returningList, &retrieved_attrs);
2303 			break;
2304 		default:
2305 			elog(ERROR, "unexpected operation: %d", (int) operation);
2306 			break;
2307 	}
2308 
2309 	/*
2310 	 * Update the operation info.
2311 	 */
2312 	fscan->operation = operation;
2313 
2314 	/*
2315 	 * Update the fdw_exprs list that will be available to the executor.
2316 	 */
2317 	fscan->fdw_exprs = params_list;
2318 
2319 	/*
2320 	 * Update the fdw_private list that will be available to the executor.
2321 	 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2322 	 */
2323 	fscan->fdw_private = list_make4(makeString(sql.data),
2324 									makeInteger((retrieved_attrs != NIL)),
2325 									retrieved_attrs,
2326 									makeInteger(plan->canSetTag));
2327 
2328 	/*
2329 	 * Update the foreign-join-related fields.
2330 	 */
2331 	if (fscan->scan.scanrelid == 0)
2332 	{
2333 		/* No need for the outer subplan. */
2334 		fscan->scan.plan.lefttree = NULL;
2335 
2336 		/* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2337 		if (returningList)
2338 			rebuild_fdw_scan_tlist(fscan, returningList);
2339 	}
2340 
2341 	table_close(rel, NoLock);
2342 	return true;
2343 }
2344 
2345 /*
2346  * postgresBeginDirectModify
2347  *		Prepare a direct foreign table modification
2348  */
2349 static void
postgresBeginDirectModify(ForeignScanState * node,int eflags)2350 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2351 {
2352 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2353 	EState	   *estate = node->ss.ps.state;
2354 	PgFdwDirectModifyState *dmstate;
2355 	Index		rtindex;
2356 	RangeTblEntry *rte;
2357 	Oid			userid;
2358 	ForeignTable *table;
2359 	UserMapping *user;
2360 	int			numParams;
2361 
2362 	/*
2363 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
2364 	 */
2365 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2366 		return;
2367 
2368 	/*
2369 	 * We'll save private state in node->fdw_state.
2370 	 */
2371 	dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2372 	node->fdw_state = (void *) dmstate;
2373 
2374 	/*
2375 	 * Identify which user to do the remote access as.  This should match what
2376 	 * ExecCheckRTEPerms() does.
2377 	 */
2378 	rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
2379 	rte = exec_rt_fetch(rtindex, estate);
2380 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2381 
2382 	/* Get info about foreign table. */
2383 	if (fsplan->scan.scanrelid == 0)
2384 		dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2385 	else
2386 		dmstate->rel = node->ss.ss_currentRelation;
2387 	table = GetForeignTable(RelationGetRelid(dmstate->rel));
2388 	user = GetUserMapping(userid, table->serverid);
2389 
2390 	/*
2391 	 * Get connection to the foreign server.  Connection manager will
2392 	 * establish new connection if necessary.
2393 	 */
2394 	dmstate->conn = GetConnection(user, false);
2395 
2396 	/* Update the foreign-join-related fields. */
2397 	if (fsplan->scan.scanrelid == 0)
2398 	{
2399 		/* Save info about foreign table. */
2400 		dmstate->resultRel = dmstate->rel;
2401 
2402 		/*
2403 		 * Set dmstate->rel to NULL to teach get_returning_data() and
2404 		 * make_tuple_from_result_row() that columns fetched from the remote
2405 		 * server are described by fdw_scan_tlist of the foreign-scan plan
2406 		 * node, not the tuple descriptor for the target relation.
2407 		 */
2408 		dmstate->rel = NULL;
2409 	}
2410 
2411 	/* Initialize state variable */
2412 	dmstate->num_tuples = -1;	/* -1 means not set yet */
2413 
2414 	/* Get private info created by planner functions. */
2415 	dmstate->query = strVal(list_nth(fsplan->fdw_private,
2416 									 FdwDirectModifyPrivateUpdateSql));
2417 	dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2418 											 FdwDirectModifyPrivateHasReturning));
2419 	dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2420 												 FdwDirectModifyPrivateRetrievedAttrs);
2421 	dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2422 											 FdwDirectModifyPrivateSetProcessed));
2423 
2424 	/* Create context for per-tuple temp workspace. */
2425 	dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2426 											  "postgres_fdw temporary data",
2427 											  ALLOCSET_SMALL_SIZES);
2428 
2429 	/* Prepare for input conversion of RETURNING results. */
2430 	if (dmstate->has_returning)
2431 	{
2432 		TupleDesc	tupdesc;
2433 
2434 		if (fsplan->scan.scanrelid == 0)
2435 			tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2436 		else
2437 			tupdesc = RelationGetDescr(dmstate->rel);
2438 
2439 		dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2440 
2441 		/*
2442 		 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2443 		 * initialize a filter to extract an updated/deleted tuple from a scan
2444 		 * tuple.
2445 		 */
2446 		if (fsplan->scan.scanrelid == 0)
2447 			init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2448 	}
2449 
2450 	/*
2451 	 * Prepare for processing of parameters used in remote query, if any.
2452 	 */
2453 	numParams = list_length(fsplan->fdw_exprs);
2454 	dmstate->numParams = numParams;
2455 	if (numParams > 0)
2456 		prepare_query_params((PlanState *) node,
2457 							 fsplan->fdw_exprs,
2458 							 numParams,
2459 							 &dmstate->param_flinfo,
2460 							 &dmstate->param_exprs,
2461 							 &dmstate->param_values);
2462 }
2463 
2464 /*
2465  * postgresIterateDirectModify
2466  *		Execute a direct foreign table modification
2467  */
2468 static TupleTableSlot *
postgresIterateDirectModify(ForeignScanState * node)2469 postgresIterateDirectModify(ForeignScanState *node)
2470 {
2471 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2472 	EState	   *estate = node->ss.ps.state;
2473 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2474 
2475 	/*
2476 	 * If this is the first call after Begin, execute the statement.
2477 	 */
2478 	if (dmstate->num_tuples == -1)
2479 		execute_dml_stmt(node);
2480 
2481 	/*
2482 	 * If the local query doesn't specify RETURNING, just clear tuple slot.
2483 	 */
2484 	if (!resultRelInfo->ri_projectReturning)
2485 	{
2486 		TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2487 		Instrumentation *instr = node->ss.ps.instrument;
2488 
2489 		Assert(!dmstate->has_returning);
2490 
2491 		/* Increment the command es_processed count if necessary. */
2492 		if (dmstate->set_processed)
2493 			estate->es_processed += dmstate->num_tuples;
2494 
2495 		/* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2496 		if (instr)
2497 			instr->tuplecount += dmstate->num_tuples;
2498 
2499 		return ExecClearTuple(slot);
2500 	}
2501 
2502 	/*
2503 	 * Get the next RETURNING tuple.
2504 	 */
2505 	return get_returning_data(node);
2506 }
2507 
2508 /*
2509  * postgresEndDirectModify
2510  *		Finish a direct foreign table modification
2511  */
2512 static void
postgresEndDirectModify(ForeignScanState * node)2513 postgresEndDirectModify(ForeignScanState *node)
2514 {
2515 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2516 
2517 	/* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2518 	if (dmstate == NULL)
2519 		return;
2520 
2521 	/* Release PGresult */
2522 	if (dmstate->result)
2523 		PQclear(dmstate->result);
2524 
2525 	/* Release remote connection */
2526 	ReleaseConnection(dmstate->conn);
2527 	dmstate->conn = NULL;
2528 
2529 	/* MemoryContext will be deleted automatically. */
2530 }
2531 
2532 /*
2533  * postgresExplainForeignScan
2534  *		Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2535  */
2536 static void
postgresExplainForeignScan(ForeignScanState * node,ExplainState * es)2537 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2538 {
2539 	List	   *fdw_private;
2540 	char	   *sql;
2541 	char	   *relations;
2542 
2543 	fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2544 
2545 	/*
2546 	 * Add names of relation handled by the foreign scan when the scan is a
2547 	 * join
2548 	 */
2549 	if (list_length(fdw_private) > FdwScanPrivateRelations)
2550 	{
2551 		relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2552 		ExplainPropertyText("Relations", relations, es);
2553 	}
2554 
2555 	/*
2556 	 * Add remote query, when VERBOSE option is specified.
2557 	 */
2558 	if (es->verbose)
2559 	{
2560 		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2561 		ExplainPropertyText("Remote SQL", sql, es);
2562 	}
2563 }
2564 
2565 /*
2566  * postgresExplainForeignModify
2567  *		Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2568  */
2569 static void
postgresExplainForeignModify(ModifyTableState * mtstate,ResultRelInfo * rinfo,List * fdw_private,int subplan_index,ExplainState * es)2570 postgresExplainForeignModify(ModifyTableState *mtstate,
2571 							 ResultRelInfo *rinfo,
2572 							 List *fdw_private,
2573 							 int subplan_index,
2574 							 ExplainState *es)
2575 {
2576 	if (es->verbose)
2577 	{
2578 		char	   *sql = strVal(list_nth(fdw_private,
2579 										  FdwModifyPrivateUpdateSql));
2580 
2581 		ExplainPropertyText("Remote SQL", sql, es);
2582 	}
2583 }
2584 
2585 /*
2586  * postgresExplainDirectModify
2587  *		Produce extra output for EXPLAIN of a ForeignScan that modifies a
2588  *		foreign table directly
2589  */
2590 static void
postgresExplainDirectModify(ForeignScanState * node,ExplainState * es)2591 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2592 {
2593 	List	   *fdw_private;
2594 	char	   *sql;
2595 
2596 	if (es->verbose)
2597 	{
2598 		fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2599 		sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2600 		ExplainPropertyText("Remote SQL", sql, es);
2601 	}
2602 }
2603 
2604 
2605 /*
2606  * estimate_path_cost_size
2607  *		Get cost and size estimates for a foreign scan on given foreign relation
2608  *		either a base relation or a join between foreign relations or an upper
2609  *		relation containing foreign relations.
2610  *
2611  * param_join_conds are the parameterization clauses with outer relations.
2612  * pathkeys specify the expected sort order if any for given path being costed.
2613  * fpextra specifies additional post-scan/join-processing steps such as the
2614  * final sort and the LIMIT restriction.
2615  *
2616  * The function returns the cost and size estimates in p_rows, p_width,
2617  * p_startup_cost and p_total_cost variables.
2618  */
2619 static void
estimate_path_cost_size(PlannerInfo * root,RelOptInfo * foreignrel,List * param_join_conds,List * pathkeys,PgFdwPathExtraData * fpextra,double * p_rows,int * p_width,Cost * p_startup_cost,Cost * p_total_cost)2620 estimate_path_cost_size(PlannerInfo *root,
2621 						RelOptInfo *foreignrel,
2622 						List *param_join_conds,
2623 						List *pathkeys,
2624 						PgFdwPathExtraData *fpextra,
2625 						double *p_rows, int *p_width,
2626 						Cost *p_startup_cost, Cost *p_total_cost)
2627 {
2628 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2629 	double		rows;
2630 	double		retrieved_rows;
2631 	int			width;
2632 	Cost		startup_cost;
2633 	Cost		total_cost;
2634 
2635 	/* Make sure the core code has set up the relation's reltarget */
2636 	Assert(foreignrel->reltarget);
2637 
2638 	/*
2639 	 * If the table or the server is configured to use remote estimates,
2640 	 * connect to the foreign server and execute EXPLAIN to estimate the
2641 	 * number of rows selected by the restriction+join clauses.  Otherwise,
2642 	 * estimate rows using whatever statistics we have locally, in a way
2643 	 * similar to ordinary tables.
2644 	 */
2645 	if (fpinfo->use_remote_estimate)
2646 	{
2647 		List	   *remote_param_join_conds;
2648 		List	   *local_param_join_conds;
2649 		StringInfoData sql;
2650 		PGconn	   *conn;
2651 		Selectivity local_sel;
2652 		QualCost	local_cost;
2653 		List	   *fdw_scan_tlist = NIL;
2654 		List	   *remote_conds;
2655 
2656 		/* Required only to be passed to deparseSelectStmtForRel */
2657 		List	   *retrieved_attrs;
2658 
2659 		/*
2660 		 * param_join_conds might contain both clauses that are safe to send
2661 		 * across, and clauses that aren't.
2662 		 */
2663 		classifyConditions(root, foreignrel, param_join_conds,
2664 						   &remote_param_join_conds, &local_param_join_conds);
2665 
2666 		/* Build the list of columns to be fetched from the foreign server. */
2667 		if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2668 			fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2669 		else
2670 			fdw_scan_tlist = NIL;
2671 
2672 		/*
2673 		 * The complete list of remote conditions includes everything from
2674 		 * baserestrictinfo plus any extra join_conds relevant to this
2675 		 * particular path.
2676 		 */
2677 		remote_conds = list_concat(list_copy(remote_param_join_conds),
2678 								   fpinfo->remote_conds);
2679 
2680 		/*
2681 		 * Construct EXPLAIN query including the desired SELECT, FROM, and
2682 		 * WHERE clauses. Params and other-relation Vars are replaced by dummy
2683 		 * values, so don't request params_list.
2684 		 */
2685 		initStringInfo(&sql);
2686 		appendStringInfoString(&sql, "EXPLAIN ");
2687 		deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2688 								remote_conds, pathkeys,
2689 								fpextra ? fpextra->has_final_sort : false,
2690 								fpextra ? fpextra->has_limit : false,
2691 								false, &retrieved_attrs, NULL);
2692 
2693 		/* Get the remote estimate */
2694 		conn = GetConnection(fpinfo->user, false);
2695 		get_remote_estimate(sql.data, conn, &rows, &width,
2696 							&startup_cost, &total_cost);
2697 		ReleaseConnection(conn);
2698 
2699 		retrieved_rows = rows;
2700 
2701 		/* Factor in the selectivity of the locally-checked quals */
2702 		local_sel = clauselist_selectivity(root,
2703 										   local_param_join_conds,
2704 										   foreignrel->relid,
2705 										   JOIN_INNER,
2706 										   NULL);
2707 		local_sel *= fpinfo->local_conds_sel;
2708 
2709 		rows = clamp_row_est(rows * local_sel);
2710 
2711 		/* Add in the eval cost of the locally-checked quals */
2712 		startup_cost += fpinfo->local_conds_cost.startup;
2713 		total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2714 		cost_qual_eval(&local_cost, local_param_join_conds, root);
2715 		startup_cost += local_cost.startup;
2716 		total_cost += local_cost.per_tuple * retrieved_rows;
2717 
2718 		/*
2719 		 * Add in tlist eval cost for each output row.  In case of an
2720 		 * aggregate, some of the tlist expressions such as grouping
2721 		 * expressions will be evaluated remotely, so adjust the costs.
2722 		 */
2723 		startup_cost += foreignrel->reltarget->cost.startup;
2724 		total_cost += foreignrel->reltarget->cost.startup;
2725 		total_cost += foreignrel->reltarget->cost.per_tuple * rows;
2726 		if (IS_UPPER_REL(foreignrel))
2727 		{
2728 			QualCost	tlist_cost;
2729 
2730 			cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
2731 			startup_cost -= tlist_cost.startup;
2732 			total_cost -= tlist_cost.startup;
2733 			total_cost -= tlist_cost.per_tuple * rows;
2734 		}
2735 	}
2736 	else
2737 	{
2738 		Cost		run_cost = 0;
2739 
2740 		/*
2741 		 * We don't support join conditions in this mode (hence, no
2742 		 * parameterized paths can be made).
2743 		 */
2744 		Assert(param_join_conds == NIL);
2745 
2746 		/*
2747 		 * We will come here again and again with different set of pathkeys or
2748 		 * additional post-scan/join-processing steps that caller wants to
2749 		 * cost.  We don't need to calculate the cost/size estimates for the
2750 		 * underlying scan, join, or grouping each time.  Instead, use those
2751 		 * estimates if we have cached them already.
2752 		 */
2753 		if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
2754 		{
2755 			Assert(fpinfo->retrieved_rows >= 1);
2756 
2757 			rows = fpinfo->rows;
2758 			retrieved_rows = fpinfo->retrieved_rows;
2759 			width = fpinfo->width;
2760 			startup_cost = fpinfo->rel_startup_cost;
2761 			run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2762 
2763 			/*
2764 			 * If we estimate the costs of a foreign scan or a foreign join
2765 			 * with additional post-scan/join-processing steps, the scan or
2766 			 * join costs obtained from the cache wouldn't yet contain the
2767 			 * eval costs for the final scan/join target, which would've been
2768 			 * updated by apply_scanjoin_target_to_paths(); add the eval costs
2769 			 * now.
2770 			 */
2771 			if (fpextra && !IS_UPPER_REL(foreignrel))
2772 			{
2773 				/* Shouldn't get here unless we have LIMIT */
2774 				Assert(fpextra->has_limit);
2775 				Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
2776 					   foreignrel->reloptkind == RELOPT_JOINREL);
2777 				startup_cost += foreignrel->reltarget->cost.startup;
2778 				run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2779 			}
2780 		}
2781 		else if (IS_JOIN_REL(foreignrel))
2782 		{
2783 			PgFdwRelationInfo *fpinfo_i;
2784 			PgFdwRelationInfo *fpinfo_o;
2785 			QualCost	join_cost;
2786 			QualCost	remote_conds_cost;
2787 			double		nrows;
2788 
2789 			/* Use rows/width estimates made by the core code. */
2790 			rows = foreignrel->rows;
2791 			width = foreignrel->reltarget->width;
2792 
2793 			/* For join we expect inner and outer relations set */
2794 			Assert(fpinfo->innerrel && fpinfo->outerrel);
2795 
2796 			fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2797 			fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2798 
2799 			/* Estimate of number of rows in cross product */
2800 			nrows = fpinfo_i->rows * fpinfo_o->rows;
2801 
2802 			/*
2803 			 * Back into an estimate of the number of retrieved rows.  Just in
2804 			 * case this is nuts, clamp to at most nrow.
2805 			 */
2806 			retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2807 			retrieved_rows = Min(retrieved_rows, nrows);
2808 
2809 			/*
2810 			 * The cost of foreign join is estimated as cost of generating
2811 			 * rows for the joining relations + cost for applying quals on the
2812 			 * rows.
2813 			 */
2814 
2815 			/*
2816 			 * Calculate the cost of clauses pushed down to the foreign server
2817 			 */
2818 			cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2819 			/* Calculate the cost of applying join clauses */
2820 			cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2821 
2822 			/*
2823 			 * Startup cost includes startup cost of joining relations and the
2824 			 * startup cost for join and other clauses. We do not include the
2825 			 * startup cost specific to join strategy (e.g. setting up hash
2826 			 * tables) since we do not know what strategy the foreign server
2827 			 * is going to use.
2828 			 */
2829 			startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2830 			startup_cost += join_cost.startup;
2831 			startup_cost += remote_conds_cost.startup;
2832 			startup_cost += fpinfo->local_conds_cost.startup;
2833 
2834 			/*
2835 			 * Run time cost includes:
2836 			 *
2837 			 * 1. Run time cost (total_cost - startup_cost) of relations being
2838 			 * joined
2839 			 *
2840 			 * 2. Run time cost of applying join clauses on the cross product
2841 			 * of the joining relations.
2842 			 *
2843 			 * 3. Run time cost of applying pushed down other clauses on the
2844 			 * result of join
2845 			 *
2846 			 * 4. Run time cost of applying nonpushable other clauses locally
2847 			 * on the result fetched from the foreign server.
2848 			 */
2849 			run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2850 			run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2851 			run_cost += nrows * join_cost.per_tuple;
2852 			nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2853 			run_cost += nrows * remote_conds_cost.per_tuple;
2854 			run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2855 
2856 			/* Add in tlist eval cost for each output row */
2857 			startup_cost += foreignrel->reltarget->cost.startup;
2858 			run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2859 		}
2860 		else if (IS_UPPER_REL(foreignrel))
2861 		{
2862 			RelOptInfo *outerrel = fpinfo->outerrel;
2863 			PgFdwRelationInfo *ofpinfo;
2864 			AggClauseCosts aggcosts;
2865 			double		input_rows;
2866 			int			numGroupCols;
2867 			double		numGroups = 1;
2868 
2869 			/* The upper relation should have its outer relation set */
2870 			Assert(outerrel);
2871 			/* and that outer relation should have its reltarget set */
2872 			Assert(outerrel->reltarget);
2873 
2874 			/*
2875 			 * This cost model is mixture of costing done for sorted and
2876 			 * hashed aggregates in cost_agg().  We are not sure which
2877 			 * strategy will be considered at remote side, thus for
2878 			 * simplicity, we put all startup related costs in startup_cost
2879 			 * and all finalization and run cost are added in total_cost.
2880 			 */
2881 
2882 			ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
2883 
2884 			/* Get rows from input rel */
2885 			input_rows = ofpinfo->rows;
2886 
2887 			/* Collect statistics about aggregates for estimating costs. */
2888 			MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2889 			if (root->parse->hasAggs)
2890 			{
2891 				get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2892 									 AGGSPLIT_SIMPLE, &aggcosts);
2893 
2894 				/*
2895 				 * The cost of aggregates in the HAVING qual will be the same
2896 				 * for each child as it is for the parent, so there's no need
2897 				 * to use a translated version of havingQual.
2898 				 */
2899 				get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2900 									 AGGSPLIT_SIMPLE, &aggcosts);
2901 			}
2902 
2903 			/* Get number of grouping columns and possible number of groups */
2904 			numGroupCols = list_length(root->parse->groupClause);
2905 			numGroups = estimate_num_groups(root,
2906 											get_sortgrouplist_exprs(root->parse->groupClause,
2907 																	fpinfo->grouped_tlist),
2908 											input_rows, NULL);
2909 
2910 			/*
2911 			 * Get the retrieved_rows and rows estimates.  If there are HAVING
2912 			 * quals, account for their selectivity.
2913 			 */
2914 			if (root->parse->havingQual)
2915 			{
2916 				/* Factor in the selectivity of the remotely-checked quals */
2917 				retrieved_rows =
2918 					clamp_row_est(numGroups *
2919 								  clauselist_selectivity(root,
2920 														 fpinfo->remote_conds,
2921 														 0,
2922 														 JOIN_INNER,
2923 														 NULL));
2924 				/* Factor in the selectivity of the locally-checked quals */
2925 				rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
2926 			}
2927 			else
2928 			{
2929 				rows = retrieved_rows = numGroups;
2930 			}
2931 
2932 			/* Use width estimate made by the core code. */
2933 			width = foreignrel->reltarget->width;
2934 
2935 			/*-----
2936 			 * Startup cost includes:
2937 			 *	  1. Startup cost for underneath input relation, adjusted for
2938 			 *	     tlist replacement by apply_scanjoin_target_to_paths()
2939 			 *	  2. Cost of performing aggregation, per cost_agg()
2940 			 *-----
2941 			 */
2942 			startup_cost = ofpinfo->rel_startup_cost;
2943 			startup_cost += outerrel->reltarget->cost.startup;
2944 			startup_cost += aggcosts.transCost.startup;
2945 			startup_cost += aggcosts.transCost.per_tuple * input_rows;
2946 			startup_cost += aggcosts.finalCost.startup;
2947 			startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2948 
2949 			/*-----
2950 			 * Run time cost includes:
2951 			 *	  1. Run time cost of underneath input relation, adjusted for
2952 			 *	     tlist replacement by apply_scanjoin_target_to_paths()
2953 			 *	  2. Run time cost of performing aggregation, per cost_agg()
2954 			 *-----
2955 			 */
2956 			run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2957 			run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
2958 			run_cost += aggcosts.finalCost.per_tuple * numGroups;
2959 			run_cost += cpu_tuple_cost * numGroups;
2960 
2961 			/* Account for the eval cost of HAVING quals, if any */
2962 			if (root->parse->havingQual)
2963 			{
2964 				QualCost	remote_cost;
2965 
2966 				/* Add in the eval cost of the remotely-checked quals */
2967 				cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
2968 				startup_cost += remote_cost.startup;
2969 				run_cost += remote_cost.per_tuple * numGroups;
2970 				/* Add in the eval cost of the locally-checked quals */
2971 				startup_cost += fpinfo->local_conds_cost.startup;
2972 				run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2973 			}
2974 
2975 			/* Add in tlist eval cost for each output row */
2976 			startup_cost += foreignrel->reltarget->cost.startup;
2977 			run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2978 		}
2979 		else
2980 		{
2981 			Cost		cpu_per_tuple;
2982 
2983 			/* Use rows/width estimates made by set_baserel_size_estimates. */
2984 			rows = foreignrel->rows;
2985 			width = foreignrel->reltarget->width;
2986 
2987 			/*
2988 			 * Back into an estimate of the number of retrieved rows.  Just in
2989 			 * case this is nuts, clamp to at most foreignrel->tuples.
2990 			 */
2991 			retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2992 			retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2993 
2994 			/*
2995 			 * Cost as though this were a seqscan, which is pessimistic.  We
2996 			 * effectively imagine the local_conds are being evaluated
2997 			 * remotely, too.
2998 			 */
2999 			startup_cost = 0;
3000 			run_cost = 0;
3001 			run_cost += seq_page_cost * foreignrel->pages;
3002 
3003 			startup_cost += foreignrel->baserestrictcost.startup;
3004 			cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3005 			run_cost += cpu_per_tuple * foreignrel->tuples;
3006 
3007 			/* Add in tlist eval cost for each output row */
3008 			startup_cost += foreignrel->reltarget->cost.startup;
3009 			run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3010 		}
3011 
3012 		/*
3013 		 * Without remote estimates, we have no real way to estimate the cost
3014 		 * of generating sorted output.  It could be free if the query plan
3015 		 * the remote side would have chosen generates properly-sorted output
3016 		 * anyway, but in most cases it will cost something.  Estimate a value
3017 		 * high enough that we won't pick the sorted path when the ordering
3018 		 * isn't locally useful, but low enough that we'll err on the side of
3019 		 * pushing down the ORDER BY clause when it's useful to do so.
3020 		 */
3021 		if (pathkeys != NIL)
3022 		{
3023 			if (IS_UPPER_REL(foreignrel))
3024 			{
3025 				Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3026 					   fpinfo->stage == UPPERREL_GROUP_AGG);
3027 				adjust_foreign_grouping_path_cost(root, pathkeys,
3028 												  retrieved_rows, width,
3029 												  fpextra->limit_tuples,
3030 												  &startup_cost, &run_cost);
3031 			}
3032 			else
3033 			{
3034 				startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3035 				run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3036 			}
3037 		}
3038 
3039 		total_cost = startup_cost + run_cost;
3040 
3041 		/* Adjust the cost estimates if we have LIMIT */
3042 		if (fpextra && fpextra->has_limit)
3043 		{
3044 			adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3045 									fpextra->offset_est, fpextra->count_est);
3046 			retrieved_rows = rows;
3047 		}
3048 	}
3049 
3050 	/*
3051 	 * If this includes the final sort step, the given target, which will be
3052 	 * applied to the resulting path, might have different expressions from
3053 	 * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3054 	 * eval costs.
3055 	 */
3056 	if (fpextra && fpextra->has_final_sort &&
3057 		fpextra->target != foreignrel->reltarget)
3058 	{
3059 		QualCost	oldcost = foreignrel->reltarget->cost;
3060 		QualCost	newcost = fpextra->target->cost;
3061 
3062 		startup_cost += newcost.startup - oldcost.startup;
3063 		total_cost += newcost.startup - oldcost.startup;
3064 		total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3065 	}
3066 
3067 	/*
3068 	 * Cache the retrieved rows and cost estimates for scans, joins, or
3069 	 * groupings without any parameterization, pathkeys, or additional
3070 	 * post-scan/join-processing steps, before adding the costs for
3071 	 * transferring data from the foreign server.  These estimates are useful
3072 	 * for costing remote joins involving this relation or costing other
3073 	 * remote operations on this relation such as remote sorts and remote
3074 	 * LIMIT restrictions, when the costs can not be obtained from the foreign
3075 	 * server.  This function will be called at least once for every foreign
3076 	 * relation without any parameterization, pathkeys, or additional
3077 	 * post-scan/join-processing steps.
3078 	 */
3079 	if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3080 	{
3081 		fpinfo->retrieved_rows = retrieved_rows;
3082 		fpinfo->rel_startup_cost = startup_cost;
3083 		fpinfo->rel_total_cost = total_cost;
3084 	}
3085 
3086 	/*
3087 	 * Add some additional cost factors to account for connection overhead
3088 	 * (fdw_startup_cost), transferring data across the network
3089 	 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3090 	 * (cpu_tuple_cost per retrieved row).
3091 	 */
3092 	startup_cost += fpinfo->fdw_startup_cost;
3093 	total_cost += fpinfo->fdw_startup_cost;
3094 	total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3095 	total_cost += cpu_tuple_cost * retrieved_rows;
3096 
3097 	/*
3098 	 * If we have LIMIT, we should prefer performing the restriction remotely
3099 	 * rather than locally, as the former avoids extra row fetches from the
3100 	 * remote that the latter might cause.  But since the core code doesn't
3101 	 * account for such fetches when estimating the costs of the local
3102 	 * restriction (see create_limit_path()), there would be no difference
3103 	 * between the costs of the local restriction and the costs of the remote
3104 	 * restriction estimated above if we don't use remote estimates (except
3105 	 * for the case where the foreignrel is a grouping relation, the given
3106 	 * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3107 	 * accounted for in costing the remote restriction).  Tweak the costs of
3108 	 * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3109 	 * one.
3110 	 */
3111 	if (!fpinfo->use_remote_estimate &&
3112 		fpextra && fpextra->has_limit &&
3113 		fpextra->limit_tuples > 0 &&
3114 		fpextra->limit_tuples < fpinfo->rows)
3115 	{
3116 		Assert(fpinfo->rows > 0);
3117 		total_cost -= (total_cost - startup_cost) * 0.05 *
3118 			(fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3119 	}
3120 
3121 	/* Return results. */
3122 	*p_rows = rows;
3123 	*p_width = width;
3124 	*p_startup_cost = startup_cost;
3125 	*p_total_cost = total_cost;
3126 }
3127 
3128 /*
3129  * Estimate costs of executing a SQL statement remotely.
3130  * The given "sql" must be an EXPLAIN command.
3131  */
3132 static void
get_remote_estimate(const char * sql,PGconn * conn,double * rows,int * width,Cost * startup_cost,Cost * total_cost)3133 get_remote_estimate(const char *sql, PGconn *conn,
3134 					double *rows, int *width,
3135 					Cost *startup_cost, Cost *total_cost)
3136 {
3137 	PGresult   *volatile res = NULL;
3138 
3139 	/* PGresult must be released before leaving this function. */
3140 	PG_TRY();
3141 	{
3142 		char	   *line;
3143 		char	   *p;
3144 		int			n;
3145 
3146 		/*
3147 		 * Execute EXPLAIN remotely.
3148 		 */
3149 		res = pgfdw_exec_query(conn, sql);
3150 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3151 			pgfdw_report_error(ERROR, res, conn, false, sql);
3152 
3153 		/*
3154 		 * Extract cost numbers for topmost plan node.  Note we search for a
3155 		 * left paren from the end of the line to avoid being confused by
3156 		 * other uses of parentheses.
3157 		 */
3158 		line = PQgetvalue(res, 0, 0);
3159 		p = strrchr(line, '(');
3160 		if (p == NULL)
3161 			elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3162 		n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3163 				   startup_cost, total_cost, rows, width);
3164 		if (n != 4)
3165 			elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3166 
3167 		PQclear(res);
3168 		res = NULL;
3169 	}
3170 	PG_CATCH();
3171 	{
3172 		if (res)
3173 			PQclear(res);
3174 		PG_RE_THROW();
3175 	}
3176 	PG_END_TRY();
3177 }
3178 
3179 /*
3180  * Adjust the cost estimates of a foreign grouping path to include the cost of
3181  * generating properly-sorted output.
3182  */
3183 static void
adjust_foreign_grouping_path_cost(PlannerInfo * root,List * pathkeys,double retrieved_rows,double width,double limit_tuples,Cost * p_startup_cost,Cost * p_run_cost)3184 adjust_foreign_grouping_path_cost(PlannerInfo *root,
3185 								  List *pathkeys,
3186 								  double retrieved_rows,
3187 								  double width,
3188 								  double limit_tuples,
3189 								  Cost *p_startup_cost,
3190 								  Cost *p_run_cost)
3191 {
3192 	/*
3193 	 * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3194 	 * side is unlikely to generate properly-sorted output, so it would need
3195 	 * an explicit sort; adjust the given costs with cost_sort().  Likewise,
3196 	 * if the GROUP BY clause is sort-able but isn't a superset of the given
3197 	 * pathkeys, adjust the costs with that function.  Otherwise, adjust the
3198 	 * costs by applying the same heuristic as for the scan or join case.
3199 	 */
3200 	if (!grouping_is_sortable(root->parse->groupClause) ||
3201 		!pathkeys_contained_in(pathkeys, root->group_pathkeys))
3202 	{
3203 		Path		sort_path;	/* dummy for result of cost_sort */
3204 
3205 		cost_sort(&sort_path,
3206 				  root,
3207 				  pathkeys,
3208 				  *p_startup_cost + *p_run_cost,
3209 				  retrieved_rows,
3210 				  width,
3211 				  0.0,
3212 				  work_mem,
3213 				  limit_tuples);
3214 
3215 		*p_startup_cost = sort_path.startup_cost;
3216 		*p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3217 	}
3218 	else
3219 	{
3220 		/*
3221 		 * The default extra cost seems too large for foreign-grouping cases;
3222 		 * add 1/4th of that default.
3223 		 */
3224 		double		sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3225 											 - 1.0) * 0.25;
3226 
3227 		*p_startup_cost *= sort_multiplier;
3228 		*p_run_cost *= sort_multiplier;
3229 	}
3230 }
3231 
3232 /*
3233  * Detect whether we want to process an EquivalenceClass member.
3234  *
3235  * This is a callback for use by generate_implied_equalities_for_column.
3236  */
3237 static bool
ec_member_matches_foreign(PlannerInfo * root,RelOptInfo * rel,EquivalenceClass * ec,EquivalenceMember * em,void * arg)3238 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
3239 						  EquivalenceClass *ec, EquivalenceMember *em,
3240 						  void *arg)
3241 {
3242 	ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
3243 	Expr	   *expr = em->em_expr;
3244 
3245 	/*
3246 	 * If we've identified what we're processing in the current scan, we only
3247 	 * want to match that expression.
3248 	 */
3249 	if (state->current != NULL)
3250 		return equal(expr, state->current);
3251 
3252 	/*
3253 	 * Otherwise, ignore anything we've already processed.
3254 	 */
3255 	if (list_member(state->already_used, expr))
3256 		return false;
3257 
3258 	/* This is the new target to process. */
3259 	state->current = expr;
3260 	return true;
3261 }
3262 
3263 /*
3264  * Create cursor for node's query with current parameter values.
3265  */
3266 static void
create_cursor(ForeignScanState * node)3267 create_cursor(ForeignScanState *node)
3268 {
3269 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3270 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
3271 	int			numParams = fsstate->numParams;
3272 	const char **values = fsstate->param_values;
3273 	PGconn	   *conn = fsstate->conn;
3274 	StringInfoData buf;
3275 	PGresult   *res;
3276 
3277 	/*
3278 	 * Construct array of query parameter values in text format.  We do the
3279 	 * conversions in the short-lived per-tuple context, so as not to cause a
3280 	 * memory leak over repeated scans.
3281 	 */
3282 	if (numParams > 0)
3283 	{
3284 		MemoryContext oldcontext;
3285 
3286 		oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3287 
3288 		process_query_params(econtext,
3289 							 fsstate->param_flinfo,
3290 							 fsstate->param_exprs,
3291 							 values);
3292 
3293 		MemoryContextSwitchTo(oldcontext);
3294 	}
3295 
3296 	/* Construct the DECLARE CURSOR command */
3297 	initStringInfo(&buf);
3298 	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3299 					 fsstate->cursor_number, fsstate->query);
3300 
3301 	/*
3302 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3303 	 * to infer types for all parameters.  Since we explicitly cast every
3304 	 * parameter (see deparse.c), the "inference" is trivial and will produce
3305 	 * the desired result.  This allows us to avoid assuming that the remote
3306 	 * server has the same OIDs we do for the parameters' types.
3307 	 */
3308 	if (!PQsendQueryParams(conn, buf.data, numParams,
3309 						   NULL, values, NULL, NULL, 0))
3310 		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3311 
3312 	/*
3313 	 * Get the result, and check for success.
3314 	 *
3315 	 * We don't use a PG_TRY block here, so be careful not to throw error
3316 	 * without releasing the PGresult.
3317 	 */
3318 	res = pgfdw_get_result(conn, buf.data);
3319 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3320 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3321 	PQclear(res);
3322 
3323 	/* Mark the cursor as created, and show no tuples have been retrieved */
3324 	fsstate->cursor_exists = true;
3325 	fsstate->tuples = NULL;
3326 	fsstate->num_tuples = 0;
3327 	fsstate->next_tuple = 0;
3328 	fsstate->fetch_ct_2 = 0;
3329 	fsstate->eof_reached = false;
3330 
3331 	/* Clean up */
3332 	pfree(buf.data);
3333 }
3334 
3335 /*
3336  * Fetch some more rows from the node's cursor.
3337  */
3338 static void
fetch_more_data(ForeignScanState * node)3339 fetch_more_data(ForeignScanState *node)
3340 {
3341 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3342 	PGresult   *volatile res = NULL;
3343 	MemoryContext oldcontext;
3344 
3345 	/*
3346 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
3347 	 * batch.
3348 	 */
3349 	fsstate->tuples = NULL;
3350 	MemoryContextReset(fsstate->batch_cxt);
3351 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3352 
3353 	/* PGresult must be released before leaving this function. */
3354 	PG_TRY();
3355 	{
3356 		PGconn	   *conn = fsstate->conn;
3357 		char		sql[64];
3358 		int			numrows;
3359 		int			i;
3360 
3361 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3362 				 fsstate->fetch_size, fsstate->cursor_number);
3363 
3364 		res = pgfdw_exec_query(conn, sql);
3365 		/* On error, report the original query, not the FETCH. */
3366 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3367 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3368 
3369 		/* Convert the data into HeapTuples */
3370 		numrows = PQntuples(res);
3371 		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3372 		fsstate->num_tuples = numrows;
3373 		fsstate->next_tuple = 0;
3374 
3375 		for (i = 0; i < numrows; i++)
3376 		{
3377 			Assert(IsA(node->ss.ps.plan, ForeignScan));
3378 
3379 			fsstate->tuples[i] =
3380 				make_tuple_from_result_row(res, i,
3381 										   fsstate->rel,
3382 										   fsstate->attinmeta,
3383 										   fsstate->retrieved_attrs,
3384 										   node,
3385 										   fsstate->temp_cxt);
3386 		}
3387 
3388 		/* Update fetch_ct_2 */
3389 		if (fsstate->fetch_ct_2 < 2)
3390 			fsstate->fetch_ct_2++;
3391 
3392 		/* Must be EOF if we didn't get as many tuples as we asked for. */
3393 		fsstate->eof_reached = (numrows < fsstate->fetch_size);
3394 
3395 		PQclear(res);
3396 		res = NULL;
3397 	}
3398 	PG_CATCH();
3399 	{
3400 		if (res)
3401 			PQclear(res);
3402 		PG_RE_THROW();
3403 	}
3404 	PG_END_TRY();
3405 
3406 	MemoryContextSwitchTo(oldcontext);
3407 }
3408 
3409 /*
3410  * Force assorted GUC parameters to settings that ensure that we'll output
3411  * data values in a form that is unambiguous to the remote server.
3412  *
3413  * This is rather expensive and annoying to do once per row, but there's
3414  * little choice if we want to be sure values are transmitted accurately;
3415  * we can't leave the settings in place between rows for fear of affecting
3416  * user-visible computations.
3417  *
3418  * We use the equivalent of a function SET option to allow the settings to
3419  * persist only until the caller calls reset_transmission_modes().  If an
3420  * error is thrown in between, guc.c will take care of undoing the settings.
3421  *
3422  * The return value is the nestlevel that must be passed to
3423  * reset_transmission_modes() to undo things.
3424  */
3425 int
set_transmission_modes(void)3426 set_transmission_modes(void)
3427 {
3428 	int			nestlevel = NewGUCNestLevel();
3429 
3430 	/*
3431 	 * The values set here should match what pg_dump does.  See also
3432 	 * configure_remote_session in connection.c.
3433 	 */
3434 	if (DateStyle != USE_ISO_DATES)
3435 		(void) set_config_option("datestyle", "ISO",
3436 								 PGC_USERSET, PGC_S_SESSION,
3437 								 GUC_ACTION_SAVE, true, 0, false);
3438 	if (IntervalStyle != INTSTYLE_POSTGRES)
3439 		(void) set_config_option("intervalstyle", "postgres",
3440 								 PGC_USERSET, PGC_S_SESSION,
3441 								 GUC_ACTION_SAVE, true, 0, false);
3442 	if (extra_float_digits < 3)
3443 		(void) set_config_option("extra_float_digits", "3",
3444 								 PGC_USERSET, PGC_S_SESSION,
3445 								 GUC_ACTION_SAVE, true, 0, false);
3446 
3447 	return nestlevel;
3448 }
3449 
3450 /*
3451  * Undo the effects of set_transmission_modes().
3452  */
3453 void
reset_transmission_modes(int nestlevel)3454 reset_transmission_modes(int nestlevel)
3455 {
3456 	AtEOXact_GUC(true, nestlevel);
3457 }
3458 
3459 /*
3460  * Utility routine to close a cursor.
3461  */
3462 static void
close_cursor(PGconn * conn,unsigned int cursor_number)3463 close_cursor(PGconn *conn, unsigned int cursor_number)
3464 {
3465 	char		sql[64];
3466 	PGresult   *res;
3467 
3468 	snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3469 
3470 	/*
3471 	 * We don't use a PG_TRY block here, so be careful not to throw error
3472 	 * without releasing the PGresult.
3473 	 */
3474 	res = pgfdw_exec_query(conn, sql);
3475 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3476 		pgfdw_report_error(ERROR, res, conn, true, sql);
3477 	PQclear(res);
3478 }
3479 
3480 /*
3481  * create_foreign_modify
3482  *		Construct an execution state of a foreign insert/update/delete
3483  *		operation
3484  */
3485 static PgFdwModifyState *
create_foreign_modify(EState * estate,RangeTblEntry * rte,ResultRelInfo * resultRelInfo,CmdType operation,Plan * subplan,char * query,List * target_attrs,bool has_returning,List * retrieved_attrs)3486 create_foreign_modify(EState *estate,
3487 					  RangeTblEntry *rte,
3488 					  ResultRelInfo *resultRelInfo,
3489 					  CmdType operation,
3490 					  Plan *subplan,
3491 					  char *query,
3492 					  List *target_attrs,
3493 					  bool has_returning,
3494 					  List *retrieved_attrs)
3495 {
3496 	PgFdwModifyState *fmstate;
3497 	Relation	rel = resultRelInfo->ri_RelationDesc;
3498 	TupleDesc	tupdesc = RelationGetDescr(rel);
3499 	Oid			userid;
3500 	ForeignTable *table;
3501 	UserMapping *user;
3502 	AttrNumber	n_params;
3503 	Oid			typefnoid;
3504 	bool		isvarlena;
3505 	ListCell   *lc;
3506 
3507 	/* Begin constructing PgFdwModifyState. */
3508 	fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3509 	fmstate->rel = rel;
3510 
3511 	/*
3512 	 * Identify which user to do the remote access as.  This should match what
3513 	 * ExecCheckRTEPerms() does.
3514 	 */
3515 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3516 
3517 	/* Get info about foreign table. */
3518 	table = GetForeignTable(RelationGetRelid(rel));
3519 	user = GetUserMapping(userid, table->serverid);
3520 
3521 	/* Open connection; report that we'll create a prepared statement. */
3522 	fmstate->conn = GetConnection(user, true);
3523 	fmstate->p_name = NULL;		/* prepared statement not made yet */
3524 
3525 	/* Set up remote query information. */
3526 	fmstate->query = query;
3527 	fmstate->target_attrs = target_attrs;
3528 	fmstate->has_returning = has_returning;
3529 	fmstate->retrieved_attrs = retrieved_attrs;
3530 
3531 	/* Create context for per-tuple temp workspace. */
3532 	fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3533 											  "postgres_fdw temporary data",
3534 											  ALLOCSET_SMALL_SIZES);
3535 
3536 	/* Prepare for input conversion of RETURNING results. */
3537 	if (fmstate->has_returning)
3538 		fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3539 
3540 	/* Prepare for output conversion of parameters used in prepared stmt. */
3541 	n_params = list_length(fmstate->target_attrs) + 1;
3542 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3543 	fmstate->p_nums = 0;
3544 
3545 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
3546 	{
3547 		Assert(subplan != NULL);
3548 
3549 		/* Find the ctid resjunk column in the subplan's result */
3550 		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
3551 														  "ctid");
3552 		if (!AttributeNumberIsValid(fmstate->ctidAttno))
3553 			elog(ERROR, "could not find junk ctid column");
3554 
3555 		/* First transmittable parameter will be ctid */
3556 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
3557 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3558 		fmstate->p_nums++;
3559 	}
3560 
3561 	if (operation == CMD_INSERT || operation == CMD_UPDATE)
3562 	{
3563 		/* Set up for remaining transmittable parameters */
3564 		foreach(lc, fmstate->target_attrs)
3565 		{
3566 			int			attnum = lfirst_int(lc);
3567 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3568 
3569 			Assert(!attr->attisdropped);
3570 
3571 			/* Ignore generated columns; they are set to DEFAULT */
3572 			if (attr->attgenerated)
3573 				continue;
3574 			getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
3575 			fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3576 			fmstate->p_nums++;
3577 		}
3578 	}
3579 
3580 	Assert(fmstate->p_nums <= n_params);
3581 
3582 	/* Initialize auxiliary state */
3583 	fmstate->aux_fmstate = NULL;
3584 
3585 	return fmstate;
3586 }
3587 
3588 /*
3589  * execute_foreign_modify
3590  *		Perform foreign-table modification as required, and fetch RETURNING
3591  *		result if any.  (This is the shared guts of postgresExecForeignInsert,
3592  *		postgresExecForeignUpdate, and postgresExecForeignDelete.)
3593  */
3594 static TupleTableSlot *
execute_foreign_modify(EState * estate,ResultRelInfo * resultRelInfo,CmdType operation,TupleTableSlot * slot,TupleTableSlot * planSlot)3595 execute_foreign_modify(EState *estate,
3596 					   ResultRelInfo *resultRelInfo,
3597 					   CmdType operation,
3598 					   TupleTableSlot *slot,
3599 					   TupleTableSlot *planSlot)
3600 {
3601 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
3602 	ItemPointer ctid = NULL;
3603 	const char **p_values;
3604 	PGresult   *res;
3605 	int			n_rows;
3606 
3607 	/* The operation should be INSERT, UPDATE, or DELETE */
3608 	Assert(operation == CMD_INSERT ||
3609 		   operation == CMD_UPDATE ||
3610 		   operation == CMD_DELETE);
3611 
3612 	/* Set up the prepared statement on the remote server, if we didn't yet */
3613 	if (!fmstate->p_name)
3614 		prepare_foreign_modify(fmstate);
3615 
3616 	/*
3617 	 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
3618 	 */
3619 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
3620 	{
3621 		Datum		datum;
3622 		bool		isNull;
3623 
3624 		datum = ExecGetJunkAttribute(planSlot,
3625 									 fmstate->ctidAttno,
3626 									 &isNull);
3627 		/* shouldn't ever get a null result... */
3628 		if (isNull)
3629 			elog(ERROR, "ctid is NULL");
3630 		ctid = (ItemPointer) DatumGetPointer(datum);
3631 	}
3632 
3633 	/* Convert parameters needed by prepared statement to text form */
3634 	p_values = convert_prep_stmt_params(fmstate, ctid, slot);
3635 
3636 	/*
3637 	 * Execute the prepared statement.
3638 	 */
3639 	if (!PQsendQueryPrepared(fmstate->conn,
3640 							 fmstate->p_name,
3641 							 fmstate->p_nums,
3642 							 p_values,
3643 							 NULL,
3644 							 NULL,
3645 							 0))
3646 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3647 
3648 	/*
3649 	 * Get the result, and check for success.
3650 	 *
3651 	 * We don't use a PG_TRY block here, so be careful not to throw error
3652 	 * without releasing the PGresult.
3653 	 */
3654 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
3655 	if (PQresultStatus(res) !=
3656 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
3657 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3658 
3659 	/* Check number of rows affected, and fetch RETURNING tuple if any */
3660 	if (fmstate->has_returning)
3661 	{
3662 		n_rows = PQntuples(res);
3663 		if (n_rows > 0)
3664 			store_returning_result(fmstate, slot, res);
3665 	}
3666 	else
3667 		n_rows = atoi(PQcmdTuples(res));
3668 
3669 	/* And clean up */
3670 	PQclear(res);
3671 
3672 	MemoryContextReset(fmstate->temp_cxt);
3673 
3674 	/*
3675 	 * Return NULL if nothing was inserted/updated/deleted on the remote end
3676 	 */
3677 	return (n_rows > 0) ? slot : NULL;
3678 }
3679 
3680 /*
3681  * prepare_foreign_modify
3682  *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3683  */
3684 static void
prepare_foreign_modify(PgFdwModifyState * fmstate)3685 prepare_foreign_modify(PgFdwModifyState *fmstate)
3686 {
3687 	char		prep_name[NAMEDATALEN];
3688 	char	   *p_name;
3689 	PGresult   *res;
3690 
3691 	/* Construct name we'll use for the prepared statement. */
3692 	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3693 			 GetPrepStmtNumber(fmstate->conn));
3694 	p_name = pstrdup(prep_name);
3695 
3696 	/*
3697 	 * We intentionally do not specify parameter types here, but leave the
3698 	 * remote server to derive them by default.  This avoids possible problems
3699 	 * with the remote server using different type OIDs than we do.  All of
3700 	 * the prepared statements we use in this module are simple enough that
3701 	 * the remote server will make the right choices.
3702 	 */
3703 	if (!PQsendPrepare(fmstate->conn,
3704 					   p_name,
3705 					   fmstate->query,
3706 					   0,
3707 					   NULL))
3708 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3709 
3710 	/*
3711 	 * Get the result, and check for success.
3712 	 *
3713 	 * We don't use a PG_TRY block here, so be careful not to throw error
3714 	 * without releasing the PGresult.
3715 	 */
3716 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
3717 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3718 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3719 	PQclear(res);
3720 
3721 	/* This action shows that the prepare has been done. */
3722 	fmstate->p_name = p_name;
3723 }
3724 
3725 /*
3726  * convert_prep_stmt_params
3727  *		Create array of text strings representing parameter values
3728  *
3729  * tupleid is ctid to send, or NULL if none
3730  * slot is slot to get remaining parameters from, or NULL if none
3731  *
3732  * Data is constructed in temp_cxt; caller should reset that after use.
3733  */
3734 static const char **
convert_prep_stmt_params(PgFdwModifyState * fmstate,ItemPointer tupleid,TupleTableSlot * slot)3735 convert_prep_stmt_params(PgFdwModifyState *fmstate,
3736 						 ItemPointer tupleid,
3737 						 TupleTableSlot *slot)
3738 {
3739 	const char **p_values;
3740 	int			pindex = 0;
3741 	MemoryContext oldcontext;
3742 
3743 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3744 
3745 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3746 
3747 	/* 1st parameter should be ctid, if it's in use */
3748 	if (tupleid != NULL)
3749 	{
3750 		/* don't need set_transmission_modes for TID output */
3751 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3752 											  PointerGetDatum(tupleid));
3753 		pindex++;
3754 	}
3755 
3756 	/* get following parameters from slot */
3757 	if (slot != NULL && fmstate->target_attrs != NIL)
3758 	{
3759 		TupleDesc	tupdesc = RelationGetDescr(fmstate->rel);
3760 		int			nestlevel;
3761 		ListCell   *lc;
3762 
3763 		nestlevel = set_transmission_modes();
3764 
3765 		foreach(lc, fmstate->target_attrs)
3766 		{
3767 			int			attnum = lfirst_int(lc);
3768 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3769 			Datum		value;
3770 			bool		isnull;
3771 
3772 			/* Ignore generated columns; they are set to DEFAULT */
3773 			if (attr->attgenerated)
3774 				continue;
3775 			value = slot_getattr(slot, attnum, &isnull);
3776 			if (isnull)
3777 				p_values[pindex] = NULL;
3778 			else
3779 				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3780 													  value);
3781 			pindex++;
3782 		}
3783 
3784 		reset_transmission_modes(nestlevel);
3785 	}
3786 
3787 	Assert(pindex == fmstate->p_nums);
3788 
3789 	MemoryContextSwitchTo(oldcontext);
3790 
3791 	return p_values;
3792 }
3793 
3794 /*
3795  * store_returning_result
3796  *		Store the result of a RETURNING clause
3797  *
3798  * On error, be sure to release the PGresult on the way out.  Callers do not
3799  * have PG_TRY blocks to ensure this happens.
3800  */
3801 static void
store_returning_result(PgFdwModifyState * fmstate,TupleTableSlot * slot,PGresult * res)3802 store_returning_result(PgFdwModifyState *fmstate,
3803 					   TupleTableSlot *slot, PGresult *res)
3804 {
3805 	PG_TRY();
3806 	{
3807 		HeapTuple	newtup;
3808 
3809 		newtup = make_tuple_from_result_row(res, 0,
3810 											fmstate->rel,
3811 											fmstate->attinmeta,
3812 											fmstate->retrieved_attrs,
3813 											NULL,
3814 											fmstate->temp_cxt);
3815 
3816 		/*
3817 		 * The returning slot will not necessarily be suitable to store
3818 		 * heaptuples directly, so allow for conversion.
3819 		 */
3820 		ExecForceStoreHeapTuple(newtup, slot, true);
3821 	}
3822 	PG_CATCH();
3823 	{
3824 		if (res)
3825 			PQclear(res);
3826 		PG_RE_THROW();
3827 	}
3828 	PG_END_TRY();
3829 }
3830 
3831 /*
3832  * finish_foreign_modify
3833  *		Release resources for a foreign insert/update/delete operation
3834  */
3835 static void
finish_foreign_modify(PgFdwModifyState * fmstate)3836 finish_foreign_modify(PgFdwModifyState *fmstate)
3837 {
3838 	Assert(fmstate != NULL);
3839 
3840 	/* If we created a prepared statement, destroy it */
3841 	if (fmstate->p_name)
3842 	{
3843 		char		sql[64];
3844 		PGresult   *res;
3845 
3846 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
3847 
3848 		/*
3849 		 * We don't use a PG_TRY block here, so be careful not to throw error
3850 		 * without releasing the PGresult.
3851 		 */
3852 		res = pgfdw_exec_query(fmstate->conn, sql);
3853 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
3854 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
3855 		PQclear(res);
3856 		fmstate->p_name = NULL;
3857 	}
3858 
3859 	/* Release remote connection */
3860 	ReleaseConnection(fmstate->conn);
3861 	fmstate->conn = NULL;
3862 }
3863 
3864 /*
3865  * build_remote_returning
3866  *		Build a RETURNING targetlist of a remote query for performing an
3867  *		UPDATE/DELETE .. RETURNING on a join directly
3868  */
3869 static List *
build_remote_returning(Index rtindex,Relation rel,List * returningList)3870 build_remote_returning(Index rtindex, Relation rel, List *returningList)
3871 {
3872 	bool		have_wholerow = false;
3873 	List	   *tlist = NIL;
3874 	List	   *vars;
3875 	ListCell   *lc;
3876 
3877 	Assert(returningList);
3878 
3879 	vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
3880 
3881 	/*
3882 	 * If there's a whole-row reference to the target relation, then we'll
3883 	 * need all the columns of the relation.
3884 	 */
3885 	foreach(lc, vars)
3886 	{
3887 		Var		   *var = (Var *) lfirst(lc);
3888 
3889 		if (IsA(var, Var) &&
3890 			var->varno == rtindex &&
3891 			var->varattno == InvalidAttrNumber)
3892 		{
3893 			have_wholerow = true;
3894 			break;
3895 		}
3896 	}
3897 
3898 	if (have_wholerow)
3899 	{
3900 		TupleDesc	tupdesc = RelationGetDescr(rel);
3901 		int			i;
3902 
3903 		for (i = 1; i <= tupdesc->natts; i++)
3904 		{
3905 			Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
3906 			Var		   *var;
3907 
3908 			/* Ignore dropped attributes. */
3909 			if (attr->attisdropped)
3910 				continue;
3911 
3912 			var = makeVar(rtindex,
3913 						  i,
3914 						  attr->atttypid,
3915 						  attr->atttypmod,
3916 						  attr->attcollation,
3917 						  0);
3918 
3919 			tlist = lappend(tlist,
3920 							makeTargetEntry((Expr *) var,
3921 											list_length(tlist) + 1,
3922 											NULL,
3923 											false));
3924 		}
3925 	}
3926 
3927 	/* Now add any remaining columns to tlist. */
3928 	foreach(lc, vars)
3929 	{
3930 		Var		   *var = (Var *) lfirst(lc);
3931 
3932 		/*
3933 		 * No need for whole-row references to the target relation.  We don't
3934 		 * need system columns other than ctid and oid either, since those are
3935 		 * set locally.
3936 		 */
3937 		if (IsA(var, Var) &&
3938 			var->varno == rtindex &&
3939 			var->varattno <= InvalidAttrNumber &&
3940 			var->varattno != SelfItemPointerAttributeNumber)
3941 			continue;			/* don't need it */
3942 
3943 		if (tlist_member((Expr *) var, tlist))
3944 			continue;			/* already got it */
3945 
3946 		tlist = lappend(tlist,
3947 						makeTargetEntry((Expr *) var,
3948 										list_length(tlist) + 1,
3949 										NULL,
3950 										false));
3951 	}
3952 
3953 	list_free(vars);
3954 
3955 	return tlist;
3956 }
3957 
3958 /*
3959  * rebuild_fdw_scan_tlist
3960  *		Build new fdw_scan_tlist of given foreign-scan plan node from given
3961  *		tlist
3962  *
3963  * There might be columns that the fdw_scan_tlist of the given foreign-scan
3964  * plan node contains that the given tlist doesn't.  The fdw_scan_tlist would
3965  * have contained resjunk columns such as 'ctid' of the target relation and
3966  * 'wholerow' of non-target relations, but the tlist might not contain them,
3967  * for example.  So, adjust the tlist so it contains all the columns specified
3968  * in the fdw_scan_tlist; else setrefs.c will get confused.
3969  */
3970 static void
rebuild_fdw_scan_tlist(ForeignScan * fscan,List * tlist)3971 rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
3972 {
3973 	List	   *new_tlist = tlist;
3974 	List	   *old_tlist = fscan->fdw_scan_tlist;
3975 	ListCell   *lc;
3976 
3977 	foreach(lc, old_tlist)
3978 	{
3979 		TargetEntry *tle = (TargetEntry *) lfirst(lc);
3980 
3981 		if (tlist_member(tle->expr, new_tlist))
3982 			continue;			/* already got it */
3983 
3984 		new_tlist = lappend(new_tlist,
3985 							makeTargetEntry(tle->expr,
3986 											list_length(new_tlist) + 1,
3987 											NULL,
3988 											false));
3989 	}
3990 	fscan->fdw_scan_tlist = new_tlist;
3991 }
3992 
3993 /*
3994  * Execute a direct UPDATE/DELETE statement.
3995  */
3996 static void
execute_dml_stmt(ForeignScanState * node)3997 execute_dml_stmt(ForeignScanState *node)
3998 {
3999 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4000 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
4001 	int			numParams = dmstate->numParams;
4002 	const char **values = dmstate->param_values;
4003 
4004 	/*
4005 	 * Construct array of query parameter values in text format.
4006 	 */
4007 	if (numParams > 0)
4008 		process_query_params(econtext,
4009 							 dmstate->param_flinfo,
4010 							 dmstate->param_exprs,
4011 							 values);
4012 
4013 	/*
4014 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
4015 	 * to infer types for all parameters.  Since we explicitly cast every
4016 	 * parameter (see deparse.c), the "inference" is trivial and will produce
4017 	 * the desired result.  This allows us to avoid assuming that the remote
4018 	 * server has the same OIDs we do for the parameters' types.
4019 	 */
4020 	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4021 						   NULL, values, NULL, NULL, 0))
4022 		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4023 
4024 	/*
4025 	 * Get the result, and check for success.
4026 	 *
4027 	 * We don't use a PG_TRY block here, so be careful not to throw error
4028 	 * without releasing the PGresult.
4029 	 */
4030 	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4031 	if (PQresultStatus(dmstate->result) !=
4032 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4033 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4034 						   dmstate->query);
4035 
4036 	/* Get the number of rows affected. */
4037 	if (dmstate->has_returning)
4038 		dmstate->num_tuples = PQntuples(dmstate->result);
4039 	else
4040 		dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4041 }
4042 
4043 /*
4044  * Get the result of a RETURNING clause.
4045  */
4046 static TupleTableSlot *
get_returning_data(ForeignScanState * node)4047 get_returning_data(ForeignScanState *node)
4048 {
4049 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4050 	EState	   *estate = node->ss.ps.state;
4051 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
4052 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4053 	TupleTableSlot *resultSlot;
4054 
4055 	Assert(resultRelInfo->ri_projectReturning);
4056 
4057 	/* If we didn't get any tuples, must be end of data. */
4058 	if (dmstate->next_tuple >= dmstate->num_tuples)
4059 		return ExecClearTuple(slot);
4060 
4061 	/* Increment the command es_processed count if necessary. */
4062 	if (dmstate->set_processed)
4063 		estate->es_processed += 1;
4064 
4065 	/*
4066 	 * Store a RETURNING tuple.  If has_returning is false, just emit a dummy
4067 	 * tuple.  (has_returning is false when the local query is of the form
4068 	 * "UPDATE/DELETE .. RETURNING 1" for example.)
4069 	 */
4070 	if (!dmstate->has_returning)
4071 	{
4072 		ExecStoreAllNullTuple(slot);
4073 		resultSlot = slot;
4074 	}
4075 	else
4076 	{
4077 		/*
4078 		 * On error, be sure to release the PGresult on the way out.  Callers
4079 		 * do not have PG_TRY blocks to ensure this happens.
4080 		 */
4081 		PG_TRY();
4082 		{
4083 			HeapTuple	newtup;
4084 
4085 			newtup = make_tuple_from_result_row(dmstate->result,
4086 												dmstate->next_tuple,
4087 												dmstate->rel,
4088 												dmstate->attinmeta,
4089 												dmstate->retrieved_attrs,
4090 												node,
4091 												dmstate->temp_cxt);
4092 			ExecStoreHeapTuple(newtup, slot, false);
4093 		}
4094 		PG_CATCH();
4095 		{
4096 			if (dmstate->result)
4097 				PQclear(dmstate->result);
4098 			PG_RE_THROW();
4099 		}
4100 		PG_END_TRY();
4101 
4102 		/* Get the updated/deleted tuple. */
4103 		if (dmstate->rel)
4104 			resultSlot = slot;
4105 		else
4106 			resultSlot = apply_returning_filter(dmstate, slot, estate);
4107 	}
4108 	dmstate->next_tuple++;
4109 
4110 	/* Make slot available for evaluation of the local query RETURNING list. */
4111 	resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4112 		resultSlot;
4113 
4114 	return slot;
4115 }
4116 
4117 /*
4118  * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4119  */
4120 static void
init_returning_filter(PgFdwDirectModifyState * dmstate,List * fdw_scan_tlist,Index rtindex)4121 init_returning_filter(PgFdwDirectModifyState *dmstate,
4122 					  List *fdw_scan_tlist,
4123 					  Index rtindex)
4124 {
4125 	TupleDesc	resultTupType = RelationGetDescr(dmstate->resultRel);
4126 	ListCell   *lc;
4127 	int			i;
4128 
4129 	/*
4130 	 * Calculate the mapping between the fdw_scan_tlist's entries and the
4131 	 * result tuple's attributes.
4132 	 *
4133 	 * The "map" is an array of indexes of the result tuple's attributes in
4134 	 * fdw_scan_tlist, i.e., one entry for every attribute of the result
4135 	 * tuple.  We store zero for any attributes that don't have the
4136 	 * corresponding entries in that list, marking that a NULL is needed in
4137 	 * the result tuple.
4138 	 *
4139 	 * Also get the indexes of the entries for ctid and oid if any.
4140 	 */
4141 	dmstate->attnoMap = (AttrNumber *)
4142 		palloc0(resultTupType->natts * sizeof(AttrNumber));
4143 
4144 	dmstate->ctidAttno = dmstate->oidAttno = 0;
4145 
4146 	i = 1;
4147 	dmstate->hasSystemCols = false;
4148 	foreach(lc, fdw_scan_tlist)
4149 	{
4150 		TargetEntry *tle = (TargetEntry *) lfirst(lc);
4151 		Var		   *var = (Var *) tle->expr;
4152 
4153 		Assert(IsA(var, Var));
4154 
4155 		/*
4156 		 * If the Var is a column of the target relation to be retrieved from
4157 		 * the foreign server, get the index of the entry.
4158 		 */
4159 		if (var->varno == rtindex &&
4160 			list_member_int(dmstate->retrieved_attrs, i))
4161 		{
4162 			int			attrno = var->varattno;
4163 
4164 			if (attrno < 0)
4165 			{
4166 				/*
4167 				 * We don't retrieve system columns other than ctid and oid.
4168 				 */
4169 				if (attrno == SelfItemPointerAttributeNumber)
4170 					dmstate->ctidAttno = i;
4171 				else
4172 					Assert(false);
4173 				dmstate->hasSystemCols = true;
4174 			}
4175 			else
4176 			{
4177 				/*
4178 				 * We don't retrieve whole-row references to the target
4179 				 * relation either.
4180 				 */
4181 				Assert(attrno > 0);
4182 
4183 				dmstate->attnoMap[attrno - 1] = i;
4184 			}
4185 		}
4186 		i++;
4187 	}
4188 }
4189 
4190 /*
4191  * Extract and return an updated/deleted tuple from a scan tuple.
4192  */
4193 static TupleTableSlot *
apply_returning_filter(PgFdwDirectModifyState * dmstate,TupleTableSlot * slot,EState * estate)4194 apply_returning_filter(PgFdwDirectModifyState *dmstate,
4195 					   TupleTableSlot *slot,
4196 					   EState *estate)
4197 {
4198 	ResultRelInfo *relInfo = estate->es_result_relation_info;
4199 	TupleDesc	resultTupType = RelationGetDescr(dmstate->resultRel);
4200 	TupleTableSlot *resultSlot;
4201 	Datum	   *values;
4202 	bool	   *isnull;
4203 	Datum	   *old_values;
4204 	bool	   *old_isnull;
4205 	int			i;
4206 
4207 	/*
4208 	 * Use the return tuple slot as a place to store the result tuple.
4209 	 */
4210 	resultSlot = ExecGetReturningSlot(estate, relInfo);
4211 
4212 	/*
4213 	 * Extract all the values of the scan tuple.
4214 	 */
4215 	slot_getallattrs(slot);
4216 	old_values = slot->tts_values;
4217 	old_isnull = slot->tts_isnull;
4218 
4219 	/*
4220 	 * Prepare to build the result tuple.
4221 	 */
4222 	ExecClearTuple(resultSlot);
4223 	values = resultSlot->tts_values;
4224 	isnull = resultSlot->tts_isnull;
4225 
4226 	/*
4227 	 * Transpose data into proper fields of the result tuple.
4228 	 */
4229 	for (i = 0; i < resultTupType->natts; i++)
4230 	{
4231 		int			j = dmstate->attnoMap[i];
4232 
4233 		if (j == 0)
4234 		{
4235 			values[i] = (Datum) 0;
4236 			isnull[i] = true;
4237 		}
4238 		else
4239 		{
4240 			values[i] = old_values[j - 1];
4241 			isnull[i] = old_isnull[j - 1];
4242 		}
4243 	}
4244 
4245 	/*
4246 	 * Build the virtual tuple.
4247 	 */
4248 	ExecStoreVirtualTuple(resultSlot);
4249 
4250 	/*
4251 	 * If we have any system columns to return, materialize a heap tuple in
4252 	 * the slot from column values set above and install system columns in
4253 	 * that tuple.
4254 	 */
4255 	if (dmstate->hasSystemCols)
4256 	{
4257 		HeapTuple	resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4258 
4259 		/* ctid */
4260 		if (dmstate->ctidAttno)
4261 		{
4262 			ItemPointer ctid = NULL;
4263 
4264 			ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4265 			resultTup->t_self = *ctid;
4266 		}
4267 
4268 		/*
4269 		 * And remaining columns
4270 		 *
4271 		 * Note: since we currently don't allow the target relation to appear
4272 		 * on the nullable side of an outer join, any system columns wouldn't
4273 		 * go to NULL.
4274 		 *
4275 		 * Note: no need to care about tableoid here because it will be
4276 		 * initialized in ExecProcessReturning().
4277 		 */
4278 		HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
4279 		HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
4280 		HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
4281 	}
4282 
4283 	/*
4284 	 * And return the result tuple.
4285 	 */
4286 	return resultSlot;
4287 }
4288 
4289 /*
4290  * Prepare for processing of parameters used in remote query.
4291  */
4292 static void
prepare_query_params(PlanState * node,List * fdw_exprs,int numParams,FmgrInfo ** param_flinfo,List ** param_exprs,const char *** param_values)4293 prepare_query_params(PlanState *node,
4294 					 List *fdw_exprs,
4295 					 int numParams,
4296 					 FmgrInfo **param_flinfo,
4297 					 List **param_exprs,
4298 					 const char ***param_values)
4299 {
4300 	int			i;
4301 	ListCell   *lc;
4302 
4303 	Assert(numParams > 0);
4304 
4305 	/* Prepare for output conversion of parameters used in remote query. */
4306 	*param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4307 
4308 	i = 0;
4309 	foreach(lc, fdw_exprs)
4310 	{
4311 		Node	   *param_expr = (Node *) lfirst(lc);
4312 		Oid			typefnoid;
4313 		bool		isvarlena;
4314 
4315 		getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4316 		fmgr_info(typefnoid, &(*param_flinfo)[i]);
4317 		i++;
4318 	}
4319 
4320 	/*
4321 	 * Prepare remote-parameter expressions for evaluation.  (Note: in
4322 	 * practice, we expect that all these expressions will be just Params, so
4323 	 * we could possibly do something more efficient than using the full
4324 	 * expression-eval machinery for this.  But probably there would be little
4325 	 * benefit, and it'd require postgres_fdw to know more than is desirable
4326 	 * about Param evaluation.)
4327 	 */
4328 	*param_exprs = ExecInitExprList(fdw_exprs, node);
4329 
4330 	/* Allocate buffer for text form of query parameters. */
4331 	*param_values = (const char **) palloc0(numParams * sizeof(char *));
4332 }
4333 
4334 /*
4335  * Construct array of query parameter values in text format.
4336  */
4337 static void
process_query_params(ExprContext * econtext,FmgrInfo * param_flinfo,List * param_exprs,const char ** param_values)4338 process_query_params(ExprContext *econtext,
4339 					 FmgrInfo *param_flinfo,
4340 					 List *param_exprs,
4341 					 const char **param_values)
4342 {
4343 	int			nestlevel;
4344 	int			i;
4345 	ListCell   *lc;
4346 
4347 	nestlevel = set_transmission_modes();
4348 
4349 	i = 0;
4350 	foreach(lc, param_exprs)
4351 	{
4352 		ExprState  *expr_state = (ExprState *) lfirst(lc);
4353 		Datum		expr_value;
4354 		bool		isNull;
4355 
4356 		/* Evaluate the parameter expression */
4357 		expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4358 
4359 		/*
4360 		 * Get string representation of each parameter value by invoking
4361 		 * type-specific output function, unless the value is null.
4362 		 */
4363 		if (isNull)
4364 			param_values[i] = NULL;
4365 		else
4366 			param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4367 
4368 		i++;
4369 	}
4370 
4371 	reset_transmission_modes(nestlevel);
4372 }
4373 
4374 /*
4375  * postgresAnalyzeForeignTable
4376  *		Test whether analyzing this foreign table is supported
4377  */
4378 static bool
postgresAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)4379 postgresAnalyzeForeignTable(Relation relation,
4380 							AcquireSampleRowsFunc *func,
4381 							BlockNumber *totalpages)
4382 {
4383 	ForeignTable *table;
4384 	UserMapping *user;
4385 	PGconn	   *conn;
4386 	StringInfoData sql;
4387 	PGresult   *volatile res = NULL;
4388 
4389 	/* Return the row-analysis function pointer */
4390 	*func = postgresAcquireSampleRowsFunc;
4391 
4392 	/*
4393 	 * Now we have to get the number of pages.  It's annoying that the ANALYZE
4394 	 * API requires us to return that now, because it forces some duplication
4395 	 * of effort between this routine and postgresAcquireSampleRowsFunc.  But
4396 	 * it's probably not worth redefining that API at this point.
4397 	 */
4398 
4399 	/*
4400 	 * Get the connection to use.  We do the remote access as the table's
4401 	 * owner, even if the ANALYZE was started by some other user.
4402 	 */
4403 	table = GetForeignTable(RelationGetRelid(relation));
4404 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4405 	conn = GetConnection(user, false);
4406 
4407 	/*
4408 	 * Construct command to get page count for relation.
4409 	 */
4410 	initStringInfo(&sql);
4411 	deparseAnalyzeSizeSql(&sql, relation);
4412 
4413 	/* In what follows, do not risk leaking any PGresults. */
4414 	PG_TRY();
4415 	{
4416 		res = pgfdw_exec_query(conn, sql.data);
4417 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
4418 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
4419 
4420 		if (PQntuples(res) != 1 || PQnfields(res) != 1)
4421 			elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4422 		*totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4423 
4424 		PQclear(res);
4425 		res = NULL;
4426 	}
4427 	PG_CATCH();
4428 	{
4429 		if (res)
4430 			PQclear(res);
4431 		PG_RE_THROW();
4432 	}
4433 	PG_END_TRY();
4434 
4435 	ReleaseConnection(conn);
4436 
4437 	return true;
4438 }
4439 
4440 /*
4441  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4442  *
4443  * We fetch the whole table from the remote side and pick out some sample rows.
4444  *
4445  * Selected rows are returned in the caller-allocated array rows[],
4446  * which must have at least targrows entries.
4447  * The actual number of rows selected is returned as the function result.
4448  * We also count the total number of rows in the table and return it into
4449  * *totalrows.  Note that *totaldeadrows is always set to 0.
4450  *
4451  * Note that the returned list of rows is not always in order by physical
4452  * position in the table.  Therefore, correlation estimates derived later
4453  * may be meaningless, but it's OK because we don't use the estimates
4454  * currently (the planner only pays attention to correlation for indexscans).
4455  */
4456 static int
postgresAcquireSampleRowsFunc(Relation relation,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)4457 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
4458 							  HeapTuple *rows, int targrows,
4459 							  double *totalrows,
4460 							  double *totaldeadrows)
4461 {
4462 	PgFdwAnalyzeState astate;
4463 	ForeignTable *table;
4464 	ForeignServer *server;
4465 	UserMapping *user;
4466 	PGconn	   *conn;
4467 	unsigned int cursor_number;
4468 	StringInfoData sql;
4469 	PGresult   *volatile res = NULL;
4470 
4471 	/* Initialize workspace state */
4472 	astate.rel = relation;
4473 	astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
4474 
4475 	astate.rows = rows;
4476 	astate.targrows = targrows;
4477 	astate.numrows = 0;
4478 	astate.samplerows = 0;
4479 	astate.rowstoskip = -1;		/* -1 means not set yet */
4480 	reservoir_init_selection_state(&astate.rstate, targrows);
4481 
4482 	/* Remember ANALYZE context, and create a per-tuple temp context */
4483 	astate.anl_cxt = CurrentMemoryContext;
4484 	astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
4485 											"postgres_fdw temporary data",
4486 											ALLOCSET_SMALL_SIZES);
4487 
4488 	/*
4489 	 * Get the connection to use.  We do the remote access as the table's
4490 	 * owner, even if the ANALYZE was started by some other user.
4491 	 */
4492 	table = GetForeignTable(RelationGetRelid(relation));
4493 	server = GetForeignServer(table->serverid);
4494 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4495 	conn = GetConnection(user, false);
4496 
4497 	/*
4498 	 * Construct cursor that retrieves whole rows from remote.
4499 	 */
4500 	cursor_number = GetCursorNumber(conn);
4501 	initStringInfo(&sql);
4502 	appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
4503 	deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
4504 
4505 	/* In what follows, do not risk leaking any PGresults. */
4506 	PG_TRY();
4507 	{
4508 		res = pgfdw_exec_query(conn, sql.data);
4509 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
4510 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
4511 		PQclear(res);
4512 		res = NULL;
4513 
4514 		/* Retrieve and process rows a batch at a time. */
4515 		for (;;)
4516 		{
4517 			char		fetch_sql[64];
4518 			int			fetch_size;
4519 			int			numrows;
4520 			int			i;
4521 			ListCell   *lc;
4522 
4523 			/* Allow users to cancel long query */
4524 			CHECK_FOR_INTERRUPTS();
4525 
4526 			/*
4527 			 * XXX possible future improvement: if rowstoskip is large, we
4528 			 * could issue a MOVE rather than physically fetching the rows,
4529 			 * then just adjust rowstoskip and samplerows appropriately.
4530 			 */
4531 
4532 			/* The fetch size is arbitrary, but shouldn't be enormous. */
4533 			fetch_size = 100;
4534 			foreach(lc, server->options)
4535 			{
4536 				DefElem    *def = (DefElem *) lfirst(lc);
4537 
4538 				if (strcmp(def->defname, "fetch_size") == 0)
4539 				{
4540 					fetch_size = strtol(defGetString(def), NULL, 10);
4541 					break;
4542 				}
4543 			}
4544 			foreach(lc, table->options)
4545 			{
4546 				DefElem    *def = (DefElem *) lfirst(lc);
4547 
4548 				if (strcmp(def->defname, "fetch_size") == 0)
4549 				{
4550 					fetch_size = strtol(defGetString(def), NULL, 10);
4551 					break;
4552 				}
4553 			}
4554 
4555 			/* Fetch some rows */
4556 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
4557 					 fetch_size, cursor_number);
4558 
4559 			res = pgfdw_exec_query(conn, fetch_sql);
4560 			/* On error, report the original query, not the FETCH. */
4561 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
4562 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
4563 
4564 			/* Process whatever we got. */
4565 			numrows = PQntuples(res);
4566 			for (i = 0; i < numrows; i++)
4567 				analyze_row_processor(res, i, &astate);
4568 
4569 			PQclear(res);
4570 			res = NULL;
4571 
4572 			/* Must be EOF if we didn't get all the rows requested. */
4573 			if (numrows < fetch_size)
4574 				break;
4575 		}
4576 
4577 		/* Close the cursor, just to be tidy. */
4578 		close_cursor(conn, cursor_number);
4579 	}
4580 	PG_CATCH();
4581 	{
4582 		if (res)
4583 			PQclear(res);
4584 		PG_RE_THROW();
4585 	}
4586 	PG_END_TRY();
4587 
4588 	ReleaseConnection(conn);
4589 
4590 	/* We assume that we have no dead tuple. */
4591 	*totaldeadrows = 0.0;
4592 
4593 	/* We've retrieved all living tuples from foreign server. */
4594 	*totalrows = astate.samplerows;
4595 
4596 	/*
4597 	 * Emit some interesting relation info
4598 	 */
4599 	ereport(elevel,
4600 			(errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
4601 					RelationGetRelationName(relation),
4602 					astate.samplerows, astate.numrows)));
4603 
4604 	return astate.numrows;
4605 }
4606 
4607 /*
4608  * Collect sample rows from the result of query.
4609  *	 - Use all tuples in sample until target # of samples are collected.
4610  *	 - Subsequently, replace already-sampled tuples randomly.
4611  */
4612 static void
analyze_row_processor(PGresult * res,int row,PgFdwAnalyzeState * astate)4613 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
4614 {
4615 	int			targrows = astate->targrows;
4616 	int			pos;			/* array index to store tuple in */
4617 	MemoryContext oldcontext;
4618 
4619 	/* Always increment sample row counter. */
4620 	astate->samplerows += 1;
4621 
4622 	/*
4623 	 * Determine the slot where this sample row should be stored.  Set pos to
4624 	 * negative value to indicate the row should be skipped.
4625 	 */
4626 	if (astate->numrows < targrows)
4627 	{
4628 		/* First targrows rows are always included into the sample */
4629 		pos = astate->numrows++;
4630 	}
4631 	else
4632 	{
4633 		/*
4634 		 * Now we start replacing tuples in the sample until we reach the end
4635 		 * of the relation.  Same algorithm as in acquire_sample_rows in
4636 		 * analyze.c; see Jeff Vitter's paper.
4637 		 */
4638 		if (astate->rowstoskip < 0)
4639 			astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
4640 
4641 		if (astate->rowstoskip <= 0)
4642 		{
4643 			/* Choose a random reservoir element to replace. */
4644 			pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
4645 			Assert(pos >= 0 && pos < targrows);
4646 			heap_freetuple(astate->rows[pos]);
4647 		}
4648 		else
4649 		{
4650 			/* Skip this tuple. */
4651 			pos = -1;
4652 		}
4653 
4654 		astate->rowstoskip -= 1;
4655 	}
4656 
4657 	if (pos >= 0)
4658 	{
4659 		/*
4660 		 * Create sample tuple from current result row, and store it in the
4661 		 * position determined above.  The tuple has to be created in anl_cxt.
4662 		 */
4663 		oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
4664 
4665 		astate->rows[pos] = make_tuple_from_result_row(res, row,
4666 													   astate->rel,
4667 													   astate->attinmeta,
4668 													   astate->retrieved_attrs,
4669 													   NULL,
4670 													   astate->temp_cxt);
4671 
4672 		MemoryContextSwitchTo(oldcontext);
4673 	}
4674 }
4675 
4676 /*
4677  * Import a foreign schema
4678  */
4679 static List *
postgresImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)4680 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
4681 {
4682 	List	   *commands = NIL;
4683 	bool		import_collate = true;
4684 	bool		import_default = false;
4685 	bool		import_generated = true;
4686 	bool		import_not_null = true;
4687 	ForeignServer *server;
4688 	UserMapping *mapping;
4689 	PGconn	   *conn;
4690 	StringInfoData buf;
4691 	PGresult   *volatile res = NULL;
4692 	int			numrows,
4693 				i;
4694 	ListCell   *lc;
4695 
4696 	/* Parse statement options */
4697 	foreach(lc, stmt->options)
4698 	{
4699 		DefElem    *def = (DefElem *) lfirst(lc);
4700 
4701 		if (strcmp(def->defname, "import_collate") == 0)
4702 			import_collate = defGetBoolean(def);
4703 		else if (strcmp(def->defname, "import_default") == 0)
4704 			import_default = defGetBoolean(def);
4705 		else if (strcmp(def->defname, "import_generated") == 0)
4706 			import_generated = defGetBoolean(def);
4707 		else if (strcmp(def->defname, "import_not_null") == 0)
4708 			import_not_null = defGetBoolean(def);
4709 		else
4710 			ereport(ERROR,
4711 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
4712 					 errmsg("invalid option \"%s\"", def->defname)));
4713 	}
4714 
4715 	/*
4716 	 * Get connection to the foreign server.  Connection manager will
4717 	 * establish new connection if necessary.
4718 	 */
4719 	server = GetForeignServer(serverOid);
4720 	mapping = GetUserMapping(GetUserId(), server->serverid);
4721 	conn = GetConnection(mapping, false);
4722 
4723 	/* Don't attempt to import collation if remote server hasn't got it */
4724 	if (PQserverVersion(conn) < 90100)
4725 		import_collate = false;
4726 
4727 	/* Create workspace for strings */
4728 	initStringInfo(&buf);
4729 
4730 	/* In what follows, do not risk leaking any PGresults. */
4731 	PG_TRY();
4732 	{
4733 		/* Check that the schema really exists */
4734 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
4735 		deparseStringLiteral(&buf, stmt->remote_schema);
4736 
4737 		res = pgfdw_exec_query(conn, buf.data);
4738 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
4739 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
4740 
4741 		if (PQntuples(res) != 1)
4742 			ereport(ERROR,
4743 					(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
4744 					 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
4745 							stmt->remote_schema, server->servername)));
4746 
4747 		PQclear(res);
4748 		res = NULL;
4749 		resetStringInfo(&buf);
4750 
4751 		/*
4752 		 * Fetch all table data from this schema, possibly restricted by
4753 		 * EXCEPT or LIMIT TO.  (We don't actually need to pay any attention
4754 		 * to EXCEPT/LIMIT TO here, because the core code will filter the
4755 		 * statements we return according to those lists anyway.  But it
4756 		 * should save a few cycles to not process excluded tables in the
4757 		 * first place.)
4758 		 *
4759 		 * Ignore table data for partitions and only include the definitions
4760 		 * of the root partitioned tables to allow access to the complete
4761 		 * remote data set locally in the schema imported.
4762 		 *
4763 		 * Note: because we run the connection with search_path restricted to
4764 		 * pg_catalog, the format_type() and pg_get_expr() outputs will always
4765 		 * include a schema name for types/functions in other schemas, which
4766 		 * is what we want.
4767 		 */
4768 		appendStringInfoString(&buf,
4769 							   "SELECT relname, "
4770 							   "  attname, "
4771 							   "  format_type(atttypid, atttypmod), "
4772 							   "  attnotnull, ");
4773 
4774 		/* Generated columns are supported since Postgres 12 */
4775 		if (PQserverVersion(conn) >= 120000)
4776 			appendStringInfoString(&buf,
4777 								   "  attgenerated, "
4778 								   "  pg_get_expr(adbin, adrelid), ");
4779 		else
4780 			appendStringInfoString(&buf,
4781 								   "  NULL, "
4782 								   "  pg_get_expr(adbin, adrelid), ");
4783 
4784 		if (import_collate)
4785 			appendStringInfoString(&buf,
4786 								   "  collname, "
4787 								   "  collnsp.nspname "
4788 								   "FROM pg_class c "
4789 								   "  JOIN pg_namespace n ON "
4790 								   "    relnamespace = n.oid "
4791 								   "  LEFT JOIN pg_attribute a ON "
4792 								   "    attrelid = c.oid AND attnum > 0 "
4793 								   "      AND NOT attisdropped "
4794 								   "  LEFT JOIN pg_attrdef ad ON "
4795 								   "    adrelid = c.oid AND adnum = attnum "
4796 								   "  LEFT JOIN pg_collation coll ON "
4797 								   "    coll.oid = attcollation "
4798 								   "  LEFT JOIN pg_namespace collnsp ON "
4799 								   "    collnsp.oid = collnamespace ");
4800 		else
4801 			appendStringInfoString(&buf,
4802 								   "  NULL, NULL "
4803 								   "FROM pg_class c "
4804 								   "  JOIN pg_namespace n ON "
4805 								   "    relnamespace = n.oid "
4806 								   "  LEFT JOIN pg_attribute a ON "
4807 								   "    attrelid = c.oid AND attnum > 0 "
4808 								   "      AND NOT attisdropped "
4809 								   "  LEFT JOIN pg_attrdef ad ON "
4810 								   "    adrelid = c.oid AND adnum = attnum ");
4811 
4812 		appendStringInfoString(&buf,
4813 							   "WHERE c.relkind IN ("
4814 							   CppAsString2(RELKIND_RELATION) ","
4815 							   CppAsString2(RELKIND_VIEW) ","
4816 							   CppAsString2(RELKIND_FOREIGN_TABLE) ","
4817 							   CppAsString2(RELKIND_MATVIEW) ","
4818 							   CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
4819 							   "  AND n.nspname = ");
4820 		deparseStringLiteral(&buf, stmt->remote_schema);
4821 
4822 		/* Partitions are supported since Postgres 10 */
4823 		if (PQserverVersion(conn) >= 100000)
4824 			appendStringInfoString(&buf, " AND NOT c.relispartition ");
4825 
4826 		/* Apply restrictions for LIMIT TO and EXCEPT */
4827 		if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
4828 			stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4829 		{
4830 			bool		first_item = true;
4831 
4832 			appendStringInfoString(&buf, " AND c.relname ");
4833 			if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4834 				appendStringInfoString(&buf, "NOT ");
4835 			appendStringInfoString(&buf, "IN (");
4836 
4837 			/* Append list of table names within IN clause */
4838 			foreach(lc, stmt->table_list)
4839 			{
4840 				RangeVar   *rv = (RangeVar *) lfirst(lc);
4841 
4842 				if (first_item)
4843 					first_item = false;
4844 				else
4845 					appendStringInfoString(&buf, ", ");
4846 				deparseStringLiteral(&buf, rv->relname);
4847 			}
4848 			appendStringInfoChar(&buf, ')');
4849 		}
4850 
4851 		/* Append ORDER BY at the end of query to ensure output ordering */
4852 		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
4853 
4854 		/* Fetch the data */
4855 		res = pgfdw_exec_query(conn, buf.data);
4856 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
4857 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
4858 
4859 		/* Process results */
4860 		numrows = PQntuples(res);
4861 		/* note: incrementation of i happens in inner loop's while() test */
4862 		for (i = 0; i < numrows;)
4863 		{
4864 			char	   *tablename = PQgetvalue(res, i, 0);
4865 			bool		first_item = true;
4866 
4867 			resetStringInfo(&buf);
4868 			appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
4869 							 quote_identifier(tablename));
4870 
4871 			/* Scan all rows for this table */
4872 			do
4873 			{
4874 				char	   *attname;
4875 				char	   *typename;
4876 				char	   *attnotnull;
4877 				char	   *attgenerated;
4878 				char	   *attdefault;
4879 				char	   *collname;
4880 				char	   *collnamespace;
4881 
4882 				/* If table has no columns, we'll see nulls here */
4883 				if (PQgetisnull(res, i, 1))
4884 					continue;
4885 
4886 				attname = PQgetvalue(res, i, 1);
4887 				typename = PQgetvalue(res, i, 2);
4888 				attnotnull = PQgetvalue(res, i, 3);
4889 				attgenerated = PQgetisnull(res, i, 4) ? (char *) NULL :
4890 					PQgetvalue(res, i, 4);
4891 				attdefault = PQgetisnull(res, i, 5) ? (char *) NULL :
4892 					PQgetvalue(res, i, 5);
4893 				collname = PQgetisnull(res, i, 6) ? (char *) NULL :
4894 					PQgetvalue(res, i, 6);
4895 				collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL :
4896 					PQgetvalue(res, i, 7);
4897 
4898 				if (first_item)
4899 					first_item = false;
4900 				else
4901 					appendStringInfoString(&buf, ",\n");
4902 
4903 				/* Print column name and type */
4904 				appendStringInfo(&buf, "  %s %s",
4905 								 quote_identifier(attname),
4906 								 typename);
4907 
4908 				/*
4909 				 * Add column_name option so that renaming the foreign table's
4910 				 * column doesn't break the association to the underlying
4911 				 * column.
4912 				 */
4913 				appendStringInfoString(&buf, " OPTIONS (column_name ");
4914 				deparseStringLiteral(&buf, attname);
4915 				appendStringInfoChar(&buf, ')');
4916 
4917 				/* Add COLLATE if needed */
4918 				if (import_collate && collname != NULL && collnamespace != NULL)
4919 					appendStringInfo(&buf, " COLLATE %s.%s",
4920 									 quote_identifier(collnamespace),
4921 									 quote_identifier(collname));
4922 
4923 				/* Add DEFAULT if needed */
4924 				if (import_default && attdefault != NULL &&
4925 					(!attgenerated || !attgenerated[0]))
4926 					appendStringInfo(&buf, " DEFAULT %s", attdefault);
4927 
4928 				/* Add GENERATED if needed */
4929 				if (import_generated && attgenerated != NULL &&
4930 					attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
4931 				{
4932 					Assert(attdefault != NULL);
4933 					appendStringInfo(&buf,
4934 									 " GENERATED ALWAYS AS (%s) STORED",
4935 									 attdefault);
4936 				}
4937 
4938 				/* Add NOT NULL if needed */
4939 				if (import_not_null && attnotnull[0] == 't')
4940 					appendStringInfoString(&buf, " NOT NULL");
4941 			}
4942 			while (++i < numrows &&
4943 				   strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4944 
4945 			/*
4946 			 * Add server name and table-level options.  We specify remote
4947 			 * schema and table name as options (the latter to ensure that
4948 			 * renaming the foreign table doesn't break the association).
4949 			 */
4950 			appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4951 							 quote_identifier(server->servername));
4952 
4953 			appendStringInfoString(&buf, "schema_name ");
4954 			deparseStringLiteral(&buf, stmt->remote_schema);
4955 			appendStringInfoString(&buf, ", table_name ");
4956 			deparseStringLiteral(&buf, tablename);
4957 
4958 			appendStringInfoString(&buf, ");");
4959 
4960 			commands = lappend(commands, pstrdup(buf.data));
4961 		}
4962 
4963 		/* Clean up */
4964 		PQclear(res);
4965 		res = NULL;
4966 	}
4967 	PG_CATCH();
4968 	{
4969 		if (res)
4970 			PQclear(res);
4971 		PG_RE_THROW();
4972 	}
4973 	PG_END_TRY();
4974 
4975 	ReleaseConnection(conn);
4976 
4977 	return commands;
4978 }
4979 
4980 /*
4981  * Assess whether the join between inner and outer relations can be pushed down
4982  * to the foreign server. As a side effect, save information we obtain in this
4983  * function to PgFdwRelationInfo passed in.
4984  */
4985 static bool
foreign_join_ok(PlannerInfo * root,RelOptInfo * joinrel,JoinType jointype,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinPathExtraData * extra)4986 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
4987 				RelOptInfo *outerrel, RelOptInfo *innerrel,
4988 				JoinPathExtraData *extra)
4989 {
4990 	PgFdwRelationInfo *fpinfo;
4991 	PgFdwRelationInfo *fpinfo_o;
4992 	PgFdwRelationInfo *fpinfo_i;
4993 	ListCell   *lc;
4994 	List	   *joinclauses;
4995 
4996 	/*
4997 	 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4998 	 * Constructing queries representing SEMI and ANTI joins is hard, hence
4999 	 * not considered right now.
5000 	 */
5001 	if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
5002 		jointype != JOIN_RIGHT && jointype != JOIN_FULL)
5003 		return false;
5004 
5005 	/*
5006 	 * If either of the joining relations is marked as unsafe to pushdown, the
5007 	 * join can not be pushed down.
5008 	 */
5009 	fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
5010 	fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
5011 	fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
5012 	if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
5013 		!fpinfo_i || !fpinfo_i->pushdown_safe)
5014 		return false;
5015 
5016 	/*
5017 	 * If joining relations have local conditions, those conditions are
5018 	 * required to be applied before joining the relations. Hence the join can
5019 	 * not be pushed down.
5020 	 */
5021 	if (fpinfo_o->local_conds || fpinfo_i->local_conds)
5022 		return false;
5023 
5024 	/*
5025 	 * Merge FDW options.  We might be tempted to do this after we have deemed
5026 	 * the foreign join to be OK.  But we must do this beforehand so that we
5027 	 * know which quals can be evaluated on the foreign server, which might
5028 	 * depend on shippable_extensions.
5029 	 */
5030 	fpinfo->server = fpinfo_o->server;
5031 	merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
5032 
5033 	/*
5034 	 * Separate restrict list into join quals and pushed-down (other) quals.
5035 	 *
5036 	 * Join quals belonging to an outer join must all be shippable, else we
5037 	 * cannot execute the join remotely.  Add such quals to 'joinclauses'.
5038 	 *
5039 	 * Add other quals to fpinfo->remote_conds if they are shippable, else to
5040 	 * fpinfo->local_conds.  In an inner join it's okay to execute conditions
5041 	 * either locally or remotely; the same is true for pushed-down conditions
5042 	 * at an outer join.
5043 	 *
5044 	 * Note we might return failure after having already scribbled on
5045 	 * fpinfo->remote_conds and fpinfo->local_conds.  That's okay because we
5046 	 * won't consult those lists again if we deem the join unshippable.
5047 	 */
5048 	joinclauses = NIL;
5049 	foreach(lc, extra->restrictlist)
5050 	{
5051 		RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5052 		bool		is_remote_clause = is_foreign_expr(root, joinrel,
5053 													   rinfo->clause);
5054 
5055 		if (IS_OUTER_JOIN(jointype) &&
5056 			!RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
5057 		{
5058 			if (!is_remote_clause)
5059 				return false;
5060 			joinclauses = lappend(joinclauses, rinfo);
5061 		}
5062 		else
5063 		{
5064 			if (is_remote_clause)
5065 				fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5066 			else
5067 				fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5068 		}
5069 	}
5070 
5071 	/*
5072 	 * deparseExplicitTargetList() isn't smart enough to handle anything other
5073 	 * than a Var.  In particular, if there's some PlaceHolderVar that would
5074 	 * need to be evaluated within this join tree (because there's an upper
5075 	 * reference to a quantity that may go to NULL as a result of an outer
5076 	 * join), then we can't try to push the join down because we'll fail when
5077 	 * we get to deparseExplicitTargetList().  However, a PlaceHolderVar that
5078 	 * needs to be evaluated *at the top* of this join tree is OK, because we
5079 	 * can do that locally after fetching the results from the remote side.
5080 	 */
5081 	foreach(lc, root->placeholder_list)
5082 	{
5083 		PlaceHolderInfo *phinfo = lfirst(lc);
5084 		Relids		relids;
5085 
5086 		/* PlaceHolderInfo refers to parent relids, not child relids. */
5087 		relids = IS_OTHER_REL(joinrel) ?
5088 			joinrel->top_parent_relids : joinrel->relids;
5089 
5090 		if (bms_is_subset(phinfo->ph_eval_at, relids) &&
5091 			bms_nonempty_difference(relids, phinfo->ph_eval_at))
5092 			return false;
5093 	}
5094 
5095 	/* Save the join clauses, for later use. */
5096 	fpinfo->joinclauses = joinclauses;
5097 
5098 	fpinfo->outerrel = outerrel;
5099 	fpinfo->innerrel = innerrel;
5100 	fpinfo->jointype = jointype;
5101 
5102 	/*
5103 	 * By default, both the input relations are not required to be deparsed as
5104 	 * subqueries, but there might be some relations covered by the input
5105 	 * relations that are required to be deparsed as subqueries, so save the
5106 	 * relids of those relations for later use by the deparser.
5107 	 */
5108 	fpinfo->make_outerrel_subquery = false;
5109 	fpinfo->make_innerrel_subquery = false;
5110 	Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
5111 	Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
5112 	fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
5113 											fpinfo_i->lower_subquery_rels);
5114 
5115 	/*
5116 	 * Pull the other remote conditions from the joining relations into join
5117 	 * clauses or other remote clauses (remote_conds) of this relation
5118 	 * wherever possible. This avoids building subqueries at every join step.
5119 	 *
5120 	 * For an inner join, clauses from both the relations are added to the
5121 	 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
5122 	 * the outer side are added to remote_conds since those can be evaluated
5123 	 * after the join is evaluated. The clauses from inner side are added to
5124 	 * the joinclauses, since they need to be evaluated while constructing the
5125 	 * join.
5126 	 *
5127 	 * For a FULL OUTER JOIN, the other clauses from either relation can not
5128 	 * be added to the joinclauses or remote_conds, since each relation acts
5129 	 * as an outer relation for the other.
5130 	 *
5131 	 * The joining sides can not have local conditions, thus no need to test
5132 	 * shippability of the clauses being pulled up.
5133 	 */
5134 	switch (jointype)
5135 	{
5136 		case JOIN_INNER:
5137 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5138 											   list_copy(fpinfo_i->remote_conds));
5139 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5140 											   list_copy(fpinfo_o->remote_conds));
5141 			break;
5142 
5143 		case JOIN_LEFT:
5144 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5145 											  list_copy(fpinfo_i->remote_conds));
5146 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5147 											   list_copy(fpinfo_o->remote_conds));
5148 			break;
5149 
5150 		case JOIN_RIGHT:
5151 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5152 											  list_copy(fpinfo_o->remote_conds));
5153 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5154 											   list_copy(fpinfo_i->remote_conds));
5155 			break;
5156 
5157 		case JOIN_FULL:
5158 
5159 			/*
5160 			 * In this case, if any of the input relations has conditions, we
5161 			 * need to deparse that relation as a subquery so that the
5162 			 * conditions can be evaluated before the join.  Remember it in
5163 			 * the fpinfo of this relation so that the deparser can take
5164 			 * appropriate action.  Also, save the relids of base relations
5165 			 * covered by that relation for later use by the deparser.
5166 			 */
5167 			if (fpinfo_o->remote_conds)
5168 			{
5169 				fpinfo->make_outerrel_subquery = true;
5170 				fpinfo->lower_subquery_rels =
5171 					bms_add_members(fpinfo->lower_subquery_rels,
5172 									outerrel->relids);
5173 			}
5174 			if (fpinfo_i->remote_conds)
5175 			{
5176 				fpinfo->make_innerrel_subquery = true;
5177 				fpinfo->lower_subquery_rels =
5178 					bms_add_members(fpinfo->lower_subquery_rels,
5179 									innerrel->relids);
5180 			}
5181 			break;
5182 
5183 		default:
5184 			/* Should not happen, we have just checked this above */
5185 			elog(ERROR, "unsupported join type %d", jointype);
5186 	}
5187 
5188 	/*
5189 	 * For an inner join, all restrictions can be treated alike. Treating the
5190 	 * pushed down conditions as join conditions allows a top level full outer
5191 	 * join to be deparsed without requiring subqueries.
5192 	 */
5193 	if (jointype == JOIN_INNER)
5194 	{
5195 		Assert(!fpinfo->joinclauses);
5196 		fpinfo->joinclauses = fpinfo->remote_conds;
5197 		fpinfo->remote_conds = NIL;
5198 	}
5199 
5200 	/* Mark that this join can be pushed down safely */
5201 	fpinfo->pushdown_safe = true;
5202 
5203 	/* Get user mapping */
5204 	if (fpinfo->use_remote_estimate)
5205 	{
5206 		if (fpinfo_o->use_remote_estimate)
5207 			fpinfo->user = fpinfo_o->user;
5208 		else
5209 			fpinfo->user = fpinfo_i->user;
5210 	}
5211 	else
5212 		fpinfo->user = NULL;
5213 
5214 	/*
5215 	 * Set # of retrieved rows and cached relation costs to some negative
5216 	 * value, so that we can detect when they are set to some sensible values,
5217 	 * during one (usually the first) of the calls to estimate_path_cost_size.
5218 	 */
5219 	fpinfo->retrieved_rows = -1;
5220 	fpinfo->rel_startup_cost = -1;
5221 	fpinfo->rel_total_cost = -1;
5222 
5223 	/*
5224 	 * Set the string describing this join relation to be used in EXPLAIN
5225 	 * output of corresponding ForeignScan.
5226 	 */
5227 	fpinfo->relation_name = makeStringInfo();
5228 	appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
5229 					 fpinfo_o->relation_name->data,
5230 					 get_jointype_name(fpinfo->jointype),
5231 					 fpinfo_i->relation_name->data);
5232 
5233 	/*
5234 	 * Set the relation index.  This is defined as the position of this
5235 	 * joinrel in the join_rel_list list plus the length of the rtable list.
5236 	 * Note that since this joinrel is at the end of the join_rel_list list
5237 	 * when we are called, we can get the position by list_length.
5238 	 */
5239 	Assert(fpinfo->relation_index == 0);	/* shouldn't be set yet */
5240 	fpinfo->relation_index =
5241 		list_length(root->parse->rtable) + list_length(root->join_rel_list);
5242 
5243 	return true;
5244 }
5245 
5246 static void
add_paths_with_pathkeys_for_rel(PlannerInfo * root,RelOptInfo * rel,Path * epq_path)5247 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
5248 								Path *epq_path)
5249 {
5250 	List	   *useful_pathkeys_list = NIL; /* List of all pathkeys */
5251 	ListCell   *lc;
5252 
5253 	useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
5254 
5255 	/* Create one path for each set of pathkeys we found above. */
5256 	foreach(lc, useful_pathkeys_list)
5257 	{
5258 		double		rows;
5259 		int			width;
5260 		Cost		startup_cost;
5261 		Cost		total_cost;
5262 		List	   *useful_pathkeys = lfirst(lc);
5263 		Path	   *sorted_epq_path;
5264 
5265 		estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL,
5266 								&rows, &width, &startup_cost, &total_cost);
5267 
5268 		/*
5269 		 * The EPQ path must be at least as well sorted as the path itself, in
5270 		 * case it gets used as input to a mergejoin.
5271 		 */
5272 		sorted_epq_path = epq_path;
5273 		if (sorted_epq_path != NULL &&
5274 			!pathkeys_contained_in(useful_pathkeys,
5275 								   sorted_epq_path->pathkeys))
5276 			sorted_epq_path = (Path *)
5277 				create_sort_path(root,
5278 								 rel,
5279 								 sorted_epq_path,
5280 								 useful_pathkeys,
5281 								 -1.0);
5282 
5283 		if (IS_SIMPLE_REL(rel))
5284 			add_path(rel, (Path *)
5285 					 create_foreignscan_path(root, rel,
5286 											 NULL,
5287 											 rows,
5288 											 startup_cost,
5289 											 total_cost,
5290 											 useful_pathkeys,
5291 											 rel->lateral_relids,
5292 											 sorted_epq_path,
5293 											 NIL));
5294 		else
5295 			add_path(rel, (Path *)
5296 					 create_foreign_join_path(root, rel,
5297 											  NULL,
5298 											  rows,
5299 											  startup_cost,
5300 											  total_cost,
5301 											  useful_pathkeys,
5302 											  rel->lateral_relids,
5303 											  sorted_epq_path,
5304 											  NIL));
5305 	}
5306 }
5307 
5308 /*
5309  * Parse options from foreign server and apply them to fpinfo.
5310  *
5311  * New options might also require tweaking merge_fdw_options().
5312  */
5313 static void
apply_server_options(PgFdwRelationInfo * fpinfo)5314 apply_server_options(PgFdwRelationInfo *fpinfo)
5315 {
5316 	ListCell   *lc;
5317 
5318 	foreach(lc, fpinfo->server->options)
5319 	{
5320 		DefElem    *def = (DefElem *) lfirst(lc);
5321 
5322 		if (strcmp(def->defname, "use_remote_estimate") == 0)
5323 			fpinfo->use_remote_estimate = defGetBoolean(def);
5324 		else if (strcmp(def->defname, "fdw_startup_cost") == 0)
5325 			fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
5326 		else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
5327 			fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
5328 		else if (strcmp(def->defname, "extensions") == 0)
5329 			fpinfo->shippable_extensions =
5330 				ExtractExtensionList(defGetString(def), false);
5331 		else if (strcmp(def->defname, "fetch_size") == 0)
5332 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5333 	}
5334 }
5335 
5336 /*
5337  * Parse options from foreign table and apply them to fpinfo.
5338  *
5339  * New options might also require tweaking merge_fdw_options().
5340  */
5341 static void
apply_table_options(PgFdwRelationInfo * fpinfo)5342 apply_table_options(PgFdwRelationInfo *fpinfo)
5343 {
5344 	ListCell   *lc;
5345 
5346 	foreach(lc, fpinfo->table->options)
5347 	{
5348 		DefElem    *def = (DefElem *) lfirst(lc);
5349 
5350 		if (strcmp(def->defname, "use_remote_estimate") == 0)
5351 			fpinfo->use_remote_estimate = defGetBoolean(def);
5352 		else if (strcmp(def->defname, "fetch_size") == 0)
5353 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5354 	}
5355 }
5356 
5357 /*
5358  * Merge FDW options from input relations into a new set of options for a join
5359  * or an upper rel.
5360  *
5361  * For a join relation, FDW-specific information about the inner and outer
5362  * relations is provided using fpinfo_i and fpinfo_o.  For an upper relation,
5363  * fpinfo_o provides the information for the input relation; fpinfo_i is
5364  * expected to NULL.
5365  */
5366 static void
merge_fdw_options(PgFdwRelationInfo * fpinfo,const PgFdwRelationInfo * fpinfo_o,const PgFdwRelationInfo * fpinfo_i)5367 merge_fdw_options(PgFdwRelationInfo *fpinfo,
5368 				  const PgFdwRelationInfo *fpinfo_o,
5369 				  const PgFdwRelationInfo *fpinfo_i)
5370 {
5371 	/* We must always have fpinfo_o. */
5372 	Assert(fpinfo_o);
5373 
5374 	/* fpinfo_i may be NULL, but if present the servers must both match. */
5375 	Assert(!fpinfo_i ||
5376 		   fpinfo_i->server->serverid == fpinfo_o->server->serverid);
5377 
5378 	/*
5379 	 * Copy the server specific FDW options.  (For a join, both relations come
5380 	 * from the same server, so the server options should have the same value
5381 	 * for both relations.)
5382 	 */
5383 	fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
5384 	fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
5385 	fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
5386 	fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
5387 	fpinfo->fetch_size = fpinfo_o->fetch_size;
5388 
5389 	/* Merge the table level options from either side of the join. */
5390 	if (fpinfo_i)
5391 	{
5392 		/*
5393 		 * We'll prefer to use remote estimates for this join if any table
5394 		 * from either side of the join is using remote estimates.  This is
5395 		 * most likely going to be preferred since they're already willing to
5396 		 * pay the price of a round trip to get the remote EXPLAIN.  In any
5397 		 * case it's not entirely clear how we might otherwise handle this
5398 		 * best.
5399 		 */
5400 		fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
5401 			fpinfo_i->use_remote_estimate;
5402 
5403 		/*
5404 		 * Set fetch size to maximum of the joining sides, since we are
5405 		 * expecting the rows returned by the join to be proportional to the
5406 		 * relation sizes.
5407 		 */
5408 		fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
5409 	}
5410 }
5411 
5412 /*
5413  * postgresGetForeignJoinPaths
5414  *		Add possible ForeignPath to joinrel, if join is safe to push down.
5415  */
5416 static void
postgresGetForeignJoinPaths(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)5417 postgresGetForeignJoinPaths(PlannerInfo *root,
5418 							RelOptInfo *joinrel,
5419 							RelOptInfo *outerrel,
5420 							RelOptInfo *innerrel,
5421 							JoinType jointype,
5422 							JoinPathExtraData *extra)
5423 {
5424 	PgFdwRelationInfo *fpinfo;
5425 	ForeignPath *joinpath;
5426 	double		rows;
5427 	int			width;
5428 	Cost		startup_cost;
5429 	Cost		total_cost;
5430 	Path	   *epq_path;		/* Path to create plan to be executed when
5431 								 * EvalPlanQual gets triggered. */
5432 
5433 	/*
5434 	 * Skip if this join combination has been considered already.
5435 	 */
5436 	if (joinrel->fdw_private)
5437 		return;
5438 
5439 	/*
5440 	 * This code does not work for joins with lateral references, since those
5441 	 * must have parameterized paths, which we don't generate yet.
5442 	 */
5443 	if (!bms_is_empty(joinrel->lateral_relids))
5444 		return;
5445 
5446 	/*
5447 	 * Create unfinished PgFdwRelationInfo entry which is used to indicate
5448 	 * that the join relation is already considered, so that we won't waste
5449 	 * time in judging safety of join pushdown and adding the same paths again
5450 	 * if found safe. Once we know that this join can be pushed down, we fill
5451 	 * the entry.
5452 	 */
5453 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5454 	fpinfo->pushdown_safe = false;
5455 	joinrel->fdw_private = fpinfo;
5456 	/* attrs_used is only for base relations. */
5457 	fpinfo->attrs_used = NULL;
5458 
5459 	/*
5460 	 * If there is a possibility that EvalPlanQual will be executed, we need
5461 	 * to be able to reconstruct the row using scans of the base relations.
5462 	 * GetExistingLocalJoinPath will find a suitable path for this purpose in
5463 	 * the path list of the joinrel, if one exists.  We must be careful to
5464 	 * call it before adding any ForeignPath, since the ForeignPath might
5465 	 * dominate the only suitable local path available.  We also do it before
5466 	 * calling foreign_join_ok(), since that function updates fpinfo and marks
5467 	 * it as pushable if the join is found to be pushable.
5468 	 */
5469 	if (root->parse->commandType == CMD_DELETE ||
5470 		root->parse->commandType == CMD_UPDATE ||
5471 		root->rowMarks)
5472 	{
5473 		epq_path = GetExistingLocalJoinPath(joinrel);
5474 		if (!epq_path)
5475 		{
5476 			elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
5477 			return;
5478 		}
5479 	}
5480 	else
5481 		epq_path = NULL;
5482 
5483 	if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
5484 	{
5485 		/* Free path required for EPQ if we copied one; we don't need it now */
5486 		if (epq_path)
5487 			pfree(epq_path);
5488 		return;
5489 	}
5490 
5491 	/*
5492 	 * Compute the selectivity and cost of the local_conds, so we don't have
5493 	 * to do it over again for each path. The best we can do for these
5494 	 * conditions is to estimate selectivity on the basis of local statistics.
5495 	 * The local conditions are applied after the join has been computed on
5496 	 * the remote side like quals in WHERE clause, so pass jointype as
5497 	 * JOIN_INNER.
5498 	 */
5499 	fpinfo->local_conds_sel = clauselist_selectivity(root,
5500 													 fpinfo->local_conds,
5501 													 0,
5502 													 JOIN_INNER,
5503 													 NULL);
5504 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5505 
5506 	/*
5507 	 * If we are going to estimate costs locally, estimate the join clause
5508 	 * selectivity here while we have special join info.
5509 	 */
5510 	if (!fpinfo->use_remote_estimate)
5511 		fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
5512 														0, fpinfo->jointype,
5513 														extra->sjinfo);
5514 
5515 	/* Estimate costs for bare join relation */
5516 	estimate_path_cost_size(root, joinrel, NIL, NIL, NULL,
5517 							&rows, &width, &startup_cost, &total_cost);
5518 	/* Now update this information in the joinrel */
5519 	joinrel->rows = rows;
5520 	joinrel->reltarget->width = width;
5521 	fpinfo->rows = rows;
5522 	fpinfo->width = width;
5523 	fpinfo->startup_cost = startup_cost;
5524 	fpinfo->total_cost = total_cost;
5525 
5526 	/*
5527 	 * Create a new join path and add it to the joinrel which represents a
5528 	 * join between foreign tables.
5529 	 */
5530 	joinpath = create_foreign_join_path(root,
5531 										joinrel,
5532 										NULL,	/* default pathtarget */
5533 										rows,
5534 										startup_cost,
5535 										total_cost,
5536 										NIL,	/* no pathkeys */
5537 										joinrel->lateral_relids,
5538 										epq_path,
5539 										NIL);	/* no fdw_private */
5540 
5541 	/* Add generated path into joinrel by add_path(). */
5542 	add_path(joinrel, (Path *) joinpath);
5543 
5544 	/* Consider pathkeys for the join relation */
5545 	add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
5546 
5547 	/* XXX Consider parameterized paths for the join relation */
5548 }
5549 
5550 /*
5551  * Assess whether the aggregation, grouping and having operations can be pushed
5552  * down to the foreign server.  As a side effect, save information we obtain in
5553  * this function to PgFdwRelationInfo of the input relation.
5554  */
5555 static bool
foreign_grouping_ok(PlannerInfo * root,RelOptInfo * grouped_rel,Node * havingQual)5556 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
5557 					Node *havingQual)
5558 {
5559 	Query	   *query = root->parse;
5560 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
5561 	PathTarget *grouping_target = grouped_rel->reltarget;
5562 	PgFdwRelationInfo *ofpinfo;
5563 	ListCell   *lc;
5564 	int			i;
5565 	List	   *tlist = NIL;
5566 
5567 	/* We currently don't support pushing Grouping Sets. */
5568 	if (query->groupingSets)
5569 		return false;
5570 
5571 	/* Get the fpinfo of the underlying scan relation. */
5572 	ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
5573 
5574 	/*
5575 	 * If underlying scan relation has any local conditions, those conditions
5576 	 * are required to be applied before performing aggregation.  Hence the
5577 	 * aggregate cannot be pushed down.
5578 	 */
5579 	if (ofpinfo->local_conds)
5580 		return false;
5581 
5582 	/*
5583 	 * Examine grouping expressions, as well as other expressions we'd need to
5584 	 * compute, and check whether they are safe to push down to the foreign
5585 	 * server.  All GROUP BY expressions will be part of the grouping target
5586 	 * and thus there is no need to search for them separately.  Add grouping
5587 	 * expressions into target list which will be passed to foreign server.
5588 	 *
5589 	 * A tricky fine point is that we must not put any expression into the
5590 	 * target list that is just a foreign param (that is, something that
5591 	 * deparse.c would conclude has to be sent to the foreign server).  If we
5592 	 * do, the expression will also appear in the fdw_exprs list of the plan
5593 	 * node, and setrefs.c will get confused and decide that the fdw_exprs
5594 	 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
5595 	 * a broken plan.  Somewhat oddly, it's OK if the expression contains such
5596 	 * a node, as long as it's not at top level; then no match is possible.
5597 	 */
5598 	i = 0;
5599 	foreach(lc, grouping_target->exprs)
5600 	{
5601 		Expr	   *expr = (Expr *) lfirst(lc);
5602 		Index		sgref = get_pathtarget_sortgroupref(grouping_target, i);
5603 		ListCell   *l;
5604 
5605 		/* Check whether this expression is part of GROUP BY clause */
5606 		if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
5607 		{
5608 			TargetEntry *tle;
5609 
5610 			/*
5611 			 * If any GROUP BY expression is not shippable, then we cannot
5612 			 * push down aggregation to the foreign server.
5613 			 */
5614 			if (!is_foreign_expr(root, grouped_rel, expr))
5615 				return false;
5616 
5617 			/*
5618 			 * If it would be a foreign param, we can't put it into the tlist,
5619 			 * so we have to fail.
5620 			 */
5621 			if (is_foreign_param(root, grouped_rel, expr))
5622 				return false;
5623 
5624 			/*
5625 			 * Pushable, so add to tlist.  We need to create a TLE for this
5626 			 * expression and apply the sortgroupref to it.  We cannot use
5627 			 * add_to_flat_tlist() here because that avoids making duplicate
5628 			 * entries in the tlist.  If there are duplicate entries with
5629 			 * distinct sortgrouprefs, we have to duplicate that situation in
5630 			 * the output tlist.
5631 			 */
5632 			tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
5633 			tle->ressortgroupref = sgref;
5634 			tlist = lappend(tlist, tle);
5635 		}
5636 		else
5637 		{
5638 			/*
5639 			 * Non-grouping expression we need to compute.  Can we ship it
5640 			 * as-is to the foreign server?
5641 			 */
5642 			if (is_foreign_expr(root, grouped_rel, expr) &&
5643 				!is_foreign_param(root, grouped_rel, expr))
5644 			{
5645 				/* Yes, so add to tlist as-is; OK to suppress duplicates */
5646 				tlist = add_to_flat_tlist(tlist, list_make1(expr));
5647 			}
5648 			else
5649 			{
5650 				/* Not pushable as a whole; extract its Vars and aggregates */
5651 				List	   *aggvars;
5652 
5653 				aggvars = pull_var_clause((Node *) expr,
5654 										  PVC_INCLUDE_AGGREGATES);
5655 
5656 				/*
5657 				 * If any aggregate expression is not shippable, then we
5658 				 * cannot push down aggregation to the foreign server.  (We
5659 				 * don't have to check is_foreign_param, since that certainly
5660 				 * won't return true for any such expression.)
5661 				 */
5662 				if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
5663 					return false;
5664 
5665 				/*
5666 				 * Add aggregates, if any, into the targetlist.  Plain Vars
5667 				 * outside an aggregate can be ignored, because they should be
5668 				 * either same as some GROUP BY column or part of some GROUP
5669 				 * BY expression.  In either case, they are already part of
5670 				 * the targetlist and thus no need to add them again.  In fact
5671 				 * including plain Vars in the tlist when they do not match a
5672 				 * GROUP BY column would cause the foreign server to complain
5673 				 * that the shipped query is invalid.
5674 				 */
5675 				foreach(l, aggvars)
5676 				{
5677 					Expr	   *expr = (Expr *) lfirst(l);
5678 
5679 					if (IsA(expr, Aggref))
5680 						tlist = add_to_flat_tlist(tlist, list_make1(expr));
5681 				}
5682 			}
5683 		}
5684 
5685 		i++;
5686 	}
5687 
5688 	/*
5689 	 * Classify the pushable and non-pushable HAVING clauses and save them in
5690 	 * remote_conds and local_conds of the grouped rel's fpinfo.
5691 	 */
5692 	if (havingQual)
5693 	{
5694 		ListCell   *lc;
5695 
5696 		foreach(lc, (List *) havingQual)
5697 		{
5698 			Expr	   *expr = (Expr *) lfirst(lc);
5699 			RestrictInfo *rinfo;
5700 
5701 			/*
5702 			 * Currently, the core code doesn't wrap havingQuals in
5703 			 * RestrictInfos, so we must make our own.
5704 			 */
5705 			Assert(!IsA(expr, RestrictInfo));
5706 			rinfo = make_restrictinfo(root,
5707 									  expr,
5708 									  true,
5709 									  false,
5710 									  false,
5711 									  root->qual_security_level,
5712 									  grouped_rel->relids,
5713 									  NULL,
5714 									  NULL);
5715 			if (is_foreign_expr(root, grouped_rel, expr))
5716 				fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5717 			else
5718 				fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5719 		}
5720 	}
5721 
5722 	/*
5723 	 * If there are any local conditions, pull Vars and aggregates from it and
5724 	 * check whether they are safe to pushdown or not.
5725 	 */
5726 	if (fpinfo->local_conds)
5727 	{
5728 		List	   *aggvars = NIL;
5729 		ListCell   *lc;
5730 
5731 		foreach(lc, fpinfo->local_conds)
5732 		{
5733 			RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5734 
5735 			aggvars = list_concat(aggvars,
5736 								  pull_var_clause((Node *) rinfo->clause,
5737 												  PVC_INCLUDE_AGGREGATES));
5738 		}
5739 
5740 		foreach(lc, aggvars)
5741 		{
5742 			Expr	   *expr = (Expr *) lfirst(lc);
5743 
5744 			/*
5745 			 * If aggregates within local conditions are not safe to push
5746 			 * down, then we cannot push down the query.  Vars are already
5747 			 * part of GROUP BY clause which are checked above, so no need to
5748 			 * access them again here.  Again, we need not check
5749 			 * is_foreign_param for a foreign aggregate.
5750 			 */
5751 			if (IsA(expr, Aggref))
5752 			{
5753 				if (!is_foreign_expr(root, grouped_rel, expr))
5754 					return false;
5755 
5756 				tlist = add_to_flat_tlist(tlist, list_make1(expr));
5757 			}
5758 		}
5759 	}
5760 
5761 	/* Store generated targetlist */
5762 	fpinfo->grouped_tlist = tlist;
5763 
5764 	/* Safe to pushdown */
5765 	fpinfo->pushdown_safe = true;
5766 
5767 	/*
5768 	 * Set # of retrieved rows and cached relation costs to some negative
5769 	 * value, so that we can detect when they are set to some sensible values,
5770 	 * during one (usually the first) of the calls to estimate_path_cost_size.
5771 	 */
5772 	fpinfo->retrieved_rows = -1;
5773 	fpinfo->rel_startup_cost = -1;
5774 	fpinfo->rel_total_cost = -1;
5775 
5776 	/*
5777 	 * Set the string describing this grouped relation to be used in EXPLAIN
5778 	 * output of corresponding ForeignScan.
5779 	 */
5780 	fpinfo->relation_name = makeStringInfo();
5781 	appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
5782 					 ofpinfo->relation_name->data);
5783 
5784 	return true;
5785 }
5786 
5787 /*
5788  * postgresGetForeignUpperPaths
5789  *		Add paths for post-join operations like aggregation, grouping etc. if
5790  *		corresponding operations are safe to push down.
5791  */
5792 static void
postgresGetForeignUpperPaths(PlannerInfo * root,UpperRelationKind stage,RelOptInfo * input_rel,RelOptInfo * output_rel,void * extra)5793 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
5794 							 RelOptInfo *input_rel, RelOptInfo *output_rel,
5795 							 void *extra)
5796 {
5797 	PgFdwRelationInfo *fpinfo;
5798 
5799 	/*
5800 	 * If input rel is not safe to pushdown, then simply return as we cannot
5801 	 * perform any post-join operations on the foreign server.
5802 	 */
5803 	if (!input_rel->fdw_private ||
5804 		!((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
5805 		return;
5806 
5807 	/* Ignore stages we don't support; and skip any duplicate calls. */
5808 	if ((stage != UPPERREL_GROUP_AGG &&
5809 		 stage != UPPERREL_ORDERED &&
5810 		 stage != UPPERREL_FINAL) ||
5811 		output_rel->fdw_private)
5812 		return;
5813 
5814 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5815 	fpinfo->pushdown_safe = false;
5816 	fpinfo->stage = stage;
5817 	output_rel->fdw_private = fpinfo;
5818 
5819 	switch (stage)
5820 	{
5821 		case UPPERREL_GROUP_AGG:
5822 			add_foreign_grouping_paths(root, input_rel, output_rel,
5823 									   (GroupPathExtraData *) extra);
5824 			break;
5825 		case UPPERREL_ORDERED:
5826 			add_foreign_ordered_paths(root, input_rel, output_rel);
5827 			break;
5828 		case UPPERREL_FINAL:
5829 			add_foreign_final_paths(root, input_rel, output_rel,
5830 									(FinalPathExtraData *) extra);
5831 			break;
5832 		default:
5833 			elog(ERROR, "unexpected upper relation: %d", (int) stage);
5834 			break;
5835 	}
5836 }
5837 
5838 /*
5839  * add_foreign_grouping_paths
5840  *		Add foreign path for grouping and/or aggregation.
5841  *
5842  * Given input_rel represents the underlying scan.  The paths are added to the
5843  * given grouped_rel.
5844  */
5845 static void
add_foreign_grouping_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * grouped_rel,GroupPathExtraData * extra)5846 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
5847 						   RelOptInfo *grouped_rel,
5848 						   GroupPathExtraData *extra)
5849 {
5850 	Query	   *parse = root->parse;
5851 	PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5852 	PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
5853 	ForeignPath *grouppath;
5854 	double		rows;
5855 	int			width;
5856 	Cost		startup_cost;
5857 	Cost		total_cost;
5858 
5859 	/* Nothing to be done, if there is no grouping or aggregation required. */
5860 	if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
5861 		!root->hasHavingQual)
5862 		return;
5863 
5864 	Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
5865 		   extra->patype == PARTITIONWISE_AGGREGATE_FULL);
5866 
5867 	/* save the input_rel as outerrel in fpinfo */
5868 	fpinfo->outerrel = input_rel;
5869 
5870 	/*
5871 	 * Copy foreign table, foreign server, user mapping, FDW options etc.
5872 	 * details from the input relation's fpinfo.
5873 	 */
5874 	fpinfo->table = ifpinfo->table;
5875 	fpinfo->server = ifpinfo->server;
5876 	fpinfo->user = ifpinfo->user;
5877 	merge_fdw_options(fpinfo, ifpinfo, NULL);
5878 
5879 	/*
5880 	 * Assess if it is safe to push down aggregation and grouping.
5881 	 *
5882 	 * Use HAVING qual from extra. In case of child partition, it will have
5883 	 * translated Vars.
5884 	 */
5885 	if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
5886 		return;
5887 
5888 	/*
5889 	 * Compute the selectivity and cost of the local_conds, so we don't have
5890 	 * to do it over again for each path.  (Currently we create just a single
5891 	 * path here, but in future it would be possible that we build more paths
5892 	 * such as pre-sorted paths as in postgresGetForeignPaths and
5893 	 * postgresGetForeignJoinPaths.)  The best we can do for these conditions
5894 	 * is to estimate selectivity on the basis of local statistics.
5895 	 */
5896 	fpinfo->local_conds_sel = clauselist_selectivity(root,
5897 													 fpinfo->local_conds,
5898 													 0,
5899 													 JOIN_INNER,
5900 													 NULL);
5901 
5902 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5903 
5904 	/* Estimate the cost of push down */
5905 	estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL,
5906 							&rows, &width, &startup_cost, &total_cost);
5907 
5908 	/* Now update this information in the fpinfo */
5909 	fpinfo->rows = rows;
5910 	fpinfo->width = width;
5911 	fpinfo->startup_cost = startup_cost;
5912 	fpinfo->total_cost = total_cost;
5913 
5914 	/* Create and add foreign path to the grouping relation. */
5915 	grouppath = create_foreign_upper_path(root,
5916 										  grouped_rel,
5917 										  grouped_rel->reltarget,
5918 										  rows,
5919 										  startup_cost,
5920 										  total_cost,
5921 										  NIL,	/* no pathkeys */
5922 										  NULL,
5923 										  NIL); /* no fdw_private */
5924 
5925 	/* Add generated path into grouped_rel by add_path(). */
5926 	add_path(grouped_rel, (Path *) grouppath);
5927 }
5928 
5929 /*
5930  * add_foreign_ordered_paths
5931  *		Add foreign paths for performing the final sort remotely.
5932  *
5933  * Given input_rel contains the source-data Paths.  The paths are added to the
5934  * given ordered_rel.
5935  */
5936 static void
add_foreign_ordered_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * ordered_rel)5937 add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel,
5938 						  RelOptInfo *ordered_rel)
5939 {
5940 	Query	   *parse = root->parse;
5941 	PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5942 	PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private;
5943 	PgFdwPathExtraData *fpextra;
5944 	double		rows;
5945 	int			width;
5946 	Cost		startup_cost;
5947 	Cost		total_cost;
5948 	List	   *fdw_private;
5949 	ForeignPath *ordered_path;
5950 	ListCell   *lc;
5951 
5952 	/* Shouldn't get here unless the query has ORDER BY */
5953 	Assert(parse->sortClause);
5954 
5955 	/* We don't support cases where there are any SRFs in the targetlist */
5956 	if (parse->hasTargetSRFs)
5957 		return;
5958 
5959 	/* Save the input_rel as outerrel in fpinfo */
5960 	fpinfo->outerrel = input_rel;
5961 
5962 	/*
5963 	 * Copy foreign table, foreign server, user mapping, FDW options etc.
5964 	 * details from the input relation's fpinfo.
5965 	 */
5966 	fpinfo->table = ifpinfo->table;
5967 	fpinfo->server = ifpinfo->server;
5968 	fpinfo->user = ifpinfo->user;
5969 	merge_fdw_options(fpinfo, ifpinfo, NULL);
5970 
5971 	/*
5972 	 * If the input_rel is a base or join relation, we would already have
5973 	 * considered pushing down the final sort to the remote server when
5974 	 * creating pre-sorted foreign paths for that relation, because the
5975 	 * query_pathkeys is set to the root->sort_pathkeys in that case (see
5976 	 * standard_qp_callback()).
5977 	 */
5978 	if (input_rel->reloptkind == RELOPT_BASEREL ||
5979 		input_rel->reloptkind == RELOPT_JOINREL)
5980 	{
5981 		Assert(root->query_pathkeys == root->sort_pathkeys);
5982 
5983 		/* Safe to push down if the query_pathkeys is safe to push down */
5984 		fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe;
5985 
5986 		return;
5987 	}
5988 
5989 	/* The input_rel should be a grouping relation */
5990 	Assert(input_rel->reloptkind == RELOPT_UPPER_REL &&
5991 		   ifpinfo->stage == UPPERREL_GROUP_AGG);
5992 
5993 	/*
5994 	 * We try to create a path below by extending a simple foreign path for
5995 	 * the underlying grouping relation to perform the final sort remotely,
5996 	 * which is stored into the fdw_private list of the resulting path.
5997 	 */
5998 
5999 	/* Assess if it is safe to push down the final sort */
6000 	foreach(lc, root->sort_pathkeys)
6001 	{
6002 		PathKey    *pathkey = (PathKey *) lfirst(lc);
6003 		EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
6004 		Expr	   *sort_expr;
6005 
6006 		/*
6007 		 * is_foreign_expr would detect volatile expressions as well, but
6008 		 * checking ec_has_volatile here saves some cycles.
6009 		 */
6010 		if (pathkey_ec->ec_has_volatile)
6011 			return;
6012 
6013 		/* Get the sort expression for the pathkey_ec */
6014 		sort_expr = find_em_expr_for_input_target(root,
6015 												  pathkey_ec,
6016 												  input_rel->reltarget);
6017 
6018 		/* If it's unsafe to remote, we cannot push down the final sort */
6019 		if (!is_foreign_expr(root, input_rel, sort_expr))
6020 			return;
6021 	}
6022 
6023 	/* Safe to push down */
6024 	fpinfo->pushdown_safe = true;
6025 
6026 	/* Construct PgFdwPathExtraData */
6027 	fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6028 	fpextra->target = root->upper_targets[UPPERREL_ORDERED];
6029 	fpextra->has_final_sort = true;
6030 
6031 	/* Estimate the costs of performing the final sort remotely */
6032 	estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra,
6033 							&rows, &width, &startup_cost, &total_cost);
6034 
6035 	/*
6036 	 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6037 	 * Items in the list must match order in enum FdwPathPrivateIndex.
6038 	 */
6039 	fdw_private = list_make2(makeInteger(true), makeInteger(false));
6040 
6041 	/* Create foreign ordering path */
6042 	ordered_path = create_foreign_upper_path(root,
6043 											 input_rel,
6044 											 root->upper_targets[UPPERREL_ORDERED],
6045 											 rows,
6046 											 startup_cost,
6047 											 total_cost,
6048 											 root->sort_pathkeys,
6049 											 NULL,	/* no extra plan */
6050 											 fdw_private);
6051 
6052 	/* and add it to the ordered_rel */
6053 	add_path(ordered_rel, (Path *) ordered_path);
6054 }
6055 
6056 /*
6057  * add_foreign_final_paths
6058  *		Add foreign paths for performing the final processing remotely.
6059  *
6060  * Given input_rel contains the source-data Paths.  The paths are added to the
6061  * given final_rel.
6062  */
6063 static void
add_foreign_final_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * final_rel,FinalPathExtraData * extra)6064 add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
6065 						RelOptInfo *final_rel,
6066 						FinalPathExtraData *extra)
6067 {
6068 	Query	   *parse = root->parse;
6069 	PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6070 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) final_rel->fdw_private;
6071 	bool		has_final_sort = false;
6072 	List	   *pathkeys = NIL;
6073 	PgFdwPathExtraData *fpextra;
6074 	bool		save_use_remote_estimate = false;
6075 	double		rows;
6076 	int			width;
6077 	Cost		startup_cost;
6078 	Cost		total_cost;
6079 	List	   *fdw_private;
6080 	ForeignPath *final_path;
6081 
6082 	/*
6083 	 * Currently, we only support this for SELECT commands
6084 	 */
6085 	if (parse->commandType != CMD_SELECT)
6086 		return;
6087 
6088 	/*
6089 	 * No work if there is no FOR UPDATE/SHARE clause and if there is no need
6090 	 * to add a LIMIT node
6091 	 */
6092 	if (!parse->rowMarks && !extra->limit_needed)
6093 		return;
6094 
6095 	/* We don't support cases where there are any SRFs in the targetlist */
6096 	if (parse->hasTargetSRFs)
6097 		return;
6098 
6099 	/* Save the input_rel as outerrel in fpinfo */
6100 	fpinfo->outerrel = input_rel;
6101 
6102 	/*
6103 	 * Copy foreign table, foreign server, user mapping, FDW options etc.
6104 	 * details from the input relation's fpinfo.
6105 	 */
6106 	fpinfo->table = ifpinfo->table;
6107 	fpinfo->server = ifpinfo->server;
6108 	fpinfo->user = ifpinfo->user;
6109 	merge_fdw_options(fpinfo, ifpinfo, NULL);
6110 
6111 	/*
6112 	 * If there is no need to add a LIMIT node, there might be a ForeignPath
6113 	 * in the input_rel's pathlist that implements all behavior of the query.
6114 	 * Note: we would already have accounted for the query's FOR UPDATE/SHARE
6115 	 * (if any) before we get here.
6116 	 */
6117 	if (!extra->limit_needed)
6118 	{
6119 		ListCell   *lc;
6120 
6121 		Assert(parse->rowMarks);
6122 
6123 		/*
6124 		 * Grouping and aggregation are not supported with FOR UPDATE/SHARE,
6125 		 * so the input_rel should be a base, join, or ordered relation; and
6126 		 * if it's an ordered relation, its input relation should be a base or
6127 		 * join relation.
6128 		 */
6129 		Assert(input_rel->reloptkind == RELOPT_BASEREL ||
6130 			   input_rel->reloptkind == RELOPT_JOINREL ||
6131 			   (input_rel->reloptkind == RELOPT_UPPER_REL &&
6132 				ifpinfo->stage == UPPERREL_ORDERED &&
6133 				(ifpinfo->outerrel->reloptkind == RELOPT_BASEREL ||
6134 				 ifpinfo->outerrel->reloptkind == RELOPT_JOINREL)));
6135 
6136 		foreach(lc, input_rel->pathlist)
6137 		{
6138 			Path	   *path = (Path *) lfirst(lc);
6139 
6140 			/*
6141 			 * apply_scanjoin_target_to_paths() uses create_projection_path()
6142 			 * to adjust each of its input paths if needed, whereas
6143 			 * create_ordered_paths() uses apply_projection_to_path() to do
6144 			 * that.  So the former might have put a ProjectionPath on top of
6145 			 * the ForeignPath; look through ProjectionPath and see if the
6146 			 * path underneath it is ForeignPath.
6147 			 */
6148 			if (IsA(path, ForeignPath) ||
6149 				(IsA(path, ProjectionPath) &&
6150 				 IsA(((ProjectionPath *) path)->subpath, ForeignPath)))
6151 			{
6152 				/*
6153 				 * Create foreign final path; this gets rid of a
6154 				 * no-longer-needed outer plan (if any), which makes the
6155 				 * EXPLAIN output look cleaner
6156 				 */
6157 				final_path = create_foreign_upper_path(root,
6158 													   path->parent,
6159 													   path->pathtarget,
6160 													   path->rows,
6161 													   path->startup_cost,
6162 													   path->total_cost,
6163 													   path->pathkeys,
6164 													   NULL,	/* no extra plan */
6165 													   NULL);	/* no fdw_private */
6166 
6167 				/* and add it to the final_rel */
6168 				add_path(final_rel, (Path *) final_path);
6169 
6170 				/* Safe to push down */
6171 				fpinfo->pushdown_safe = true;
6172 
6173 				return;
6174 			}
6175 		}
6176 
6177 		/*
6178 		 * If we get here it means no ForeignPaths; since we would already
6179 		 * have considered pushing down all operations for the query to the
6180 		 * remote server, give up on it.
6181 		 */
6182 		return;
6183 	}
6184 
6185 	Assert(extra->limit_needed);
6186 
6187 	/*
6188 	 * If the input_rel is an ordered relation, replace the input_rel with its
6189 	 * input relation
6190 	 */
6191 	if (input_rel->reloptkind == RELOPT_UPPER_REL &&
6192 		ifpinfo->stage == UPPERREL_ORDERED)
6193 	{
6194 		input_rel = ifpinfo->outerrel;
6195 		ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6196 		has_final_sort = true;
6197 		pathkeys = root->sort_pathkeys;
6198 	}
6199 
6200 	/* The input_rel should be a base, join, or grouping relation */
6201 	Assert(input_rel->reloptkind == RELOPT_BASEREL ||
6202 		   input_rel->reloptkind == RELOPT_JOINREL ||
6203 		   (input_rel->reloptkind == RELOPT_UPPER_REL &&
6204 			ifpinfo->stage == UPPERREL_GROUP_AGG));
6205 
6206 	/*
6207 	 * We try to create a path below by extending a simple foreign path for
6208 	 * the underlying base, join, or grouping relation to perform the final
6209 	 * sort (if has_final_sort) and the LIMIT restriction remotely, which is
6210 	 * stored into the fdw_private list of the resulting path.  (We
6211 	 * re-estimate the costs of sorting the underlying relation, if
6212 	 * has_final_sort.)
6213 	 */
6214 
6215 	/*
6216 	 * Assess if it is safe to push down the LIMIT and OFFSET to the remote
6217 	 * server
6218 	 */
6219 
6220 	/*
6221 	 * If the underlying relation has any local conditions, the LIMIT/OFFSET
6222 	 * cannot be pushed down.
6223 	 */
6224 	if (ifpinfo->local_conds)
6225 		return;
6226 
6227 	/*
6228 	 * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are
6229 	 * not safe to remote.
6230 	 */
6231 	if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) ||
6232 		!is_foreign_expr(root, input_rel, (Expr *) parse->limitCount))
6233 		return;
6234 
6235 	/* Safe to push down */
6236 	fpinfo->pushdown_safe = true;
6237 
6238 	/* Construct PgFdwPathExtraData */
6239 	fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6240 	fpextra->target = root->upper_targets[UPPERREL_FINAL];
6241 	fpextra->has_final_sort = has_final_sort;
6242 	fpextra->has_limit = extra->limit_needed;
6243 	fpextra->limit_tuples = extra->limit_tuples;
6244 	fpextra->count_est = extra->count_est;
6245 	fpextra->offset_est = extra->offset_est;
6246 
6247 	/*
6248 	 * Estimate the costs of performing the final sort and the LIMIT
6249 	 * restriction remotely.  If has_final_sort is false, we wouldn't need to
6250 	 * execute EXPLAIN anymore if use_remote_estimate, since the costs can be
6251 	 * roughly estimated using the costs we already have for the underlying
6252 	 * relation, in the same way as when use_remote_estimate is false.  Since
6253 	 * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to
6254 	 * false in that case.
6255 	 */
6256 	if (!fpextra->has_final_sort)
6257 	{
6258 		save_use_remote_estimate = ifpinfo->use_remote_estimate;
6259 		ifpinfo->use_remote_estimate = false;
6260 	}
6261 	estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra,
6262 							&rows, &width, &startup_cost, &total_cost);
6263 	if (!fpextra->has_final_sort)
6264 		ifpinfo->use_remote_estimate = save_use_remote_estimate;
6265 
6266 	/*
6267 	 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6268 	 * Items in the list must match order in enum FdwPathPrivateIndex.
6269 	 */
6270 	fdw_private = list_make2(makeInteger(has_final_sort),
6271 							 makeInteger(extra->limit_needed));
6272 
6273 	/*
6274 	 * Create foreign final path; this gets rid of a no-longer-needed outer
6275 	 * plan (if any), which makes the EXPLAIN output look cleaner
6276 	 */
6277 	final_path = create_foreign_upper_path(root,
6278 										   input_rel,
6279 										   root->upper_targets[UPPERREL_FINAL],
6280 										   rows,
6281 										   startup_cost,
6282 										   total_cost,
6283 										   pathkeys,
6284 										   NULL,	/* no extra plan */
6285 										   fdw_private);
6286 
6287 	/* and add it to the final_rel */
6288 	add_path(final_rel, (Path *) final_path);
6289 }
6290 
6291 /*
6292  * Create a tuple from the specified row of the PGresult.
6293  *
6294  * rel is the local representation of the foreign table, attinmeta is
6295  * conversion data for the rel's tupdesc, and retrieved_attrs is an
6296  * integer list of the table column numbers present in the PGresult.
6297  * fsstate is the ForeignScan plan node's execution state.
6298  * temp_context is a working context that can be reset after each tuple.
6299  *
6300  * Note: either rel or fsstate, but not both, can be NULL.  rel is NULL
6301  * if we're processing a remote join, while fsstate is NULL in a non-query
6302  * context such as ANALYZE, or if we're processing a non-scan query node.
6303  */
6304 static HeapTuple
make_tuple_from_result_row(PGresult * res,int row,Relation rel,AttInMetadata * attinmeta,List * retrieved_attrs,ForeignScanState * fsstate,MemoryContext temp_context)6305 make_tuple_from_result_row(PGresult *res,
6306 						   int row,
6307 						   Relation rel,
6308 						   AttInMetadata *attinmeta,
6309 						   List *retrieved_attrs,
6310 						   ForeignScanState *fsstate,
6311 						   MemoryContext temp_context)
6312 {
6313 	HeapTuple	tuple;
6314 	TupleDesc	tupdesc;
6315 	Datum	   *values;
6316 	bool	   *nulls;
6317 	ItemPointer ctid = NULL;
6318 	ConversionLocation errpos;
6319 	ErrorContextCallback errcallback;
6320 	MemoryContext oldcontext;
6321 	ListCell   *lc;
6322 	int			j;
6323 
6324 	Assert(row < PQntuples(res));
6325 
6326 	/*
6327 	 * Do the following work in a temp context that we reset after each tuple.
6328 	 * This cleans up not only the data we have direct access to, but any
6329 	 * cruft the I/O functions might leak.
6330 	 */
6331 	oldcontext = MemoryContextSwitchTo(temp_context);
6332 
6333 	/*
6334 	 * Get the tuple descriptor for the row.  Use the rel's tupdesc if rel is
6335 	 * provided, otherwise look to the scan node's ScanTupleSlot.
6336 	 */
6337 	if (rel)
6338 		tupdesc = RelationGetDescr(rel);
6339 	else
6340 	{
6341 		Assert(fsstate);
6342 		tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
6343 	}
6344 
6345 	values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
6346 	nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
6347 	/* Initialize to nulls for any columns not present in result */
6348 	memset(nulls, true, tupdesc->natts * sizeof(bool));
6349 
6350 	/*
6351 	 * Set up and install callback to report where conversion error occurs.
6352 	 */
6353 	errpos.cur_attno = 0;
6354 	errpos.rel = rel;
6355 	errpos.fsstate = fsstate;
6356 	errcallback.callback = conversion_error_callback;
6357 	errcallback.arg = (void *) &errpos;
6358 	errcallback.previous = error_context_stack;
6359 	error_context_stack = &errcallback;
6360 
6361 	/*
6362 	 * i indexes columns in the relation, j indexes columns in the PGresult.
6363 	 */
6364 	j = 0;
6365 	foreach(lc, retrieved_attrs)
6366 	{
6367 		int			i = lfirst_int(lc);
6368 		char	   *valstr;
6369 
6370 		/* fetch next column's textual value */
6371 		if (PQgetisnull(res, row, j))
6372 			valstr = NULL;
6373 		else
6374 			valstr = PQgetvalue(res, row, j);
6375 
6376 		/*
6377 		 * convert value to internal representation
6378 		 *
6379 		 * Note: we ignore system columns other than ctid and oid in result
6380 		 */
6381 		errpos.cur_attno = i;
6382 		if (i > 0)
6383 		{
6384 			/* ordinary column */
6385 			Assert(i <= tupdesc->natts);
6386 			nulls[i - 1] = (valstr == NULL);
6387 			/* Apply the input function even to nulls, to support domains */
6388 			values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
6389 											  valstr,
6390 											  attinmeta->attioparams[i - 1],
6391 											  attinmeta->atttypmods[i - 1]);
6392 		}
6393 		else if (i == SelfItemPointerAttributeNumber)
6394 		{
6395 			/* ctid */
6396 			if (valstr != NULL)
6397 			{
6398 				Datum		datum;
6399 
6400 				datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
6401 				ctid = (ItemPointer) DatumGetPointer(datum);
6402 			}
6403 		}
6404 		errpos.cur_attno = 0;
6405 
6406 		j++;
6407 	}
6408 
6409 	/* Uninstall error context callback. */
6410 	error_context_stack = errcallback.previous;
6411 
6412 	/*
6413 	 * Check we got the expected number of columns.  Note: j == 0 and
6414 	 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
6415 	 */
6416 	if (j > 0 && j != PQnfields(res))
6417 		elog(ERROR, "remote query result does not match the foreign table");
6418 
6419 	/*
6420 	 * Build the result tuple in caller's memory context.
6421 	 */
6422 	MemoryContextSwitchTo(oldcontext);
6423 
6424 	tuple = heap_form_tuple(tupdesc, values, nulls);
6425 
6426 	/*
6427 	 * If we have a CTID to return, install it in both t_self and t_ctid.
6428 	 * t_self is the normal place, but if the tuple is converted to a
6429 	 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
6430 	 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
6431 	 */
6432 	if (ctid)
6433 		tuple->t_self = tuple->t_data->t_ctid = *ctid;
6434 
6435 	/*
6436 	 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
6437 	 * heap_form_tuple.  heap_form_tuple actually creates the tuple with
6438 	 * DatumTupleFields, not HeapTupleFields, but the executor expects
6439 	 * HeapTupleFields and will happily extract system columns on that
6440 	 * assumption.  If we don't do this then, for example, the tuple length
6441 	 * ends up in the xmin field, which isn't what we want.
6442 	 */
6443 	HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
6444 	HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
6445 	HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
6446 
6447 	/* Clean up */
6448 	MemoryContextReset(temp_context);
6449 
6450 	return tuple;
6451 }
6452 
6453 /*
6454  * Callback function which is called when error occurs during column value
6455  * conversion.  Print names of column and relation.
6456  *
6457  * Note that this function mustn't do any catalog lookups, since we are in
6458  * an already-failed transaction.  Fortunately, we can get the needed info
6459  * from the relation or the query's rangetable instead.
6460  */
6461 static void
conversion_error_callback(void * arg)6462 conversion_error_callback(void *arg)
6463 {
6464 	ConversionLocation *errpos = (ConversionLocation *) arg;
6465 	Relation	rel = errpos->rel;
6466 	ForeignScanState *fsstate = errpos->fsstate;
6467 	const char *attname = NULL;
6468 	const char *relname = NULL;
6469 	bool		is_wholerow = false;
6470 
6471 	/*
6472 	 * If we're in a scan node, always use aliases from the rangetable, for
6473 	 * consistency between the simple-relation and remote-join cases.  Look at
6474 	 * the relation's tupdesc only if we're not in a scan node.
6475 	 */
6476 	if (fsstate)
6477 	{
6478 		/* ForeignScan case */
6479 		ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
6480 		int			varno = 0;
6481 		AttrNumber	colno = 0;
6482 
6483 		if (fsplan->scan.scanrelid > 0)
6484 		{
6485 			/* error occurred in a scan against a foreign table */
6486 			varno = fsplan->scan.scanrelid;
6487 			colno = errpos->cur_attno;
6488 		}
6489 		else
6490 		{
6491 			/* error occurred in a scan against a foreign join */
6492 			TargetEntry *tle;
6493 
6494 			tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
6495 								errpos->cur_attno - 1);
6496 
6497 			/*
6498 			 * Target list can have Vars and expressions.  For Vars, we can
6499 			 * get some information, however for expressions we can't.  Thus
6500 			 * for expressions, just show generic context message.
6501 			 */
6502 			if (IsA(tle->expr, Var))
6503 			{
6504 				Var		   *var = (Var *) tle->expr;
6505 
6506 				varno = var->varno;
6507 				colno = var->varattno;
6508 			}
6509 		}
6510 
6511 		if (varno > 0)
6512 		{
6513 			EState	   *estate = fsstate->ss.ps.state;
6514 			RangeTblEntry *rte = exec_rt_fetch(varno, estate);
6515 
6516 			relname = rte->eref->aliasname;
6517 
6518 			if (colno == 0)
6519 				is_wholerow = true;
6520 			else if (colno > 0 && colno <= list_length(rte->eref->colnames))
6521 				attname = strVal(list_nth(rte->eref->colnames, colno - 1));
6522 			else if (colno == SelfItemPointerAttributeNumber)
6523 				attname = "ctid";
6524 		}
6525 	}
6526 	else if (rel)
6527 	{
6528 		/* Non-ForeignScan case (we should always have a rel here) */
6529 		TupleDesc	tupdesc = RelationGetDescr(rel);
6530 
6531 		relname = RelationGetRelationName(rel);
6532 		if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
6533 		{
6534 			Form_pg_attribute attr = TupleDescAttr(tupdesc,
6535 												   errpos->cur_attno - 1);
6536 
6537 			attname = NameStr(attr->attname);
6538 		}
6539 		else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
6540 			attname = "ctid";
6541 	}
6542 
6543 	if (relname && is_wholerow)
6544 		errcontext("whole-row reference to foreign table \"%s\"", relname);
6545 	else if (relname && attname)
6546 		errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
6547 	else
6548 		errcontext("processing expression at position %d in select list",
6549 				   errpos->cur_attno);
6550 }
6551 
6552 /*
6553  * Find an equivalence class member expression, all of whose Vars, come from
6554  * the indicated relation.
6555  */
6556 Expr *
find_em_expr_for_rel(EquivalenceClass * ec,RelOptInfo * rel)6557 find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
6558 {
6559 	ListCell   *lc_em;
6560 
6561 	foreach(lc_em, ec->ec_members)
6562 	{
6563 		EquivalenceMember *em = lfirst(lc_em);
6564 
6565 		if (bms_is_subset(em->em_relids, rel->relids) &&
6566 			!bms_is_empty(em->em_relids))
6567 		{
6568 			/*
6569 			 * If there is more than one equivalence member whose Vars are
6570 			 * taken entirely from this relation, we'll be content to choose
6571 			 * any one of those.
6572 			 */
6573 			return em->em_expr;
6574 		}
6575 	}
6576 
6577 	/* We didn't find any suitable equivalence class expression */
6578 	return NULL;
6579 }
6580 
6581 /*
6582  * Find an equivalence class member expression to be computed as a sort column
6583  * in the given target.
6584  */
6585 Expr *
find_em_expr_for_input_target(PlannerInfo * root,EquivalenceClass * ec,PathTarget * target)6586 find_em_expr_for_input_target(PlannerInfo *root,
6587 							  EquivalenceClass *ec,
6588 							  PathTarget *target)
6589 {
6590 	ListCell   *lc1;
6591 	int			i;
6592 
6593 	i = 0;
6594 	foreach(lc1, target->exprs)
6595 	{
6596 		Expr	   *expr = (Expr *) lfirst(lc1);
6597 		Index		sgref = get_pathtarget_sortgroupref(target, i);
6598 		ListCell   *lc2;
6599 
6600 		/* Ignore non-sort expressions */
6601 		if (sgref == 0 ||
6602 			get_sortgroupref_clause_noerr(sgref,
6603 										  root->parse->sortClause) == NULL)
6604 		{
6605 			i++;
6606 			continue;
6607 		}
6608 
6609 		/* We ignore binary-compatible relabeling on both ends */
6610 		while (expr && IsA(expr, RelabelType))
6611 			expr = ((RelabelType *) expr)->arg;
6612 
6613 		/* Locate an EquivalenceClass member matching this expr, if any */
6614 		foreach(lc2, ec->ec_members)
6615 		{
6616 			EquivalenceMember *em = (EquivalenceMember *) lfirst(lc2);
6617 			Expr	   *em_expr;
6618 
6619 			/* Don't match constants */
6620 			if (em->em_is_const)
6621 				continue;
6622 
6623 			/* Ignore child members */
6624 			if (em->em_is_child)
6625 				continue;
6626 
6627 			/* Match if same expression (after stripping relabel) */
6628 			em_expr = em->em_expr;
6629 			while (em_expr && IsA(em_expr, RelabelType))
6630 				em_expr = ((RelabelType *) em_expr)->arg;
6631 
6632 			if (equal(em_expr, expr))
6633 				return em->em_expr;
6634 		}
6635 
6636 		i++;
6637 	}
6638 
6639 	elog(ERROR, "could not find pathkey item to sort");
6640 	return NULL;				/* keep compiler quiet */
6641 }
6642