1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  *		  Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include <limits.h>
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "commands/defrem.h"
22 #include "commands/explain.h"
23 #include "commands/vacuum.h"
24 #include "foreign/fdwapi.h"
25 #include "funcapi.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "nodes/nodeFuncs.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/cost.h"
31 #include "optimizer/optimizer.h"
32 #include "optimizer/pathnode.h"
33 #include "optimizer/paths.h"
34 #include "optimizer/planmain.h"
35 #include "optimizer/restrictinfo.h"
36 #include "optimizer/tlist.h"
37 #include "parser/parsetree.h"
38 #include "postgres_fdw.h"
39 #include "utils/builtins.h"
40 #include "utils/float.h"
41 #include "utils/guc.h"
42 #include "utils/lsyscache.h"
43 #include "utils/memutils.h"
44 #include "utils/rel.h"
45 #include "utils/sampling.h"
46 #include "utils/selfuncs.h"
47 
48 /* source-code-compatibility hacks for pull_varnos() API change */
49 #define make_restrictinfo(a,b,c,d,e,f,g,h,i) make_restrictinfo_new(a,b,c,d,e,f,g,h,i)
50 
51 PG_MODULE_MAGIC;
52 
53 /* Default CPU cost to start up a foreign query. */
54 #define DEFAULT_FDW_STARTUP_COST	100.0
55 
56 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
57 #define DEFAULT_FDW_TUPLE_COST		0.01
58 
59 /* If no remote estimates, assume a sort costs 20% extra */
60 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
61 
62 /*
63  * Indexes of FDW-private information stored in fdw_private lists.
64  *
65  * These items are indexed with the enum FdwScanPrivateIndex, so an item
66  * can be fetched with list_nth().  For example, to get the SELECT statement:
67  *		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
68  */
69 enum FdwScanPrivateIndex
70 {
free_openssl_digest(OSSLDigest * digest)71 	/* SQL statement to execute remotely (as a String node) */
72 	FdwScanPrivateSelectSql,
73 	/* Integer list of attribute numbers retrieved by the SELECT */
74 	FdwScanPrivateRetrievedAttrs,
75 	/* Integer representing the desired fetch_size */
76 	FdwScanPrivateFetchSize,
77 
78 	/*
79 	 * String describing join i.e. names of relations being joined and types
80 	 * of join, added when the scan is join
81 	 */
82 	FdwScanPrivateRelations
83 };
84 
85 /*
86  * Similarly, this enum describes what's kept in the fdw_private list for
digest_free_callback(ResourceReleasePhase phase,bool isCommit,bool isTopLevel,void * arg)87  * a ModifyTable node referencing a postgres_fdw foreign table.  We store:
88  *
89  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
90  * 2) Integer list of target attribute numbers for INSERT/UPDATE
91  *	  (NIL for a DELETE)
92  * 3) Boolean flag showing if the remote query has a RETURNING clause
93  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
94  */
95 enum FdwModifyPrivateIndex
96 {
97 	/* SQL statement to execute remotely (as a String node) */
98 	FdwModifyPrivateUpdateSql,
99 	/* Integer list of target attribute numbers for INSERT/UPDATE */
100 	FdwModifyPrivateTargetAttnums,
101 	/* has-returning flag (as an integer Value node) */
102 	FdwModifyPrivateHasReturning,
103 	/* Integer list of attribute numbers retrieved by RETURNING */
104 	FdwModifyPrivateRetrievedAttrs
105 };
106 
107 /*
108  * Similarly, this enum describes what's kept in the fdw_private list for
109  * a ForeignScan node that modifies a foreign table directly.  We store:
110  *
111  * 1) UPDATE/DELETE statement text to be sent to the remote server
112  * 2) Boolean flag showing if the remote query has a RETURNING clause
113  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
digest_result_size(PX_MD * h)114  * 4) Boolean flag showing if we set the command es_processed
115  */
116 enum FdwDirectModifyPrivateIndex
117 {
118 	/* SQL statement to execute remotely (as a String node) */
119 	FdwDirectModifyPrivateUpdateSql,
120 	/* has-returning flag (as an integer Value node) */
121 	FdwDirectModifyPrivateHasReturning,
122 	/* Integer list of attribute numbers retrieved by RETURNING */
123 	FdwDirectModifyPrivateRetrievedAttrs,
124 	/* set-processed flag (as an integer Value node) */
125 	FdwDirectModifyPrivateSetProcessed
126 };
127 
128 /*
129  * Execution state of a foreign scan using postgres_fdw.
130  */
131 typedef struct PgFdwScanState
132 {
133 	Relation	rel;			/* relcache entry for the foreign table. NULL
134 								 * for a foreign join scan. */
135 	TupleDesc	tupdesc;		/* tuple descriptor of scan */
136 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
137 
digest_reset(PX_MD * h)138 	/* extracted fdw_private data */
139 	char	   *query;			/* text of SELECT command */
140 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
141 
142 	/* for remote query execution */
143 	PGconn	   *conn;			/* connection for the scan */
144 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
145 	bool		cursor_exists;	/* have we created the cursor? */
146 	int			numParams;		/* number of parameters passed to query */
digest_update(PX_MD * h,const uint8 * data,unsigned dlen)147 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
148 	List	   *param_exprs;	/* executable expressions for param values */
149 	const char **param_values;	/* textual values of query parameters */
150 
151 	/* for storing result tuples */
152 	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
153 	int			num_tuples;		/* # of tuples in array */
154 	int			next_tuple;		/* index of next one to return */
155 
digest_finish(PX_MD * h,uint8 * dst)156 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
157 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
158 	bool		eof_reached;	/* true if last fetch reached EOF */
159 
160 	/* working memory contexts */
161 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
162 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
163 
164 	int			fetch_size;		/* number of tuples per fetch */
digest_free(PX_MD * h)165 } PgFdwScanState;
166 
167 /*
168  * Execution state of a foreign insert/update/delete operation.
169  */
170 typedef struct PgFdwModifyState
171 {
172 	Relation	rel;			/* relcache entry for the foreign table */
173 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
174 
175 	/* for remote query execution */
176 	PGconn	   *conn;			/* connection for the scan */
177 	char	   *p_name;			/* name of prepared statement, if created */
px_find_digest(const char * name,PX_MD ** res)178 
179 	/* extracted fdw_private data */
180 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
181 	List	   *target_attrs;	/* list of target attribute numbers */
182 	bool		has_returning;	/* is there a RETURNING clause? */
183 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
184 
185 	/* info about parameters for prepared statement */
186 	AttrNumber	ctidAttno;		/* attnum of input resjunk ctid column */
187 	int			p_nums;			/* number of parameters to transmit */
188 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
189 
190 	/* working memory context */
191 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
192 
193 	/* for update row movement if subplan result rel */
194 	struct PgFdwModifyState *aux_fmstate;	/* foreign-insert state, if
195 											 * created */
196 } PgFdwModifyState;
197 
198 /*
199  * Execution state of a foreign scan that modifies a foreign table directly.
200  */
201 typedef struct PgFdwDirectModifyState
202 {
203 	Relation	rel;			/* relcache entry for the foreign table */
204 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
205 
206 	/* extracted fdw_private data */
207 	char	   *query;			/* text of UPDATE/DELETE command */
208 	bool		has_returning;	/* is there a RETURNING clause? */
209 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
210 	bool		set_processed;	/* do we set the command es_processed? */
211 
212 	/* for remote query execution */
213 	PGconn	   *conn;			/* connection for the update */
214 	int			numParams;		/* number of parameters passed to query */
215 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
216 	List	   *param_exprs;	/* executable expressions for param values */
217 	const char **param_values;	/* textual values of query parameters */
218 
219 	/* for storing result tuples */
220 	PGresult   *result;			/* result for query */
221 	int			num_tuples;		/* # of result tuples */
222 	int			next_tuple;		/* index of next one to return */
223 	Relation	resultRel;		/* relcache entry for the target relation */
224 	AttrNumber *attnoMap;		/* array of attnums of input user columns */
225 	AttrNumber	ctidAttno;		/* attnum of input ctid column */
226 	AttrNumber	oidAttno;		/* attnum of input oid column */
227 	bool		hasSystemCols;	/* are there system columns of resultRel? */
228 
229 	/* working memory context */
230 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
231 } PgFdwDirectModifyState;
232 
233 /*
234  * Workspace for analyzing a foreign table.
235  */
236 typedef struct PgFdwAnalyzeState
237 {
238 	Relation	rel;			/* relcache entry for the foreign table */
239 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
240 	List	   *retrieved_attrs;	/* attr numbers retrieved by query */
241 
242 	/* collected sample rows */
243 	HeapTuple  *rows;			/* array of size targrows */
244 	int			targrows;		/* target # of sample rows */
245 	int			numrows;		/* # of sample rows collected */
246 
247 	/* for random sampling */
248 	double		samplerows;		/* # of rows fetched */
249 	double		rowstoskip;		/* # of rows to skip before next sample */
250 	ReservoirStateData rstate;	/* state for reservoir sampling */
251 
252 	/* working memory contexts */
253 	MemoryContext anl_cxt;		/* context for per-analyze lifespan data */
254 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
255 } PgFdwAnalyzeState;
256 
257 /*
258  * This enum describes what's kept in the fdw_private list for a ForeignPath.
259  * We store:
260  *
261  * 1) Boolean flag showing if the remote query has the final sort
262  * 2) Boolean flag showing if the remote query has the LIMIT clause
263  */
264 enum FdwPathPrivateIndex
265 {
266 	/* has-final-sort flag (as an integer Value node) */
267 	FdwPathPrivateHasFinalSort,
268 	/* has-limit flag (as an integer Value node) */
269 	FdwPathPrivateHasLimit
270 };
271 
272 /* Struct for extra information passed to estimate_path_cost_size() */
273 typedef struct
274 {
275 	PathTarget *target;
276 	bool		has_final_sort;
277 	bool		has_limit;
278 	double		limit_tuples;
279 	int64		count_est;
280 	int64		offset_est;
281 } PgFdwPathExtraData;
282 
283 /*
284  * Identify the attribute where data conversion fails.
285  */
286 typedef struct ConversionLocation
287 {
288 	AttrNumber	cur_attno;		/* attribute number being processed, or 0 */
289 	Relation	rel;			/* foreign table being processed, or NULL */
290 	ForeignScanState *fsstate;	/* plan node being processed, or NULL */
291 } ConversionLocation;
free_openssl_cipher(OSSLCipher * od)292 
293 /* Callback argument for ec_member_matches_foreign */
294 typedef struct
295 {
296 	Expr	   *current;		/* current expr, or NULL if not yet found */
297 	List	   *already_used;	/* expressions already dealt with */
298 } ec_member_foreign_arg;
299 
300 /*
301  * SQL functions
302  */
303 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
304 
305 /*
306  * FDW callback routines
307  */
cipher_free_callback(ResourceReleasePhase phase,bool isCommit,bool isTopLevel,void * arg)308 static void postgresGetForeignRelSize(PlannerInfo *root,
309 									  RelOptInfo *baserel,
310 									  Oid foreigntableid);
311 static void postgresGetForeignPaths(PlannerInfo *root,
312 									RelOptInfo *baserel,
313 									Oid foreigntableid);
314 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
315 										   RelOptInfo *foreignrel,
316 										   Oid foreigntableid,
317 										   ForeignPath *best_path,
318 										   List *tlist,
319 										   List *scan_clauses,
320 										   Plan *outer_plan);
321 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
322 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
323 static void postgresReScanForeignScan(ForeignScanState *node);
324 static void postgresEndForeignScan(ForeignScanState *node);
325 static void postgresAddForeignUpdateTargets(Query *parsetree,
326 											RangeTblEntry *target_rte,
327 											Relation target_relation);
328 static List *postgresPlanForeignModify(PlannerInfo *root,
329 									   ModifyTable *plan,
330 									   Index resultRelation,
331 									   int subplan_index);
332 static void postgresBeginForeignModify(ModifyTableState *mtstate,
333 									   ResultRelInfo *resultRelInfo,
334 									   List *fdw_private,
335 									   int subplan_index,
336 									   int eflags);
gen_ossl_block_size(PX_Cipher * c)337 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
338 												 ResultRelInfo *resultRelInfo,
339 												 TupleTableSlot *slot,
340 												 TupleTableSlot *planSlot);
341 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
342 												 ResultRelInfo *resultRelInfo,
343 												 TupleTableSlot *slot,
344 												 TupleTableSlot *planSlot);
gen_ossl_key_size(PX_Cipher * c)345 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
346 												 ResultRelInfo *resultRelInfo,
347 												 TupleTableSlot *slot,
348 												 TupleTableSlot *planSlot);
349 static void postgresEndForeignModify(EState *estate,
350 									 ResultRelInfo *resultRelInfo);
351 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
352 									   ResultRelInfo *resultRelInfo);
gen_ossl_iv_size(PX_Cipher * c)353 static void postgresEndForeignInsert(EState *estate,
354 									 ResultRelInfo *resultRelInfo);
355 static int	postgresIsForeignRelUpdatable(Relation rel);
356 static bool postgresPlanDirectModify(PlannerInfo *root,
357 									 ModifyTable *plan,
358 									 Index resultRelation,
359 									 int subplan_index);
360 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
361 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
362 static void postgresEndDirectModify(ForeignScanState *node);
gen_ossl_free(PX_Cipher * c)363 static void postgresExplainForeignScan(ForeignScanState *node,
364 									   ExplainState *es);
365 static void postgresExplainForeignModify(ModifyTableState *mtstate,
366 										 ResultRelInfo *rinfo,
367 										 List *fdw_private,
368 										 int subplan_index,
369 										 ExplainState *es);
370 static void postgresExplainDirectModify(ForeignScanState *node,
371 										ExplainState *es);
gen_ossl_decrypt(PX_Cipher * c,const uint8 * data,unsigned dlen,uint8 * res)372 static bool postgresAnalyzeForeignTable(Relation relation,
373 										AcquireSampleRowsFunc *func,
374 										BlockNumber *totalpages);
375 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
376 										 Oid serverOid);
377 static void postgresGetForeignJoinPaths(PlannerInfo *root,
378 										RelOptInfo *joinrel,
379 										RelOptInfo *outerrel,
380 										RelOptInfo *innerrel,
381 										JoinType jointype,
382 										JoinPathExtraData *extra);
383 static bool postgresRecheckForeignScan(ForeignScanState *node,
384 									   TupleTableSlot *slot);
385 static void postgresGetForeignUpperPaths(PlannerInfo *root,
386 										 UpperRelationKind stage,
387 										 RelOptInfo *input_rel,
388 										 RelOptInfo *output_rel,
389 										 void *extra);
390 
391 /*
392  * Helper functions
393  */
394 static void estimate_path_cost_size(PlannerInfo *root,
395 									RelOptInfo *foreignrel,
396 									List *param_join_conds,
397 									List *pathkeys,
gen_ossl_encrypt(PX_Cipher * c,const uint8 * data,unsigned dlen,uint8 * res)398 									PgFdwPathExtraData *fpextra,
399 									double *p_rows, int *p_width,
400 									Cost *p_startup_cost, Cost *p_total_cost);
401 static void get_remote_estimate(const char *sql,
402 								PGconn *conn,
403 								double *rows,
404 								int *width,
405 								Cost *startup_cost,
406 								Cost *total_cost);
407 static void adjust_foreign_grouping_path_cost(PlannerInfo *root,
408 											  List *pathkeys,
409 											  double retrieved_rows,
410 											  double width,
411 											  double limit_tuples,
412 											  Cost *p_startup_cost,
413 											  Cost *p_run_cost);
414 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
415 									  EquivalenceClass *ec, EquivalenceMember *em,
416 									  void *arg);
417 static void create_cursor(ForeignScanState *node);
418 static void fetch_more_data(ForeignScanState *node);
419 static void close_cursor(PGconn *conn, unsigned int cursor_number);
420 static PgFdwModifyState *create_foreign_modify(EState *estate,
421 											   RangeTblEntry *rte,
422 											   ResultRelInfo *resultRelInfo,
423 											   CmdType operation,
424 											   Plan *subplan,
425 											   char *query,
426 											   List *target_attrs,
427 											   bool has_returning,
428 											   List *retrieved_attrs);
429 static TupleTableSlot *execute_foreign_modify(EState *estate,
430 											  ResultRelInfo *resultRelInfo,
bf_check_supported_key_len(void)431 											  CmdType operation,
432 											  TupleTableSlot *slot,
433 											  TupleTableSlot *planSlot);
434 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
435 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
436 											 ItemPointer tupleid,
437 											 TupleTableSlot *slot);
438 static void store_returning_result(PgFdwModifyState *fmstate,
439 								   TupleTableSlot *slot, PGresult *res);
440 static void finish_foreign_modify(PgFdwModifyState *fmstate);
441 static List *build_remote_returning(Index rtindex, Relation rel,
442 									List *returningList);
443 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
444 static void execute_dml_stmt(ForeignScanState *node);
445 static TupleTableSlot *get_returning_data(ForeignScanState *node);
446 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
447 								  List *fdw_scan_tlist,
448 								  Index rtindex);
449 static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
450 											  TupleTableSlot *slot,
451 											  EState *estate);
452 static void prepare_query_params(PlanState *node,
453 								 List *fdw_exprs,
454 								 int numParams,
455 								 FmgrInfo **param_flinfo,
456 								 List **param_exprs,
457 								 const char ***param_values);
458 static void process_query_params(ExprContext *econtext,
459 								 FmgrInfo *param_flinfo,
460 								 List *param_exprs,
461 								 const char **param_values);
462 static int	postgresAcquireSampleRowsFunc(Relation relation, int elevel,
463 										  HeapTuple *rows, int targrows,
464 										  double *totalrows,
465 										  double *totaldeadrows);
466 static void analyze_row_processor(PGresult *res, int row,
467 								  PgFdwAnalyzeState *astate);
468 static HeapTuple make_tuple_from_result_row(PGresult *res,
469 											int row,
470 											Relation rel,
471 											AttInMetadata *attinmeta,
472 											List *retrieved_attrs,
473 											ForeignScanState *fsstate,
bf_init(PX_Cipher * c,const uint8 * key,unsigned klen,const uint8 * iv)474 											MemoryContext temp_context);
475 static void conversion_error_callback(void *arg);
476 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
477 							JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
478 							JoinPathExtraData *extra);
479 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
480 								Node *havingQual);
481 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
482 											  RelOptInfo *rel);
483 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
484 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
485 											Path *epq_path);
486 static void add_foreign_grouping_paths(PlannerInfo *root,
487 									   RelOptInfo *input_rel,
488 									   RelOptInfo *grouped_rel,
489 									   GroupPathExtraData *extra);
490 static void add_foreign_ordered_paths(PlannerInfo *root,
491 									  RelOptInfo *input_rel,
492 									  RelOptInfo *ordered_rel);
493 static void add_foreign_final_paths(PlannerInfo *root,
494 									RelOptInfo *input_rel,
495 									RelOptInfo *final_rel,
496 									FinalPathExtraData *extra);
497 static void apply_server_options(PgFdwRelationInfo *fpinfo);
498 static void apply_table_options(PgFdwRelationInfo *fpinfo);
499 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
500 							  const PgFdwRelationInfo *fpinfo_o,
501 							  const PgFdwRelationInfo *fpinfo_i);
502 
503 
504 /*
505  * Foreign-data wrapper handler function: return a struct with pointers
ossl_des_init(PX_Cipher * c,const uint8 * key,unsigned klen,const uint8 * iv)506  * to my callback routines.
507  */
508 Datum
509 postgres_fdw_handler(PG_FUNCTION_ARGS)
510 {
511 	FdwRoutine *routine = makeNode(FdwRoutine);
512 
513 	/* Functions for scanning foreign tables */
514 	routine->GetForeignRelSize = postgresGetForeignRelSize;
515 	routine->GetForeignPaths = postgresGetForeignPaths;
516 	routine->GetForeignPlan = postgresGetForeignPlan;
517 	routine->BeginForeignScan = postgresBeginForeignScan;
518 	routine->IterateForeignScan = postgresIterateForeignScan;
519 	routine->ReScanForeignScan = postgresReScanForeignScan;
520 	routine->EndForeignScan = postgresEndForeignScan;
521 
522 	/* Functions for updating foreign tables */
523 	routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
524 	routine->PlanForeignModify = postgresPlanForeignModify;
525 	routine->BeginForeignModify = postgresBeginForeignModify;
526 	routine->ExecForeignInsert = postgresExecForeignInsert;
527 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
528 	routine->ExecForeignDelete = postgresExecForeignDelete;
529 	routine->EndForeignModify = postgresEndForeignModify;
530 	routine->BeginForeignInsert = postgresBeginForeignInsert;
531 	routine->EndForeignInsert = postgresEndForeignInsert;
532 	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
533 	routine->PlanDirectModify = postgresPlanDirectModify;
534 	routine->BeginDirectModify = postgresBeginDirectModify;
535 	routine->IterateDirectModify = postgresIterateDirectModify;
536 	routine->EndDirectModify = postgresEndDirectModify;
537 
538 	/* Function for EvalPlanQual rechecks */
539 	routine->RecheckForeignScan = postgresRecheckForeignScan;
540 	/* Support functions for EXPLAIN */
541 	routine->ExplainForeignScan = postgresExplainForeignScan;
542 	routine->ExplainForeignModify = postgresExplainForeignModify;
543 	routine->ExplainDirectModify = postgresExplainDirectModify;
544 
545 	/* Support functions for ANALYZE */
546 	routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
547 
548 	/* Support functions for IMPORT FOREIGN SCHEMA */
549 	routine->ImportForeignSchema = postgresImportForeignSchema;
550 
551 	/* Support functions for join push-down */
552 	routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
553 
554 	/* Support functions for upper relation push-down */
555 	routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
556 
557 	PG_RETURN_POINTER(routine);
558 }
559 
560 /*
561  * postgresGetForeignRelSize
ossl_aes_init(PX_Cipher * c,const uint8 * key,unsigned klen,const uint8 * iv)562  *		Estimate # of rows and width of the result of the scan
563  *
564  * We should consider the effect of all baserestrictinfo clauses here, but
565  * not any join clauses.
566  */
567 static void
568 postgresGetForeignRelSize(PlannerInfo *root,
569 						  RelOptInfo *baserel,
570 						  Oid foreigntableid)
571 {
572 	PgFdwRelationInfo *fpinfo;
573 	ListCell   *lc;
574 	RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
575 
576 	/*
577 	 * We use PgFdwRelationInfo to pass various information to subsequent
578 	 * functions.
579 	 */
580 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
581 	baserel->fdw_private = (void *) fpinfo;
582 
583 	/* Base foreign tables need to be pushed down always. */
584 	fpinfo->pushdown_safe = true;
585 
586 	/* Look up foreign-table catalog info. */
587 	fpinfo->table = GetForeignTable(foreigntableid);
588 	fpinfo->server = GetForeignServer(fpinfo->table->serverid);
589 
590 	/*
591 	 * Extract user-settable option values.  Note that per-table settings of
592 	 * use_remote_estimate and fetch_size override per-server settings of
593 	 * them, respectively.
594 	 */
595 	fpinfo->use_remote_estimate = false;
596 	fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
597 	fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
598 	fpinfo->shippable_extensions = NIL;
599 	fpinfo->fetch_size = 100;
600 
601 	apply_server_options(fpinfo);
602 	apply_table_options(fpinfo);
603 
604 	/*
605 	 * If the table or the server is configured to use remote estimates,
606 	 * identify which user to do remote access as during planning.  This
607 	 * should match what ExecCheckRTEPerms() does.  If we fail due to lack of
608 	 * permissions, the query would have failed at runtime anyway.
609 	 */
610 	if (fpinfo->use_remote_estimate)
611 	{
612 		Oid			userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
613 
614 		fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
615 	}
616 	else
617 		fpinfo->user = NULL;
618 
619 	/*
620 	 * Identify which baserestrictinfo clauses can be sent to the remote
621 	 * server and which can't.
622 	 */
623 	classifyConditions(root, baserel, baserel->baserestrictinfo,
624 					   &fpinfo->remote_conds, &fpinfo->local_conds);
625 
626 	/*
627 	 * Identify which attributes will need to be retrieved from the remote
628 	 * server.  These include all attrs needed for joins or final output, plus
629 	 * all attrs used in the local_conds.  (Note: if we end up using a
630 	 * parameterized scan, it's possible that some of the join clauses will be
631 	 * sent to the remote and thus we wouldn't really need to retrieve the
632 	 * columns used in them.  Doesn't seem worth detecting that case though.)
633 	 */
634 	fpinfo->attrs_used = NULL;
635 	pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
636 				   &fpinfo->attrs_used);
637 	foreach(lc, fpinfo->local_conds)
638 	{
639 		RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
640 
641 		pull_varattnos((Node *) rinfo->clause, baserel->relid,
642 					   &fpinfo->attrs_used);
643 	}
644 
645 	/*
646 	 * Compute the selectivity and cost of the local_conds, so we don't have
647 	 * to do it over again for each path.  The best we can do for these
648 	 * conditions is to estimate selectivity on the basis of local statistics.
649 	 */
650 	fpinfo->local_conds_sel = clauselist_selectivity(root,
651 													 fpinfo->local_conds,
652 													 baserel->relid,
653 													 JOIN_INNER,
654 													 NULL);
655 
656 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
657 
658 	/*
659 	 * Set # of retrieved rows and cached relation costs to some negative
660 	 * value, so that we can detect when they are set to some sensible values,
661 	 * during one (usually the first) of the calls to estimate_path_cost_size.
662 	 */
663 	fpinfo->retrieved_rows = -1;
664 	fpinfo->rel_startup_cost = -1;
665 	fpinfo->rel_total_cost = -1;
666 
667 	/*
668 	 * If the table or the server is configured to use remote estimates,
669 	 * connect to the foreign server and execute EXPLAIN to estimate the
670 	 * number of rows selected by the restriction clauses, as well as the
671 	 * average row width.  Otherwise, estimate using whatever statistics we
672 	 * have locally, in a way similar to ordinary tables.
673 	 */
674 	if (fpinfo->use_remote_estimate)
675 	{
676 		/*
677 		 * Get cost/size estimates with help of remote server.  Save the
678 		 * values in fpinfo so we don't need to do it again to generate the
679 		 * basic foreign path.
680 		 */
681 		estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
682 								&fpinfo->rows, &fpinfo->width,
683 								&fpinfo->startup_cost, &fpinfo->total_cost);
684 
685 		/* Report estimated baserel size to planner. */
686 		baserel->rows = fpinfo->rows;
687 		baserel->reltarget->width = fpinfo->width;
688 	}
689 	else
690 	{
691 		/*
692 		 * If the foreign table has never been ANALYZEd, it will have relpages
693 		 * and reltuples equal to zero, which most likely has nothing to do
694 		 * with reality.  We can't do a whole lot about that if we're not
695 		 * allowed to consult the remote server, but we can use a hack similar
696 		 * to plancat.c's treatment of empty relations: use a minimum size
697 		 * estimate of 10 pages, and divide by the column-datatype-based width
698 		 * estimate to get the corresponding number of tuples.
699 		 */
700 		if (baserel->pages == 0 && baserel->tuples == 0)
701 		{
702 			baserel->pages = 10;
703 			baserel->tuples =
704 				(10 * BLCKSZ) / (baserel->reltarget->width +
705 								 MAXALIGN(SizeofHeapTupleHeader));
706 		}
707 
708 		/* Estimate baserel size as best we can with local statistics. */
709 		set_baserel_size_estimates(root, baserel);
710 
711 		/* Fill in basically-bogus cost estimates for use later. */
712 		estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
713 								&fpinfo->rows, &fpinfo->width,
714 								&fpinfo->startup_cost, &fpinfo->total_cost);
715 	}
716 
717 	/*
718 	 * fpinfo->relation_name gets the numeric rangetable index of the foreign
719 	 * table RTE.  (If this query gets EXPLAIN'd, we'll convert that to a
720 	 * human-readable string at that time.)
721 	 */
722 	fpinfo->relation_name = psprintf("%u", baserel->relid);
723 
724 	/* No outer and inner relations. */
725 	fpinfo->make_outerrel_subquery = false;
726 	fpinfo->make_innerrel_subquery = false;
727 	fpinfo->lower_subquery_rels = NULL;
728 	/* Set the relation index. */
729 	fpinfo->relation_index = baserel->relid;
730 }
731 
732 /*
733  * get_useful_ecs_for_relation
734  *		Determine which EquivalenceClasses might be involved in useful
735  *		orderings of this relation.
736  *
737  * This function is in some respects a mirror image of the core function
738  * pathkeys_useful_for_merging: for a regular table, we know what indexes
739  * we have and want to test whether any of them are useful.  For a foreign
740  * table, we don't know what indexes are present on the remote side but
741  * want to speculate about which ones we'd like to use if they existed.
742  *
743  * This function returns a list of potentially-useful equivalence classes,
744  * but it does not guarantee that an EquivalenceMember exists which contains
745  * Vars only from the given relation.  For example, given ft1 JOIN t1 ON
746  * ft1.x + t1.x = 0, this function will say that the equivalence class
747  * containing ft1.x + t1.x is potentially useful.  Supposing ft1 is remote and
748  * t1 is local (or on a different server), it will turn out that no useful
749  * ORDER BY clause can be generated.  It's not our job to figure that out
750  * here; we're only interested in identifying relevant ECs.
751  */
752 static List *
753 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
754 {
755 	List	   *useful_eclass_list = NIL;
756 	ListCell   *lc;
757 	Relids		relids;
758 
759 	/*
760 	 * First, consider whether any active EC is potentially useful for a merge
761 	 * join against this relation.
762 	 */
px_find_cipher(const char * name,PX_Cipher ** res)763 	if (rel->has_eclass_joins)
764 	{
765 		foreach(lc, root->eq_classes)
766 		{
767 			EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
768 
769 			if (eclass_useful_for_merging(root, cur_ec, rel))
770 				useful_eclass_list = lappend(useful_eclass_list, cur_ec);
771 		}
772 	}
773 
774 	/*
775 	 * Next, consider whether there are any non-EC derivable join clauses that
776 	 * are merge-joinable.  If the joininfo list is empty, we can exit
777 	 * quickly.
778 	 */
779 	if (rel->joininfo == NIL)
780 		return useful_eclass_list;
781 
782 	/* If this is a child rel, we must use the topmost parent rel to search. */
783 	if (IS_OTHER_REL(rel))
784 	{
785 		Assert(!bms_is_empty(rel->top_parent_relids));
786 		relids = rel->top_parent_relids;
787 	}
788 	else
789 		relids = rel->relids;
790 
791 	/* Check each join clause in turn. */
792 	foreach(lc, rel->joininfo)
793 	{
794 		RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
795 
796 		/* Consider only mergejoinable clauses */
797 		if (restrictinfo->mergeopfamilies == NIL)
798 			continue;
799 
800 		/* Make sure we've got canonical ECs. */
801 		update_mergeclause_eclasses(root, restrictinfo);
802 
803 		/*
804 		 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
805 		 * that left_ec and right_ec will be initialized, per comments in
806 		 * distribute_qual_to_rels.
807 		 *
808 		 * We want to identify which side of this merge-joinable clause
809 		 * contains columns from the relation produced by this RelOptInfo. We
810 		 * test for overlap, not containment, because there could be extra
811 		 * relations on either side.  For example, suppose we've got something
812 		 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
813 		 * A.y = D.y.  The input rel might be the joinrel between A and B, and
814 		 * we'll consider the join clause A.y = D.y. relids contains a
815 		 * relation not involved in the join class (B) and the equivalence
816 		 * class for the left-hand side of the clause contains a relation not
817 		 * involved in the input rel (C).  Despite the fact that we have only
818 		 * overlap and not containment in either direction, A.y is potentially
819 		 * useful as a sort column.
820 		 *
821 		 * Note that it's even possible that relids overlaps neither side of
822 		 * the join clause.  For example, consider A LEFT JOIN B ON A.x = B.x
823 		 * AND A.x = 1.  The clause A.x = 1 will appear in B's joininfo list,
824 		 * but overlaps neither side of B.  In that case, we just skip this
825 		 * join clause, since it doesn't suggest a useful sort order for this
826 		 * relation.
827 		 */
828 		if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
829 			useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
830 														restrictinfo->right_ec);
831 		else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
832 			useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
833 														restrictinfo->left_ec);
834 	}
835 
836 	return useful_eclass_list;
837 }
838 
839 /*
840  * get_useful_pathkeys_for_relation
841  *		Determine which orderings of a relation might be useful.
842  *
843  * Getting data in sorted order can be useful either because the requested
844  * order matches the final output ordering for the overall query we're
845  * planning, or because it enables an efficient merge join.  Here, we try
846  * to figure out which pathkeys to consider.
847  */
848 static List *
849 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
850 {
851 	List	   *useful_pathkeys_list = NIL;
852 	List	   *useful_eclass_list;
853 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
854 	EquivalenceClass *query_ec = NULL;
855 	ListCell   *lc;
856 
857 	/*
858 	 * Pushing the query_pathkeys to the remote server is always worth
859 	 * considering, because it might let us avoid a local sort.
860 	 */
861 	fpinfo->qp_is_pushdown_safe = false;
862 	if (root->query_pathkeys)
863 	{
864 		bool		query_pathkeys_ok = true;
865 
866 		foreach(lc, root->query_pathkeys)
867 		{
868 			PathKey    *pathkey = (PathKey *) lfirst(lc);
869 			EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
870 			Expr	   *em_expr;
871 
872 			/*
873 			 * The planner and executor don't have any clever strategy for
874 			 * taking data sorted by a prefix of the query's pathkeys and
875 			 * getting it to be sorted by all of those pathkeys. We'll just
876 			 * end up resorting the entire data set.  So, unless we can push
877 			 * down all of the query pathkeys, forget it.
878 			 *
879 			 * is_foreign_expr would detect volatile expressions as well, but
880 			 * checking ec_has_volatile here saves some cycles.
881 			 */
882 			if (pathkey_ec->ec_has_volatile ||
883 				!(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
884 				!is_foreign_expr(root, rel, em_expr))
885 			{
886 				query_pathkeys_ok = false;
887 				break;
888 			}
889 		}
890 
891 		if (query_pathkeys_ok)
892 		{
893 			useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
894 			fpinfo->qp_is_pushdown_safe = true;
895 		}
896 	}
897 
898 	/*
899 	 * Even if we're not using remote estimates, having the remote side do the
900 	 * sort generally won't be any worse than doing it locally, and it might
901 	 * be much better if the remote side can generate data in the right order
902 	 * without needing a sort at all.  However, what we're going to do next is
903 	 * try to generate pathkeys that seem promising for possible merge joins,
904 	 * and that's more speculative.  A wrong choice might hurt quite a bit, so
905 	 * bail out if we can't use remote estimates.
906 	 */
907 	if (!fpinfo->use_remote_estimate)
908 		return useful_pathkeys_list;
909 
910 	/* Get the list of interesting EquivalenceClasses. */
911 	useful_eclass_list = get_useful_ecs_for_relation(root, rel);
912 
913 	/* Extract unique EC for query, if any, so we don't consider it again. */
914 	if (list_length(root->query_pathkeys) == 1)
915 	{
916 		PathKey    *query_pathkey = linitial(root->query_pathkeys);
917 
918 		query_ec = query_pathkey->pk_eclass;
919 	}
920 
921 	/*
922 	 * As a heuristic, the only pathkeys we consider here are those of length
923 	 * one.  It's surely possible to consider more, but since each one we
924 	 * choose to consider will generate a round-trip to the remote side, we
925 	 * need to be a bit cautious here.  It would sure be nice to have a local
926 	 * cache of information about remote index definitions...
927 	 */
928 	foreach(lc, useful_eclass_list)
929 	{
930 		EquivalenceClass *cur_ec = lfirst(lc);
931 		Expr	   *em_expr;
932 		PathKey    *pathkey;
933 
934 		/* If redundant with what we did above, skip it. */
935 		if (cur_ec == query_ec)
936 			continue;
937 
938 		/* If no pushable expression for this rel, skip it. */
939 		em_expr = find_em_expr_for_rel(cur_ec, rel);
940 		if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
941 			continue;
942 
943 		/* Looks like we can generate a pathkey, so let's do it. */
944 		pathkey = make_canonical_pathkey(root, cur_ec,
945 										 linitial_oid(cur_ec->ec_opfamilies),
946 										 BTLessStrategyNumber,
947 										 false);
948 		useful_pathkeys_list = lappend(useful_pathkeys_list,
949 									   list_make1(pathkey));
950 	}
951 
952 	return useful_pathkeys_list;
953 }
954 
955 /*
956  * postgresGetForeignPaths
957  *		Create possible scan paths for a scan on the foreign table
958  */
959 static void
960 postgresGetForeignPaths(PlannerInfo *root,
961 						RelOptInfo *baserel,
962 						Oid foreigntableid)
963 {
964 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
965 	ForeignPath *path;
966 	List	   *ppi_list;
967 	ListCell   *lc;
968 
969 	/*
970 	 * Create simplest ForeignScan path node and add it to baserel.  This path
971 	 * corresponds to SeqScan path of regular tables (though depending on what
972 	 * baserestrict conditions we were able to send to remote, there might
973 	 * actually be an indexscan happening there).  We already did all the work
974 	 * to estimate cost and size of this path.
975 	 *
976 	 * Although this path uses no join clauses, it could still have required
977 	 * parameterization due to LATERAL refs in its tlist.
978 	 */
979 	path = create_foreignscan_path(root, baserel,
980 								   NULL,	/* default pathtarget */
981 								   fpinfo->rows,
982 								   fpinfo->startup_cost,
983 								   fpinfo->total_cost,
984 								   NIL, /* no pathkeys */
985 								   baserel->lateral_relids,
986 								   NULL,	/* no extra plan */
987 								   NIL);	/* no fdw_private list */
988 	add_path(baserel, (Path *) path);
989 
990 	/* Add paths with pathkeys */
991 	add_paths_with_pathkeys_for_rel(root, baserel, NULL);
992 
993 	/*
994 	 * If we're not using remote estimates, stop here.  We have no way to
995 	 * estimate whether any join clauses would be worth sending across, so
996 	 * don't bother building parameterized paths.
997 	 */
998 	if (!fpinfo->use_remote_estimate)
999 		return;
1000 
1001 	/*
1002 	 * Thumb through all join clauses for the rel to identify which outer
1003 	 * relations could supply one or more safe-to-send-to-remote join clauses.
1004 	 * We'll build a parameterized path for each such outer relation.
1005 	 *
1006 	 * It's convenient to manage this by representing each candidate outer
1007 	 * relation by the ParamPathInfo node for it.  We can then use the
1008 	 * ppi_clauses list in the ParamPathInfo node directly as a list of the
1009 	 * interesting join clauses for that rel.  This takes care of the
1010 	 * possibility that there are multiple safe join clauses for such a rel,
1011 	 * and also ensures that we account for unsafe join clauses that we'll
1012 	 * still have to enforce locally (since the parameterized-path machinery
1013 	 * insists that we handle all movable clauses).
1014 	 */
1015 	ppi_list = NIL;
1016 	foreach(lc, baserel->joininfo)
1017 	{
1018 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1019 		Relids		required_outer;
1020 		ParamPathInfo *param_info;
1021 
1022 		/* Check if clause can be moved to this rel */
1023 		if (!join_clause_is_movable_to(rinfo, baserel))
1024 			continue;
1025 
1026 		/* See if it is safe to send to remote */
1027 		if (!is_foreign_expr(root, baserel, rinfo->clause))
1028 			continue;
1029 
1030 		/* Calculate required outer rels for the resulting path */
1031 		required_outer = bms_union(rinfo->clause_relids,
1032 								   baserel->lateral_relids);
1033 		/* We do not want the foreign rel itself listed in required_outer */
1034 		required_outer = bms_del_member(required_outer, baserel->relid);
1035 
1036 		/*
1037 		 * required_outer probably can't be empty here, but if it were, we
1038 		 * couldn't make a parameterized path.
1039 		 */
1040 		if (bms_is_empty(required_outer))
1041 			continue;
1042 
1043 		/* Get the ParamPathInfo */
1044 		param_info = get_baserel_parampathinfo(root, baserel,
1045 											   required_outer);
1046 		Assert(param_info != NULL);
1047 
1048 		/*
1049 		 * Add it to list unless we already have it.  Testing pointer equality
1050 		 * is OK since get_baserel_parampathinfo won't make duplicates.
1051 		 */
1052 		ppi_list = list_append_unique_ptr(ppi_list, param_info);
1053 	}
1054 
1055 	/*
1056 	 * The above scan examined only "generic" join clauses, not those that
1057 	 * were absorbed into EquivalenceClauses.  See if we can make anything out
1058 	 * of EquivalenceClauses.
1059 	 */
1060 	if (baserel->has_eclass_joins)
1061 	{
1062 		/*
1063 		 * We repeatedly scan the eclass list looking for column references
1064 		 * (or expressions) belonging to the foreign rel.  Each time we find
1065 		 * one, we generate a list of equivalence joinclauses for it, and then
1066 		 * see if any are safe to send to the remote.  Repeat till there are
1067 		 * no more candidate EC members.
1068 		 */
1069 		ec_member_foreign_arg arg;
1070 
1071 		arg.already_used = NIL;
1072 		for (;;)
1073 		{
1074 			List	   *clauses;
1075 
1076 			/* Make clauses, skipping any that join to lateral_referencers */
1077 			arg.current = NULL;
1078 			clauses = generate_implied_equalities_for_column(root,
1079 															 baserel,
1080 															 ec_member_matches_foreign,
1081 															 (void *) &arg,
1082 															 baserel->lateral_referencers);
1083 
1084 			/* Done if there are no more expressions in the foreign rel */
1085 			if (arg.current == NULL)
1086 			{
1087 				Assert(clauses == NIL);
1088 				break;
1089 			}
1090 
1091 			/* Scan the extracted join clauses */
1092 			foreach(lc, clauses)
1093 			{
1094 				RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1095 				Relids		required_outer;
1096 				ParamPathInfo *param_info;
1097 
1098 				/* Check if clause can be moved to this rel */
1099 				if (!join_clause_is_movable_to(rinfo, baserel))
1100 					continue;
1101 
1102 				/* See if it is safe to send to remote */
1103 				if (!is_foreign_expr(root, baserel, rinfo->clause))
1104 					continue;
1105 
1106 				/* Calculate required outer rels for the resulting path */
1107 				required_outer = bms_union(rinfo->clause_relids,
1108 										   baserel->lateral_relids);
1109 				required_outer = bms_del_member(required_outer, baserel->relid);
1110 				if (bms_is_empty(required_outer))
1111 					continue;
1112 
1113 				/* Get the ParamPathInfo */
1114 				param_info = get_baserel_parampathinfo(root, baserel,
1115 													   required_outer);
1116 				Assert(param_info != NULL);
1117 
1118 				/* Add it to list unless we already have it */
1119 				ppi_list = list_append_unique_ptr(ppi_list, param_info);
1120 			}
1121 
1122 			/* Try again, now ignoring the expression we found this time */
1123 			arg.already_used = lappend(arg.already_used, arg.current);
1124 		}
1125 	}
1126 
1127 	/*
1128 	 * Now build a path for each useful outer relation.
1129 	 */
1130 	foreach(lc, ppi_list)
1131 	{
1132 		ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1133 		double		rows;
1134 		int			width;
1135 		Cost		startup_cost;
1136 		Cost		total_cost;
1137 
1138 		/* Get a cost estimate from the remote */
1139 		estimate_path_cost_size(root, baserel,
1140 								param_info->ppi_clauses, NIL, NULL,
1141 								&rows, &width,
1142 								&startup_cost, &total_cost);
1143 
1144 		/*
1145 		 * ppi_rows currently won't get looked at by anything, but still we
1146 		 * may as well ensure that it matches our idea of the rowcount.
1147 		 */
1148 		param_info->ppi_rows = rows;
1149 
1150 		/* Make the path */
1151 		path = create_foreignscan_path(root, baserel,
1152 									   NULL,	/* default pathtarget */
1153 									   rows,
1154 									   startup_cost,
1155 									   total_cost,
1156 									   NIL, /* no pathkeys */
1157 									   param_info->ppi_req_outer,
1158 									   NULL,
1159 									   NIL);	/* no fdw_private list */
1160 		add_path(baserel, (Path *) path);
1161 	}
1162 }
1163 
1164 /*
1165  * postgresGetForeignPlan
1166  *		Create ForeignScan plan node which implements selected best path
1167  */
1168 static ForeignScan *
1169 postgresGetForeignPlan(PlannerInfo *root,
1170 					   RelOptInfo *foreignrel,
1171 					   Oid foreigntableid,
1172 					   ForeignPath *best_path,
1173 					   List *tlist,
1174 					   List *scan_clauses,
1175 					   Plan *outer_plan)
1176 {
1177 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1178 	Index		scan_relid;
1179 	List	   *fdw_private;
1180 	List	   *remote_exprs = NIL;
1181 	List	   *local_exprs = NIL;
1182 	List	   *params_list = NIL;
1183 	List	   *fdw_scan_tlist = NIL;
1184 	List	   *fdw_recheck_quals = NIL;
1185 	List	   *retrieved_attrs;
1186 	StringInfoData sql;
1187 	bool		has_final_sort = false;
1188 	bool		has_limit = false;
1189 	ListCell   *lc;
1190 
1191 	/*
1192 	 * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1193 	 */
1194 	if (best_path->fdw_private)
1195 	{
1196 		has_final_sort = intVal(list_nth(best_path->fdw_private,
1197 										 FdwPathPrivateHasFinalSort));
1198 		has_limit = intVal(list_nth(best_path->fdw_private,
1199 									FdwPathPrivateHasLimit));
1200 	}
1201 
1202 	if (IS_SIMPLE_REL(foreignrel))
1203 	{
1204 		/*
1205 		 * For base relations, set scan_relid as the relid of the relation.
1206 		 */
1207 		scan_relid = foreignrel->relid;
1208 
1209 		/*
1210 		 * In a base-relation scan, we must apply the given scan_clauses.
1211 		 *
1212 		 * Separate the scan_clauses into those that can be executed remotely
1213 		 * and those that can't.  baserestrictinfo clauses that were
1214 		 * previously determined to be safe or unsafe by classifyConditions
1215 		 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1216 		 * else in the scan_clauses list will be a join clause, which we have
1217 		 * to check for remote-safety.
1218 		 *
1219 		 * Note: the join clauses we see here should be the exact same ones
1220 		 * previously examined by postgresGetForeignPaths.  Possibly it'd be
1221 		 * worth passing forward the classification work done then, rather
1222 		 * than repeating it here.
1223 		 *
1224 		 * This code must match "extract_actual_clauses(scan_clauses, false)"
1225 		 * except for the additional decision about remote versus local
1226 		 * execution.
1227 		 */
1228 		foreach(lc, scan_clauses)
1229 		{
1230 			RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1231 
1232 			/* Ignore any pseudoconstants, they're dealt with elsewhere */
1233 			if (rinfo->pseudoconstant)
1234 				continue;
1235 
1236 			if (list_member_ptr(fpinfo->remote_conds, rinfo))
1237 				remote_exprs = lappend(remote_exprs, rinfo->clause);
1238 			else if (list_member_ptr(fpinfo->local_conds, rinfo))
1239 				local_exprs = lappend(local_exprs, rinfo->clause);
1240 			else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1241 				remote_exprs = lappend(remote_exprs, rinfo->clause);
1242 			else
1243 				local_exprs = lappend(local_exprs, rinfo->clause);
1244 		}
1245 
1246 		/*
1247 		 * For a base-relation scan, we have to support EPQ recheck, which
1248 		 * should recheck all the remote quals.
1249 		 */
1250 		fdw_recheck_quals = remote_exprs;
1251 	}
1252 	else
1253 	{
1254 		/*
1255 		 * Join relation or upper relation - set scan_relid to 0.
1256 		 */
1257 		scan_relid = 0;
1258 
1259 		/*
1260 		 * For a join rel, baserestrictinfo is NIL and we are not considering
1261 		 * parameterization right now, so there should be no scan_clauses for
1262 		 * a joinrel or an upper rel either.
1263 		 */
1264 		Assert(!scan_clauses);
1265 
1266 		/*
1267 		 * Instead we get the conditions to apply from the fdw_private
1268 		 * structure.
1269 		 */
1270 		remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1271 		local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1272 
1273 		/*
1274 		 * We leave fdw_recheck_quals empty in this case, since we never need
1275 		 * to apply EPQ recheck clauses.  In the case of a joinrel, EPQ
1276 		 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1277 		 * If we're planning an upperrel (ie, remote grouping or aggregation)
1278 		 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1279 		 * allowed, and indeed we *can't* put the remote clauses into
1280 		 * fdw_recheck_quals because the unaggregated Vars won't be available
1281 		 * locally.
1282 		 */
1283 
1284 		/* Build the list of columns to be fetched from the foreign server. */
1285 		fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1286 
1287 		/*
1288 		 * Ensure that the outer plan produces a tuple whose descriptor
1289 		 * matches our scan tuple slot.  Also, remove the local conditions
1290 		 * from outer plan's quals, lest they be evaluated twice, once by the
1291 		 * local plan and once by the scan.
1292 		 */
1293 		if (outer_plan)
1294 		{
1295 			ListCell   *lc;
1296 
1297 			/*
1298 			 * Right now, we only consider grouping and aggregation beyond
1299 			 * joins. Queries involving aggregates or grouping do not require
1300 			 * EPQ mechanism, hence should not have an outer plan here.
1301 			 */
1302 			Assert(!IS_UPPER_REL(foreignrel));
1303 
1304 			/*
1305 			 * First, update the plan's qual list if possible.  In some cases
1306 			 * the quals might be enforced below the topmost plan level, in
1307 			 * which case we'll fail to remove them; it's not worth working
1308 			 * harder than this.
1309 			 */
1310 			foreach(lc, local_exprs)
1311 			{
1312 				Node	   *qual = lfirst(lc);
1313 
1314 				outer_plan->qual = list_delete(outer_plan->qual, qual);
1315 
1316 				/*
1317 				 * For an inner join the local conditions of foreign scan plan
1318 				 * can be part of the joinquals as well.  (They might also be
1319 				 * in the mergequals or hashquals, but we can't touch those
1320 				 * without breaking the plan.)
1321 				 */
1322 				if (IsA(outer_plan, NestLoop) ||
1323 					IsA(outer_plan, MergeJoin) ||
1324 					IsA(outer_plan, HashJoin))
1325 				{
1326 					Join	   *join_plan = (Join *) outer_plan;
1327 
1328 					if (join_plan->jointype == JOIN_INNER)
1329 						join_plan->joinqual = list_delete(join_plan->joinqual,
1330 														  qual);
1331 				}
1332 			}
1333 
1334 			/*
1335 			 * Now fix the subplan's tlist --- this might result in inserting
1336 			 * a Result node atop the plan tree.
1337 			 */
1338 			outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1339 												best_path->path.parallel_safe);
1340 		}
1341 	}
1342 
1343 	/*
1344 	 * Build the query string to be sent for execution, and identify
1345 	 * expressions to be sent as parameters.
1346 	 */
1347 	initStringInfo(&sql);
1348 	deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1349 							remote_exprs, best_path->path.pathkeys,
1350 							has_final_sort, has_limit, false,
1351 							&retrieved_attrs, &params_list);
1352 
1353 	/* Remember remote_exprs for possible use by postgresPlanDirectModify */
1354 	fpinfo->final_remote_exprs = remote_exprs;
1355 
1356 	/*
1357 	 * Build the fdw_private list that will be available to the executor.
1358 	 * Items in the list must match order in enum FdwScanPrivateIndex.
1359 	 */
1360 	fdw_private = list_make3(makeString(sql.data),
1361 							 retrieved_attrs,
1362 							 makeInteger(fpinfo->fetch_size));
1363 	if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1364 		fdw_private = lappend(fdw_private,
1365 							  makeString(fpinfo->relation_name));
1366 
1367 	/*
1368 	 * Create the ForeignScan node for the given relation.
1369 	 *
1370 	 * Note that the remote parameter expressions are stored in the fdw_exprs
1371 	 * field of the finished plan node; we can't keep them in private state
1372 	 * because then they wouldn't be subject to later planner processing.
1373 	 */
1374 	return make_foreignscan(tlist,
1375 							local_exprs,
1376 							scan_relid,
1377 							params_list,
1378 							fdw_private,
1379 							fdw_scan_tlist,
1380 							fdw_recheck_quals,
1381 							outer_plan);
1382 }
1383 
1384 /*
1385  * postgresBeginForeignScan
1386  *		Initiate an executor scan of a foreign PostgreSQL table.
1387  */
1388 static void
1389 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1390 {
1391 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1392 	EState	   *estate = node->ss.ps.state;
1393 	PgFdwScanState *fsstate;
1394 	RangeTblEntry *rte;
1395 	Oid			userid;
1396 	ForeignTable *table;
1397 	UserMapping *user;
1398 	int			rtindex;
1399 	int			numParams;
1400 
1401 	/*
1402 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
1403 	 */
1404 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1405 		return;
1406 
1407 	/*
1408 	 * We'll save private state in node->fdw_state.
1409 	 */
1410 	fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1411 	node->fdw_state = (void *) fsstate;
1412 
1413 	/*
1414 	 * Identify which user to do the remote access as.  This should match what
1415 	 * ExecCheckRTEPerms() does.  In case of a join or aggregate, use the
1416 	 * lowest-numbered member RTE as a representative; we would get the same
1417 	 * result from any.
1418 	 */
1419 	if (fsplan->scan.scanrelid > 0)
1420 		rtindex = fsplan->scan.scanrelid;
1421 	else
1422 		rtindex = bms_next_member(fsplan->fs_relids, -1);
1423 	rte = exec_rt_fetch(rtindex, estate);
1424 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1425 
1426 	/* Get info about foreign table. */
1427 	table = GetForeignTable(rte->relid);
1428 	user = GetUserMapping(userid, table->serverid);
1429 
1430 	/*
1431 	 * Get connection to the foreign server.  Connection manager will
1432 	 * establish new connection if necessary.
1433 	 */
1434 	fsstate->conn = GetConnection(user, false);
1435 
1436 	/* Assign a unique ID for my cursor */
1437 	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1438 	fsstate->cursor_exists = false;
1439 
1440 	/* Get private info created by planner functions. */
1441 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
1442 									 FdwScanPrivateSelectSql));
1443 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1444 												 FdwScanPrivateRetrievedAttrs);
1445 	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1446 										  FdwScanPrivateFetchSize));
1447 
1448 	/* Create contexts for batches of tuples and per-tuple temp workspace. */
1449 	fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1450 											   "postgres_fdw tuple data",
1451 											   ALLOCSET_DEFAULT_SIZES);
1452 	fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1453 											  "postgres_fdw temporary data",
1454 											  ALLOCSET_SMALL_SIZES);
1455 
1456 	/*
1457 	 * Get info we'll need for converting data fetched from the foreign server
1458 	 * into local representation and error reporting during that process.
1459 	 */
1460 	if (fsplan->scan.scanrelid > 0)
1461 	{
1462 		fsstate->rel = node->ss.ss_currentRelation;
1463 		fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1464 	}
1465 	else
1466 	{
1467 		fsstate->rel = NULL;
1468 		fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1469 	}
1470 
1471 	fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1472 
1473 	/*
1474 	 * Prepare for processing of parameters used in remote query, if any.
1475 	 */
1476 	numParams = list_length(fsplan->fdw_exprs);
1477 	fsstate->numParams = numParams;
1478 	if (numParams > 0)
1479 		prepare_query_params((PlanState *) node,
1480 							 fsplan->fdw_exprs,
1481 							 numParams,
1482 							 &fsstate->param_flinfo,
1483 							 &fsstate->param_exprs,
1484 							 &fsstate->param_values);
1485 }
1486 
1487 /*
1488  * postgresIterateForeignScan
1489  *		Retrieve next row from the result set, or clear tuple slot to indicate
1490  *		EOF.
1491  */
1492 static TupleTableSlot *
1493 postgresIterateForeignScan(ForeignScanState *node)
1494 {
1495 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1496 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1497 
1498 	/*
1499 	 * If this is the first call after Begin or ReScan, we need to create the
1500 	 * cursor on the remote side.
1501 	 */
1502 	if (!fsstate->cursor_exists)
1503 		create_cursor(node);
1504 
1505 	/*
1506 	 * Get some more tuples, if we've run out.
1507 	 */
1508 	if (fsstate->next_tuple >= fsstate->num_tuples)
1509 	{
1510 		/* No point in another fetch if we already detected EOF, though. */
1511 		if (!fsstate->eof_reached)
1512 			fetch_more_data(node);
1513 		/* If we didn't get any tuples, must be end of data. */
1514 		if (fsstate->next_tuple >= fsstate->num_tuples)
1515 			return ExecClearTuple(slot);
1516 	}
1517 
1518 	/*
1519 	 * Return the next tuple.
1520 	 */
1521 	ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1522 					   slot,
1523 					   false);
1524 
1525 	return slot;
1526 }
1527 
1528 /*
1529  * postgresReScanForeignScan
1530  *		Restart the scan.
1531  */
1532 static void
1533 postgresReScanForeignScan(ForeignScanState *node)
1534 {
1535 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1536 	char		sql[64];
1537 	PGresult   *res;
1538 
1539 	/* If we haven't created the cursor yet, nothing to do. */
1540 	if (!fsstate->cursor_exists)
1541 		return;
1542 
1543 	/*
1544 	 * If any internal parameters affecting this node have changed, we'd
1545 	 * better destroy and recreate the cursor.  Otherwise, rewinding it should
1546 	 * be good enough.  If we've only fetched zero or one batch, we needn't
1547 	 * even rewind the cursor, just rescan what we have.
1548 	 */
1549 	if (node->ss.ps.chgParam != NULL)
1550 	{
1551 		fsstate->cursor_exists = false;
1552 		snprintf(sql, sizeof(sql), "CLOSE c%u",
1553 				 fsstate->cursor_number);
1554 	}
1555 	else if (fsstate->fetch_ct_2 > 1)
1556 	{
1557 		snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1558 				 fsstate->cursor_number);
1559 	}
1560 	else
1561 	{
1562 		/* Easy: just rescan what we already have in memory, if anything */
1563 		fsstate->next_tuple = 0;
1564 		return;
1565 	}
1566 
1567 	/*
1568 	 * We don't use a PG_TRY block here, so be careful not to throw error
1569 	 * without releasing the PGresult.
1570 	 */
1571 	res = pgfdw_exec_query(fsstate->conn, sql);
1572 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1573 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1574 	PQclear(res);
1575 
1576 	/* Now force a fresh FETCH. */
1577 	fsstate->tuples = NULL;
1578 	fsstate->num_tuples = 0;
1579 	fsstate->next_tuple = 0;
1580 	fsstate->fetch_ct_2 = 0;
1581 	fsstate->eof_reached = false;
1582 }
1583 
1584 /*
1585  * postgresEndForeignScan
1586  *		Finish scanning foreign table and dispose objects used for this scan
1587  */
1588 static void
1589 postgresEndForeignScan(ForeignScanState *node)
1590 {
1591 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1592 
1593 	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1594 	if (fsstate == NULL)
1595 		return;
1596 
1597 	/* Close the cursor if open, to prevent accumulation of cursors */
1598 	if (fsstate->cursor_exists)
1599 		close_cursor(fsstate->conn, fsstate->cursor_number);
1600 
1601 	/* Release remote connection */
1602 	ReleaseConnection(fsstate->conn);
1603 	fsstate->conn = NULL;
1604 
1605 	/* MemoryContexts will be deleted automatically. */
1606 }
1607 
1608 /*
1609  * postgresAddForeignUpdateTargets
1610  *		Add resjunk column(s) needed for update/delete on a foreign table
1611  */
1612 static void
1613 postgresAddForeignUpdateTargets(Query *parsetree,
1614 								RangeTblEntry *target_rte,
1615 								Relation target_relation)
1616 {
1617 	Var		   *var;
1618 	const char *attrname;
1619 	TargetEntry *tle;
1620 
1621 	/*
1622 	 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1623 	 */
1624 
1625 	/* Make a Var representing the desired value */
1626 	var = makeVar(parsetree->resultRelation,
1627 				  SelfItemPointerAttributeNumber,
1628 				  TIDOID,
1629 				  -1,
1630 				  InvalidOid,
1631 				  0);
1632 
1633 	/* Wrap it in a resjunk TLE with the right name ... */
1634 	attrname = "ctid";
1635 
1636 	tle = makeTargetEntry((Expr *) var,
1637 						  list_length(parsetree->targetList) + 1,
1638 						  pstrdup(attrname),
1639 						  true);
1640 
1641 	/* ... and add it to the query's targetlist */
1642 	parsetree->targetList = lappend(parsetree->targetList, tle);
1643 }
1644 
1645 /*
1646  * postgresPlanForeignModify
1647  *		Plan an insert/update/delete operation on a foreign table
1648  */
1649 static List *
1650 postgresPlanForeignModify(PlannerInfo *root,
1651 						  ModifyTable *plan,
1652 						  Index resultRelation,
1653 						  int subplan_index)
1654 {
1655 	CmdType		operation = plan->operation;
1656 	RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1657 	Relation	rel;
1658 	StringInfoData sql;
1659 	List	   *targetAttrs = NIL;
1660 	List	   *withCheckOptionList = NIL;
1661 	List	   *returningList = NIL;
1662 	List	   *retrieved_attrs = NIL;
1663 	bool		doNothing = false;
1664 
1665 	initStringInfo(&sql);
1666 
1667 	/*
1668 	 * Core code already has some lock on each rel being planned, so we can
1669 	 * use NoLock here.
1670 	 */
1671 	rel = table_open(rte->relid, NoLock);
1672 
1673 	/*
1674 	 * In an INSERT, we transmit all columns that are defined in the foreign
1675 	 * table.  In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1676 	 * foreign table, we transmit all columns like INSERT; else we transmit
1677 	 * only columns that were explicitly targets of the UPDATE, so as to avoid
1678 	 * unnecessary data transmission.  (We can't do that for INSERT since we
1679 	 * would miss sending default values for columns not listed in the source
1680 	 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1681 	 * those triggers might change values for non-target columns, in which
1682 	 * case we would miss sending changed values for those columns.)
1683 	 */
1684 	if (operation == CMD_INSERT ||
1685 		(operation == CMD_UPDATE &&
1686 		 rel->trigdesc &&
1687 		 rel->trigdesc->trig_update_before_row))
1688 	{
1689 		TupleDesc	tupdesc = RelationGetDescr(rel);
1690 		int			attnum;
1691 
1692 		for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1693 		{
1694 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1695 
1696 			if (!attr->attisdropped)
1697 				targetAttrs = lappend_int(targetAttrs, attnum);
1698 		}
1699 	}
1700 	else if (operation == CMD_UPDATE)
1701 	{
1702 		int			col;
1703 		Bitmapset  *allUpdatedCols = bms_union(rte->updatedCols, rte->extraUpdatedCols);
1704 
1705 		col = -1;
1706 		while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1707 		{
1708 			/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1709 			AttrNumber	attno = col + FirstLowInvalidHeapAttributeNumber;
1710 
1711 			if (attno <= InvalidAttrNumber) /* shouldn't happen */
1712 				elog(ERROR, "system-column update is not supported");
1713 			targetAttrs = lappend_int(targetAttrs, attno);
1714 		}
1715 	}
1716 
1717 	/*
1718 	 * Extract the relevant WITH CHECK OPTION list if any.
1719 	 */
1720 	if (plan->withCheckOptionLists)
1721 		withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1722 												subplan_index);
1723 
1724 	/*
1725 	 * Extract the relevant RETURNING list if any.
1726 	 */
1727 	if (plan->returningLists)
1728 		returningList = (List *) list_nth(plan->returningLists, subplan_index);
1729 
1730 	/*
1731 	 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1732 	 * should have already been rejected in the optimizer, as presently there
1733 	 * is no way to recognize an arbiter index on a foreign table.  Only DO
1734 	 * NOTHING is supported without an inference specification.
1735 	 */
1736 	if (plan->onConflictAction == ONCONFLICT_NOTHING)
1737 		doNothing = true;
1738 	else if (plan->onConflictAction != ONCONFLICT_NONE)
1739 		elog(ERROR, "unexpected ON CONFLICT specification: %d",
1740 			 (int) plan->onConflictAction);
1741 
1742 	/*
1743 	 * Construct the SQL command string.
1744 	 */
1745 	switch (operation)
1746 	{
1747 		case CMD_INSERT:
1748 			deparseInsertSql(&sql, rte, resultRelation, rel,
1749 							 targetAttrs, doNothing,
1750 							 withCheckOptionList, returningList,
1751 							 &retrieved_attrs);
1752 			break;
1753 		case CMD_UPDATE:
1754 			deparseUpdateSql(&sql, rte, resultRelation, rel,
1755 							 targetAttrs,
1756 							 withCheckOptionList, returningList,
1757 							 &retrieved_attrs);
1758 			break;
1759 		case CMD_DELETE:
1760 			deparseDeleteSql(&sql, rte, resultRelation, rel,
1761 							 returningList,
1762 							 &retrieved_attrs);
1763 			break;
1764 		default:
1765 			elog(ERROR, "unexpected operation: %d", (int) operation);
1766 			break;
1767 	}
1768 
1769 	table_close(rel, NoLock);
1770 
1771 	/*
1772 	 * Build the fdw_private list that will be available to the executor.
1773 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
1774 	 */
1775 	return list_make4(makeString(sql.data),
1776 					  targetAttrs,
1777 					  makeInteger((retrieved_attrs != NIL)),
1778 					  retrieved_attrs);
1779 }
1780 
1781 /*
1782  * postgresBeginForeignModify
1783  *		Begin an insert/update/delete operation on a foreign table
1784  */
1785 static void
1786 postgresBeginForeignModify(ModifyTableState *mtstate,
1787 						   ResultRelInfo *resultRelInfo,
1788 						   List *fdw_private,
1789 						   int subplan_index,
1790 						   int eflags)
1791 {
1792 	PgFdwModifyState *fmstate;
1793 	char	   *query;
1794 	List	   *target_attrs;
1795 	bool		has_returning;
1796 	List	   *retrieved_attrs;
1797 	RangeTblEntry *rte;
1798 
1799 	/*
1800 	 * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
1801 	 * stays NULL.
1802 	 */
1803 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1804 		return;
1805 
1806 	/* Deconstruct fdw_private data. */
1807 	query = strVal(list_nth(fdw_private,
1808 							FdwModifyPrivateUpdateSql));
1809 	target_attrs = (List *) list_nth(fdw_private,
1810 									 FdwModifyPrivateTargetAttnums);
1811 	has_returning = intVal(list_nth(fdw_private,
1812 									FdwModifyPrivateHasReturning));
1813 	retrieved_attrs = (List *) list_nth(fdw_private,
1814 										FdwModifyPrivateRetrievedAttrs);
1815 
1816 	/* Find RTE. */
1817 	rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1818 						mtstate->ps.state);
1819 
1820 	/* Construct an execution state. */
1821 	fmstate = create_foreign_modify(mtstate->ps.state,
1822 									rte,
1823 									resultRelInfo,
1824 									mtstate->operation,
1825 									mtstate->mt_plans[subplan_index]->plan,
1826 									query,
1827 									target_attrs,
1828 									has_returning,
1829 									retrieved_attrs);
1830 
1831 	resultRelInfo->ri_FdwState = fmstate;
1832 }
1833 
1834 /*
1835  * postgresExecForeignInsert
1836  *		Insert one row into a foreign table
1837  */
1838 static TupleTableSlot *
1839 postgresExecForeignInsert(EState *estate,
1840 						  ResultRelInfo *resultRelInfo,
1841 						  TupleTableSlot *slot,
1842 						  TupleTableSlot *planSlot)
1843 {
1844 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1845 	TupleTableSlot *rslot;
1846 
1847 	/*
1848 	 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1849 	 * postgresBeginForeignInsert())
1850 	 */
1851 	if (fmstate->aux_fmstate)
1852 		resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1853 	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1854 								   slot, planSlot);
1855 	/* Revert that change */
1856 	if (fmstate->aux_fmstate)
1857 		resultRelInfo->ri_FdwState = fmstate;
1858 
1859 	return rslot;
1860 }
1861 
1862 /*
1863  * postgresExecForeignUpdate
1864  *		Update one row in a foreign table
1865  */
1866 static TupleTableSlot *
1867 postgresExecForeignUpdate(EState *estate,
1868 						  ResultRelInfo *resultRelInfo,
1869 						  TupleTableSlot *slot,
1870 						  TupleTableSlot *planSlot)
1871 {
1872 	return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
1873 								  slot, planSlot);
1874 }
1875 
1876 /*
1877  * postgresExecForeignDelete
1878  *		Delete one row from a foreign table
1879  */
1880 static TupleTableSlot *
1881 postgresExecForeignDelete(EState *estate,
1882 						  ResultRelInfo *resultRelInfo,
1883 						  TupleTableSlot *slot,
1884 						  TupleTableSlot *planSlot)
1885 {
1886 	return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
1887 								  slot, planSlot);
1888 }
1889 
1890 /*
1891  * postgresEndForeignModify
1892  *		Finish an insert/update/delete operation on a foreign table
1893  */
1894 static void
1895 postgresEndForeignModify(EState *estate,
1896 						 ResultRelInfo *resultRelInfo)
1897 {
1898 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1899 
1900 	/* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1901 	if (fmstate == NULL)
1902 		return;
1903 
1904 	/* Destroy the execution state */
1905 	finish_foreign_modify(fmstate);
1906 }
1907 
1908 /*
1909  * postgresBeginForeignInsert
1910  *		Begin an insert operation on a foreign table
1911  */
1912 static void
1913 postgresBeginForeignInsert(ModifyTableState *mtstate,
1914 						   ResultRelInfo *resultRelInfo)
1915 {
1916 	PgFdwModifyState *fmstate;
1917 	ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
1918 	EState	   *estate = mtstate->ps.state;
1919 	Index		resultRelation;
1920 	Relation	rel = resultRelInfo->ri_RelationDesc;
1921 	RangeTblEntry *rte;
1922 	TupleDesc	tupdesc = RelationGetDescr(rel);
1923 	int			attnum;
1924 	StringInfoData sql;
1925 	List	   *targetAttrs = NIL;
1926 	List	   *retrieved_attrs = NIL;
1927 	bool		doNothing = false;
1928 
1929 	/*
1930 	 * If the foreign table we are about to insert routed rows into is also an
1931 	 * UPDATE subplan result rel that will be updated later, proceeding with
1932 	 * the INSERT will result in the later UPDATE incorrectly modifying those
1933 	 * routed rows, so prevent the INSERT --- it would be nice if we could
1934 	 * handle this case; but for now, throw an error for safety.
1935 	 */
1936 	if (plan && plan->operation == CMD_UPDATE &&
1937 		(resultRelInfo->ri_usesFdwDirectModify ||
1938 		 resultRelInfo->ri_FdwState) &&
1939 		resultRelInfo > mtstate->resultRelInfo + mtstate->mt_whichplan)
1940 		ereport(ERROR,
1941 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1942 				 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
1943 						RelationGetRelationName(rel))));
1944 
1945 	initStringInfo(&sql);
1946 
1947 	/* We transmit all columns that are defined in the foreign table. */
1948 	for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1949 	{
1950 		Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1951 
1952 		if (!attr->attisdropped)
1953 			targetAttrs = lappend_int(targetAttrs, attnum);
1954 	}
1955 
1956 	/* Check if we add the ON CONFLICT clause to the remote query. */
1957 	if (plan)
1958 	{
1959 		OnConflictAction onConflictAction = plan->onConflictAction;
1960 
1961 		/* We only support DO NOTHING without an inference specification. */
1962 		if (onConflictAction == ONCONFLICT_NOTHING)
1963 			doNothing = true;
1964 		else if (onConflictAction != ONCONFLICT_NONE)
1965 			elog(ERROR, "unexpected ON CONFLICT specification: %d",
1966 				 (int) onConflictAction);
1967 	}
1968 
1969 	/*
1970 	 * If the foreign table is a partition that doesn't have a corresponding
1971 	 * RTE entry, we need to create a new RTE
1972 	 * describing the foreign table for use by deparseInsertSql and
1973 	 * create_foreign_modify() below, after first copying the parent's RTE and
1974 	 * modifying some fields to describe the foreign partition to work on.
1975 	 * However, if this is invoked by UPDATE, the existing RTE may already
1976 	 * correspond to this partition if it is one of the UPDATE subplan target
1977 	 * rels; in that case, we can just use the existing RTE as-is.
1978 	 */
1979 	if (resultRelInfo->ri_RangeTableIndex == 0)
1980 	{
1981 		ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
1982 
1983 		rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
1984 		rte = copyObject(rte);
1985 		rte->relid = RelationGetRelid(rel);
1986 		rte->relkind = RELKIND_FOREIGN_TABLE;
1987 
1988 		/*
1989 		 * For UPDATE, we must use the RT index of the first subplan target
1990 		 * rel's RTE, because the core code would have built expressions for
1991 		 * the partition, such as RETURNING, using that RT index as varno of
1992 		 * Vars contained in those expressions.
1993 		 */
1994 		if (plan && plan->operation == CMD_UPDATE &&
1995 			rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
1996 			resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
1997 		else
1998 			resultRelation = rootResultRelInfo->ri_RangeTableIndex;
1999 	}
2000 	else
2001 	{
2002 		resultRelation = resultRelInfo->ri_RangeTableIndex;
2003 		rte = exec_rt_fetch(resultRelation, estate);
2004 	}
2005 
2006 	/* Construct the SQL command string. */
2007 	deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2008 					 resultRelInfo->ri_WithCheckOptions,
2009 					 resultRelInfo->ri_returningList,
2010 					 &retrieved_attrs);
2011 
2012 	/* Construct an execution state. */
2013 	fmstate = create_foreign_modify(mtstate->ps.state,
2014 									rte,
2015 									resultRelInfo,
2016 									CMD_INSERT,
2017 									NULL,
2018 									sql.data,
2019 									targetAttrs,
2020 									retrieved_attrs != NIL,
2021 									retrieved_attrs);
2022 
2023 	/*
2024 	 * If the given resultRelInfo already has PgFdwModifyState set, it means
2025 	 * the foreign table is an UPDATE subplan result rel; in which case, store
2026 	 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2027 	 */
2028 	if (resultRelInfo->ri_FdwState)
2029 	{
2030 		Assert(plan && plan->operation == CMD_UPDATE);
2031 		Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2032 		((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2033 	}
2034 	else
2035 		resultRelInfo->ri_FdwState = fmstate;
2036 }
2037 
2038 /*
2039  * postgresEndForeignInsert
2040  *		Finish an insert operation on a foreign table
2041  */
2042 static void
2043 postgresEndForeignInsert(EState *estate,
2044 						 ResultRelInfo *resultRelInfo)
2045 {
2046 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2047 
2048 	Assert(fmstate != NULL);
2049 
2050 	/*
2051 	 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2052 	 * postgresBeginForeignInsert())
2053 	 */
2054 	if (fmstate->aux_fmstate)
2055 		fmstate = fmstate->aux_fmstate;
2056 
2057 	/* Destroy the execution state */
2058 	finish_foreign_modify(fmstate);
2059 }
2060 
2061 /*
2062  * postgresIsForeignRelUpdatable
2063  *		Determine whether a foreign table supports INSERT, UPDATE and/or
2064  *		DELETE.
2065  */
2066 static int
2067 postgresIsForeignRelUpdatable(Relation rel)
2068 {
2069 	bool		updatable;
2070 	ForeignTable *table;
2071 	ForeignServer *server;
2072 	ListCell   *lc;
2073 
2074 	/*
2075 	 * By default, all postgres_fdw foreign tables are assumed updatable. This
2076 	 * can be overridden by a per-server setting, which in turn can be
2077 	 * overridden by a per-table setting.
2078 	 */
2079 	updatable = true;
2080 
2081 	table = GetForeignTable(RelationGetRelid(rel));
2082 	server = GetForeignServer(table->serverid);
2083 
2084 	foreach(lc, server->options)
2085 	{
2086 		DefElem    *def = (DefElem *) lfirst(lc);
2087 
2088 		if (strcmp(def->defname, "updatable") == 0)
2089 			updatable = defGetBoolean(def);
2090 	}
2091 	foreach(lc, table->options)
2092 	{
2093 		DefElem    *def = (DefElem *) lfirst(lc);
2094 
2095 		if (strcmp(def->defname, "updatable") == 0)
2096 			updatable = defGetBoolean(def);
2097 	}
2098 
2099 	/*
2100 	 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2101 	 */
2102 	return updatable ?
2103 		(1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2104 }
2105 
2106 /*
2107  * postgresRecheckForeignScan
2108  *		Execute a local join execution plan for a foreign join
2109  */
2110 static bool
2111 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2112 {
2113 	Index		scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2114 	PlanState  *outerPlan = outerPlanState(node);
2115 	TupleTableSlot *result;
2116 
2117 	/* For base foreign relations, it suffices to set fdw_recheck_quals */
2118 	if (scanrelid > 0)
2119 		return true;
2120 
2121 	Assert(outerPlan != NULL);
2122 
2123 	/* Execute a local join execution plan */
2124 	result = ExecProcNode(outerPlan);
2125 	if (TupIsNull(result))
2126 		return false;
2127 
2128 	/* Store result in the given slot */
2129 	ExecCopySlot(slot, result);
2130 
2131 	return true;
2132 }
2133 
2134 /*
2135  * postgresPlanDirectModify
2136  *		Consider a direct foreign table modification
2137  *
2138  * Decide whether it is safe to modify a foreign table directly, and if so,
2139  * rewrite subplan accordingly.
2140  */
2141 static bool
2142 postgresPlanDirectModify(PlannerInfo *root,
2143 						 ModifyTable *plan,
2144 						 Index resultRelation,
2145 						 int subplan_index)
2146 {
2147 	CmdType		operation = plan->operation;
2148 	Plan	   *subplan;
2149 	RelOptInfo *foreignrel;
2150 	RangeTblEntry *rte;
2151 	PgFdwRelationInfo *fpinfo;
2152 	Relation	rel;
2153 	StringInfoData sql;
2154 	ForeignScan *fscan;
2155 	List	   *targetAttrs = NIL;
2156 	List	   *remote_exprs;
2157 	List	   *params_list = NIL;
2158 	List	   *returningList = NIL;
2159 	List	   *retrieved_attrs = NIL;
2160 
2161 	/*
2162 	 * Decide whether it is safe to modify a foreign table directly.
2163 	 */
2164 
2165 	/*
2166 	 * The table modification must be an UPDATE or DELETE.
2167 	 */
2168 	if (operation != CMD_UPDATE && operation != CMD_DELETE)
2169 		return false;
2170 
2171 	/*
2172 	 * It's unsafe to modify a foreign table directly if there are any local
2173 	 * joins needed.
2174 	 */
2175 	subplan = (Plan *) list_nth(plan->plans, subplan_index);
2176 	if (!IsA(subplan, ForeignScan))
2177 		return false;
2178 	fscan = (ForeignScan *) subplan;
2179 
2180 	/*
2181 	 * It's unsafe to modify a foreign table directly if there are any quals
2182 	 * that should be evaluated locally.
2183 	 */
2184 	if (subplan->qual != NIL)
2185 		return false;
2186 
2187 	/* Safe to fetch data about the target foreign rel */
2188 	if (fscan->scan.scanrelid == 0)
2189 	{
2190 		foreignrel = find_join_rel(root, fscan->fs_relids);
2191 		/* We should have a rel for this foreign join. */
2192 		Assert(foreignrel);
2193 	}
2194 	else
2195 		foreignrel = root->simple_rel_array[resultRelation];
2196 	rte = root->simple_rte_array[resultRelation];
2197 	fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2198 
2199 	/*
2200 	 * It's unsafe to update a foreign table directly, if any expressions to
2201 	 * assign to the target columns are unsafe to evaluate remotely.
2202 	 */
2203 	if (operation == CMD_UPDATE)
2204 	{
2205 		int			col;
2206 
2207 		/*
2208 		 * We transmit only columns that were explicitly targets of the
2209 		 * UPDATE, so as to avoid unnecessary data transmission.
2210 		 */
2211 		col = -1;
2212 		while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2213 		{
2214 			/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2215 			AttrNumber	attno = col + FirstLowInvalidHeapAttributeNumber;
2216 			TargetEntry *tle;
2217 
2218 			if (attno <= InvalidAttrNumber) /* shouldn't happen */
2219 				elog(ERROR, "system-column update is not supported");
2220 
2221 			tle = get_tle_by_resno(subplan->targetlist, attno);
2222 
2223 			if (!tle)
2224 				elog(ERROR, "attribute number %d not found in subplan targetlist",
2225 					 attno);
2226 
2227 			if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2228 				return false;
2229 
2230 			targetAttrs = lappend_int(targetAttrs, attno);
2231 		}
2232 	}
2233 
2234 	/*
2235 	 * Ok, rewrite subplan so as to modify the foreign table directly.
2236 	 */
2237 	initStringInfo(&sql);
2238 
2239 	/*
2240 	 * Core code already has some lock on each rel being planned, so we can
2241 	 * use NoLock here.
2242 	 */
2243 	rel = table_open(rte->relid, NoLock);
2244 
2245 	/*
2246 	 * Recall the qual clauses that must be evaluated remotely.  (These are
2247 	 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2248 	 * doesn't care.)
2249 	 */
2250 	remote_exprs = fpinfo->final_remote_exprs;
2251 
2252 	/*
2253 	 * Extract the relevant RETURNING list if any.
2254 	 */
2255 	if (plan->returningLists)
2256 	{
2257 		returningList = (List *) list_nth(plan->returningLists, subplan_index);
2258 
2259 		/*
2260 		 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2261 		 * we fetch from the foreign server any Vars specified in RETURNING
2262 		 * that refer not only to the target relation but to non-target
2263 		 * relations.  So we'll deparse them into the RETURNING clause of the
2264 		 * remote query; use a targetlist consisting of them instead, which
2265 		 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2266 		 * node below.
2267 		 */
2268 		if (fscan->scan.scanrelid == 0)
2269 			returningList = build_remote_returning(resultRelation, rel,
2270 												   returningList);
2271 	}
2272 
2273 	/*
2274 	 * Construct the SQL command string.
2275 	 */
2276 	switch (operation)
2277 	{
2278 		case CMD_UPDATE:
2279 			deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2280 								   foreignrel,
2281 								   ((Plan *) fscan)->targetlist,
2282 								   targetAttrs,
2283 								   remote_exprs, &params_list,
2284 								   returningList, &retrieved_attrs);
2285 			break;
2286 		case CMD_DELETE:
2287 			deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2288 								   foreignrel,
2289 								   remote_exprs, &params_list,
2290 								   returningList, &retrieved_attrs);
2291 			break;
2292 		default:
2293 			elog(ERROR, "unexpected operation: %d", (int) operation);
2294 			break;
2295 	}
2296 
2297 	/*
2298 	 * Update the operation info.
2299 	 */
2300 	fscan->operation = operation;
2301 
2302 	/*
2303 	 * Update the fdw_exprs list that will be available to the executor.
2304 	 */
2305 	fscan->fdw_exprs = params_list;
2306 
2307 	/*
2308 	 * Update the fdw_private list that will be available to the executor.
2309 	 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2310 	 */
2311 	fscan->fdw_private = list_make4(makeString(sql.data),
2312 									makeInteger((retrieved_attrs != NIL)),
2313 									retrieved_attrs,
2314 									makeInteger(plan->canSetTag));
2315 
2316 	/*
2317 	 * Update the foreign-join-related fields.
2318 	 */
2319 	if (fscan->scan.scanrelid == 0)
2320 	{
2321 		/* No need for the outer subplan. */
2322 		fscan->scan.plan.lefttree = NULL;
2323 
2324 		/* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2325 		if (returningList)
2326 			rebuild_fdw_scan_tlist(fscan, returningList);
2327 	}
2328 
2329 	table_close(rel, NoLock);
2330 	return true;
2331 }
2332 
2333 /*
2334  * postgresBeginDirectModify
2335  *		Prepare a direct foreign table modification
2336  */
2337 static void
2338 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2339 {
2340 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2341 	EState	   *estate = node->ss.ps.state;
2342 	PgFdwDirectModifyState *dmstate;
2343 	Index		rtindex;
2344 	RangeTblEntry *rte;
2345 	Oid			userid;
2346 	ForeignTable *table;
2347 	UserMapping *user;
2348 	int			numParams;
2349 
2350 	/*
2351 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
2352 	 */
2353 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2354 		return;
2355 
2356 	/*
2357 	 * We'll save private state in node->fdw_state.
2358 	 */
2359 	dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2360 	node->fdw_state = (void *) dmstate;
2361 
2362 	/*
2363 	 * Identify which user to do the remote access as.  This should match what
2364 	 * ExecCheckRTEPerms() does.
2365 	 */
2366 	rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
2367 	rte = exec_rt_fetch(rtindex, estate);
2368 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2369 
2370 	/* Get info about foreign table. */
2371 	if (fsplan->scan.scanrelid == 0)
2372 		dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2373 	else
2374 		dmstate->rel = node->ss.ss_currentRelation;
2375 	table = GetForeignTable(RelationGetRelid(dmstate->rel));
2376 	user = GetUserMapping(userid, table->serverid);
2377 
2378 	/*
2379 	 * Get connection to the foreign server.  Connection manager will
2380 	 * establish new connection if necessary.
2381 	 */
2382 	dmstate->conn = GetConnection(user, false);
2383 
2384 	/* Update the foreign-join-related fields. */
2385 	if (fsplan->scan.scanrelid == 0)
2386 	{
2387 		/* Save info about foreign table. */
2388 		dmstate->resultRel = dmstate->rel;
2389 
2390 		/*
2391 		 * Set dmstate->rel to NULL to teach get_returning_data() and
2392 		 * make_tuple_from_result_row() that columns fetched from the remote
2393 		 * server are described by fdw_scan_tlist of the foreign-scan plan
2394 		 * node, not the tuple descriptor for the target relation.
2395 		 */
2396 		dmstate->rel = NULL;
2397 	}
2398 
2399 	/* Initialize state variable */
2400 	dmstate->num_tuples = -1;	/* -1 means not set yet */
2401 
2402 	/* Get private info created by planner functions. */
2403 	dmstate->query = strVal(list_nth(fsplan->fdw_private,
2404 									 FdwDirectModifyPrivateUpdateSql));
2405 	dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2406 											 FdwDirectModifyPrivateHasReturning));
2407 	dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2408 												 FdwDirectModifyPrivateRetrievedAttrs);
2409 	dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2410 											 FdwDirectModifyPrivateSetProcessed));
2411 
2412 	/* Create context for per-tuple temp workspace. */
2413 	dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2414 											  "postgres_fdw temporary data",
2415 											  ALLOCSET_SMALL_SIZES);
2416 
2417 	/* Prepare for input conversion of RETURNING results. */
2418 	if (dmstate->has_returning)
2419 	{
2420 		TupleDesc	tupdesc;
2421 
2422 		if (fsplan->scan.scanrelid == 0)
2423 			tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2424 		else
2425 			tupdesc = RelationGetDescr(dmstate->rel);
2426 
2427 		dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2428 
2429 		/*
2430 		 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2431 		 * initialize a filter to extract an updated/deleted tuple from a scan
2432 		 * tuple.
2433 		 */
2434 		if (fsplan->scan.scanrelid == 0)
2435 			init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2436 	}
2437 
2438 	/*
2439 	 * Prepare for processing of parameters used in remote query, if any.
2440 	 */
2441 	numParams = list_length(fsplan->fdw_exprs);
2442 	dmstate->numParams = numParams;
2443 	if (numParams > 0)
2444 		prepare_query_params((PlanState *) node,
2445 							 fsplan->fdw_exprs,
2446 							 numParams,
2447 							 &dmstate->param_flinfo,
2448 							 &dmstate->param_exprs,
2449 							 &dmstate->param_values);
2450 }
2451 
2452 /*
2453  * postgresIterateDirectModify
2454  *		Execute a direct foreign table modification
2455  */
2456 static TupleTableSlot *
2457 postgresIterateDirectModify(ForeignScanState *node)
2458 {
2459 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2460 	EState	   *estate = node->ss.ps.state;
2461 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2462 
2463 	/*
2464 	 * If this is the first call after Begin, execute the statement.
2465 	 */
2466 	if (dmstate->num_tuples == -1)
2467 		execute_dml_stmt(node);
2468 
2469 	/*
2470 	 * If the local query doesn't specify RETURNING, just clear tuple slot.
2471 	 */
2472 	if (!resultRelInfo->ri_projectReturning)
2473 	{
2474 		TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2475 		Instrumentation *instr = node->ss.ps.instrument;
2476 
2477 		Assert(!dmstate->has_returning);
2478 
2479 		/* Increment the command es_processed count if necessary. */
2480 		if (dmstate->set_processed)
2481 			estate->es_processed += dmstate->num_tuples;
2482 
2483 		/* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2484 		if (instr)
2485 			instr->tuplecount += dmstate->num_tuples;
2486 
2487 		return ExecClearTuple(slot);
2488 	}
2489 
2490 	/*
2491 	 * Get the next RETURNING tuple.
2492 	 */
2493 	return get_returning_data(node);
2494 }
2495 
2496 /*
2497  * postgresEndDirectModify
2498  *		Finish a direct foreign table modification
2499  */
2500 static void
2501 postgresEndDirectModify(ForeignScanState *node)
2502 {
2503 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2504 
2505 	/* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2506 	if (dmstate == NULL)
2507 		return;
2508 
2509 	/* Release PGresult */
2510 	if (dmstate->result)
2511 		PQclear(dmstate->result);
2512 
2513 	/* Release remote connection */
2514 	ReleaseConnection(dmstate->conn);
2515 	dmstate->conn = NULL;
2516 
2517 	/* MemoryContext will be deleted automatically. */
2518 }
2519 
2520 /*
2521  * postgresExplainForeignScan
2522  *		Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2523  */
2524 static void
2525 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2526 {
2527 	ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan);
2528 	List	   *fdw_private = plan->fdw_private;
2529 
2530 	/*
2531 	 * Identify foreign scans that are really joins or upper relations.  The
2532 	 * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2533 	 * digit string(s), which are RT indexes, with the correct relation names.
2534 	 * We do that here, not when the plan is created, because we can't know
2535 	 * what aliases ruleutils.c will assign at plan creation time.
2536 	 */
2537 	if (list_length(fdw_private) > FdwScanPrivateRelations)
2538 	{
2539 		StringInfo	relations;
2540 		char	   *rawrelations;
2541 		char	   *ptr;
2542 		int			minrti,
2543 					rtoffset;
2544 
2545 		rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2546 
2547 		/*
2548 		 * A difficulty with using a string representation of RT indexes is
2549 		 * that setrefs.c won't update the string when flattening the
2550 		 * rangetable.  To find out what rtoffset was applied, identify the
2551 		 * minimum RT index appearing in the string and compare it to the
2552 		 * minimum member of plan->fs_relids.  (We expect all the relids in
2553 		 * the join will have been offset by the same amount; the Asserts
2554 		 * below should catch it if that ever changes.)
2555 		 */
2556 		minrti = INT_MAX;
2557 		ptr = rawrelations;
2558 		while (*ptr)
2559 		{
2560 			if (isdigit((unsigned char) *ptr))
2561 			{
2562 				int			rti = strtol(ptr, &ptr, 10);
2563 
2564 				if (rti < minrti)
2565 					minrti = rti;
2566 			}
2567 			else
2568 				ptr++;
2569 		}
2570 		rtoffset = bms_next_member(plan->fs_relids, -1) - minrti;
2571 
2572 		/* Now we can translate the string */
2573 		relations = makeStringInfo();
2574 		ptr = rawrelations;
2575 		while (*ptr)
2576 		{
2577 			if (isdigit((unsigned char) *ptr))
2578 			{
2579 				int			rti = strtol(ptr, &ptr, 10);
2580 				RangeTblEntry *rte;
2581 				char	   *relname;
2582 				char	   *refname;
2583 
2584 				rti += rtoffset;
2585 				Assert(bms_is_member(rti, plan->fs_relids));
2586 				rte = rt_fetch(rti, es->rtable);
2587 				Assert(rte->rtekind == RTE_RELATION);
2588 				/* This logic should agree with explain.c's ExplainTargetRel */
2589 				relname = get_rel_name(rte->relid);
2590 				if (es->verbose)
2591 				{
2592 					char	   *namespace;
2593 
2594 					namespace = get_namespace_name(get_rel_namespace(rte->relid));
2595 					appendStringInfo(relations, "%s.%s",
2596 									 quote_identifier(namespace),
2597 									 quote_identifier(relname));
2598 				}
2599 				else
2600 					appendStringInfo(relations, "%s",
2601 									 quote_identifier(relname));
2602 				refname = (char *) list_nth(es->rtable_names, rti - 1);
2603 				if (refname == NULL)
2604 					refname = rte->eref->aliasname;
2605 				if (strcmp(refname, relname) != 0)
2606 					appendStringInfo(relations, " %s",
2607 									 quote_identifier(refname));
2608 			}
2609 			else
2610 				appendStringInfoChar(relations, *ptr++);
2611 		}
2612 		ExplainPropertyText("Relations", relations->data, es);
2613 	}
2614 
2615 	/*
2616 	 * Add remote query, when VERBOSE option is specified.
2617 	 */
2618 	if (es->verbose)
2619 	{
2620 		char	   *sql;
2621 
2622 		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2623 		ExplainPropertyText("Remote SQL", sql, es);
2624 	}
2625 }
2626 
2627 /*
2628  * postgresExplainForeignModify
2629  *		Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2630  */
2631 static void
2632 postgresExplainForeignModify(ModifyTableState *mtstate,
2633 							 ResultRelInfo *rinfo,
2634 							 List *fdw_private,
2635 							 int subplan_index,
2636 							 ExplainState *es)
2637 {
2638 	if (es->verbose)
2639 	{
2640 		char	   *sql = strVal(list_nth(fdw_private,
2641 										  FdwModifyPrivateUpdateSql));
2642 
2643 		ExplainPropertyText("Remote SQL", sql, es);
2644 	}
2645 }
2646 
2647 /*
2648  * postgresExplainDirectModify
2649  *		Produce extra output for EXPLAIN of a ForeignScan that modifies a
2650  *		foreign table directly
2651  */
2652 static void
2653 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2654 {
2655 	List	   *fdw_private;
2656 	char	   *sql;
2657 
2658 	if (es->verbose)
2659 	{
2660 		fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2661 		sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2662 		ExplainPropertyText("Remote SQL", sql, es);
2663 	}
2664 }
2665 
2666 
2667 /*
2668  * estimate_path_cost_size
2669  *		Get cost and size estimates for a foreign scan on given foreign relation
2670  *		either a base relation or a join between foreign relations or an upper
2671  *		relation containing foreign relations.
2672  *
2673  * param_join_conds are the parameterization clauses with outer relations.
2674  * pathkeys specify the expected sort order if any for given path being costed.
2675  * fpextra specifies additional post-scan/join-processing steps such as the
2676  * final sort and the LIMIT restriction.
2677  *
2678  * The function returns the cost and size estimates in p_rows, p_width,
2679  * p_startup_cost and p_total_cost variables.
2680  */
2681 static void
2682 estimate_path_cost_size(PlannerInfo *root,
2683 						RelOptInfo *foreignrel,
2684 						List *param_join_conds,
2685 						List *pathkeys,
2686 						PgFdwPathExtraData *fpextra,
2687 						double *p_rows, int *p_width,
2688 						Cost *p_startup_cost, Cost *p_total_cost)
2689 {
2690 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2691 	double		rows;
2692 	double		retrieved_rows;
2693 	int			width;
2694 	Cost		startup_cost;
2695 	Cost		total_cost;
2696 
2697 	/* Make sure the core code has set up the relation's reltarget */
2698 	Assert(foreignrel->reltarget);
2699 
2700 	/*
2701 	 * If the table or the server is configured to use remote estimates,
2702 	 * connect to the foreign server and execute EXPLAIN to estimate the
2703 	 * number of rows selected by the restriction+join clauses.  Otherwise,
2704 	 * estimate rows using whatever statistics we have locally, in a way
2705 	 * similar to ordinary tables.
2706 	 */
2707 	if (fpinfo->use_remote_estimate)
2708 	{
2709 		List	   *remote_param_join_conds;
2710 		List	   *local_param_join_conds;
2711 		StringInfoData sql;
2712 		PGconn	   *conn;
2713 		Selectivity local_sel;
2714 		QualCost	local_cost;
2715 		List	   *fdw_scan_tlist = NIL;
2716 		List	   *remote_conds;
2717 
2718 		/* Required only to be passed to deparseSelectStmtForRel */
2719 		List	   *retrieved_attrs;
2720 
2721 		/*
2722 		 * param_join_conds might contain both clauses that are safe to send
2723 		 * across, and clauses that aren't.
2724 		 */
2725 		classifyConditions(root, foreignrel, param_join_conds,
2726 						   &remote_param_join_conds, &local_param_join_conds);
2727 
2728 		/* Build the list of columns to be fetched from the foreign server. */
2729 		if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2730 			fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2731 		else
2732 			fdw_scan_tlist = NIL;
2733 
2734 		/*
2735 		 * The complete list of remote conditions includes everything from
2736 		 * baserestrictinfo plus any extra join_conds relevant to this
2737 		 * particular path.
2738 		 */
2739 		remote_conds = list_concat(remote_param_join_conds,
2740 								   fpinfo->remote_conds);
2741 
2742 		/*
2743 		 * Construct EXPLAIN query including the desired SELECT, FROM, and
2744 		 * WHERE clauses. Params and other-relation Vars are replaced by dummy
2745 		 * values, so don't request params_list.
2746 		 */
2747 		initStringInfo(&sql);
2748 		appendStringInfoString(&sql, "EXPLAIN ");
2749 		deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2750 								remote_conds, pathkeys,
2751 								fpextra ? fpextra->has_final_sort : false,
2752 								fpextra ? fpextra->has_limit : false,
2753 								false, &retrieved_attrs, NULL);
2754 
2755 		/* Get the remote estimate */
2756 		conn = GetConnection(fpinfo->user, false);
2757 		get_remote_estimate(sql.data, conn, &rows, &width,
2758 							&startup_cost, &total_cost);
2759 		ReleaseConnection(conn);
2760 
2761 		retrieved_rows = rows;
2762 
2763 		/* Factor in the selectivity of the locally-checked quals */
2764 		local_sel = clauselist_selectivity(root,
2765 										   local_param_join_conds,
2766 										   foreignrel->relid,
2767 										   JOIN_INNER,
2768 										   NULL);
2769 		local_sel *= fpinfo->local_conds_sel;
2770 
2771 		rows = clamp_row_est(rows * local_sel);
2772 
2773 		/* Add in the eval cost of the locally-checked quals */
2774 		startup_cost += fpinfo->local_conds_cost.startup;
2775 		total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2776 		cost_qual_eval(&local_cost, local_param_join_conds, root);
2777 		startup_cost += local_cost.startup;
2778 		total_cost += local_cost.per_tuple * retrieved_rows;
2779 
2780 		/*
2781 		 * Add in tlist eval cost for each output row.  In case of an
2782 		 * aggregate, some of the tlist expressions such as grouping
2783 		 * expressions will be evaluated remotely, so adjust the costs.
2784 		 */
2785 		startup_cost += foreignrel->reltarget->cost.startup;
2786 		total_cost += foreignrel->reltarget->cost.startup;
2787 		total_cost += foreignrel->reltarget->cost.per_tuple * rows;
2788 		if (IS_UPPER_REL(foreignrel))
2789 		{
2790 			QualCost	tlist_cost;
2791 
2792 			cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
2793 			startup_cost -= tlist_cost.startup;
2794 			total_cost -= tlist_cost.startup;
2795 			total_cost -= tlist_cost.per_tuple * rows;
2796 		}
2797 	}
2798 	else
2799 	{
2800 		Cost		run_cost = 0;
2801 
2802 		/*
2803 		 * We don't support join conditions in this mode (hence, no
2804 		 * parameterized paths can be made).
2805 		 */
2806 		Assert(param_join_conds == NIL);
2807 
2808 		/*
2809 		 * We will come here again and again with different set of pathkeys or
2810 		 * additional post-scan/join-processing steps that caller wants to
2811 		 * cost.  We don't need to calculate the cost/size estimates for the
2812 		 * underlying scan, join, or grouping each time.  Instead, use those
2813 		 * estimates if we have cached them already.
2814 		 */
2815 		if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
2816 		{
2817 			Assert(fpinfo->retrieved_rows >= 0);
2818 
2819 			rows = fpinfo->rows;
2820 			retrieved_rows = fpinfo->retrieved_rows;
2821 			width = fpinfo->width;
2822 			startup_cost = fpinfo->rel_startup_cost;
2823 			run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2824 
2825 			/*
2826 			 * If we estimate the costs of a foreign scan or a foreign join
2827 			 * with additional post-scan/join-processing steps, the scan or
2828 			 * join costs obtained from the cache wouldn't yet contain the
2829 			 * eval costs for the final scan/join target, which would've been
2830 			 * updated by apply_scanjoin_target_to_paths(); add the eval costs
2831 			 * now.
2832 			 */
2833 			if (fpextra && !IS_UPPER_REL(foreignrel))
2834 			{
2835 				/* Shouldn't get here unless we have LIMIT */
2836 				Assert(fpextra->has_limit);
2837 				Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
2838 					   foreignrel->reloptkind == RELOPT_JOINREL);
2839 				startup_cost += foreignrel->reltarget->cost.startup;
2840 				run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2841 			}
2842 		}
2843 		else if (IS_JOIN_REL(foreignrel))
2844 		{
2845 			PgFdwRelationInfo *fpinfo_i;
2846 			PgFdwRelationInfo *fpinfo_o;
2847 			QualCost	join_cost;
2848 			QualCost	remote_conds_cost;
2849 			double		nrows;
2850 
2851 			/* Use rows/width estimates made by the core code. */
2852 			rows = foreignrel->rows;
2853 			width = foreignrel->reltarget->width;
2854 
2855 			/* For join we expect inner and outer relations set */
2856 			Assert(fpinfo->innerrel && fpinfo->outerrel);
2857 
2858 			fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2859 			fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2860 
2861 			/* Estimate of number of rows in cross product */
2862 			nrows = fpinfo_i->rows * fpinfo_o->rows;
2863 
2864 			/*
2865 			 * Back into an estimate of the number of retrieved rows.  Just in
2866 			 * case this is nuts, clamp to at most nrows.
2867 			 */
2868 			retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2869 			retrieved_rows = Min(retrieved_rows, nrows);
2870 
2871 			/*
2872 			 * The cost of foreign join is estimated as cost of generating
2873 			 * rows for the joining relations + cost for applying quals on the
2874 			 * rows.
2875 			 */
2876 
2877 			/*
2878 			 * Calculate the cost of clauses pushed down to the foreign server
2879 			 */
2880 			cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2881 			/* Calculate the cost of applying join clauses */
2882 			cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2883 
2884 			/*
2885 			 * Startup cost includes startup cost of joining relations and the
2886 			 * startup cost for join and other clauses. We do not include the
2887 			 * startup cost specific to join strategy (e.g. setting up hash
2888 			 * tables) since we do not know what strategy the foreign server
2889 			 * is going to use.
2890 			 */
2891 			startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2892 			startup_cost += join_cost.startup;
2893 			startup_cost += remote_conds_cost.startup;
2894 			startup_cost += fpinfo->local_conds_cost.startup;
2895 
2896 			/*
2897 			 * Run time cost includes:
2898 			 *
2899 			 * 1. Run time cost (total_cost - startup_cost) of relations being
2900 			 * joined
2901 			 *
2902 			 * 2. Run time cost of applying join clauses on the cross product
2903 			 * of the joining relations.
2904 			 *
2905 			 * 3. Run time cost of applying pushed down other clauses on the
2906 			 * result of join
2907 			 *
2908 			 * 4. Run time cost of applying nonpushable other clauses locally
2909 			 * on the result fetched from the foreign server.
2910 			 */
2911 			run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2912 			run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2913 			run_cost += nrows * join_cost.per_tuple;
2914 			nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2915 			run_cost += nrows * remote_conds_cost.per_tuple;
2916 			run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2917 
2918 			/* Add in tlist eval cost for each output row */
2919 			startup_cost += foreignrel->reltarget->cost.startup;
2920 			run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2921 		}
2922 		else if (IS_UPPER_REL(foreignrel))
2923 		{
2924 			RelOptInfo *outerrel = fpinfo->outerrel;
2925 			PgFdwRelationInfo *ofpinfo;
2926 			AggClauseCosts aggcosts;
2927 			double		input_rows;
2928 			int			numGroupCols;
2929 			double		numGroups = 1;
2930 
2931 			/* The upper relation should have its outer relation set */
2932 			Assert(outerrel);
2933 			/* and that outer relation should have its reltarget set */
2934 			Assert(outerrel->reltarget);
2935 
2936 			/*
2937 			 * This cost model is mixture of costing done for sorted and
2938 			 * hashed aggregates in cost_agg().  We are not sure which
2939 			 * strategy will be considered at remote side, thus for
2940 			 * simplicity, we put all startup related costs in startup_cost
2941 			 * and all finalization and run cost are added in total_cost.
2942 			 */
2943 
2944 			ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
2945 
2946 			/* Get rows from input rel */
2947 			input_rows = ofpinfo->rows;
2948 
2949 			/* Collect statistics about aggregates for estimating costs. */
2950 			MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2951 			if (root->parse->hasAggs)
2952 			{
2953 				get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2954 									 AGGSPLIT_SIMPLE, &aggcosts);
2955 
2956 				/*
2957 				 * The cost of aggregates in the HAVING qual will be the same
2958 				 * for each child as it is for the parent, so there's no need
2959 				 * to use a translated version of havingQual.
2960 				 */
2961 				get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2962 									 AGGSPLIT_SIMPLE, &aggcosts);
2963 			}
2964 
2965 			/* Get number of grouping columns and possible number of groups */
2966 			numGroupCols = list_length(root->parse->groupClause);
2967 			numGroups = estimate_num_groups(root,
2968 											get_sortgrouplist_exprs(root->parse->groupClause,
2969 																	fpinfo->grouped_tlist),
2970 											input_rows, NULL);
2971 
2972 			/*
2973 			 * Get the retrieved_rows and rows estimates.  If there are HAVING
2974 			 * quals, account for their selectivity.
2975 			 */
2976 			if (root->parse->havingQual)
2977 			{
2978 				/* Factor in the selectivity of the remotely-checked quals */
2979 				retrieved_rows =
2980 					clamp_row_est(numGroups *
2981 								  clauselist_selectivity(root,
2982 														 fpinfo->remote_conds,
2983 														 0,
2984 														 JOIN_INNER,
2985 														 NULL));
2986 				/* Factor in the selectivity of the locally-checked quals */
2987 				rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
2988 			}
2989 			else
2990 			{
2991 				rows = retrieved_rows = numGroups;
2992 			}
2993 
2994 			/* Use width estimate made by the core code. */
2995 			width = foreignrel->reltarget->width;
2996 
2997 			/*-----
2998 			 * Startup cost includes:
2999 			 *	  1. Startup cost for underneath input relation, adjusted for
3000 			 *	     tlist replacement by apply_scanjoin_target_to_paths()
3001 			 *	  2. Cost of performing aggregation, per cost_agg()
3002 			 *-----
3003 			 */
3004 			startup_cost = ofpinfo->rel_startup_cost;
3005 			startup_cost += outerrel->reltarget->cost.startup;
3006 			startup_cost += aggcosts.transCost.startup;
3007 			startup_cost += aggcosts.transCost.per_tuple * input_rows;
3008 			startup_cost += aggcosts.finalCost.startup;
3009 			startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
3010 
3011 			/*-----
3012 			 * Run time cost includes:
3013 			 *	  1. Run time cost of underneath input relation, adjusted for
3014 			 *	     tlist replacement by apply_scanjoin_target_to_paths()
3015 			 *	  2. Run time cost of performing aggregation, per cost_agg()
3016 			 *-----
3017 			 */
3018 			run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
3019 			run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
3020 			run_cost += aggcosts.finalCost.per_tuple * numGroups;
3021 			run_cost += cpu_tuple_cost * numGroups;
3022 
3023 			/* Account for the eval cost of HAVING quals, if any */
3024 			if (root->parse->havingQual)
3025 			{
3026 				QualCost	remote_cost;
3027 
3028 				/* Add in the eval cost of the remotely-checked quals */
3029 				cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
3030 				startup_cost += remote_cost.startup;
3031 				run_cost += remote_cost.per_tuple * numGroups;
3032 				/* Add in the eval cost of the locally-checked quals */
3033 				startup_cost += fpinfo->local_conds_cost.startup;
3034 				run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3035 			}
3036 
3037 			/* Add in tlist eval cost for each output row */
3038 			startup_cost += foreignrel->reltarget->cost.startup;
3039 			run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3040 		}
3041 		else
3042 		{
3043 			Cost		cpu_per_tuple;
3044 
3045 			/* Use rows/width estimates made by set_baserel_size_estimates. */
3046 			rows = foreignrel->rows;
3047 			width = foreignrel->reltarget->width;
3048 
3049 			/*
3050 			 * Back into an estimate of the number of retrieved rows.  Just in
3051 			 * case this is nuts, clamp to at most foreignrel->tuples.
3052 			 */
3053 			retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3054 			retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
3055 
3056 			/*
3057 			 * Cost as though this were a seqscan, which is pessimistic.  We
3058 			 * effectively imagine the local_conds are being evaluated
3059 			 * remotely, too.
3060 			 */
3061 			startup_cost = 0;
3062 			run_cost = 0;
3063 			run_cost += seq_page_cost * foreignrel->pages;
3064 
3065 			startup_cost += foreignrel->baserestrictcost.startup;
3066 			cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3067 			run_cost += cpu_per_tuple * foreignrel->tuples;
3068 
3069 			/* Add in tlist eval cost for each output row */
3070 			startup_cost += foreignrel->reltarget->cost.startup;
3071 			run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3072 		}
3073 
3074 		/*
3075 		 * Without remote estimates, we have no real way to estimate the cost
3076 		 * of generating sorted output.  It could be free if the query plan
3077 		 * the remote side would have chosen generates properly-sorted output
3078 		 * anyway, but in most cases it will cost something.  Estimate a value
3079 		 * high enough that we won't pick the sorted path when the ordering
3080 		 * isn't locally useful, but low enough that we'll err on the side of
3081 		 * pushing down the ORDER BY clause when it's useful to do so.
3082 		 */
3083 		if (pathkeys != NIL)
3084 		{
3085 			if (IS_UPPER_REL(foreignrel))
3086 			{
3087 				Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3088 					   fpinfo->stage == UPPERREL_GROUP_AGG);
3089 				adjust_foreign_grouping_path_cost(root, pathkeys,
3090 												  retrieved_rows, width,
3091 												  fpextra->limit_tuples,
3092 												  &startup_cost, &run_cost);
3093 			}
3094 			else
3095 			{
3096 				startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3097 				run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3098 			}
3099 		}
3100 
3101 		total_cost = startup_cost + run_cost;
3102 
3103 		/* Adjust the cost estimates if we have LIMIT */
3104 		if (fpextra && fpextra->has_limit)
3105 		{
3106 			adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3107 									fpextra->offset_est, fpextra->count_est);
3108 			retrieved_rows = rows;
3109 		}
3110 	}
3111 
3112 	/*
3113 	 * If this includes the final sort step, the given target, which will be
3114 	 * applied to the resulting path, might have different expressions from
3115 	 * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3116 	 * eval costs.
3117 	 */
3118 	if (fpextra && fpextra->has_final_sort &&
3119 		fpextra->target != foreignrel->reltarget)
3120 	{
3121 		QualCost	oldcost = foreignrel->reltarget->cost;
3122 		QualCost	newcost = fpextra->target->cost;
3123 
3124 		startup_cost += newcost.startup - oldcost.startup;
3125 		total_cost += newcost.startup - oldcost.startup;
3126 		total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3127 	}
3128 
3129 	/*
3130 	 * Cache the retrieved rows and cost estimates for scans, joins, or
3131 	 * groupings without any parameterization, pathkeys, or additional
3132 	 * post-scan/join-processing steps, before adding the costs for
3133 	 * transferring data from the foreign server.  These estimates are useful
3134 	 * for costing remote joins involving this relation or costing other
3135 	 * remote operations on this relation such as remote sorts and remote
3136 	 * LIMIT restrictions, when the costs can not be obtained from the foreign
3137 	 * server.  This function will be called at least once for every foreign
3138 	 * relation without any parameterization, pathkeys, or additional
3139 	 * post-scan/join-processing steps.
3140 	 */
3141 	if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3142 	{
3143 		fpinfo->retrieved_rows = retrieved_rows;
3144 		fpinfo->rel_startup_cost = startup_cost;
3145 		fpinfo->rel_total_cost = total_cost;
3146 	}
3147 
3148 	/*
3149 	 * Add some additional cost factors to account for connection overhead
3150 	 * (fdw_startup_cost), transferring data across the network
3151 	 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3152 	 * (cpu_tuple_cost per retrieved row).
3153 	 */
3154 	startup_cost += fpinfo->fdw_startup_cost;
3155 	total_cost += fpinfo->fdw_startup_cost;
3156 	total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3157 	total_cost += cpu_tuple_cost * retrieved_rows;
3158 
3159 	/*
3160 	 * If we have LIMIT, we should prefer performing the restriction remotely
3161 	 * rather than locally, as the former avoids extra row fetches from the
3162 	 * remote that the latter might cause.  But since the core code doesn't
3163 	 * account for such fetches when estimating the costs of the local
3164 	 * restriction (see create_limit_path()), there would be no difference
3165 	 * between the costs of the local restriction and the costs of the remote
3166 	 * restriction estimated above if we don't use remote estimates (except
3167 	 * for the case where the foreignrel is a grouping relation, the given
3168 	 * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3169 	 * accounted for in costing the remote restriction).  Tweak the costs of
3170 	 * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3171 	 * one.
3172 	 */
3173 	if (!fpinfo->use_remote_estimate &&
3174 		fpextra && fpextra->has_limit &&
3175 		fpextra->limit_tuples > 0 &&
3176 		fpextra->limit_tuples < fpinfo->rows)
3177 	{
3178 		Assert(fpinfo->rows > 0);
3179 		total_cost -= (total_cost - startup_cost) * 0.05 *
3180 			(fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3181 	}
3182 
3183 	/* Return results. */
3184 	*p_rows = rows;
3185 	*p_width = width;
3186 	*p_startup_cost = startup_cost;
3187 	*p_total_cost = total_cost;
3188 }
3189 
3190 /*
3191  * Estimate costs of executing a SQL statement remotely.
3192  * The given "sql" must be an EXPLAIN command.
3193  */
3194 static void
3195 get_remote_estimate(const char *sql, PGconn *conn,
3196 					double *rows, int *width,
3197 					Cost *startup_cost, Cost *total_cost)
3198 {
3199 	PGresult   *volatile res = NULL;
3200 
3201 	/* PGresult must be released before leaving this function. */
3202 	PG_TRY();
3203 	{
3204 		char	   *line;
3205 		char	   *p;
3206 		int			n;
3207 
3208 		/*
3209 		 * Execute EXPLAIN remotely.
3210 		 */
3211 		res = pgfdw_exec_query(conn, sql);
3212 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3213 			pgfdw_report_error(ERROR, res, conn, false, sql);
3214 
3215 		/*
3216 		 * Extract cost numbers for topmost plan node.  Note we search for a
3217 		 * left paren from the end of the line to avoid being confused by
3218 		 * other uses of parentheses.
3219 		 */
3220 		line = PQgetvalue(res, 0, 0);
3221 		p = strrchr(line, '(');
3222 		if (p == NULL)
3223 			elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3224 		n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3225 				   startup_cost, total_cost, rows, width);
3226 		if (n != 4)
3227 			elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3228 	}
3229 	PG_FINALLY();
3230 	{
3231 		if (res)
3232 			PQclear(res);
3233 	}
3234 	PG_END_TRY();
3235 }
3236 
3237 /*
3238  * Adjust the cost estimates of a foreign grouping path to include the cost of
3239  * generating properly-sorted output.
3240  */
3241 static void
3242 adjust_foreign_grouping_path_cost(PlannerInfo *root,
3243 								  List *pathkeys,
3244 								  double retrieved_rows,
3245 								  double width,
3246 								  double limit_tuples,
3247 								  Cost *p_startup_cost,
3248 								  Cost *p_run_cost)
3249 {
3250 	/*
3251 	 * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3252 	 * side is unlikely to generate properly-sorted output, so it would need
3253 	 * an explicit sort; adjust the given costs with cost_sort().  Likewise,
3254 	 * if the GROUP BY clause is sort-able but isn't a superset of the given
3255 	 * pathkeys, adjust the costs with that function.  Otherwise, adjust the
3256 	 * costs by applying the same heuristic as for the scan or join case.
3257 	 */
3258 	if (!grouping_is_sortable(root->parse->groupClause) ||
3259 		!pathkeys_contained_in(pathkeys, root->group_pathkeys))
3260 	{
3261 		Path		sort_path;	/* dummy for result of cost_sort */
3262 
3263 		cost_sort(&sort_path,
3264 				  root,
3265 				  pathkeys,
3266 				  *p_startup_cost + *p_run_cost,
3267 				  retrieved_rows,
3268 				  width,
3269 				  0.0,
3270 				  work_mem,
3271 				  limit_tuples);
3272 
3273 		*p_startup_cost = sort_path.startup_cost;
3274 		*p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3275 	}
3276 	else
3277 	{
3278 		/*
3279 		 * The default extra cost seems too large for foreign-grouping cases;
3280 		 * add 1/4th of that default.
3281 		 */
3282 		double		sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3283 											 - 1.0) * 0.25;
3284 
3285 		*p_startup_cost *= sort_multiplier;
3286 		*p_run_cost *= sort_multiplier;
3287 	}
3288 }
3289 
3290 /*
3291  * Detect whether we want to process an EquivalenceClass member.
3292  *
3293  * This is a callback for use by generate_implied_equalities_for_column.
3294  */
3295 static bool
3296 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
3297 						  EquivalenceClass *ec, EquivalenceMember *em,
3298 						  void *arg)
3299 {
3300 	ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
3301 	Expr	   *expr = em->em_expr;
3302 
3303 	/*
3304 	 * If we've identified what we're processing in the current scan, we only
3305 	 * want to match that expression.
3306 	 */
3307 	if (state->current != NULL)
3308 		return equal(expr, state->current);
3309 
3310 	/*
3311 	 * Otherwise, ignore anything we've already processed.
3312 	 */
3313 	if (list_member(state->already_used, expr))
3314 		return false;
3315 
3316 	/* This is the new target to process. */
3317 	state->current = expr;
3318 	return true;
3319 }
3320 
3321 /*
3322  * Create cursor for node's query with current parameter values.
3323  */
3324 static void
3325 create_cursor(ForeignScanState *node)
3326 {
3327 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3328 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
3329 	int			numParams = fsstate->numParams;
3330 	const char **values = fsstate->param_values;
3331 	PGconn	   *conn = fsstate->conn;
3332 	StringInfoData buf;
3333 	PGresult   *res;
3334 
3335 	/*
3336 	 * Construct array of query parameter values in text format.  We do the
3337 	 * conversions in the short-lived per-tuple context, so as not to cause a
3338 	 * memory leak over repeated scans.
3339 	 */
3340 	if (numParams > 0)
3341 	{
3342 		MemoryContext oldcontext;
3343 
3344 		oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3345 
3346 		process_query_params(econtext,
3347 							 fsstate->param_flinfo,
3348 							 fsstate->param_exprs,
3349 							 values);
3350 
3351 		MemoryContextSwitchTo(oldcontext);
3352 	}
3353 
3354 	/* Construct the DECLARE CURSOR command */
3355 	initStringInfo(&buf);
3356 	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3357 					 fsstate->cursor_number, fsstate->query);
3358 
3359 	/*
3360 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3361 	 * to infer types for all parameters.  Since we explicitly cast every
3362 	 * parameter (see deparse.c), the "inference" is trivial and will produce
3363 	 * the desired result.  This allows us to avoid assuming that the remote
3364 	 * server has the same OIDs we do for the parameters' types.
3365 	 */
3366 	if (!PQsendQueryParams(conn, buf.data, numParams,
3367 						   NULL, values, NULL, NULL, 0))
3368 		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3369 
3370 	/*
3371 	 * Get the result, and check for success.
3372 	 *
3373 	 * We don't use a PG_TRY block here, so be careful not to throw error
3374 	 * without releasing the PGresult.
3375 	 */
3376 	res = pgfdw_get_result(conn, buf.data);
3377 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3378 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3379 	PQclear(res);
3380 
3381 	/* Mark the cursor as created, and show no tuples have been retrieved */
3382 	fsstate->cursor_exists = true;
3383 	fsstate->tuples = NULL;
3384 	fsstate->num_tuples = 0;
3385 	fsstate->next_tuple = 0;
3386 	fsstate->fetch_ct_2 = 0;
3387 	fsstate->eof_reached = false;
3388 
3389 	/* Clean up */
3390 	pfree(buf.data);
3391 }
3392 
3393 /*
3394  * Fetch some more rows from the node's cursor.
3395  */
3396 static void
3397 fetch_more_data(ForeignScanState *node)
3398 {
3399 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3400 	PGresult   *volatile res = NULL;
3401 	MemoryContext oldcontext;
3402 
3403 	/*
3404 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
3405 	 * batch.
3406 	 */
3407 	fsstate->tuples = NULL;
3408 	MemoryContextReset(fsstate->batch_cxt);
3409 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3410 
3411 	/* PGresult must be released before leaving this function. */
3412 	PG_TRY();
3413 	{
3414 		PGconn	   *conn = fsstate->conn;
3415 		char		sql[64];
3416 		int			numrows;
3417 		int			i;
3418 
3419 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3420 				 fsstate->fetch_size, fsstate->cursor_number);
3421 
3422 		res = pgfdw_exec_query(conn, sql);
3423 		/* On error, report the original query, not the FETCH. */
3424 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3425 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3426 
3427 		/* Convert the data into HeapTuples */
3428 		numrows = PQntuples(res);
3429 		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3430 		fsstate->num_tuples = numrows;
3431 		fsstate->next_tuple = 0;
3432 
3433 		for (i = 0; i < numrows; i++)
3434 		{
3435 			Assert(IsA(node->ss.ps.plan, ForeignScan));
3436 
3437 			fsstate->tuples[i] =
3438 				make_tuple_from_result_row(res, i,
3439 										   fsstate->rel,
3440 										   fsstate->attinmeta,
3441 										   fsstate->retrieved_attrs,
3442 										   node,
3443 										   fsstate->temp_cxt);
3444 		}
3445 
3446 		/* Update fetch_ct_2 */
3447 		if (fsstate->fetch_ct_2 < 2)
3448 			fsstate->fetch_ct_2++;
3449 
3450 		/* Must be EOF if we didn't get as many tuples as we asked for. */
3451 		fsstate->eof_reached = (numrows < fsstate->fetch_size);
3452 	}
3453 	PG_FINALLY();
3454 	{
3455 		if (res)
3456 			PQclear(res);
3457 	}
3458 	PG_END_TRY();
3459 
3460 	MemoryContextSwitchTo(oldcontext);
3461 }
3462 
3463 /*
3464  * Force assorted GUC parameters to settings that ensure that we'll output
3465  * data values in a form that is unambiguous to the remote server.
3466  *
3467  * This is rather expensive and annoying to do once per row, but there's
3468  * little choice if we want to be sure values are transmitted accurately;
3469  * we can't leave the settings in place between rows for fear of affecting
3470  * user-visible computations.
3471  *
3472  * We use the equivalent of a function SET option to allow the settings to
3473  * persist only until the caller calls reset_transmission_modes().  If an
3474  * error is thrown in between, guc.c will take care of undoing the settings.
3475  *
3476  * The return value is the nestlevel that must be passed to
3477  * reset_transmission_modes() to undo things.
3478  */
3479 int
3480 set_transmission_modes(void)
3481 {
3482 	int			nestlevel = NewGUCNestLevel();
3483 
3484 	/*
3485 	 * The values set here should match what pg_dump does.  See also
3486 	 * configure_remote_session in connection.c.
3487 	 */
3488 	if (DateStyle != USE_ISO_DATES)
3489 		(void) set_config_option("datestyle", "ISO",
3490 								 PGC_USERSET, PGC_S_SESSION,
3491 								 GUC_ACTION_SAVE, true, 0, false);
3492 	if (IntervalStyle != INTSTYLE_POSTGRES)
3493 		(void) set_config_option("intervalstyle", "postgres",
3494 								 PGC_USERSET, PGC_S_SESSION,
3495 								 GUC_ACTION_SAVE, true, 0, false);
3496 	if (extra_float_digits < 3)
3497 		(void) set_config_option("extra_float_digits", "3",
3498 								 PGC_USERSET, PGC_S_SESSION,
3499 								 GUC_ACTION_SAVE, true, 0, false);
3500 
3501 	return nestlevel;
3502 }
3503 
3504 /*
3505  * Undo the effects of set_transmission_modes().
3506  */
3507 void
3508 reset_transmission_modes(int nestlevel)
3509 {
3510 	AtEOXact_GUC(true, nestlevel);
3511 }
3512 
3513 /*
3514  * Utility routine to close a cursor.
3515  */
3516 static void
3517 close_cursor(PGconn *conn, unsigned int cursor_number)
3518 {
3519 	char		sql[64];
3520 	PGresult   *res;
3521 
3522 	snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3523 
3524 	/*
3525 	 * We don't use a PG_TRY block here, so be careful not to throw error
3526 	 * without releasing the PGresult.
3527 	 */
3528 	res = pgfdw_exec_query(conn, sql);
3529 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3530 		pgfdw_report_error(ERROR, res, conn, true, sql);
3531 	PQclear(res);
3532 }
3533 
3534 /*
3535  * create_foreign_modify
3536  *		Construct an execution state of a foreign insert/update/delete
3537  *		operation
3538  */
3539 static PgFdwModifyState *
3540 create_foreign_modify(EState *estate,
3541 					  RangeTblEntry *rte,
3542 					  ResultRelInfo *resultRelInfo,
3543 					  CmdType operation,
3544 					  Plan *subplan,
3545 					  char *query,
3546 					  List *target_attrs,
3547 					  bool has_returning,
3548 					  List *retrieved_attrs)
3549 {
3550 	PgFdwModifyState *fmstate;
3551 	Relation	rel = resultRelInfo->ri_RelationDesc;
3552 	TupleDesc	tupdesc = RelationGetDescr(rel);
3553 	Oid			userid;
3554 	ForeignTable *table;
3555 	UserMapping *user;
3556 	AttrNumber	n_params;
3557 	Oid			typefnoid;
3558 	bool		isvarlena;
3559 	ListCell   *lc;
3560 
3561 	/* Begin constructing PgFdwModifyState. */
3562 	fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3563 	fmstate->rel = rel;
3564 
3565 	/*
3566 	 * Identify which user to do the remote access as.  This should match what
3567 	 * ExecCheckRTEPerms() does.
3568 	 */
3569 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3570 
3571 	/* Get info about foreign table. */
3572 	table = GetForeignTable(RelationGetRelid(rel));
3573 	user = GetUserMapping(userid, table->serverid);
3574 
3575 	/* Open connection; report that we'll create a prepared statement. */
3576 	fmstate->conn = GetConnection(user, true);
3577 	fmstate->p_name = NULL;		/* prepared statement not made yet */
3578 
3579 	/* Set up remote query information. */
3580 	fmstate->query = query;
3581 	fmstate->target_attrs = target_attrs;
3582 	fmstate->has_returning = has_returning;
3583 	fmstate->retrieved_attrs = retrieved_attrs;
3584 
3585 	/* Create context for per-tuple temp workspace. */
3586 	fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3587 											  "postgres_fdw temporary data",
3588 											  ALLOCSET_SMALL_SIZES);
3589 
3590 	/* Prepare for input conversion of RETURNING results. */
3591 	if (fmstate->has_returning)
3592 		fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3593 
3594 	/* Prepare for output conversion of parameters used in prepared stmt. */
3595 	n_params = list_length(fmstate->target_attrs) + 1;
3596 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3597 	fmstate->p_nums = 0;
3598 
3599 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
3600 	{
3601 		Assert(subplan != NULL);
3602 
3603 		/* Find the ctid resjunk column in the subplan's result */
3604 		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
3605 														  "ctid");
3606 		if (!AttributeNumberIsValid(fmstate->ctidAttno))
3607 			elog(ERROR, "could not find junk ctid column");
3608 
3609 		/* First transmittable parameter will be ctid */
3610 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
3611 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3612 		fmstate->p_nums++;
3613 	}
3614 
3615 	if (operation == CMD_INSERT || operation == CMD_UPDATE)
3616 	{
3617 		/* Set up for remaining transmittable parameters */
3618 		foreach(lc, fmstate->target_attrs)
3619 		{
3620 			int			attnum = lfirst_int(lc);
3621 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3622 
3623 			Assert(!attr->attisdropped);
3624 
3625 			/* Ignore generated columns; they are set to DEFAULT */
3626 			if (attr->attgenerated)
3627 				continue;
3628 			getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
3629 			fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3630 			fmstate->p_nums++;
3631 		}
3632 	}
3633 
3634 	Assert(fmstate->p_nums <= n_params);
3635 
3636 	/* Initialize auxiliary state */
3637 	fmstate->aux_fmstate = NULL;
3638 
3639 	return fmstate;
3640 }
3641 
3642 /*
3643  * execute_foreign_modify
3644  *		Perform foreign-table modification as required, and fetch RETURNING
3645  *		result if any.  (This is the shared guts of postgresExecForeignInsert,
3646  *		postgresExecForeignUpdate, and postgresExecForeignDelete.)
3647  */
3648 static TupleTableSlot *
3649 execute_foreign_modify(EState *estate,
3650 					   ResultRelInfo *resultRelInfo,
3651 					   CmdType operation,
3652 					   TupleTableSlot *slot,
3653 					   TupleTableSlot *planSlot)
3654 {
3655 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
3656 	ItemPointer ctid = NULL;
3657 	const char **p_values;
3658 	PGresult   *res;
3659 	int			n_rows;
3660 
3661 	/* The operation should be INSERT, UPDATE, or DELETE */
3662 	Assert(operation == CMD_INSERT ||
3663 		   operation == CMD_UPDATE ||
3664 		   operation == CMD_DELETE);
3665 
3666 	/* Set up the prepared statement on the remote server, if we didn't yet */
3667 	if (!fmstate->p_name)
3668 		prepare_foreign_modify(fmstate);
3669 
3670 	/*
3671 	 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
3672 	 */
3673 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
3674 	{
3675 		Datum		datum;
3676 		bool		isNull;
3677 
3678 		datum = ExecGetJunkAttribute(planSlot,
3679 									 fmstate->ctidAttno,
3680 									 &isNull);
3681 		/* shouldn't ever get a null result... */
3682 		if (isNull)
3683 			elog(ERROR, "ctid is NULL");
3684 		ctid = (ItemPointer) DatumGetPointer(datum);
3685 	}
3686 
3687 	/* Convert parameters needed by prepared statement to text form */
3688 	p_values = convert_prep_stmt_params(fmstate, ctid, slot);
3689 
3690 	/*
3691 	 * Execute the prepared statement.
3692 	 */
3693 	if (!PQsendQueryPrepared(fmstate->conn,
3694 							 fmstate->p_name,
3695 							 fmstate->p_nums,
3696 							 p_values,
3697 							 NULL,
3698 							 NULL,
3699 							 0))
3700 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3701 
3702 	/*
3703 	 * Get the result, and check for success.
3704 	 *
3705 	 * We don't use a PG_TRY block here, so be careful not to throw error
3706 	 * without releasing the PGresult.
3707 	 */
3708 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
3709 	if (PQresultStatus(res) !=
3710 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
3711 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3712 
3713 	/* Check number of rows affected, and fetch RETURNING tuple if any */
3714 	if (fmstate->has_returning)
3715 	{
3716 		n_rows = PQntuples(res);
3717 		if (n_rows > 0)
3718 			store_returning_result(fmstate, slot, res);
3719 	}
3720 	else
3721 		n_rows = atoi(PQcmdTuples(res));
3722 
3723 	/* And clean up */
3724 	PQclear(res);
3725 
3726 	MemoryContextReset(fmstate->temp_cxt);
3727 
3728 	/*
3729 	 * Return NULL if nothing was inserted/updated/deleted on the remote end
3730 	 */
3731 	return (n_rows > 0) ? slot : NULL;
3732 }
3733 
3734 /*
3735  * prepare_foreign_modify
3736  *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3737  */
3738 static void
3739 prepare_foreign_modify(PgFdwModifyState *fmstate)
3740 {
3741 	char		prep_name[NAMEDATALEN];
3742 	char	   *p_name;
3743 	PGresult   *res;
3744 
3745 	/* Construct name we'll use for the prepared statement. */
3746 	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3747 			 GetPrepStmtNumber(fmstate->conn));
3748 	p_name = pstrdup(prep_name);
3749 
3750 	/*
3751 	 * We intentionally do not specify parameter types here, but leave the
3752 	 * remote server to derive them by default.  This avoids possible problems
3753 	 * with the remote server using different type OIDs than we do.  All of
3754 	 * the prepared statements we use in this module are simple enough that
3755 	 * the remote server will make the right choices.
3756 	 */
3757 	if (!PQsendPrepare(fmstate->conn,
3758 					   p_name,
3759 					   fmstate->query,
3760 					   0,
3761 					   NULL))
3762 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3763 
3764 	/*
3765 	 * Get the result, and check for success.
3766 	 *
3767 	 * We don't use a PG_TRY block here, so be careful not to throw error
3768 	 * without releasing the PGresult.
3769 	 */
3770 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
3771 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3772 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3773 	PQclear(res);
3774 
3775 	/* This action shows that the prepare has been done. */
3776 	fmstate->p_name = p_name;
3777 }
3778 
3779 /*
3780  * convert_prep_stmt_params
3781  *		Create array of text strings representing parameter values
3782  *
3783  * tupleid is ctid to send, or NULL if none
3784  * slot is slot to get remaining parameters from, or NULL if none
3785  *
3786  * Data is constructed in temp_cxt; caller should reset that after use.
3787  */
3788 static const char **
3789 convert_prep_stmt_params(PgFdwModifyState *fmstate,
3790 						 ItemPointer tupleid,
3791 						 TupleTableSlot *slot)
3792 {
3793 	const char **p_values;
3794 	int			pindex = 0;
3795 	MemoryContext oldcontext;
3796 
3797 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3798 
3799 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3800 
3801 	/* 1st parameter should be ctid, if it's in use */
3802 	if (tupleid != NULL)
3803 	{
3804 		/* don't need set_transmission_modes for TID output */
3805 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3806 											  PointerGetDatum(tupleid));
3807 		pindex++;
3808 	}
3809 
3810 	/* get following parameters from slot */
3811 	if (slot != NULL && fmstate->target_attrs != NIL)
3812 	{
3813 		TupleDesc	tupdesc = RelationGetDescr(fmstate->rel);
3814 		int			nestlevel;
3815 		ListCell   *lc;
3816 
3817 		nestlevel = set_transmission_modes();
3818 
3819 		foreach(lc, fmstate->target_attrs)
3820 		{
3821 			int			attnum = lfirst_int(lc);
3822 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3823 			Datum		value;
3824 			bool		isnull;
3825 
3826 			/* Ignore generated columns; they are set to DEFAULT */
3827 			if (attr->attgenerated)
3828 				continue;
3829 			value = slot_getattr(slot, attnum, &isnull);
3830 			if (isnull)
3831 				p_values[pindex] = NULL;
3832 			else
3833 				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3834 													  value);
3835 			pindex++;
3836 		}
3837 
3838 		reset_transmission_modes(nestlevel);
3839 	}
3840 
3841 	Assert(pindex == fmstate->p_nums);
3842 
3843 	MemoryContextSwitchTo(oldcontext);
3844 
3845 	return p_values;
3846 }
3847 
3848 /*
3849  * store_returning_result
3850  *		Store the result of a RETURNING clause
3851  *
3852  * On error, be sure to release the PGresult on the way out.  Callers do not
3853  * have PG_TRY blocks to ensure this happens.
3854  */
3855 static void
3856 store_returning_result(PgFdwModifyState *fmstate,
3857 					   TupleTableSlot *slot, PGresult *res)
3858 {
3859 	PG_TRY();
3860 	{
3861 		HeapTuple	newtup;
3862 
3863 		newtup = make_tuple_from_result_row(res, 0,
3864 											fmstate->rel,
3865 											fmstate->attinmeta,
3866 											fmstate->retrieved_attrs,
3867 											NULL,
3868 											fmstate->temp_cxt);
3869 
3870 		/*
3871 		 * The returning slot will not necessarily be suitable to store
3872 		 * heaptuples directly, so allow for conversion.
3873 		 */
3874 		ExecForceStoreHeapTuple(newtup, slot, true);
3875 	}
3876 	PG_CATCH();
3877 	{
3878 		if (res)
3879 			PQclear(res);
3880 		PG_RE_THROW();
3881 	}
3882 	PG_END_TRY();
3883 }
3884 
3885 /*
3886  * finish_foreign_modify
3887  *		Release resources for a foreign insert/update/delete operation
3888  */
3889 static void
3890 finish_foreign_modify(PgFdwModifyState *fmstate)
3891 {
3892 	Assert(fmstate != NULL);
3893 
3894 	/* If we created a prepared statement, destroy it */
3895 	if (fmstate->p_name)
3896 	{
3897 		char		sql[64];
3898 		PGresult   *res;
3899 
3900 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
3901 
3902 		/*
3903 		 * We don't use a PG_TRY block here, so be careful not to throw error
3904 		 * without releasing the PGresult.
3905 		 */
3906 		res = pgfdw_exec_query(fmstate->conn, sql);
3907 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
3908 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
3909 		PQclear(res);
3910 		fmstate->p_name = NULL;
3911 	}
3912 
3913 	/* Release remote connection */
3914 	ReleaseConnection(fmstate->conn);
3915 	fmstate->conn = NULL;
3916 }
3917 
3918 /*
3919  * build_remote_returning
3920  *		Build a RETURNING targetlist of a remote query for performing an
3921  *		UPDATE/DELETE .. RETURNING on a join directly
3922  */
3923 static List *
3924 build_remote_returning(Index rtindex, Relation rel, List *returningList)
3925 {
3926 	bool		have_wholerow = false;
3927 	List	   *tlist = NIL;
3928 	List	   *vars;
3929 	ListCell   *lc;
3930 
3931 	Assert(returningList);
3932 
3933 	vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
3934 
3935 	/*
3936 	 * If there's a whole-row reference to the target relation, then we'll
3937 	 * need all the columns of the relation.
3938 	 */
3939 	foreach(lc, vars)
3940 	{
3941 		Var		   *var = (Var *) lfirst(lc);
3942 
3943 		if (IsA(var, Var) &&
3944 			var->varno == rtindex &&
3945 			var->varattno == InvalidAttrNumber)
3946 		{
3947 			have_wholerow = true;
3948 			break;
3949 		}
3950 	}
3951 
3952 	if (have_wholerow)
3953 	{
3954 		TupleDesc	tupdesc = RelationGetDescr(rel);
3955 		int			i;
3956 
3957 		for (i = 1; i <= tupdesc->natts; i++)
3958 		{
3959 			Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
3960 			Var		   *var;
3961 
3962 			/* Ignore dropped attributes. */
3963 			if (attr->attisdropped)
3964 				continue;
3965 
3966 			var = makeVar(rtindex,
3967 						  i,
3968 						  attr->atttypid,
3969 						  attr->atttypmod,
3970 						  attr->attcollation,
3971 						  0);
3972 
3973 			tlist = lappend(tlist,
3974 							makeTargetEntry((Expr *) var,
3975 											list_length(tlist) + 1,
3976 											NULL,
3977 											false));
3978 		}
3979 	}
3980 
3981 	/* Now add any remaining columns to tlist. */
3982 	foreach(lc, vars)
3983 	{
3984 		Var		   *var = (Var *) lfirst(lc);
3985 
3986 		/*
3987 		 * No need for whole-row references to the target relation.  We don't
3988 		 * need system columns other than ctid and oid either, since those are
3989 		 * set locally.
3990 		 */
3991 		if (IsA(var, Var) &&
3992 			var->varno == rtindex &&
3993 			var->varattno <= InvalidAttrNumber &&
3994 			var->varattno != SelfItemPointerAttributeNumber)
3995 			continue;			/* don't need it */
3996 
3997 		if (tlist_member((Expr *) var, tlist))
3998 			continue;			/* already got it */
3999 
4000 		tlist = lappend(tlist,
4001 						makeTargetEntry((Expr *) var,
4002 										list_length(tlist) + 1,
4003 										NULL,
4004 										false));
4005 	}
4006 
4007 	list_free(vars);
4008 
4009 	return tlist;
4010 }
4011 
4012 /*
4013  * rebuild_fdw_scan_tlist
4014  *		Build new fdw_scan_tlist of given foreign-scan plan node from given
4015  *		tlist
4016  *
4017  * There might be columns that the fdw_scan_tlist of the given foreign-scan
4018  * plan node contains that the given tlist doesn't.  The fdw_scan_tlist would
4019  * have contained resjunk columns such as 'ctid' of the target relation and
4020  * 'wholerow' of non-target relations, but the tlist might not contain them,
4021  * for example.  So, adjust the tlist so it contains all the columns specified
4022  * in the fdw_scan_tlist; else setrefs.c will get confused.
4023  */
4024 static void
4025 rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
4026 {
4027 	List	   *new_tlist = tlist;
4028 	List	   *old_tlist = fscan->fdw_scan_tlist;
4029 	ListCell   *lc;
4030 
4031 	foreach(lc, old_tlist)
4032 	{
4033 		TargetEntry *tle = (TargetEntry *) lfirst(lc);
4034 
4035 		if (tlist_member(tle->expr, new_tlist))
4036 			continue;			/* already got it */
4037 
4038 		new_tlist = lappend(new_tlist,
4039 							makeTargetEntry(tle->expr,
4040 											list_length(new_tlist) + 1,
4041 											NULL,
4042 											false));
4043 	}
4044 	fscan->fdw_scan_tlist = new_tlist;
4045 }
4046 
4047 /*
4048  * Execute a direct UPDATE/DELETE statement.
4049  */
4050 static void
4051 execute_dml_stmt(ForeignScanState *node)
4052 {
4053 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4054 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
4055 	int			numParams = dmstate->numParams;
4056 	const char **values = dmstate->param_values;
4057 
4058 	/*
4059 	 * Construct array of query parameter values in text format.
4060 	 */
4061 	if (numParams > 0)
4062 		process_query_params(econtext,
4063 							 dmstate->param_flinfo,
4064 							 dmstate->param_exprs,
4065 							 values);
4066 
4067 	/*
4068 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
4069 	 * to infer types for all parameters.  Since we explicitly cast every
4070 	 * parameter (see deparse.c), the "inference" is trivial and will produce
4071 	 * the desired result.  This allows us to avoid assuming that the remote
4072 	 * server has the same OIDs we do for the parameters' types.
4073 	 */
4074 	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4075 						   NULL, values, NULL, NULL, 0))
4076 		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4077 
4078 	/*
4079 	 * Get the result, and check for success.
4080 	 *
4081 	 * We don't use a PG_TRY block here, so be careful not to throw error
4082 	 * without releasing the PGresult.
4083 	 */
4084 	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4085 	if (PQresultStatus(dmstate->result) !=
4086 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4087 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4088 						   dmstate->query);
4089 
4090 	/* Get the number of rows affected. */
4091 	if (dmstate->has_returning)
4092 		dmstate->num_tuples = PQntuples(dmstate->result);
4093 	else
4094 		dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4095 }
4096 
4097 /*
4098  * Get the result of a RETURNING clause.
4099  */
4100 static TupleTableSlot *
4101 get_returning_data(ForeignScanState *node)
4102 {
4103 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4104 	EState	   *estate = node->ss.ps.state;
4105 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
4106 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4107 	TupleTableSlot *resultSlot;
4108 
4109 	Assert(resultRelInfo->ri_projectReturning);
4110 
4111 	/* If we didn't get any tuples, must be end of data. */
4112 	if (dmstate->next_tuple >= dmstate->num_tuples)
4113 		return ExecClearTuple(slot);
4114 
4115 	/* Increment the command es_processed count if necessary. */
4116 	if (dmstate->set_processed)
4117 		estate->es_processed += 1;
4118 
4119 	/*
4120 	 * Store a RETURNING tuple.  If has_returning is false, just emit a dummy
4121 	 * tuple.  (has_returning is false when the local query is of the form
4122 	 * "UPDATE/DELETE .. RETURNING 1" for example.)
4123 	 */
4124 	if (!dmstate->has_returning)
4125 	{
4126 		ExecStoreAllNullTuple(slot);
4127 		resultSlot = slot;
4128 	}
4129 	else
4130 	{
4131 		/*
4132 		 * On error, be sure to release the PGresult on the way out.  Callers
4133 		 * do not have PG_TRY blocks to ensure this happens.
4134 		 */
4135 		PG_TRY();
4136 		{
4137 			HeapTuple	newtup;
4138 
4139 			newtup = make_tuple_from_result_row(dmstate->result,
4140 												dmstate->next_tuple,
4141 												dmstate->rel,
4142 												dmstate->attinmeta,
4143 												dmstate->retrieved_attrs,
4144 												node,
4145 												dmstate->temp_cxt);
4146 			ExecStoreHeapTuple(newtup, slot, false);
4147 		}
4148 		PG_CATCH();
4149 		{
4150 			if (dmstate->result)
4151 				PQclear(dmstate->result);
4152 			PG_RE_THROW();
4153 		}
4154 		PG_END_TRY();
4155 
4156 		/* Get the updated/deleted tuple. */
4157 		if (dmstate->rel)
4158 			resultSlot = slot;
4159 		else
4160 			resultSlot = apply_returning_filter(dmstate, slot, estate);
4161 	}
4162 	dmstate->next_tuple++;
4163 
4164 	/* Make slot available for evaluation of the local query RETURNING list. */
4165 	resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4166 		resultSlot;
4167 
4168 	return slot;
4169 }
4170 
4171 /*
4172  * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4173  */
4174 static void
4175 init_returning_filter(PgFdwDirectModifyState *dmstate,
4176 					  List *fdw_scan_tlist,
4177 					  Index rtindex)
4178 {
4179 	TupleDesc	resultTupType = RelationGetDescr(dmstate->resultRel);
4180 	ListCell   *lc;
4181 	int			i;
4182 
4183 	/*
4184 	 * Calculate the mapping between the fdw_scan_tlist's entries and the
4185 	 * result tuple's attributes.
4186 	 *
4187 	 * The "map" is an array of indexes of the result tuple's attributes in
4188 	 * fdw_scan_tlist, i.e., one entry for every attribute of the result
4189 	 * tuple.  We store zero for any attributes that don't have the
4190 	 * corresponding entries in that list, marking that a NULL is needed in
4191 	 * the result tuple.
4192 	 *
4193 	 * Also get the indexes of the entries for ctid and oid if any.
4194 	 */
4195 	dmstate->attnoMap = (AttrNumber *)
4196 		palloc0(resultTupType->natts * sizeof(AttrNumber));
4197 
4198 	dmstate->ctidAttno = dmstate->oidAttno = 0;
4199 
4200 	i = 1;
4201 	dmstate->hasSystemCols = false;
4202 	foreach(lc, fdw_scan_tlist)
4203 	{
4204 		TargetEntry *tle = (TargetEntry *) lfirst(lc);
4205 		Var		   *var = (Var *) tle->expr;
4206 
4207 		Assert(IsA(var, Var));
4208 
4209 		/*
4210 		 * If the Var is a column of the target relation to be retrieved from
4211 		 * the foreign server, get the index of the entry.
4212 		 */
4213 		if (var->varno == rtindex &&
4214 			list_member_int(dmstate->retrieved_attrs, i))
4215 		{
4216 			int			attrno = var->varattno;
4217 
4218 			if (attrno < 0)
4219 			{
4220 				/*
4221 				 * We don't retrieve system columns other than ctid and oid.
4222 				 */
4223 				if (attrno == SelfItemPointerAttributeNumber)
4224 					dmstate->ctidAttno = i;
4225 				else
4226 					Assert(false);
4227 				dmstate->hasSystemCols = true;
4228 			}
4229 			else
4230 			{
4231 				/*
4232 				 * We don't retrieve whole-row references to the target
4233 				 * relation either.
4234 				 */
4235 				Assert(attrno > 0);
4236 
4237 				dmstate->attnoMap[attrno - 1] = i;
4238 			}
4239 		}
4240 		i++;
4241 	}
4242 }
4243 
4244 /*
4245  * Extract and return an updated/deleted tuple from a scan tuple.
4246  */
4247 static TupleTableSlot *
4248 apply_returning_filter(PgFdwDirectModifyState *dmstate,
4249 					   TupleTableSlot *slot,
4250 					   EState *estate)
4251 {
4252 	ResultRelInfo *relInfo = estate->es_result_relation_info;
4253 	TupleDesc	resultTupType = RelationGetDescr(dmstate->resultRel);
4254 	TupleTableSlot *resultSlot;
4255 	Datum	   *values;
4256 	bool	   *isnull;
4257 	Datum	   *old_values;
4258 	bool	   *old_isnull;
4259 	int			i;
4260 
4261 	/*
4262 	 * Use the return tuple slot as a place to store the result tuple.
4263 	 */
4264 	resultSlot = ExecGetReturningSlot(estate, relInfo);
4265 
4266 	/*
4267 	 * Extract all the values of the scan tuple.
4268 	 */
4269 	slot_getallattrs(slot);
4270 	old_values = slot->tts_values;
4271 	old_isnull = slot->tts_isnull;
4272 
4273 	/*
4274 	 * Prepare to build the result tuple.
4275 	 */
4276 	ExecClearTuple(resultSlot);
4277 	values = resultSlot->tts_values;
4278 	isnull = resultSlot->tts_isnull;
4279 
4280 	/*
4281 	 * Transpose data into proper fields of the result tuple.
4282 	 */
4283 	for (i = 0; i < resultTupType->natts; i++)
4284 	{
4285 		int			j = dmstate->attnoMap[i];
4286 
4287 		if (j == 0)
4288 		{
4289 			values[i] = (Datum) 0;
4290 			isnull[i] = true;
4291 		}
4292 		else
4293 		{
4294 			values[i] = old_values[j - 1];
4295 			isnull[i] = old_isnull[j - 1];
4296 		}
4297 	}
4298 
4299 	/*
4300 	 * Build the virtual tuple.
4301 	 */
4302 	ExecStoreVirtualTuple(resultSlot);
4303 
4304 	/*
4305 	 * If we have any system columns to return, materialize a heap tuple in
4306 	 * the slot from column values set above and install system columns in
4307 	 * that tuple.
4308 	 */
4309 	if (dmstate->hasSystemCols)
4310 	{
4311 		HeapTuple	resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4312 
4313 		/* ctid */
4314 		if (dmstate->ctidAttno)
4315 		{
4316 			ItemPointer ctid = NULL;
4317 
4318 			ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4319 			resultTup->t_self = *ctid;
4320 		}
4321 
4322 		/*
4323 		 * And remaining columns
4324 		 *
4325 		 * Note: since we currently don't allow the target relation to appear
4326 		 * on the nullable side of an outer join, any system columns wouldn't
4327 		 * go to NULL.
4328 		 *
4329 		 * Note: no need to care about tableoid here because it will be
4330 		 * initialized in ExecProcessReturning().
4331 		 */
4332 		HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
4333 		HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
4334 		HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
4335 	}
4336 
4337 	/*
4338 	 * And return the result tuple.
4339 	 */
4340 	return resultSlot;
4341 }
4342 
4343 /*
4344  * Prepare for processing of parameters used in remote query.
4345  */
4346 static void
4347 prepare_query_params(PlanState *node,
4348 					 List *fdw_exprs,
4349 					 int numParams,
4350 					 FmgrInfo **param_flinfo,
4351 					 List **param_exprs,
4352 					 const char ***param_values)
4353 {
4354 	int			i;
4355 	ListCell   *lc;
4356 
4357 	Assert(numParams > 0);
4358 
4359 	/* Prepare for output conversion of parameters used in remote query. */
4360 	*param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4361 
4362 	i = 0;
4363 	foreach(lc, fdw_exprs)
4364 	{
4365 		Node	   *param_expr = (Node *) lfirst(lc);
4366 		Oid			typefnoid;
4367 		bool		isvarlena;
4368 
4369 		getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4370 		fmgr_info(typefnoid, &(*param_flinfo)[i]);
4371 		i++;
4372 	}
4373 
4374 	/*
4375 	 * Prepare remote-parameter expressions for evaluation.  (Note: in
4376 	 * practice, we expect that all these expressions will be just Params, so
4377 	 * we could possibly do something more efficient than using the full
4378 	 * expression-eval machinery for this.  But probably there would be little
4379 	 * benefit, and it'd require postgres_fdw to know more than is desirable
4380 	 * about Param evaluation.)
4381 	 */
4382 	*param_exprs = ExecInitExprList(fdw_exprs, node);
4383 
4384 	/* Allocate buffer for text form of query parameters. */
4385 	*param_values = (const char **) palloc0(numParams * sizeof(char *));
4386 }
4387 
4388 /*
4389  * Construct array of query parameter values in text format.
4390  */
4391 static void
4392 process_query_params(ExprContext *econtext,
4393 					 FmgrInfo *param_flinfo,
4394 					 List *param_exprs,
4395 					 const char **param_values)
4396 {
4397 	int			nestlevel;
4398 	int			i;
4399 	ListCell   *lc;
4400 
4401 	nestlevel = set_transmission_modes();
4402 
4403 	i = 0;
4404 	foreach(lc, param_exprs)
4405 	{
4406 		ExprState  *expr_state = (ExprState *) lfirst(lc);
4407 		Datum		expr_value;
4408 		bool		isNull;
4409 
4410 		/* Evaluate the parameter expression */
4411 		expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4412 
4413 		/*
4414 		 * Get string representation of each parameter value by invoking
4415 		 * type-specific output function, unless the value is null.
4416 		 */
4417 		if (isNull)
4418 			param_values[i] = NULL;
4419 		else
4420 			param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4421 
4422 		i++;
4423 	}
4424 
4425 	reset_transmission_modes(nestlevel);
4426 }
4427 
4428 /*
4429  * postgresAnalyzeForeignTable
4430  *		Test whether analyzing this foreign table is supported
4431  */
4432 static bool
4433 postgresAnalyzeForeignTable(Relation relation,
4434 							AcquireSampleRowsFunc *func,
4435 							BlockNumber *totalpages)
4436 {
4437 	ForeignTable *table;
4438 	UserMapping *user;
4439 	PGconn	   *conn;
4440 	StringInfoData sql;
4441 	PGresult   *volatile res = NULL;
4442 
4443 	/* Return the row-analysis function pointer */
4444 	*func = postgresAcquireSampleRowsFunc;
4445 
4446 	/*
4447 	 * Now we have to get the number of pages.  It's annoying that the ANALYZE
4448 	 * API requires us to return that now, because it forces some duplication
4449 	 * of effort between this routine and postgresAcquireSampleRowsFunc.  But
4450 	 * it's probably not worth redefining that API at this point.
4451 	 */
4452 
4453 	/*
4454 	 * Get the connection to use.  We do the remote access as the table's
4455 	 * owner, even if the ANALYZE was started by some other user.
4456 	 */
4457 	table = GetForeignTable(RelationGetRelid(relation));
4458 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4459 	conn = GetConnection(user, false);
4460 
4461 	/*
4462 	 * Construct command to get page count for relation.
4463 	 */
4464 	initStringInfo(&sql);
4465 	deparseAnalyzeSizeSql(&sql, relation);
4466 
4467 	/* In what follows, do not risk leaking any PGresults. */
4468 	PG_TRY();
4469 	{
4470 		res = pgfdw_exec_query(conn, sql.data);
4471 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
4472 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
4473 
4474 		if (PQntuples(res) != 1 || PQnfields(res) != 1)
4475 			elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4476 		*totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4477 	}
4478 	PG_FINALLY();
4479 	{
4480 		if (res)
4481 			PQclear(res);
4482 	}
4483 	PG_END_TRY();
4484 
4485 	ReleaseConnection(conn);
4486 
4487 	return true;
4488 }
4489 
4490 /*
4491  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4492  *
4493  * We fetch the whole table from the remote side and pick out some sample rows.
4494  *
4495  * Selected rows are returned in the caller-allocated array rows[],
4496  * which must have at least targrows entries.
4497  * The actual number of rows selected is returned as the function result.
4498  * We also count the total number of rows in the table and return it into
4499  * *totalrows.  Note that *totaldeadrows is always set to 0.
4500  *
4501  * Note that the returned list of rows is not always in order by physical
4502  * position in the table.  Therefore, correlation estimates derived later
4503  * may be meaningless, but it's OK because we don't use the estimates
4504  * currently (the planner only pays attention to correlation for indexscans).
4505  */
4506 static int
4507 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
4508 							  HeapTuple *rows, int targrows,
4509 							  double *totalrows,
4510 							  double *totaldeadrows)
4511 {
4512 	PgFdwAnalyzeState astate;
4513 	ForeignTable *table;
4514 	ForeignServer *server;
4515 	UserMapping *user;
4516 	PGconn	   *conn;
4517 	unsigned int cursor_number;
4518 	StringInfoData sql;
4519 	PGresult   *volatile res = NULL;
4520 
4521 	/* Initialize workspace state */
4522 	astate.rel = relation;
4523 	astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
4524 
4525 	astate.rows = rows;
4526 	astate.targrows = targrows;
4527 	astate.numrows = 0;
4528 	astate.samplerows = 0;
4529 	astate.rowstoskip = -1;		/* -1 means not set yet */
4530 	reservoir_init_selection_state(&astate.rstate, targrows);
4531 
4532 	/* Remember ANALYZE context, and create a per-tuple temp context */
4533 	astate.anl_cxt = CurrentMemoryContext;
4534 	astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
4535 											"postgres_fdw temporary data",
4536 											ALLOCSET_SMALL_SIZES);
4537 
4538 	/*
4539 	 * Get the connection to use.  We do the remote access as the table's
4540 	 * owner, even if the ANALYZE was started by some other user.
4541 	 */
4542 	table = GetForeignTable(RelationGetRelid(relation));
4543 	server = GetForeignServer(table->serverid);
4544 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4545 	conn = GetConnection(user, false);
4546 
4547 	/*
4548 	 * Construct cursor that retrieves whole rows from remote.
4549 	 */
4550 	cursor_number = GetCursorNumber(conn);
4551 	initStringInfo(&sql);
4552 	appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
4553 	deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
4554 
4555 	/* In what follows, do not risk leaking any PGresults. */
4556 	PG_TRY();
4557 	{
4558 		char		fetch_sql[64];
4559 		int			fetch_size;
4560 		ListCell   *lc;
4561 
4562 		res = pgfdw_exec_query(conn, sql.data);
4563 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
4564 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
4565 		PQclear(res);
4566 		res = NULL;
4567 
4568 		/*
4569 		 * Determine the fetch size.  The default is arbitrary, but shouldn't
4570 		 * be enormous.
4571 		 */
4572 		fetch_size = 100;
4573 		foreach(lc, server->options)
4574 		{
4575 			DefElem    *def = (DefElem *) lfirst(lc);
4576 
4577 			if (strcmp(def->defname, "fetch_size") == 0)
4578 			{
4579 				fetch_size = strtol(defGetString(def), NULL, 10);
4580 				break;
4581 			}
4582 		}
4583 		foreach(lc, table->options)
4584 		{
4585 			DefElem    *def = (DefElem *) lfirst(lc);
4586 
4587 			if (strcmp(def->defname, "fetch_size") == 0)
4588 			{
4589 				fetch_size = strtol(defGetString(def), NULL, 10);
4590 				break;
4591 			}
4592 		}
4593 
4594 		/* Construct command to fetch rows from remote. */
4595 		snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
4596 				 fetch_size, cursor_number);
4597 
4598 		/* Retrieve and process rows a batch at a time. */
4599 		for (;;)
4600 		{
4601 			int			numrows;
4602 			int			i;
4603 
4604 			/* Allow users to cancel long query */
4605 			CHECK_FOR_INTERRUPTS();
4606 
4607 			/*
4608 			 * XXX possible future improvement: if rowstoskip is large, we
4609 			 * could issue a MOVE rather than physically fetching the rows,
4610 			 * then just adjust rowstoskip and samplerows appropriately.
4611 			 */
4612 
4613 			/* Fetch some rows */
4614 			res = pgfdw_exec_query(conn, fetch_sql);
4615 			/* On error, report the original query, not the FETCH. */
4616 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
4617 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
4618 
4619 			/* Process whatever we got. */
4620 			numrows = PQntuples(res);
4621 			for (i = 0; i < numrows; i++)
4622 				analyze_row_processor(res, i, &astate);
4623 
4624 			PQclear(res);
4625 			res = NULL;
4626 
4627 			/* Must be EOF if we didn't get all the rows requested. */
4628 			if (numrows < fetch_size)
4629 				break;
4630 		}
4631 
4632 		/* Close the cursor, just to be tidy. */
4633 		close_cursor(conn, cursor_number);
4634 	}
4635 	PG_CATCH();
4636 	{
4637 		if (res)
4638 			PQclear(res);
4639 		PG_RE_THROW();
4640 	}
4641 	PG_END_TRY();
4642 
4643 	ReleaseConnection(conn);
4644 
4645 	/* We assume that we have no dead tuple. */
4646 	*totaldeadrows = 0.0;
4647 
4648 	/* We've retrieved all living tuples from foreign server. */
4649 	*totalrows = astate.samplerows;
4650 
4651 	/*
4652 	 * Emit some interesting relation info
4653 	 */
4654 	ereport(elevel,
4655 			(errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
4656 					RelationGetRelationName(relation),
4657 					astate.samplerows, astate.numrows)));
4658 
4659 	return astate.numrows;
4660 }
4661 
4662 /*
4663  * Collect sample rows from the result of query.
4664  *	 - Use all tuples in sample until target # of samples are collected.
4665  *	 - Subsequently, replace already-sampled tuples randomly.
4666  */
4667 static void
4668 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
4669 {
4670 	int			targrows = astate->targrows;
4671 	int			pos;			/* array index to store tuple in */
4672 	MemoryContext oldcontext;
4673 
4674 	/* Always increment sample row counter. */
4675 	astate->samplerows += 1;
4676 
4677 	/*
4678 	 * Determine the slot where this sample row should be stored.  Set pos to
4679 	 * negative value to indicate the row should be skipped.
4680 	 */
4681 	if (astate->numrows < targrows)
4682 	{
4683 		/* First targrows rows are always included into the sample */
4684 		pos = astate->numrows++;
4685 	}
4686 	else
4687 	{
4688 		/*
4689 		 * Now we start replacing tuples in the sample until we reach the end
4690 		 * of the relation.  Same algorithm as in acquire_sample_rows in
4691 		 * analyze.c; see Jeff Vitter's paper.
4692 		 */
4693 		if (astate->rowstoskip < 0)
4694 			astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
4695 
4696 		if (astate->rowstoskip <= 0)
4697 		{
4698 			/* Choose a random reservoir element to replace. */
4699 			pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
4700 			Assert(pos >= 0 && pos < targrows);
4701 			heap_freetuple(astate->rows[pos]);
4702 		}
4703 		else
4704 		{
4705 			/* Skip this tuple. */
4706 			pos = -1;
4707 		}
4708 
4709 		astate->rowstoskip -= 1;
4710 	}
4711 
4712 	if (pos >= 0)
4713 	{
4714 		/*
4715 		 * Create sample tuple from current result row, and store it in the
4716 		 * position determined above.  The tuple has to be created in anl_cxt.
4717 		 */
4718 		oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
4719 
4720 		astate->rows[pos] = make_tuple_from_result_row(res, row,
4721 													   astate->rel,
4722 													   astate->attinmeta,
4723 													   astate->retrieved_attrs,
4724 													   NULL,
4725 													   astate->temp_cxt);
4726 
4727 		MemoryContextSwitchTo(oldcontext);
4728 	}
4729 }
4730 
4731 /*
4732  * Import a foreign schema
4733  */
4734 static List *
4735 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
4736 {
4737 	List	   *commands = NIL;
4738 	bool		import_collate = true;
4739 	bool		import_default = false;
4740 	bool		import_generated = true;
4741 	bool		import_not_null = true;
4742 	ForeignServer *server;
4743 	UserMapping *mapping;
4744 	PGconn	   *conn;
4745 	StringInfoData buf;
4746 	PGresult   *volatile res = NULL;
4747 	int			numrows,
4748 				i;
4749 	ListCell   *lc;
4750 
4751 	/* Parse statement options */
4752 	foreach(lc, stmt->options)
4753 	{
4754 		DefElem    *def = (DefElem *) lfirst(lc);
4755 
4756 		if (strcmp(def->defname, "import_collate") == 0)
4757 			import_collate = defGetBoolean(def);
4758 		else if (strcmp(def->defname, "import_default") == 0)
4759 			import_default = defGetBoolean(def);
4760 		else if (strcmp(def->defname, "import_generated") == 0)
4761 			import_generated = defGetBoolean(def);
4762 		else if (strcmp(def->defname, "import_not_null") == 0)
4763 			import_not_null = defGetBoolean(def);
4764 		else
4765 			ereport(ERROR,
4766 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
4767 					 errmsg("invalid option \"%s\"", def->defname)));
4768 	}
4769 
4770 	/*
4771 	 * Get connection to the foreign server.  Connection manager will
4772 	 * establish new connection if necessary.
4773 	 */
4774 	server = GetForeignServer(serverOid);
4775 	mapping = GetUserMapping(GetUserId(), server->serverid);
4776 	conn = GetConnection(mapping, false);
4777 
4778 	/* Don't attempt to import collation if remote server hasn't got it */
4779 	if (PQserverVersion(conn) < 90100)
4780 		import_collate = false;
4781 
4782 	/* Create workspace for strings */
4783 	initStringInfo(&buf);
4784 
4785 	/* In what follows, do not risk leaking any PGresults. */
4786 	PG_TRY();
4787 	{
4788 		/* Check that the schema really exists */
4789 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
4790 		deparseStringLiteral(&buf, stmt->remote_schema);
4791 
4792 		res = pgfdw_exec_query(conn, buf.data);
4793 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
4794 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
4795 
4796 		if (PQntuples(res) != 1)
4797 			ereport(ERROR,
4798 					(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
4799 					 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
4800 							stmt->remote_schema, server->servername)));
4801 
4802 		PQclear(res);
4803 		res = NULL;
4804 		resetStringInfo(&buf);
4805 
4806 		/*
4807 		 * Fetch all table data from this schema, possibly restricted by
4808 		 * EXCEPT or LIMIT TO.  (We don't actually need to pay any attention
4809 		 * to EXCEPT/LIMIT TO here, because the core code will filter the
4810 		 * statements we return according to those lists anyway.  But it
4811 		 * should save a few cycles to not process excluded tables in the
4812 		 * first place.)
4813 		 *
4814 		 * Ignore table data for partitions and only include the definitions
4815 		 * of the root partitioned tables to allow access to the complete
4816 		 * remote data set locally in the schema imported.
4817 		 *
4818 		 * Note: because we run the connection with search_path restricted to
4819 		 * pg_catalog, the format_type() and pg_get_expr() outputs will always
4820 		 * include a schema name for types/functions in other schemas, which
4821 		 * is what we want.
4822 		 */
4823 		appendStringInfoString(&buf,
4824 							   "SELECT relname, "
4825 							   "  attname, "
4826 							   "  format_type(atttypid, atttypmod), "
4827 							   "  attnotnull, ");
4828 
4829 		/* Generated columns are supported since Postgres 12 */
4830 		if (PQserverVersion(conn) >= 120000)
4831 			appendStringInfoString(&buf,
4832 								   "  attgenerated, "
4833 								   "  pg_get_expr(adbin, adrelid), ");
4834 		else
4835 			appendStringInfoString(&buf,
4836 								   "  NULL, "
4837 								   "  pg_get_expr(adbin, adrelid), ");
4838 
4839 		if (import_collate)
4840 			appendStringInfoString(&buf,
4841 								   "  collname, "
4842 								   "  collnsp.nspname "
4843 								   "FROM pg_class c "
4844 								   "  JOIN pg_namespace n ON "
4845 								   "    relnamespace = n.oid "
4846 								   "  LEFT JOIN pg_attribute a ON "
4847 								   "    attrelid = c.oid AND attnum > 0 "
4848 								   "      AND NOT attisdropped "
4849 								   "  LEFT JOIN pg_attrdef ad ON "
4850 								   "    adrelid = c.oid AND adnum = attnum "
4851 								   "  LEFT JOIN pg_collation coll ON "
4852 								   "    coll.oid = attcollation "
4853 								   "  LEFT JOIN pg_namespace collnsp ON "
4854 								   "    collnsp.oid = collnamespace ");
4855 		else
4856 			appendStringInfoString(&buf,
4857 								   "  NULL, NULL "
4858 								   "FROM pg_class c "
4859 								   "  JOIN pg_namespace n ON "
4860 								   "    relnamespace = n.oid "
4861 								   "  LEFT JOIN pg_attribute a ON "
4862 								   "    attrelid = c.oid AND attnum > 0 "
4863 								   "      AND NOT attisdropped "
4864 								   "  LEFT JOIN pg_attrdef ad ON "
4865 								   "    adrelid = c.oid AND adnum = attnum ");
4866 
4867 		appendStringInfoString(&buf,
4868 							   "WHERE c.relkind IN ("
4869 							   CppAsString2(RELKIND_RELATION) ","
4870 							   CppAsString2(RELKIND_VIEW) ","
4871 							   CppAsString2(RELKIND_FOREIGN_TABLE) ","
4872 							   CppAsString2(RELKIND_MATVIEW) ","
4873 							   CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
4874 							   "  AND n.nspname = ");
4875 		deparseStringLiteral(&buf, stmt->remote_schema);
4876 
4877 		/* Partitions are supported since Postgres 10 */
4878 		if (PQserverVersion(conn) >= 100000)
4879 			appendStringInfoString(&buf, " AND NOT c.relispartition ");
4880 
4881 		/* Apply restrictions for LIMIT TO and EXCEPT */
4882 		if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
4883 			stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4884 		{
4885 			bool		first_item = true;
4886 
4887 			appendStringInfoString(&buf, " AND c.relname ");
4888 			if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4889 				appendStringInfoString(&buf, "NOT ");
4890 			appendStringInfoString(&buf, "IN (");
4891 
4892 			/* Append list of table names within IN clause */
4893 			foreach(lc, stmt->table_list)
4894 			{
4895 				RangeVar   *rv = (RangeVar *) lfirst(lc);
4896 
4897 				if (first_item)
4898 					first_item = false;
4899 				else
4900 					appendStringInfoString(&buf, ", ");
4901 				deparseStringLiteral(&buf, rv->relname);
4902 			}
4903 			appendStringInfoChar(&buf, ')');
4904 		}
4905 
4906 		/* Append ORDER BY at the end of query to ensure output ordering */
4907 		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
4908 
4909 		/* Fetch the data */
4910 		res = pgfdw_exec_query(conn, buf.data);
4911 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
4912 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
4913 
4914 		/* Process results */
4915 		numrows = PQntuples(res);
4916 		/* note: incrementation of i happens in inner loop's while() test */
4917 		for (i = 0; i < numrows;)
4918 		{
4919 			char	   *tablename = PQgetvalue(res, i, 0);
4920 			bool		first_item = true;
4921 
4922 			resetStringInfo(&buf);
4923 			appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
4924 							 quote_identifier(tablename));
4925 
4926 			/* Scan all rows for this table */
4927 			do
4928 			{
4929 				char	   *attname;
4930 				char	   *typename;
4931 				char	   *attnotnull;
4932 				char	   *attgenerated;
4933 				char	   *attdefault;
4934 				char	   *collname;
4935 				char	   *collnamespace;
4936 
4937 				/* If table has no columns, we'll see nulls here */
4938 				if (PQgetisnull(res, i, 1))
4939 					continue;
4940 
4941 				attname = PQgetvalue(res, i, 1);
4942 				typename = PQgetvalue(res, i, 2);
4943 				attnotnull = PQgetvalue(res, i, 3);
4944 				attgenerated = PQgetisnull(res, i, 4) ? (char *) NULL :
4945 					PQgetvalue(res, i, 4);
4946 				attdefault = PQgetisnull(res, i, 5) ? (char *) NULL :
4947 					PQgetvalue(res, i, 5);
4948 				collname = PQgetisnull(res, i, 6) ? (char *) NULL :
4949 					PQgetvalue(res, i, 6);
4950 				collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL :
4951 					PQgetvalue(res, i, 7);
4952 
4953 				if (first_item)
4954 					first_item = false;
4955 				else
4956 					appendStringInfoString(&buf, ",\n");
4957 
4958 				/* Print column name and type */
4959 				appendStringInfo(&buf, "  %s %s",
4960 								 quote_identifier(attname),
4961 								 typename);
4962 
4963 				/*
4964 				 * Add column_name option so that renaming the foreign table's
4965 				 * column doesn't break the association to the underlying
4966 				 * column.
4967 				 */
4968 				appendStringInfoString(&buf, " OPTIONS (column_name ");
4969 				deparseStringLiteral(&buf, attname);
4970 				appendStringInfoChar(&buf, ')');
4971 
4972 				/* Add COLLATE if needed */
4973 				if (import_collate && collname != NULL && collnamespace != NULL)
4974 					appendStringInfo(&buf, " COLLATE %s.%s",
4975 									 quote_identifier(collnamespace),
4976 									 quote_identifier(collname));
4977 
4978 				/* Add DEFAULT if needed */
4979 				if (import_default && attdefault != NULL &&
4980 					(!attgenerated || !attgenerated[0]))
4981 					appendStringInfo(&buf, " DEFAULT %s", attdefault);
4982 
4983 				/* Add GENERATED if needed */
4984 				if (import_generated && attgenerated != NULL &&
4985 					attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
4986 				{
4987 					Assert(attdefault != NULL);
4988 					appendStringInfo(&buf,
4989 									 " GENERATED ALWAYS AS (%s) STORED",
4990 									 attdefault);
4991 				}
4992 
4993 				/* Add NOT NULL if needed */
4994 				if (import_not_null && attnotnull[0] == 't')
4995 					appendStringInfoString(&buf, " NOT NULL");
4996 			}
4997 			while (++i < numrows &&
4998 				   strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4999 
5000 			/*
5001 			 * Add server name and table-level options.  We specify remote
5002 			 * schema and table name as options (the latter to ensure that
5003 			 * renaming the foreign table doesn't break the association).
5004 			 */
5005 			appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
5006 							 quote_identifier(server->servername));
5007 
5008 			appendStringInfoString(&buf, "schema_name ");
5009 			deparseStringLiteral(&buf, stmt->remote_schema);
5010 			appendStringInfoString(&buf, ", table_name ");
5011 			deparseStringLiteral(&buf, tablename);
5012 
5013 			appendStringInfoString(&buf, ");");
5014 
5015 			commands = lappend(commands, pstrdup(buf.data));
5016 		}
5017 	}
5018 	PG_FINALLY();
5019 	{
5020 		if (res)
5021 			PQclear(res);
5022 	}
5023 	PG_END_TRY();
5024 
5025 	ReleaseConnection(conn);
5026 
5027 	return commands;
5028 }
5029 
5030 /*
5031  * Assess whether the join between inner and outer relations can be pushed down
5032  * to the foreign server. As a side effect, save information we obtain in this
5033  * function to PgFdwRelationInfo passed in.
5034  */
5035 static bool
5036 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
5037 				RelOptInfo *outerrel, RelOptInfo *innerrel,
5038 				JoinPathExtraData *extra)
5039 {
5040 	PgFdwRelationInfo *fpinfo;
5041 	PgFdwRelationInfo *fpinfo_o;
5042 	PgFdwRelationInfo *fpinfo_i;
5043 	ListCell   *lc;
5044 	List	   *joinclauses;
5045 
5046 	/*
5047 	 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
5048 	 * Constructing queries representing SEMI and ANTI joins is hard, hence
5049 	 * not considered right now.
5050 	 */
5051 	if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
5052 		jointype != JOIN_RIGHT && jointype != JOIN_FULL)
5053 		return false;
5054 
5055 	/*
5056 	 * If either of the joining relations is marked as unsafe to pushdown, the
5057 	 * join can not be pushed down.
5058 	 */
5059 	fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
5060 	fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
5061 	fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
5062 	if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
5063 		!fpinfo_i || !fpinfo_i->pushdown_safe)
5064 		return false;
5065 
5066 	/*
5067 	 * If joining relations have local conditions, those conditions are
5068 	 * required to be applied before joining the relations. Hence the join can
5069 	 * not be pushed down.
5070 	 */
5071 	if (fpinfo_o->local_conds || fpinfo_i->local_conds)
5072 		return false;
5073 
5074 	/*
5075 	 * Merge FDW options.  We might be tempted to do this after we have deemed
5076 	 * the foreign join to be OK.  But we must do this beforehand so that we
5077 	 * know which quals can be evaluated on the foreign server, which might
5078 	 * depend on shippable_extensions.
5079 	 */
5080 	fpinfo->server = fpinfo_o->server;
5081 	merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
5082 
5083 	/*
5084 	 * Separate restrict list into join quals and pushed-down (other) quals.
5085 	 *
5086 	 * Join quals belonging to an outer join must all be shippable, else we
5087 	 * cannot execute the join remotely.  Add such quals to 'joinclauses'.
5088 	 *
5089 	 * Add other quals to fpinfo->remote_conds if they are shippable, else to
5090 	 * fpinfo->local_conds.  In an inner join it's okay to execute conditions
5091 	 * either locally or remotely; the same is true for pushed-down conditions
5092 	 * at an outer join.
5093 	 *
5094 	 * Note we might return failure after having already scribbled on
5095 	 * fpinfo->remote_conds and fpinfo->local_conds.  That's okay because we
5096 	 * won't consult those lists again if we deem the join unshippable.
5097 	 */
5098 	joinclauses = NIL;
5099 	foreach(lc, extra->restrictlist)
5100 	{
5101 		RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5102 		bool		is_remote_clause = is_foreign_expr(root, joinrel,
5103 													   rinfo->clause);
5104 
5105 		if (IS_OUTER_JOIN(jointype) &&
5106 			!RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
5107 		{
5108 			if (!is_remote_clause)
5109 				return false;
5110 			joinclauses = lappend(joinclauses, rinfo);
5111 		}
5112 		else
5113 		{
5114 			if (is_remote_clause)
5115 				fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5116 			else
5117 				fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5118 		}
5119 	}
5120 
5121 	/*
5122 	 * deparseExplicitTargetList() isn't smart enough to handle anything other
5123 	 * than a Var.  In particular, if there's some PlaceHolderVar that would
5124 	 * need to be evaluated within this join tree (because there's an upper
5125 	 * reference to a quantity that may go to NULL as a result of an outer
5126 	 * join), then we can't try to push the join down because we'll fail when
5127 	 * we get to deparseExplicitTargetList().  However, a PlaceHolderVar that
5128 	 * needs to be evaluated *at the top* of this join tree is OK, because we
5129 	 * can do that locally after fetching the results from the remote side.
5130 	 */
5131 	foreach(lc, root->placeholder_list)
5132 	{
5133 		PlaceHolderInfo *phinfo = lfirst(lc);
5134 		Relids		relids;
5135 
5136 		/* PlaceHolderInfo refers to parent relids, not child relids. */
5137 		relids = IS_OTHER_REL(joinrel) ?
5138 			joinrel->top_parent_relids : joinrel->relids;
5139 
5140 		if (bms_is_subset(phinfo->ph_eval_at, relids) &&
5141 			bms_nonempty_difference(relids, phinfo->ph_eval_at))
5142 			return false;
5143 	}
5144 
5145 	/* Save the join clauses, for later use. */
5146 	fpinfo->joinclauses = joinclauses;
5147 
5148 	fpinfo->outerrel = outerrel;
5149 	fpinfo->innerrel = innerrel;
5150 	fpinfo->jointype = jointype;
5151 
5152 	/*
5153 	 * By default, both the input relations are not required to be deparsed as
5154 	 * subqueries, but there might be some relations covered by the input
5155 	 * relations that are required to be deparsed as subqueries, so save the
5156 	 * relids of those relations for later use by the deparser.
5157 	 */
5158 	fpinfo->make_outerrel_subquery = false;
5159 	fpinfo->make_innerrel_subquery = false;
5160 	Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
5161 	Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
5162 	fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
5163 											fpinfo_i->lower_subquery_rels);
5164 
5165 	/*
5166 	 * Pull the other remote conditions from the joining relations into join
5167 	 * clauses or other remote clauses (remote_conds) of this relation
5168 	 * wherever possible. This avoids building subqueries at every join step.
5169 	 *
5170 	 * For an inner join, clauses from both the relations are added to the
5171 	 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
5172 	 * the outer side are added to remote_conds since those can be evaluated
5173 	 * after the join is evaluated. The clauses from inner side are added to
5174 	 * the joinclauses, since they need to be evaluated while constructing the
5175 	 * join.
5176 	 *
5177 	 * For a FULL OUTER JOIN, the other clauses from either relation can not
5178 	 * be added to the joinclauses or remote_conds, since each relation acts
5179 	 * as an outer relation for the other.
5180 	 *
5181 	 * The joining sides can not have local conditions, thus no need to test
5182 	 * shippability of the clauses being pulled up.
5183 	 */
5184 	switch (jointype)
5185 	{
5186 		case JOIN_INNER:
5187 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5188 											   fpinfo_i->remote_conds);
5189 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5190 											   fpinfo_o->remote_conds);
5191 			break;
5192 
5193 		case JOIN_LEFT:
5194 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5195 											  fpinfo_i->remote_conds);
5196 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5197 											   fpinfo_o->remote_conds);
5198 			break;
5199 
5200 		case JOIN_RIGHT:
5201 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5202 											  fpinfo_o->remote_conds);
5203 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5204 											   fpinfo_i->remote_conds);
5205 			break;
5206 
5207 		case JOIN_FULL:
5208 
5209 			/*
5210 			 * In this case, if any of the input relations has conditions, we
5211 			 * need to deparse that relation as a subquery so that the
5212 			 * conditions can be evaluated before the join.  Remember it in
5213 			 * the fpinfo of this relation so that the deparser can take
5214 			 * appropriate action.  Also, save the relids of base relations
5215 			 * covered by that relation for later use by the deparser.
5216 			 */
5217 			if (fpinfo_o->remote_conds)
5218 			{
5219 				fpinfo->make_outerrel_subquery = true;
5220 				fpinfo->lower_subquery_rels =
5221 					bms_add_members(fpinfo->lower_subquery_rels,
5222 									outerrel->relids);
5223 			}
5224 			if (fpinfo_i->remote_conds)
5225 			{
5226 				fpinfo->make_innerrel_subquery = true;
5227 				fpinfo->lower_subquery_rels =
5228 					bms_add_members(fpinfo->lower_subquery_rels,
5229 									innerrel->relids);
5230 			}
5231 			break;
5232 
5233 		default:
5234 			/* Should not happen, we have just checked this above */
5235 			elog(ERROR, "unsupported join type %d", jointype);
5236 	}
5237 
5238 	/*
5239 	 * For an inner join, all restrictions can be treated alike. Treating the
5240 	 * pushed down conditions as join conditions allows a top level full outer
5241 	 * join to be deparsed without requiring subqueries.
5242 	 */
5243 	if (jointype == JOIN_INNER)
5244 	{
5245 		Assert(!fpinfo->joinclauses);
5246 		fpinfo->joinclauses = fpinfo->remote_conds;
5247 		fpinfo->remote_conds = NIL;
5248 	}
5249 
5250 	/* Mark that this join can be pushed down safely */
5251 	fpinfo->pushdown_safe = true;
5252 
5253 	/* Get user mapping */
5254 	if (fpinfo->use_remote_estimate)
5255 	{
5256 		if (fpinfo_o->use_remote_estimate)
5257 			fpinfo->user = fpinfo_o->user;
5258 		else
5259 			fpinfo->user = fpinfo_i->user;
5260 	}
5261 	else
5262 		fpinfo->user = NULL;
5263 
5264 	/*
5265 	 * Set # of retrieved rows and cached relation costs to some negative
5266 	 * value, so that we can detect when they are set to some sensible values,
5267 	 * during one (usually the first) of the calls to estimate_path_cost_size.
5268 	 */
5269 	fpinfo->retrieved_rows = -1;
5270 	fpinfo->rel_startup_cost = -1;
5271 	fpinfo->rel_total_cost = -1;
5272 
5273 	/*
5274 	 * Set the string describing this join relation to be used in EXPLAIN
5275 	 * output of corresponding ForeignScan.  Note that the decoration we add
5276 	 * to the base relation names mustn't include any digits, or it'll confuse
5277 	 * postgresExplainForeignScan.
5278 	 */
5279 	fpinfo->relation_name = psprintf("(%s) %s JOIN (%s)",
5280 									 fpinfo_o->relation_name,
5281 									 get_jointype_name(fpinfo->jointype),
5282 									 fpinfo_i->relation_name);
5283 
5284 	/*
5285 	 * Set the relation index.  This is defined as the position of this
5286 	 * joinrel in the join_rel_list list plus the length of the rtable list.
5287 	 * Note that since this joinrel is at the end of the join_rel_list list
5288 	 * when we are called, we can get the position by list_length.
5289 	 */
5290 	Assert(fpinfo->relation_index == 0);	/* shouldn't be set yet */
5291 	fpinfo->relation_index =
5292 		list_length(root->parse->rtable) + list_length(root->join_rel_list);
5293 
5294 	return true;
5295 }
5296 
5297 static void
5298 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
5299 								Path *epq_path)
5300 {
5301 	List	   *useful_pathkeys_list = NIL; /* List of all pathkeys */
5302 	ListCell   *lc;
5303 
5304 	useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
5305 
5306 	/* Create one path for each set of pathkeys we found above. */
5307 	foreach(lc, useful_pathkeys_list)
5308 	{
5309 		double		rows;
5310 		int			width;
5311 		Cost		startup_cost;
5312 		Cost		total_cost;
5313 		List	   *useful_pathkeys = lfirst(lc);
5314 		Path	   *sorted_epq_path;
5315 
5316 		estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL,
5317 								&rows, &width, &startup_cost, &total_cost);
5318 
5319 		/*
5320 		 * The EPQ path must be at least as well sorted as the path itself, in
5321 		 * case it gets used as input to a mergejoin.
5322 		 */
5323 		sorted_epq_path = epq_path;
5324 		if (sorted_epq_path != NULL &&
5325 			!pathkeys_contained_in(useful_pathkeys,
5326 								   sorted_epq_path->pathkeys))
5327 			sorted_epq_path = (Path *)
5328 				create_sort_path(root,
5329 								 rel,
5330 								 sorted_epq_path,
5331 								 useful_pathkeys,
5332 								 -1.0);
5333 
5334 		if (IS_SIMPLE_REL(rel))
5335 			add_path(rel, (Path *)
5336 					 create_foreignscan_path(root, rel,
5337 											 NULL,
5338 											 rows,
5339 											 startup_cost,
5340 											 total_cost,
5341 											 useful_pathkeys,
5342 											 rel->lateral_relids,
5343 											 sorted_epq_path,
5344 											 NIL));
5345 		else
5346 			add_path(rel, (Path *)
5347 					 create_foreign_join_path(root, rel,
5348 											  NULL,
5349 											  rows,
5350 											  startup_cost,
5351 											  total_cost,
5352 											  useful_pathkeys,
5353 											  rel->lateral_relids,
5354 											  sorted_epq_path,
5355 											  NIL));
5356 	}
5357 }
5358 
5359 /*
5360  * Parse options from foreign server and apply them to fpinfo.
5361  *
5362  * New options might also require tweaking merge_fdw_options().
5363  */
5364 static void
5365 apply_server_options(PgFdwRelationInfo *fpinfo)
5366 {
5367 	ListCell   *lc;
5368 
5369 	foreach(lc, fpinfo->server->options)
5370 	{
5371 		DefElem    *def = (DefElem *) lfirst(lc);
5372 
5373 		if (strcmp(def->defname, "use_remote_estimate") == 0)
5374 			fpinfo->use_remote_estimate = defGetBoolean(def);
5375 		else if (strcmp(def->defname, "fdw_startup_cost") == 0)
5376 			fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
5377 		else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
5378 			fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
5379 		else if (strcmp(def->defname, "extensions") == 0)
5380 			fpinfo->shippable_extensions =
5381 				ExtractExtensionList(defGetString(def), false);
5382 		else if (strcmp(def->defname, "fetch_size") == 0)
5383 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5384 	}
5385 }
5386 
5387 /*
5388  * Parse options from foreign table and apply them to fpinfo.
5389  *
5390  * New options might also require tweaking merge_fdw_options().
5391  */
5392 static void
5393 apply_table_options(PgFdwRelationInfo *fpinfo)
5394 {
5395 	ListCell   *lc;
5396 
5397 	foreach(lc, fpinfo->table->options)
5398 	{
5399 		DefElem    *def = (DefElem *) lfirst(lc);
5400 
5401 		if (strcmp(def->defname, "use_remote_estimate") == 0)
5402 			fpinfo->use_remote_estimate = defGetBoolean(def);
5403 		else if (strcmp(def->defname, "fetch_size") == 0)
5404 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5405 	}
5406 }
5407 
5408 /*
5409  * Merge FDW options from input relations into a new set of options for a join
5410  * or an upper rel.
5411  *
5412  * For a join relation, FDW-specific information about the inner and outer
5413  * relations is provided using fpinfo_i and fpinfo_o.  For an upper relation,
5414  * fpinfo_o provides the information for the input relation; fpinfo_i is
5415  * expected to NULL.
5416  */
5417 static void
5418 merge_fdw_options(PgFdwRelationInfo *fpinfo,
5419 				  const PgFdwRelationInfo *fpinfo_o,
5420 				  const PgFdwRelationInfo *fpinfo_i)
5421 {
5422 	/* We must always have fpinfo_o. */
5423 	Assert(fpinfo_o);
5424 
5425 	/* fpinfo_i may be NULL, but if present the servers must both match. */
5426 	Assert(!fpinfo_i ||
5427 		   fpinfo_i->server->serverid == fpinfo_o->server->serverid);
5428 
5429 	/*
5430 	 * Copy the server specific FDW options.  (For a join, both relations come
5431 	 * from the same server, so the server options should have the same value
5432 	 * for both relations.)
5433 	 */
5434 	fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
5435 	fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
5436 	fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
5437 	fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
5438 	fpinfo->fetch_size = fpinfo_o->fetch_size;
5439 
5440 	/* Merge the table level options from either side of the join. */
5441 	if (fpinfo_i)
5442 	{
5443 		/*
5444 		 * We'll prefer to use remote estimates for this join if any table
5445 		 * from either side of the join is using remote estimates.  This is
5446 		 * most likely going to be preferred since they're already willing to
5447 		 * pay the price of a round trip to get the remote EXPLAIN.  In any
5448 		 * case it's not entirely clear how we might otherwise handle this
5449 		 * best.
5450 		 */
5451 		fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
5452 			fpinfo_i->use_remote_estimate;
5453 
5454 		/*
5455 		 * Set fetch size to maximum of the joining sides, since we are
5456 		 * expecting the rows returned by the join to be proportional to the
5457 		 * relation sizes.
5458 		 */
5459 		fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
5460 	}
5461 }
5462 
5463 /*
5464  * postgresGetForeignJoinPaths
5465  *		Add possible ForeignPath to joinrel, if join is safe to push down.
5466  */
5467 static void
5468 postgresGetForeignJoinPaths(PlannerInfo *root,
5469 							RelOptInfo *joinrel,
5470 							RelOptInfo *outerrel,
5471 							RelOptInfo *innerrel,
5472 							JoinType jointype,
5473 							JoinPathExtraData *extra)
5474 {
5475 	PgFdwRelationInfo *fpinfo;
5476 	ForeignPath *joinpath;
5477 	double		rows;
5478 	int			width;
5479 	Cost		startup_cost;
5480 	Cost		total_cost;
5481 	Path	   *epq_path;		/* Path to create plan to be executed when
5482 								 * EvalPlanQual gets triggered. */
5483 
5484 	/*
5485 	 * Skip if this join combination has been considered already.
5486 	 */
5487 	if (joinrel->fdw_private)
5488 		return;
5489 
5490 	/*
5491 	 * This code does not work for joins with lateral references, since those
5492 	 * must have parameterized paths, which we don't generate yet.
5493 	 */
5494 	if (!bms_is_empty(joinrel->lateral_relids))
5495 		return;
5496 
5497 	/*
5498 	 * Create unfinished PgFdwRelationInfo entry which is used to indicate
5499 	 * that the join relation is already considered, so that we won't waste
5500 	 * time in judging safety of join pushdown and adding the same paths again
5501 	 * if found safe. Once we know that this join can be pushed down, we fill
5502 	 * the entry.
5503 	 */
5504 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5505 	fpinfo->pushdown_safe = false;
5506 	joinrel->fdw_private = fpinfo;
5507 	/* attrs_used is only for base relations. */
5508 	fpinfo->attrs_used = NULL;
5509 
5510 	/*
5511 	 * If there is a possibility that EvalPlanQual will be executed, we need
5512 	 * to be able to reconstruct the row using scans of the base relations.
5513 	 * GetExistingLocalJoinPath will find a suitable path for this purpose in
5514 	 * the path list of the joinrel, if one exists.  We must be careful to
5515 	 * call it before adding any ForeignPath, since the ForeignPath might
5516 	 * dominate the only suitable local path available.  We also do it before
5517 	 * calling foreign_join_ok(), since that function updates fpinfo and marks
5518 	 * it as pushable if the join is found to be pushable.
5519 	 */
5520 	if (root->parse->commandType == CMD_DELETE ||
5521 		root->parse->commandType == CMD_UPDATE ||
5522 		root->rowMarks)
5523 	{
5524 		epq_path = GetExistingLocalJoinPath(joinrel);
5525 		if (!epq_path)
5526 		{
5527 			elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
5528 			return;
5529 		}
5530 	}
5531 	else
5532 		epq_path = NULL;
5533 
5534 	if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
5535 	{
5536 		/* Free path required for EPQ if we copied one; we don't need it now */
5537 		if (epq_path)
5538 			pfree(epq_path);
5539 		return;
5540 	}
5541 
5542 	/*
5543 	 * Compute the selectivity and cost of the local_conds, so we don't have
5544 	 * to do it over again for each path. The best we can do for these
5545 	 * conditions is to estimate selectivity on the basis of local statistics.
5546 	 * The local conditions are applied after the join has been computed on
5547 	 * the remote side like quals in WHERE clause, so pass jointype as
5548 	 * JOIN_INNER.
5549 	 */
5550 	fpinfo->local_conds_sel = clauselist_selectivity(root,
5551 													 fpinfo->local_conds,
5552 													 0,
5553 													 JOIN_INNER,
5554 													 NULL);
5555 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5556 
5557 	/*
5558 	 * If we are going to estimate costs locally, estimate the join clause
5559 	 * selectivity here while we have special join info.
5560 	 */
5561 	if (!fpinfo->use_remote_estimate)
5562 		fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
5563 														0, fpinfo->jointype,
5564 														extra->sjinfo);
5565 
5566 	/* Estimate costs for bare join relation */
5567 	estimate_path_cost_size(root, joinrel, NIL, NIL, NULL,
5568 							&rows, &width, &startup_cost, &total_cost);
5569 	/* Now update this information in the joinrel */
5570 	joinrel->rows = rows;
5571 	joinrel->reltarget->width = width;
5572 	fpinfo->rows = rows;
5573 	fpinfo->width = width;
5574 	fpinfo->startup_cost = startup_cost;
5575 	fpinfo->total_cost = total_cost;
5576 
5577 	/*
5578 	 * Create a new join path and add it to the joinrel which represents a
5579 	 * join between foreign tables.
5580 	 */
5581 	joinpath = create_foreign_join_path(root,
5582 										joinrel,
5583 										NULL,	/* default pathtarget */
5584 										rows,
5585 										startup_cost,
5586 										total_cost,
5587 										NIL,	/* no pathkeys */
5588 										joinrel->lateral_relids,
5589 										epq_path,
5590 										NIL);	/* no fdw_private */
5591 
5592 	/* Add generated path into joinrel by add_path(). */
5593 	add_path(joinrel, (Path *) joinpath);
5594 
5595 	/* Consider pathkeys for the join relation */
5596 	add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
5597 
5598 	/* XXX Consider parameterized paths for the join relation */
5599 }
5600 
5601 /*
5602  * Assess whether the aggregation, grouping and having operations can be pushed
5603  * down to the foreign server.  As a side effect, save information we obtain in
5604  * this function to PgFdwRelationInfo of the input relation.
5605  */
5606 static bool
5607 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
5608 					Node *havingQual)
5609 {
5610 	Query	   *query = root->parse;
5611 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
5612 	PathTarget *grouping_target = grouped_rel->reltarget;
5613 	PgFdwRelationInfo *ofpinfo;
5614 	ListCell   *lc;
5615 	int			i;
5616 	List	   *tlist = NIL;
5617 
5618 	/* We currently don't support pushing Grouping Sets. */
5619 	if (query->groupingSets)
5620 		return false;
5621 
5622 	/* Get the fpinfo of the underlying scan relation. */
5623 	ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
5624 
5625 	/*
5626 	 * If underlying scan relation has any local conditions, those conditions
5627 	 * are required to be applied before performing aggregation.  Hence the
5628 	 * aggregate cannot be pushed down.
5629 	 */
5630 	if (ofpinfo->local_conds)
5631 		return false;
5632 
5633 	/*
5634 	 * Examine grouping expressions, as well as other expressions we'd need to
5635 	 * compute, and check whether they are safe to push down to the foreign
5636 	 * server.  All GROUP BY expressions will be part of the grouping target
5637 	 * and thus there is no need to search for them separately.  Add grouping
5638 	 * expressions into target list which will be passed to foreign server.
5639 	 *
5640 	 * A tricky fine point is that we must not put any expression into the
5641 	 * target list that is just a foreign param (that is, something that
5642 	 * deparse.c would conclude has to be sent to the foreign server).  If we
5643 	 * do, the expression will also appear in the fdw_exprs list of the plan
5644 	 * node, and setrefs.c will get confused and decide that the fdw_exprs
5645 	 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
5646 	 * a broken plan.  Somewhat oddly, it's OK if the expression contains such
5647 	 * a node, as long as it's not at top level; then no match is possible.
5648 	 */
5649 	i = 0;
5650 	foreach(lc, grouping_target->exprs)
5651 	{
5652 		Expr	   *expr = (Expr *) lfirst(lc);
5653 		Index		sgref = get_pathtarget_sortgroupref(grouping_target, i);
5654 		ListCell   *l;
5655 
5656 		/* Check whether this expression is part of GROUP BY clause */
5657 		if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
5658 		{
5659 			TargetEntry *tle;
5660 
5661 			/*
5662 			 * If any GROUP BY expression is not shippable, then we cannot
5663 			 * push down aggregation to the foreign server.
5664 			 */
5665 			if (!is_foreign_expr(root, grouped_rel, expr))
5666 				return false;
5667 
5668 			/*
5669 			 * If it would be a foreign param, we can't put it into the tlist,
5670 			 * so we have to fail.
5671 			 */
5672 			if (is_foreign_param(root, grouped_rel, expr))
5673 				return false;
5674 
5675 			/*
5676 			 * Pushable, so add to tlist.  We need to create a TLE for this
5677 			 * expression and apply the sortgroupref to it.  We cannot use
5678 			 * add_to_flat_tlist() here because that avoids making duplicate
5679 			 * entries in the tlist.  If there are duplicate entries with
5680 			 * distinct sortgrouprefs, we have to duplicate that situation in
5681 			 * the output tlist.
5682 			 */
5683 			tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
5684 			tle->ressortgroupref = sgref;
5685 			tlist = lappend(tlist, tle);
5686 		}
5687 		else
5688 		{
5689 			/*
5690 			 * Non-grouping expression we need to compute.  Can we ship it
5691 			 * as-is to the foreign server?
5692 			 */
5693 			if (is_foreign_expr(root, grouped_rel, expr) &&
5694 				!is_foreign_param(root, grouped_rel, expr))
5695 			{
5696 				/* Yes, so add to tlist as-is; OK to suppress duplicates */
5697 				tlist = add_to_flat_tlist(tlist, list_make1(expr));
5698 			}
5699 			else
5700 			{
5701 				/* Not pushable as a whole; extract its Vars and aggregates */
5702 				List	   *aggvars;
5703 
5704 				aggvars = pull_var_clause((Node *) expr,
5705 										  PVC_INCLUDE_AGGREGATES);
5706 
5707 				/*
5708 				 * If any aggregate expression is not shippable, then we
5709 				 * cannot push down aggregation to the foreign server.  (We
5710 				 * don't have to check is_foreign_param, since that certainly
5711 				 * won't return true for any such expression.)
5712 				 */
5713 				if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
5714 					return false;
5715 
5716 				/*
5717 				 * Add aggregates, if any, into the targetlist.  Plain Vars
5718 				 * outside an aggregate can be ignored, because they should be
5719 				 * either same as some GROUP BY column or part of some GROUP
5720 				 * BY expression.  In either case, they are already part of
5721 				 * the targetlist and thus no need to add them again.  In fact
5722 				 * including plain Vars in the tlist when they do not match a
5723 				 * GROUP BY column would cause the foreign server to complain
5724 				 * that the shipped query is invalid.
5725 				 */
5726 				foreach(l, aggvars)
5727 				{
5728 					Expr	   *expr = (Expr *) lfirst(l);
5729 
5730 					if (IsA(expr, Aggref))
5731 						tlist = add_to_flat_tlist(tlist, list_make1(expr));
5732 				}
5733 			}
5734 		}
5735 
5736 		i++;
5737 	}
5738 
5739 	/*
5740 	 * Classify the pushable and non-pushable HAVING clauses and save them in
5741 	 * remote_conds and local_conds of the grouped rel's fpinfo.
5742 	 */
5743 	if (havingQual)
5744 	{
5745 		ListCell   *lc;
5746 
5747 		foreach(lc, (List *) havingQual)
5748 		{
5749 			Expr	   *expr = (Expr *) lfirst(lc);
5750 			RestrictInfo *rinfo;
5751 
5752 			/*
5753 			 * Currently, the core code doesn't wrap havingQuals in
5754 			 * RestrictInfos, so we must make our own.
5755 			 */
5756 			Assert(!IsA(expr, RestrictInfo));
5757 			rinfo = make_restrictinfo(root,
5758 									  expr,
5759 									  true,
5760 									  false,
5761 									  false,
5762 									  root->qual_security_level,
5763 									  grouped_rel->relids,
5764 									  NULL,
5765 									  NULL);
5766 			if (is_foreign_expr(root, grouped_rel, expr))
5767 				fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5768 			else
5769 				fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5770 		}
5771 	}
5772 
5773 	/*
5774 	 * If there are any local conditions, pull Vars and aggregates from it and
5775 	 * check whether they are safe to pushdown or not.
5776 	 */
5777 	if (fpinfo->local_conds)
5778 	{
5779 		List	   *aggvars = NIL;
5780 		ListCell   *lc;
5781 
5782 		foreach(lc, fpinfo->local_conds)
5783 		{
5784 			RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5785 
5786 			aggvars = list_concat(aggvars,
5787 								  pull_var_clause((Node *) rinfo->clause,
5788 												  PVC_INCLUDE_AGGREGATES));
5789 		}
5790 
5791 		foreach(lc, aggvars)
5792 		{
5793 			Expr	   *expr = (Expr *) lfirst(lc);
5794 
5795 			/*
5796 			 * If aggregates within local conditions are not safe to push
5797 			 * down, then we cannot push down the query.  Vars are already
5798 			 * part of GROUP BY clause which are checked above, so no need to
5799 			 * access them again here.  Again, we need not check
5800 			 * is_foreign_param for a foreign aggregate.
5801 			 */
5802 			if (IsA(expr, Aggref))
5803 			{
5804 				if (!is_foreign_expr(root, grouped_rel, expr))
5805 					return false;
5806 
5807 				tlist = add_to_flat_tlist(tlist, list_make1(expr));
5808 			}
5809 		}
5810 	}
5811 
5812 	/* Store generated targetlist */
5813 	fpinfo->grouped_tlist = tlist;
5814 
5815 	/* Safe to pushdown */
5816 	fpinfo->pushdown_safe = true;
5817 
5818 	/*
5819 	 * Set # of retrieved rows and cached relation costs to some negative
5820 	 * value, so that we can detect when they are set to some sensible values,
5821 	 * during one (usually the first) of the calls to estimate_path_cost_size.
5822 	 */
5823 	fpinfo->retrieved_rows = -1;
5824 	fpinfo->rel_startup_cost = -1;
5825 	fpinfo->rel_total_cost = -1;
5826 
5827 	/*
5828 	 * Set the string describing this grouped relation to be used in EXPLAIN
5829 	 * output of corresponding ForeignScan.  Note that the decoration we add
5830 	 * to the base relation name mustn't include any digits, or it'll confuse
5831 	 * postgresExplainForeignScan.
5832 	 */
5833 	fpinfo->relation_name = psprintf("Aggregate on (%s)",
5834 									 ofpinfo->relation_name);
5835 
5836 	return true;
5837 }
5838 
5839 /*
5840  * postgresGetForeignUpperPaths
5841  *		Add paths for post-join operations like aggregation, grouping etc. if
5842  *		corresponding operations are safe to push down.
5843  */
5844 static void
5845 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
5846 							 RelOptInfo *input_rel, RelOptInfo *output_rel,
5847 							 void *extra)
5848 {
5849 	PgFdwRelationInfo *fpinfo;
5850 
5851 	/*
5852 	 * If input rel is not safe to pushdown, then simply return as we cannot
5853 	 * perform any post-join operations on the foreign server.
5854 	 */
5855 	if (!input_rel->fdw_private ||
5856 		!((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
5857 		return;
5858 
5859 	/* Ignore stages we don't support; and skip any duplicate calls. */
5860 	if ((stage != UPPERREL_GROUP_AGG &&
5861 		 stage != UPPERREL_ORDERED &&
5862 		 stage != UPPERREL_FINAL) ||
5863 		output_rel->fdw_private)
5864 		return;
5865 
5866 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5867 	fpinfo->pushdown_safe = false;
5868 	fpinfo->stage = stage;
5869 	output_rel->fdw_private = fpinfo;
5870 
5871 	switch (stage)
5872 	{
5873 		case UPPERREL_GROUP_AGG:
5874 			add_foreign_grouping_paths(root, input_rel, output_rel,
5875 									   (GroupPathExtraData *) extra);
5876 			break;
5877 		case UPPERREL_ORDERED:
5878 			add_foreign_ordered_paths(root, input_rel, output_rel);
5879 			break;
5880 		case UPPERREL_FINAL:
5881 			add_foreign_final_paths(root, input_rel, output_rel,
5882 									(FinalPathExtraData *) extra);
5883 			break;
5884 		default:
5885 			elog(ERROR, "unexpected upper relation: %d", (int) stage);
5886 			break;
5887 	}
5888 }
5889 
5890 /*
5891  * add_foreign_grouping_paths
5892  *		Add foreign path for grouping and/or aggregation.
5893  *
5894  * Given input_rel represents the underlying scan.  The paths are added to the
5895  * given grouped_rel.
5896  */
5897 static void
5898 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
5899 						   RelOptInfo *grouped_rel,
5900 						   GroupPathExtraData *extra)
5901 {
5902 	Query	   *parse = root->parse;
5903 	PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5904 	PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
5905 	ForeignPath *grouppath;
5906 	double		rows;
5907 	int			width;
5908 	Cost		startup_cost;
5909 	Cost		total_cost;
5910 
5911 	/* Nothing to be done, if there is no grouping or aggregation required. */
5912 	if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
5913 		!root->hasHavingQual)
5914 		return;
5915 
5916 	Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
5917 		   extra->patype == PARTITIONWISE_AGGREGATE_FULL);
5918 
5919 	/* save the input_rel as outerrel in fpinfo */
5920 	fpinfo->outerrel = input_rel;
5921 
5922 	/*
5923 	 * Copy foreign table, foreign server, user mapping, FDW options etc.
5924 	 * details from the input relation's fpinfo.
5925 	 */
5926 	fpinfo->table = ifpinfo->table;
5927 	fpinfo->server = ifpinfo->server;
5928 	fpinfo->user = ifpinfo->user;
5929 	merge_fdw_options(fpinfo, ifpinfo, NULL);
5930 
5931 	/*
5932 	 * Assess if it is safe to push down aggregation and grouping.
5933 	 *
5934 	 * Use HAVING qual from extra. In case of child partition, it will have
5935 	 * translated Vars.
5936 	 */
5937 	if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
5938 		return;
5939 
5940 	/*
5941 	 * Compute the selectivity and cost of the local_conds, so we don't have
5942 	 * to do it over again for each path.  (Currently we create just a single
5943 	 * path here, but in future it would be possible that we build more paths
5944 	 * such as pre-sorted paths as in postgresGetForeignPaths and
5945 	 * postgresGetForeignJoinPaths.)  The best we can do for these conditions
5946 	 * is to estimate selectivity on the basis of local statistics.
5947 	 */
5948 	fpinfo->local_conds_sel = clauselist_selectivity(root,
5949 													 fpinfo->local_conds,
5950 													 0,
5951 													 JOIN_INNER,
5952 													 NULL);
5953 
5954 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5955 
5956 	/* Estimate the cost of push down */
5957 	estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL,
5958 							&rows, &width, &startup_cost, &total_cost);
5959 
5960 	/* Now update this information in the fpinfo */
5961 	fpinfo->rows = rows;
5962 	fpinfo->width = width;
5963 	fpinfo->startup_cost = startup_cost;
5964 	fpinfo->total_cost = total_cost;
5965 
5966 	/* Create and add foreign path to the grouping relation. */
5967 	grouppath = create_foreign_upper_path(root,
5968 										  grouped_rel,
5969 										  grouped_rel->reltarget,
5970 										  rows,
5971 										  startup_cost,
5972 										  total_cost,
5973 										  NIL,	/* no pathkeys */
5974 										  NULL,
5975 										  NIL); /* no fdw_private */
5976 
5977 	/* Add generated path into grouped_rel by add_path(). */
5978 	add_path(grouped_rel, (Path *) grouppath);
5979 }
5980 
5981 /*
5982  * add_foreign_ordered_paths
5983  *		Add foreign paths for performing the final sort remotely.
5984  *
5985  * Given input_rel contains the source-data Paths.  The paths are added to the
5986  * given ordered_rel.
5987  */
5988 static void
5989 add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel,
5990 						  RelOptInfo *ordered_rel)
5991 {
5992 	Query	   *parse = root->parse;
5993 	PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5994 	PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private;
5995 	PgFdwPathExtraData *fpextra;
5996 	double		rows;
5997 	int			width;
5998 	Cost		startup_cost;
5999 	Cost		total_cost;
6000 	List	   *fdw_private;
6001 	ForeignPath *ordered_path;
6002 	ListCell   *lc;
6003 
6004 	/* Shouldn't get here unless the query has ORDER BY */
6005 	Assert(parse->sortClause);
6006 
6007 	/* We don't support cases where there are any SRFs in the targetlist */
6008 	if (parse->hasTargetSRFs)
6009 		return;
6010 
6011 	/* Save the input_rel as outerrel in fpinfo */
6012 	fpinfo->outerrel = input_rel;
6013 
6014 	/*
6015 	 * Copy foreign table, foreign server, user mapping, FDW options etc.
6016 	 * details from the input relation's fpinfo.
6017 	 */
6018 	fpinfo->table = ifpinfo->table;
6019 	fpinfo->server = ifpinfo->server;
6020 	fpinfo->user = ifpinfo->user;
6021 	merge_fdw_options(fpinfo, ifpinfo, NULL);
6022 
6023 	/*
6024 	 * If the input_rel is a base or join relation, we would already have
6025 	 * considered pushing down the final sort to the remote server when
6026 	 * creating pre-sorted foreign paths for that relation, because the
6027 	 * query_pathkeys is set to the root->sort_pathkeys in that case (see
6028 	 * standard_qp_callback()).
6029 	 */
6030 	if (input_rel->reloptkind == RELOPT_BASEREL ||
6031 		input_rel->reloptkind == RELOPT_JOINREL)
6032 	{
6033 		Assert(root->query_pathkeys == root->sort_pathkeys);
6034 
6035 		/* Safe to push down if the query_pathkeys is safe to push down */
6036 		fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe;
6037 
6038 		return;
6039 	}
6040 
6041 	/* The input_rel should be a grouping relation */
6042 	Assert(input_rel->reloptkind == RELOPT_UPPER_REL &&
6043 		   ifpinfo->stage == UPPERREL_GROUP_AGG);
6044 
6045 	/*
6046 	 * We try to create a path below by extending a simple foreign path for
6047 	 * the underlying grouping relation to perform the final sort remotely,
6048 	 * which is stored into the fdw_private list of the resulting path.
6049 	 */
6050 
6051 	/* Assess if it is safe to push down the final sort */
6052 	foreach(lc, root->sort_pathkeys)
6053 	{
6054 		PathKey    *pathkey = (PathKey *) lfirst(lc);
6055 		EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
6056 		Expr	   *sort_expr;
6057 
6058 		/*
6059 		 * is_foreign_expr would detect volatile expressions as well, but
6060 		 * checking ec_has_volatile here saves some cycles.
6061 		 */
6062 		if (pathkey_ec->ec_has_volatile)
6063 			return;
6064 
6065 		/* Get the sort expression for the pathkey_ec */
6066 		sort_expr = find_em_expr_for_input_target(root,
6067 												  pathkey_ec,
6068 												  input_rel->reltarget);
6069 
6070 		/* If it's unsafe to remote, we cannot push down the final sort */
6071 		if (!is_foreign_expr(root, input_rel, sort_expr))
6072 			return;
6073 	}
6074 
6075 	/* Safe to push down */
6076 	fpinfo->pushdown_safe = true;
6077 
6078 	/* Construct PgFdwPathExtraData */
6079 	fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6080 	fpextra->target = root->upper_targets[UPPERREL_ORDERED];
6081 	fpextra->has_final_sort = true;
6082 
6083 	/* Estimate the costs of performing the final sort remotely */
6084 	estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra,
6085 							&rows, &width, &startup_cost, &total_cost);
6086 
6087 	/*
6088 	 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6089 	 * Items in the list must match order in enum FdwPathPrivateIndex.
6090 	 */
6091 	fdw_private = list_make2(makeInteger(true), makeInteger(false));
6092 
6093 	/* Create foreign ordering path */
6094 	ordered_path = create_foreign_upper_path(root,
6095 											 input_rel,
6096 											 root->upper_targets[UPPERREL_ORDERED],
6097 											 rows,
6098 											 startup_cost,
6099 											 total_cost,
6100 											 root->sort_pathkeys,
6101 											 NULL,	/* no extra plan */
6102 											 fdw_private);
6103 
6104 	/* and add it to the ordered_rel */
6105 	add_path(ordered_rel, (Path *) ordered_path);
6106 }
6107 
6108 /*
6109  * add_foreign_final_paths
6110  *		Add foreign paths for performing the final processing remotely.
6111  *
6112  * Given input_rel contains the source-data Paths.  The paths are added to the
6113  * given final_rel.
6114  */
6115 static void
6116 add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
6117 						RelOptInfo *final_rel,
6118 						FinalPathExtraData *extra)
6119 {
6120 	Query	   *parse = root->parse;
6121 	PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6122 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) final_rel->fdw_private;
6123 	bool		has_final_sort = false;
6124 	List	   *pathkeys = NIL;
6125 	PgFdwPathExtraData *fpextra;
6126 	bool		save_use_remote_estimate = false;
6127 	double		rows;
6128 	int			width;
6129 	Cost		startup_cost;
6130 	Cost		total_cost;
6131 	List	   *fdw_private;
6132 	ForeignPath *final_path;
6133 
6134 	/*
6135 	 * Currently, we only support this for SELECT commands
6136 	 */
6137 	if (parse->commandType != CMD_SELECT)
6138 		return;
6139 
6140 	/*
6141 	 * No work if there is no FOR UPDATE/SHARE clause and if there is no need
6142 	 * to add a LIMIT node
6143 	 */
6144 	if (!parse->rowMarks && !extra->limit_needed)
6145 		return;
6146 
6147 	/* We don't support cases where there are any SRFs in the targetlist */
6148 	if (parse->hasTargetSRFs)
6149 		return;
6150 
6151 	/* Save the input_rel as outerrel in fpinfo */
6152 	fpinfo->outerrel = input_rel;
6153 
6154 	/*
6155 	 * Copy foreign table, foreign server, user mapping, FDW options etc.
6156 	 * details from the input relation's fpinfo.
6157 	 */
6158 	fpinfo->table = ifpinfo->table;
6159 	fpinfo->server = ifpinfo->server;
6160 	fpinfo->user = ifpinfo->user;
6161 	merge_fdw_options(fpinfo, ifpinfo, NULL);
6162 
6163 	/*
6164 	 * If there is no need to add a LIMIT node, there might be a ForeignPath
6165 	 * in the input_rel's pathlist that implements all behavior of the query.
6166 	 * Note: we would already have accounted for the query's FOR UPDATE/SHARE
6167 	 * (if any) before we get here.
6168 	 */
6169 	if (!extra->limit_needed)
6170 	{
6171 		ListCell   *lc;
6172 
6173 		Assert(parse->rowMarks);
6174 
6175 		/*
6176 		 * Grouping and aggregation are not supported with FOR UPDATE/SHARE,
6177 		 * so the input_rel should be a base, join, or ordered relation; and
6178 		 * if it's an ordered relation, its input relation should be a base or
6179 		 * join relation.
6180 		 */
6181 		Assert(input_rel->reloptkind == RELOPT_BASEREL ||
6182 			   input_rel->reloptkind == RELOPT_JOINREL ||
6183 			   (input_rel->reloptkind == RELOPT_UPPER_REL &&
6184 				ifpinfo->stage == UPPERREL_ORDERED &&
6185 				(ifpinfo->outerrel->reloptkind == RELOPT_BASEREL ||
6186 				 ifpinfo->outerrel->reloptkind == RELOPT_JOINREL)));
6187 
6188 		foreach(lc, input_rel->pathlist)
6189 		{
6190 			Path	   *path = (Path *) lfirst(lc);
6191 
6192 			/*
6193 			 * apply_scanjoin_target_to_paths() uses create_projection_path()
6194 			 * to adjust each of its input paths if needed, whereas
6195 			 * create_ordered_paths() uses apply_projection_to_path() to do
6196 			 * that.  So the former might have put a ProjectionPath on top of
6197 			 * the ForeignPath; look through ProjectionPath and see if the
6198 			 * path underneath it is ForeignPath.
6199 			 */
6200 			if (IsA(path, ForeignPath) ||
6201 				(IsA(path, ProjectionPath) &&
6202 				 IsA(((ProjectionPath *) path)->subpath, ForeignPath)))
6203 			{
6204 				/*
6205 				 * Create foreign final path; this gets rid of a
6206 				 * no-longer-needed outer plan (if any), which makes the
6207 				 * EXPLAIN output look cleaner
6208 				 */
6209 				final_path = create_foreign_upper_path(root,
6210 													   path->parent,
6211 													   path->pathtarget,
6212 													   path->rows,
6213 													   path->startup_cost,
6214 													   path->total_cost,
6215 													   path->pathkeys,
6216 													   NULL,	/* no extra plan */
6217 													   NULL);	/* no fdw_private */
6218 
6219 				/* and add it to the final_rel */
6220 				add_path(final_rel, (Path *) final_path);
6221 
6222 				/* Safe to push down */
6223 				fpinfo->pushdown_safe = true;
6224 
6225 				return;
6226 			}
6227 		}
6228 
6229 		/*
6230 		 * If we get here it means no ForeignPaths; since we would already
6231 		 * have considered pushing down all operations for the query to the
6232 		 * remote server, give up on it.
6233 		 */
6234 		return;
6235 	}
6236 
6237 	Assert(extra->limit_needed);
6238 
6239 	/*
6240 	 * If the input_rel is an ordered relation, replace the input_rel with its
6241 	 * input relation
6242 	 */
6243 	if (input_rel->reloptkind == RELOPT_UPPER_REL &&
6244 		ifpinfo->stage == UPPERREL_ORDERED)
6245 	{
6246 		input_rel = ifpinfo->outerrel;
6247 		ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6248 		has_final_sort = true;
6249 		pathkeys = root->sort_pathkeys;
6250 	}
6251 
6252 	/* The input_rel should be a base, join, or grouping relation */
6253 	Assert(input_rel->reloptkind == RELOPT_BASEREL ||
6254 		   input_rel->reloptkind == RELOPT_JOINREL ||
6255 		   (input_rel->reloptkind == RELOPT_UPPER_REL &&
6256 			ifpinfo->stage == UPPERREL_GROUP_AGG));
6257 
6258 	/*
6259 	 * We try to create a path below by extending a simple foreign path for
6260 	 * the underlying base, join, or grouping relation to perform the final
6261 	 * sort (if has_final_sort) and the LIMIT restriction remotely, which is
6262 	 * stored into the fdw_private list of the resulting path.  (We
6263 	 * re-estimate the costs of sorting the underlying relation, if
6264 	 * has_final_sort.)
6265 	 */
6266 
6267 	/*
6268 	 * Assess if it is safe to push down the LIMIT and OFFSET to the remote
6269 	 * server
6270 	 */
6271 
6272 	/*
6273 	 * If the underlying relation has any local conditions, the LIMIT/OFFSET
6274 	 * cannot be pushed down.
6275 	 */
6276 	if (ifpinfo->local_conds)
6277 		return;
6278 
6279 	/*
6280 	 * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are
6281 	 * not safe to remote.
6282 	 */
6283 	if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) ||
6284 		!is_foreign_expr(root, input_rel, (Expr *) parse->limitCount))
6285 		return;
6286 
6287 	/* Safe to push down */
6288 	fpinfo->pushdown_safe = true;
6289 
6290 	/* Construct PgFdwPathExtraData */
6291 	fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6292 	fpextra->target = root->upper_targets[UPPERREL_FINAL];
6293 	fpextra->has_final_sort = has_final_sort;
6294 	fpextra->has_limit = extra->limit_needed;
6295 	fpextra->limit_tuples = extra->limit_tuples;
6296 	fpextra->count_est = extra->count_est;
6297 	fpextra->offset_est = extra->offset_est;
6298 
6299 	/*
6300 	 * Estimate the costs of performing the final sort and the LIMIT
6301 	 * restriction remotely.  If has_final_sort is false, we wouldn't need to
6302 	 * execute EXPLAIN anymore if use_remote_estimate, since the costs can be
6303 	 * roughly estimated using the costs we already have for the underlying
6304 	 * relation, in the same way as when use_remote_estimate is false.  Since
6305 	 * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to
6306 	 * false in that case.
6307 	 */
6308 	if (!fpextra->has_final_sort)
6309 	{
6310 		save_use_remote_estimate = ifpinfo->use_remote_estimate;
6311 		ifpinfo->use_remote_estimate = false;
6312 	}
6313 	estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra,
6314 							&rows, &width, &startup_cost, &total_cost);
6315 	if (!fpextra->has_final_sort)
6316 		ifpinfo->use_remote_estimate = save_use_remote_estimate;
6317 
6318 	/*
6319 	 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6320 	 * Items in the list must match order in enum FdwPathPrivateIndex.
6321 	 */
6322 	fdw_private = list_make2(makeInteger(has_final_sort),
6323 							 makeInteger(extra->limit_needed));
6324 
6325 	/*
6326 	 * Create foreign final path; this gets rid of a no-longer-needed outer
6327 	 * plan (if any), which makes the EXPLAIN output look cleaner
6328 	 */
6329 	final_path = create_foreign_upper_path(root,
6330 										   input_rel,
6331 										   root->upper_targets[UPPERREL_FINAL],
6332 										   rows,
6333 										   startup_cost,
6334 										   total_cost,
6335 										   pathkeys,
6336 										   NULL,	/* no extra plan */
6337 										   fdw_private);
6338 
6339 	/* and add it to the final_rel */
6340 	add_path(final_rel, (Path *) final_path);
6341 }
6342 
6343 /*
6344  * Create a tuple from the specified row of the PGresult.
6345  *
6346  * rel is the local representation of the foreign table, attinmeta is
6347  * conversion data for the rel's tupdesc, and retrieved_attrs is an
6348  * integer list of the table column numbers present in the PGresult.
6349  * fsstate is the ForeignScan plan node's execution state.
6350  * temp_context is a working context that can be reset after each tuple.
6351  *
6352  * Note: either rel or fsstate, but not both, can be NULL.  rel is NULL
6353  * if we're processing a remote join, while fsstate is NULL in a non-query
6354  * context such as ANALYZE, or if we're processing a non-scan query node.
6355  */
6356 static HeapTuple
6357 make_tuple_from_result_row(PGresult *res,
6358 						   int row,
6359 						   Relation rel,
6360 						   AttInMetadata *attinmeta,
6361 						   List *retrieved_attrs,
6362 						   ForeignScanState *fsstate,
6363 						   MemoryContext temp_context)
6364 {
6365 	HeapTuple	tuple;
6366 	TupleDesc	tupdesc;
6367 	Datum	   *values;
6368 	bool	   *nulls;
6369 	ItemPointer ctid = NULL;
6370 	ConversionLocation errpos;
6371 	ErrorContextCallback errcallback;
6372 	MemoryContext oldcontext;
6373 	ListCell   *lc;
6374 	int			j;
6375 
6376 	Assert(row < PQntuples(res));
6377 
6378 	/*
6379 	 * Do the following work in a temp context that we reset after each tuple.
6380 	 * This cleans up not only the data we have direct access to, but any
6381 	 * cruft the I/O functions might leak.
6382 	 */
6383 	oldcontext = MemoryContextSwitchTo(temp_context);
6384 
6385 	/*
6386 	 * Get the tuple descriptor for the row.  Use the rel's tupdesc if rel is
6387 	 * provided, otherwise look to the scan node's ScanTupleSlot.
6388 	 */
6389 	if (rel)
6390 		tupdesc = RelationGetDescr(rel);
6391 	else
6392 	{
6393 		Assert(fsstate);
6394 		tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
6395 	}
6396 
6397 	values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
6398 	nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
6399 	/* Initialize to nulls for any columns not present in result */
6400 	memset(nulls, true, tupdesc->natts * sizeof(bool));
6401 
6402 	/*
6403 	 * Set up and install callback to report where conversion error occurs.
6404 	 */
6405 	errpos.cur_attno = 0;
6406 	errpos.rel = rel;
6407 	errpos.fsstate = fsstate;
6408 	errcallback.callback = conversion_error_callback;
6409 	errcallback.arg = (void *) &errpos;
6410 	errcallback.previous = error_context_stack;
6411 	error_context_stack = &errcallback;
6412 
6413 	/*
6414 	 * i indexes columns in the relation, j indexes columns in the PGresult.
6415 	 */
6416 	j = 0;
6417 	foreach(lc, retrieved_attrs)
6418 	{
6419 		int			i = lfirst_int(lc);
6420 		char	   *valstr;
6421 
6422 		/* fetch next column's textual value */
6423 		if (PQgetisnull(res, row, j))
6424 			valstr = NULL;
6425 		else
6426 			valstr = PQgetvalue(res, row, j);
6427 
6428 		/*
6429 		 * convert value to internal representation
6430 		 *
6431 		 * Note: we ignore system columns other than ctid and oid in result
6432 		 */
6433 		errpos.cur_attno = i;
6434 		if (i > 0)
6435 		{
6436 			/* ordinary column */
6437 			Assert(i <= tupdesc->natts);
6438 			nulls[i - 1] = (valstr == NULL);
6439 			/* Apply the input function even to nulls, to support domains */
6440 			values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
6441 											  valstr,
6442 											  attinmeta->attioparams[i - 1],
6443 											  attinmeta->atttypmods[i - 1]);
6444 		}
6445 		else if (i == SelfItemPointerAttributeNumber)
6446 		{
6447 			/* ctid */
6448 			if (valstr != NULL)
6449 			{
6450 				Datum		datum;
6451 
6452 				datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
6453 				ctid = (ItemPointer) DatumGetPointer(datum);
6454 			}
6455 		}
6456 		errpos.cur_attno = 0;
6457 
6458 		j++;
6459 	}
6460 
6461 	/* Uninstall error context callback. */
6462 	error_context_stack = errcallback.previous;
6463 
6464 	/*
6465 	 * Check we got the expected number of columns.  Note: j == 0 and
6466 	 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
6467 	 */
6468 	if (j > 0 && j != PQnfields(res))
6469 		elog(ERROR, "remote query result does not match the foreign table");
6470 
6471 	/*
6472 	 * Build the result tuple in caller's memory context.
6473 	 */
6474 	MemoryContextSwitchTo(oldcontext);
6475 
6476 	tuple = heap_form_tuple(tupdesc, values, nulls);
6477 
6478 	/*
6479 	 * If we have a CTID to return, install it in both t_self and t_ctid.
6480 	 * t_self is the normal place, but if the tuple is converted to a
6481 	 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
6482 	 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
6483 	 */
6484 	if (ctid)
6485 		tuple->t_self = tuple->t_data->t_ctid = *ctid;
6486 
6487 	/*
6488 	 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
6489 	 * heap_form_tuple.  heap_form_tuple actually creates the tuple with
6490 	 * DatumTupleFields, not HeapTupleFields, but the executor expects
6491 	 * HeapTupleFields and will happily extract system columns on that
6492 	 * assumption.  If we don't do this then, for example, the tuple length
6493 	 * ends up in the xmin field, which isn't what we want.
6494 	 */
6495 	HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
6496 	HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
6497 	HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
6498 
6499 	/* Clean up */
6500 	MemoryContextReset(temp_context);
6501 
6502 	return tuple;
6503 }
6504 
6505 /*
6506  * Callback function which is called when error occurs during column value
6507  * conversion.  Print names of column and relation.
6508  *
6509  * Note that this function mustn't do any catalog lookups, since we are in
6510  * an already-failed transaction.  Fortunately, we can get the needed info
6511  * from the relation or the query's rangetable instead.
6512  */
6513 static void
6514 conversion_error_callback(void *arg)
6515 {
6516 	ConversionLocation *errpos = (ConversionLocation *) arg;
6517 	Relation	rel = errpos->rel;
6518 	ForeignScanState *fsstate = errpos->fsstate;
6519 	const char *attname = NULL;
6520 	const char *relname = NULL;
6521 	bool		is_wholerow = false;
6522 
6523 	/*
6524 	 * If we're in a scan node, always use aliases from the rangetable, for
6525 	 * consistency between the simple-relation and remote-join cases.  Look at
6526 	 * the relation's tupdesc only if we're not in a scan node.
6527 	 */
6528 	if (fsstate)
6529 	{
6530 		/* ForeignScan case */
6531 		ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
6532 		int			varno = 0;
6533 		AttrNumber	colno = 0;
6534 
6535 		if (fsplan->scan.scanrelid > 0)
6536 		{
6537 			/* error occurred in a scan against a foreign table */
6538 			varno = fsplan->scan.scanrelid;
6539 			colno = errpos->cur_attno;
6540 		}
6541 		else
6542 		{
6543 			/* error occurred in a scan against a foreign join */
6544 			TargetEntry *tle;
6545 
6546 			tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
6547 								errpos->cur_attno - 1);
6548 
6549 			/*
6550 			 * Target list can have Vars and expressions.  For Vars, we can
6551 			 * get some information, however for expressions we can't.  Thus
6552 			 * for expressions, just show generic context message.
6553 			 */
6554 			if (IsA(tle->expr, Var))
6555 			{
6556 				Var		   *var = (Var *) tle->expr;
6557 
6558 				varno = var->varno;
6559 				colno = var->varattno;
6560 			}
6561 		}
6562 
6563 		if (varno > 0)
6564 		{
6565 			EState	   *estate = fsstate->ss.ps.state;
6566 			RangeTblEntry *rte = exec_rt_fetch(varno, estate);
6567 
6568 			relname = rte->eref->aliasname;
6569 
6570 			if (colno == 0)
6571 				is_wholerow = true;
6572 			else if (colno > 0 && colno <= list_length(rte->eref->colnames))
6573 				attname = strVal(list_nth(rte->eref->colnames, colno - 1));
6574 			else if (colno == SelfItemPointerAttributeNumber)
6575 				attname = "ctid";
6576 		}
6577 	}
6578 	else if (rel)
6579 	{
6580 		/* Non-ForeignScan case (we should always have a rel here) */
6581 		TupleDesc	tupdesc = RelationGetDescr(rel);
6582 
6583 		relname = RelationGetRelationName(rel);
6584 		if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
6585 		{
6586 			Form_pg_attribute attr = TupleDescAttr(tupdesc,
6587 												   errpos->cur_attno - 1);
6588 
6589 			attname = NameStr(attr->attname);
6590 		}
6591 		else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
6592 			attname = "ctid";
6593 	}
6594 
6595 	if (relname && is_wholerow)
6596 		errcontext("whole-row reference to foreign table \"%s\"", relname);
6597 	else if (relname && attname)
6598 		errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
6599 	else
6600 		errcontext("processing expression at position %d in select list",
6601 				   errpos->cur_attno);
6602 }
6603 
6604 /*
6605  * Find an equivalence class member expression to be computed as a sort column
6606  * in the given target.
6607  */
6608 Expr *
6609 find_em_expr_for_input_target(PlannerInfo *root,
6610 							  EquivalenceClass *ec,
6611 							  PathTarget *target)
6612 {
6613 	ListCell   *lc1;
6614 	int			i;
6615 
6616 	i = 0;
6617 	foreach(lc1, target->exprs)
6618 	{
6619 		Expr	   *expr = (Expr *) lfirst(lc1);
6620 		Index		sgref = get_pathtarget_sortgroupref(target, i);
6621 		ListCell   *lc2;
6622 
6623 		/* Ignore non-sort expressions */
6624 		if (sgref == 0 ||
6625 			get_sortgroupref_clause_noerr(sgref,
6626 										  root->parse->sortClause) == NULL)
6627 		{
6628 			i++;
6629 			continue;
6630 		}
6631 
6632 		/* We ignore binary-compatible relabeling on both ends */
6633 		while (expr && IsA(expr, RelabelType))
6634 			expr = ((RelabelType *) expr)->arg;
6635 
6636 		/* Locate an EquivalenceClass member matching this expr, if any */
6637 		foreach(lc2, ec->ec_members)
6638 		{
6639 			EquivalenceMember *em = (EquivalenceMember *) lfirst(lc2);
6640 			Expr	   *em_expr;
6641 
6642 			/* Don't match constants */
6643 			if (em->em_is_const)
6644 				continue;
6645 
6646 			/* Ignore child members */
6647 			if (em->em_is_child)
6648 				continue;
6649 
6650 			/* Match if same expression (after stripping relabel) */
6651 			em_expr = em->em_expr;
6652 			while (em_expr && IsA(em_expr, RelabelType))
6653 				em_expr = ((RelabelType *) em_expr)->arg;
6654 
6655 			if (equal(em_expr, expr))
6656 				return em->em_expr;
6657 		}
6658 
6659 		i++;
6660 	}
6661 
6662 	elog(ERROR, "could not find pathkey item to sort");
6663 	return NULL;				/* keep compiler quiet */
6664 }
6665