1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  *		  Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2018, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "catalog/pg_class.h"
20 #include "commands/defrem.h"
21 #include "commands/explain.h"
22 #include "commands/vacuum.h"
23 #include "foreign/fdwapi.h"
24 #include "funcapi.h"
25 #include "miscadmin.h"
26 #include "nodes/makefuncs.h"
27 #include "nodes/nodeFuncs.h"
28 #include "optimizer/cost.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/pathnode.h"
31 #include "optimizer/paths.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "optimizer/tlist.h"
36 #include "parser/parsetree.h"
37 #include "utils/builtins.h"
38 #include "utils/guc.h"
39 #include "utils/lsyscache.h"
40 #include "utils/memutils.h"
41 #include "utils/rel.h"
42 #include "utils/sampling.h"
43 #include "utils/selfuncs.h"
44 
45 PG_MODULE_MAGIC;
46 
47 /* Default CPU cost to start up a foreign query. */
48 #define DEFAULT_FDW_STARTUP_COST	100.0
49 
50 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
51 #define DEFAULT_FDW_TUPLE_COST		0.01
52 
53 /* If no remote estimates, assume a sort costs 20% extra */
54 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
55 
56 /*
57  * Indexes of FDW-private information stored in fdw_private lists.
58  *
59  * These items are indexed with the enum FdwScanPrivateIndex, so an item
60  * can be fetched with list_nth().  For example, to get the SELECT statement:
61  *		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
62  */
63 enum FdwScanPrivateIndex
64 {
65 	/* SQL statement to execute remotely (as a String node) */
66 	FdwScanPrivateSelectSql,
67 	/* Integer list of attribute numbers retrieved by the SELECT */
68 	FdwScanPrivateRetrievedAttrs,
69 	/* Integer representing the desired fetch_size */
70 	FdwScanPrivateFetchSize,
71 
72 	/*
73 	 * String describing join i.e. names of relations being joined and types
74 	 * of join, added when the scan is join
75 	 */
76 	FdwScanPrivateRelations
77 };
78 
79 /*
80  * Similarly, this enum describes what's kept in the fdw_private list for
81  * a ModifyTable node referencing a postgres_fdw foreign table.  We store:
82  *
83  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
84  * 2) Integer list of target attribute numbers for INSERT/UPDATE
85  *	  (NIL for a DELETE)
86  * 3) Boolean flag showing if the remote query has a RETURNING clause
87  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
88  */
89 enum FdwModifyPrivateIndex
90 {
91 	/* SQL statement to execute remotely (as a String node) */
92 	FdwModifyPrivateUpdateSql,
93 	/* Integer list of target attribute numbers for INSERT/UPDATE */
94 	FdwModifyPrivateTargetAttnums,
95 	/* has-returning flag (as an integer Value node) */
96 	FdwModifyPrivateHasReturning,
97 	/* Integer list of attribute numbers retrieved by RETURNING */
98 	FdwModifyPrivateRetrievedAttrs
99 };
100 
101 /*
102  * Similarly, this enum describes what's kept in the fdw_private list for
103  * a ForeignScan node that modifies a foreign table directly.  We store:
104  *
105  * 1) UPDATE/DELETE statement text to be sent to the remote server
106  * 2) Boolean flag showing if the remote query has a RETURNING clause
107  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
108  * 4) Boolean flag showing if we set the command es_processed
109  */
110 enum FdwDirectModifyPrivateIndex
111 {
112 	/* SQL statement to execute remotely (as a String node) */
113 	FdwDirectModifyPrivateUpdateSql,
114 	/* has-returning flag (as an integer Value node) */
115 	FdwDirectModifyPrivateHasReturning,
116 	/* Integer list of attribute numbers retrieved by RETURNING */
117 	FdwDirectModifyPrivateRetrievedAttrs,
118 	/* set-processed flag (as an integer Value node) */
119 	FdwDirectModifyPrivateSetProcessed
120 };
121 
122 /*
123  * Execution state of a foreign scan using postgres_fdw.
124  */
125 typedef struct PgFdwScanState
126 {
127 	Relation	rel;			/* relcache entry for the foreign table. NULL
128 								 * for a foreign join scan. */
129 	TupleDesc	tupdesc;		/* tuple descriptor of scan */
130 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
131 
132 	/* extracted fdw_private data */
133 	char	   *query;			/* text of SELECT command */
134 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
135 
136 	/* for remote query execution */
137 	PGconn	   *conn;			/* connection for the scan */
138 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
139 	bool		cursor_exists;	/* have we created the cursor? */
140 	int			numParams;		/* number of parameters passed to query */
141 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
142 	List	   *param_exprs;	/* executable expressions for param values */
143 	const char **param_values;	/* textual values of query parameters */
144 
145 	/* for storing result tuples */
146 	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
147 	int			num_tuples;		/* # of tuples in array */
148 	int			next_tuple;		/* index of next one to return */
149 
150 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
151 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
152 	bool		eof_reached;	/* true if last fetch reached EOF */
153 
154 	/* working memory contexts */
155 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
156 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
157 
158 	int			fetch_size;		/* number of tuples per fetch */
159 } PgFdwScanState;
160 
161 /*
162  * Execution state of a foreign insert/update/delete operation.
163  */
164 typedef struct PgFdwModifyState
165 {
166 	Relation	rel;			/* relcache entry for the foreign table */
167 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
168 
169 	/* for remote query execution */
170 	PGconn	   *conn;			/* connection for the scan */
171 	char	   *p_name;			/* name of prepared statement, if created */
172 
173 	/* extracted fdw_private data */
174 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
175 	List	   *target_attrs;	/* list of target attribute numbers */
176 	bool		has_returning;	/* is there a RETURNING clause? */
177 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
178 
179 	/* info about parameters for prepared statement */
180 	AttrNumber	ctidAttno;		/* attnum of input resjunk ctid column */
181 	int			p_nums;			/* number of parameters to transmit */
182 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
183 
184 	/* working memory context */
185 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
186 
187 	/* for update row movement if subplan result rel */
188 	struct PgFdwModifyState *aux_fmstate;	/* foreign-insert state, if
189 											 * created */
190 } PgFdwModifyState;
191 
192 /*
193  * Execution state of a foreign scan that modifies a foreign table directly.
194  */
195 typedef struct PgFdwDirectModifyState
196 {
197 	Relation	rel;			/* relcache entry for the foreign table */
198 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
199 
200 	/* extracted fdw_private data */
201 	char	   *query;			/* text of UPDATE/DELETE command */
202 	bool		has_returning;	/* is there a RETURNING clause? */
203 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
204 	bool		set_processed;	/* do we set the command es_processed? */
205 
206 	/* for remote query execution */
207 	PGconn	   *conn;			/* connection for the update */
208 	int			numParams;		/* number of parameters passed to query */
209 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
210 	List	   *param_exprs;	/* executable expressions for param values */
211 	const char **param_values;	/* textual values of query parameters */
212 
213 	/* for storing result tuples */
214 	PGresult   *result;			/* result for query */
215 	int			num_tuples;		/* # of result tuples */
216 	int			next_tuple;		/* index of next one to return */
217 	Relation	resultRel;		/* relcache entry for the target relation */
218 	AttrNumber *attnoMap;		/* array of attnums of input user columns */
219 	AttrNumber	ctidAttno;		/* attnum of input ctid column */
220 	AttrNumber	oidAttno;		/* attnum of input oid column */
221 	bool		hasSystemCols;	/* are there system columns of resultRel? */
222 
223 	/* working memory context */
224 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
225 } PgFdwDirectModifyState;
226 
227 /*
228  * Workspace for analyzing a foreign table.
229  */
230 typedef struct PgFdwAnalyzeState
231 {
232 	Relation	rel;			/* relcache entry for the foreign table */
233 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
234 	List	   *retrieved_attrs;	/* attr numbers retrieved by query */
235 
236 	/* collected sample rows */
237 	HeapTuple  *rows;			/* array of size targrows */
238 	int			targrows;		/* target # of sample rows */
239 	int			numrows;		/* # of sample rows collected */
240 
241 	/* for random sampling */
242 	double		samplerows;		/* # of rows fetched */
243 	double		rowstoskip;		/* # of rows to skip before next sample */
244 	ReservoirStateData rstate;	/* state for reservoir sampling */
245 
246 	/* working memory contexts */
247 	MemoryContext anl_cxt;		/* context for per-analyze lifespan data */
248 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
249 } PgFdwAnalyzeState;
250 
251 /*
252  * Identify the attribute where data conversion fails.
253  */
254 typedef struct ConversionLocation
255 {
256 	AttrNumber	cur_attno;		/* attribute number being processed, or 0 */
257 	Relation	rel;			/* foreign table being processed, or NULL */
258 	ForeignScanState *fsstate;	/* plan node being processed, or NULL */
259 } ConversionLocation;
260 
261 /* Callback argument for ec_member_matches_foreign */
262 typedef struct
263 {
264 	Expr	   *current;		/* current expr, or NULL if not yet found */
265 	List	   *already_used;	/* expressions already dealt with */
266 } ec_member_foreign_arg;
267 
268 /*
269  * SQL functions
270  */
271 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
272 
273 /*
274  * FDW callback routines
275  */
276 static void postgresGetForeignRelSize(PlannerInfo *root,
277 						  RelOptInfo *baserel,
278 						  Oid foreigntableid);
279 static void postgresGetForeignPaths(PlannerInfo *root,
280 						RelOptInfo *baserel,
281 						Oid foreigntableid);
282 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
283 					   RelOptInfo *foreignrel,
284 					   Oid foreigntableid,
285 					   ForeignPath *best_path,
286 					   List *tlist,
287 					   List *scan_clauses,
288 					   Plan *outer_plan);
289 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
290 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
291 static void postgresReScanForeignScan(ForeignScanState *node);
292 static void postgresEndForeignScan(ForeignScanState *node);
293 static void postgresAddForeignUpdateTargets(Query *parsetree,
294 								RangeTblEntry *target_rte,
295 								Relation target_relation);
296 static List *postgresPlanForeignModify(PlannerInfo *root,
297 						  ModifyTable *plan,
298 						  Index resultRelation,
299 						  int subplan_index);
300 static void postgresBeginForeignModify(ModifyTableState *mtstate,
301 						   ResultRelInfo *resultRelInfo,
302 						   List *fdw_private,
303 						   int subplan_index,
304 						   int eflags);
305 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
306 						  ResultRelInfo *resultRelInfo,
307 						  TupleTableSlot *slot,
308 						  TupleTableSlot *planSlot);
309 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
310 						  ResultRelInfo *resultRelInfo,
311 						  TupleTableSlot *slot,
312 						  TupleTableSlot *planSlot);
313 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
314 						  ResultRelInfo *resultRelInfo,
315 						  TupleTableSlot *slot,
316 						  TupleTableSlot *planSlot);
317 static void postgresEndForeignModify(EState *estate,
318 						 ResultRelInfo *resultRelInfo);
319 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
320 						   ResultRelInfo *resultRelInfo);
321 static void postgresEndForeignInsert(EState *estate,
322 						 ResultRelInfo *resultRelInfo);
323 static int	postgresIsForeignRelUpdatable(Relation rel);
324 static bool postgresPlanDirectModify(PlannerInfo *root,
325 						 ModifyTable *plan,
326 						 Index resultRelation,
327 						 int subplan_index);
328 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
329 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
330 static void postgresEndDirectModify(ForeignScanState *node);
331 static void postgresExplainForeignScan(ForeignScanState *node,
332 						   ExplainState *es);
333 static void postgresExplainForeignModify(ModifyTableState *mtstate,
334 							 ResultRelInfo *rinfo,
335 							 List *fdw_private,
336 							 int subplan_index,
337 							 ExplainState *es);
338 static void postgresExplainDirectModify(ForeignScanState *node,
339 							ExplainState *es);
340 static bool postgresAnalyzeForeignTable(Relation relation,
341 							AcquireSampleRowsFunc *func,
342 							BlockNumber *totalpages);
343 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
344 							Oid serverOid);
345 static void postgresGetForeignJoinPaths(PlannerInfo *root,
346 							RelOptInfo *joinrel,
347 							RelOptInfo *outerrel,
348 							RelOptInfo *innerrel,
349 							JoinType jointype,
350 							JoinPathExtraData *extra);
351 static bool postgresRecheckForeignScan(ForeignScanState *node,
352 						   TupleTableSlot *slot);
353 static void postgresGetForeignUpperPaths(PlannerInfo *root,
354 							 UpperRelationKind stage,
355 							 RelOptInfo *input_rel,
356 							 RelOptInfo *output_rel,
357 							 void *extra);
358 
359 /*
360  * Helper functions
361  */
362 static void estimate_path_cost_size(PlannerInfo *root,
363 						RelOptInfo *foreignrel,
364 						List *param_join_conds,
365 						List *pathkeys,
366 						double *p_rows, int *p_width,
367 						Cost *p_startup_cost, Cost *p_total_cost);
368 static void get_remote_estimate(const char *sql,
369 					PGconn *conn,
370 					double *rows,
371 					int *width,
372 					Cost *startup_cost,
373 					Cost *total_cost);
374 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
375 						  EquivalenceClass *ec, EquivalenceMember *em,
376 						  void *arg);
377 static void create_cursor(ForeignScanState *node);
378 static void fetch_more_data(ForeignScanState *node);
379 static void close_cursor(PGconn *conn, unsigned int cursor_number);
380 static PgFdwModifyState *create_foreign_modify(EState *estate,
381 					  RangeTblEntry *rte,
382 					  ResultRelInfo *resultRelInfo,
383 					  CmdType operation,
384 					  Plan *subplan,
385 					  char *query,
386 					  List *target_attrs,
387 					  bool has_returning,
388 					  List *retrieved_attrs);
389 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
390 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
391 						 ItemPointer tupleid,
392 						 TupleTableSlot *slot);
393 static void store_returning_result(PgFdwModifyState *fmstate,
394 					   TupleTableSlot *slot, PGresult *res);
395 static void finish_foreign_modify(PgFdwModifyState *fmstate);
396 static List *build_remote_returning(Index rtindex, Relation rel,
397 					   List *returningList);
398 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
399 static void execute_dml_stmt(ForeignScanState *node);
400 static TupleTableSlot *get_returning_data(ForeignScanState *node);
401 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
402 					  List *fdw_scan_tlist,
403 					  Index rtindex);
404 static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
405 					   TupleTableSlot *slot,
406 					   EState *estate);
407 static void prepare_query_params(PlanState *node,
408 					 List *fdw_exprs,
409 					 int numParams,
410 					 FmgrInfo **param_flinfo,
411 					 List **param_exprs,
412 					 const char ***param_values);
413 static void process_query_params(ExprContext *econtext,
414 					 FmgrInfo *param_flinfo,
415 					 List *param_exprs,
416 					 const char **param_values);
417 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
418 							  HeapTuple *rows, int targrows,
419 							  double *totalrows,
420 							  double *totaldeadrows);
421 static void analyze_row_processor(PGresult *res, int row,
422 					  PgFdwAnalyzeState *astate);
423 static HeapTuple make_tuple_from_result_row(PGresult *res,
424 						   int row,
425 						   Relation rel,
426 						   AttInMetadata *attinmeta,
427 						   List *retrieved_attrs,
428 						   ForeignScanState *fsstate,
429 						   MemoryContext temp_context);
430 static void conversion_error_callback(void *arg);
431 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
432 				JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
433 				JoinPathExtraData *extra);
434 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
435 					Node *havingQual);
436 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
437 								 RelOptInfo *rel);
438 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
439 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
440 								Path *epq_path);
441 static void add_foreign_grouping_paths(PlannerInfo *root,
442 						   RelOptInfo *input_rel,
443 						   RelOptInfo *grouped_rel,
444 						   GroupPathExtraData *extra);
445 static void apply_server_options(PgFdwRelationInfo *fpinfo);
446 static void apply_table_options(PgFdwRelationInfo *fpinfo);
447 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
448 				  const PgFdwRelationInfo *fpinfo_o,
449 				  const PgFdwRelationInfo *fpinfo_i);
450 
451 
452 /*
453  * Foreign-data wrapper handler function: return a struct with pointers
454  * to my callback routines.
455  */
456 Datum
postgres_fdw_handler(PG_FUNCTION_ARGS)457 postgres_fdw_handler(PG_FUNCTION_ARGS)
458 {
459 	FdwRoutine *routine = makeNode(FdwRoutine);
460 
461 	/* Functions for scanning foreign tables */
462 	routine->GetForeignRelSize = postgresGetForeignRelSize;
463 	routine->GetForeignPaths = postgresGetForeignPaths;
464 	routine->GetForeignPlan = postgresGetForeignPlan;
465 	routine->BeginForeignScan = postgresBeginForeignScan;
466 	routine->IterateForeignScan = postgresIterateForeignScan;
467 	routine->ReScanForeignScan = postgresReScanForeignScan;
468 	routine->EndForeignScan = postgresEndForeignScan;
469 
470 	/* Functions for updating foreign tables */
471 	routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
472 	routine->PlanForeignModify = postgresPlanForeignModify;
473 	routine->BeginForeignModify = postgresBeginForeignModify;
474 	routine->ExecForeignInsert = postgresExecForeignInsert;
475 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
476 	routine->ExecForeignDelete = postgresExecForeignDelete;
477 	routine->EndForeignModify = postgresEndForeignModify;
478 	routine->BeginForeignInsert = postgresBeginForeignInsert;
479 	routine->EndForeignInsert = postgresEndForeignInsert;
480 	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
481 	routine->PlanDirectModify = postgresPlanDirectModify;
482 	routine->BeginDirectModify = postgresBeginDirectModify;
483 	routine->IterateDirectModify = postgresIterateDirectModify;
484 	routine->EndDirectModify = postgresEndDirectModify;
485 
486 	/* Function for EvalPlanQual rechecks */
487 	routine->RecheckForeignScan = postgresRecheckForeignScan;
488 	/* Support functions for EXPLAIN */
489 	routine->ExplainForeignScan = postgresExplainForeignScan;
490 	routine->ExplainForeignModify = postgresExplainForeignModify;
491 	routine->ExplainDirectModify = postgresExplainDirectModify;
492 
493 	/* Support functions for ANALYZE */
494 	routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
495 
496 	/* Support functions for IMPORT FOREIGN SCHEMA */
497 	routine->ImportForeignSchema = postgresImportForeignSchema;
498 
499 	/* Support functions for join push-down */
500 	routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
501 
502 	/* Support functions for upper relation push-down */
503 	routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
504 
505 	PG_RETURN_POINTER(routine);
506 }
507 
508 /*
509  * postgresGetForeignRelSize
510  *		Estimate # of rows and width of the result of the scan
511  *
512  * We should consider the effect of all baserestrictinfo clauses here, but
513  * not any join clauses.
514  */
515 static void
postgresGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)516 postgresGetForeignRelSize(PlannerInfo *root,
517 						  RelOptInfo *baserel,
518 						  Oid foreigntableid)
519 {
520 	PgFdwRelationInfo *fpinfo;
521 	ListCell   *lc;
522 	RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
523 	const char *namespace;
524 	const char *relname;
525 	const char *refname;
526 
527 	/*
528 	 * We use PgFdwRelationInfo to pass various information to subsequent
529 	 * functions.
530 	 */
531 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
532 	baserel->fdw_private = (void *) fpinfo;
533 
534 	/* Base foreign tables need to be pushed down always. */
535 	fpinfo->pushdown_safe = true;
536 
537 	/* Look up foreign-table catalog info. */
538 	fpinfo->table = GetForeignTable(foreigntableid);
539 	fpinfo->server = GetForeignServer(fpinfo->table->serverid);
540 
541 	/*
542 	 * Extract user-settable option values.  Note that per-table settings of
543 	 * use_remote_estimate and fetch_size override per-server settings of
544 	 * them, respectively.
545 	 */
546 	fpinfo->use_remote_estimate = false;
547 	fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
548 	fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
549 	fpinfo->shippable_extensions = NIL;
550 	fpinfo->fetch_size = 100;
551 
552 	apply_server_options(fpinfo);
553 	apply_table_options(fpinfo);
554 
555 	/*
556 	 * If the table or the server is configured to use remote estimates,
557 	 * identify which user to do remote access as during planning.  This
558 	 * should match what ExecCheckRTEPerms() does.  If we fail due to lack of
559 	 * permissions, the query would have failed at runtime anyway.
560 	 */
561 	if (fpinfo->use_remote_estimate)
562 	{
563 		Oid			userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
564 
565 		fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
566 	}
567 	else
568 		fpinfo->user = NULL;
569 
570 	/*
571 	 * Identify which baserestrictinfo clauses can be sent to the remote
572 	 * server and which can't.
573 	 */
574 	classifyConditions(root, baserel, baserel->baserestrictinfo,
575 					   &fpinfo->remote_conds, &fpinfo->local_conds);
576 
577 	/*
578 	 * Identify which attributes will need to be retrieved from the remote
579 	 * server.  These include all attrs needed for joins or final output, plus
580 	 * all attrs used in the local_conds.  (Note: if we end up using a
581 	 * parameterized scan, it's possible that some of the join clauses will be
582 	 * sent to the remote and thus we wouldn't really need to retrieve the
583 	 * columns used in them.  Doesn't seem worth detecting that case though.)
584 	 */
585 	fpinfo->attrs_used = NULL;
586 	pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
587 				   &fpinfo->attrs_used);
588 	foreach(lc, fpinfo->local_conds)
589 	{
590 		RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
591 
592 		pull_varattnos((Node *) rinfo->clause, baserel->relid,
593 					   &fpinfo->attrs_used);
594 	}
595 
596 	/*
597 	 * Compute the selectivity and cost of the local_conds, so we don't have
598 	 * to do it over again for each path.  The best we can do for these
599 	 * conditions is to estimate selectivity on the basis of local statistics.
600 	 */
601 	fpinfo->local_conds_sel = clauselist_selectivity(root,
602 													 fpinfo->local_conds,
603 													 baserel->relid,
604 													 JOIN_INNER,
605 													 NULL);
606 
607 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
608 
609 	/*
610 	 * Set cached relation costs to some negative value, so that we can detect
611 	 * when they are set to some sensible costs during one (usually the first)
612 	 * of the calls to estimate_path_cost_size().
613 	 */
614 	fpinfo->rel_startup_cost = -1;
615 	fpinfo->rel_total_cost = -1;
616 
617 	/*
618 	 * If the table or the server is configured to use remote estimates,
619 	 * connect to the foreign server and execute EXPLAIN to estimate the
620 	 * number of rows selected by the restriction clauses, as well as the
621 	 * average row width.  Otherwise, estimate using whatever statistics we
622 	 * have locally, in a way similar to ordinary tables.
623 	 */
624 	if (fpinfo->use_remote_estimate)
625 	{
626 		/*
627 		 * Get cost/size estimates with help of remote server.  Save the
628 		 * values in fpinfo so we don't need to do it again to generate the
629 		 * basic foreign path.
630 		 */
631 		estimate_path_cost_size(root, baserel, NIL, NIL,
632 								&fpinfo->rows, &fpinfo->width,
633 								&fpinfo->startup_cost, &fpinfo->total_cost);
634 
635 		/* Report estimated baserel size to planner. */
636 		baserel->rows = fpinfo->rows;
637 		baserel->reltarget->width = fpinfo->width;
638 	}
639 	else
640 	{
641 		/*
642 		 * If the foreign table has never been ANALYZEd, it will have relpages
643 		 * and reltuples equal to zero, which most likely has nothing to do
644 		 * with reality.  We can't do a whole lot about that if we're not
645 		 * allowed to consult the remote server, but we can use a hack similar
646 		 * to plancat.c's treatment of empty relations: use a minimum size
647 		 * estimate of 10 pages, and divide by the column-datatype-based width
648 		 * estimate to get the corresponding number of tuples.
649 		 */
650 		if (baserel->pages == 0 && baserel->tuples == 0)
651 		{
652 			baserel->pages = 10;
653 			baserel->tuples =
654 				(10 * BLCKSZ) / (baserel->reltarget->width +
655 								 MAXALIGN(SizeofHeapTupleHeader));
656 		}
657 
658 		/* Estimate baserel size as best we can with local statistics. */
659 		set_baserel_size_estimates(root, baserel);
660 
661 		/* Fill in basically-bogus cost estimates for use later. */
662 		estimate_path_cost_size(root, baserel, NIL, NIL,
663 								&fpinfo->rows, &fpinfo->width,
664 								&fpinfo->startup_cost, &fpinfo->total_cost);
665 	}
666 
667 	/*
668 	 * Set the name of relation in fpinfo, while we are constructing it here.
669 	 * It will be used to build the string describing the join relation in
670 	 * EXPLAIN output. We can't know whether VERBOSE option is specified or
671 	 * not, so always schema-qualify the foreign table name.
672 	 */
673 	fpinfo->relation_name = makeStringInfo();
674 	namespace = get_namespace_name(get_rel_namespace(foreigntableid));
675 	relname = get_rel_name(foreigntableid);
676 	refname = rte->eref->aliasname;
677 	appendStringInfo(fpinfo->relation_name, "%s.%s",
678 					 quote_identifier(namespace),
679 					 quote_identifier(relname));
680 	if (*refname && strcmp(refname, relname) != 0)
681 		appendStringInfo(fpinfo->relation_name, " %s",
682 						 quote_identifier(rte->eref->aliasname));
683 
684 	/* No outer and inner relations. */
685 	fpinfo->make_outerrel_subquery = false;
686 	fpinfo->make_innerrel_subquery = false;
687 	fpinfo->lower_subquery_rels = NULL;
688 	/* Set the relation index. */
689 	fpinfo->relation_index = baserel->relid;
690 }
691 
692 /*
693  * get_useful_ecs_for_relation
694  *		Determine which EquivalenceClasses might be involved in useful
695  *		orderings of this relation.
696  *
697  * This function is in some respects a mirror image of the core function
698  * pathkeys_useful_for_merging: for a regular table, we know what indexes
699  * we have and want to test whether any of them are useful.  For a foreign
700  * table, we don't know what indexes are present on the remote side but
701  * want to speculate about which ones we'd like to use if they existed.
702  *
703  * This function returns a list of potentially-useful equivalence classes,
704  * but it does not guarantee that an EquivalenceMember exists which contains
705  * Vars only from the given relation.  For example, given ft1 JOIN t1 ON
706  * ft1.x + t1.x = 0, this function will say that the equivalence class
707  * containing ft1.x + t1.x is potentially useful.  Supposing ft1 is remote and
708  * t1 is local (or on a different server), it will turn out that no useful
709  * ORDER BY clause can be generated.  It's not our job to figure that out
710  * here; we're only interested in identifying relevant ECs.
711  */
712 static List *
get_useful_ecs_for_relation(PlannerInfo * root,RelOptInfo * rel)713 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
714 {
715 	List	   *useful_eclass_list = NIL;
716 	ListCell   *lc;
717 	Relids		relids;
718 
719 	/*
720 	 * First, consider whether any active EC is potentially useful for a merge
721 	 * join against this relation.
722 	 */
723 	if (rel->has_eclass_joins)
724 	{
725 		foreach(lc, root->eq_classes)
726 		{
727 			EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
728 
729 			if (eclass_useful_for_merging(root, cur_ec, rel))
730 				useful_eclass_list = lappend(useful_eclass_list, cur_ec);
731 		}
732 	}
733 
734 	/*
735 	 * Next, consider whether there are any non-EC derivable join clauses that
736 	 * are merge-joinable.  If the joininfo list is empty, we can exit
737 	 * quickly.
738 	 */
739 	if (rel->joininfo == NIL)
740 		return useful_eclass_list;
741 
742 	/* If this is a child rel, we must use the topmost parent rel to search. */
743 	if (IS_OTHER_REL(rel))
744 	{
745 		Assert(!bms_is_empty(rel->top_parent_relids));
746 		relids = rel->top_parent_relids;
747 	}
748 	else
749 		relids = rel->relids;
750 
751 	/* Check each join clause in turn. */
752 	foreach(lc, rel->joininfo)
753 	{
754 		RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
755 
756 		/* Consider only mergejoinable clauses */
757 		if (restrictinfo->mergeopfamilies == NIL)
758 			continue;
759 
760 		/* Make sure we've got canonical ECs. */
761 		update_mergeclause_eclasses(root, restrictinfo);
762 
763 		/*
764 		 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
765 		 * that left_ec and right_ec will be initialized, per comments in
766 		 * distribute_qual_to_rels.
767 		 *
768 		 * We want to identify which side of this merge-joinable clause
769 		 * contains columns from the relation produced by this RelOptInfo. We
770 		 * test for overlap, not containment, because there could be extra
771 		 * relations on either side.  For example, suppose we've got something
772 		 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
773 		 * A.y = D.y.  The input rel might be the joinrel between A and B, and
774 		 * we'll consider the join clause A.y = D.y. relids contains a
775 		 * relation not involved in the join class (B) and the equivalence
776 		 * class for the left-hand side of the clause contains a relation not
777 		 * involved in the input rel (C).  Despite the fact that we have only
778 		 * overlap and not containment in either direction, A.y is potentially
779 		 * useful as a sort column.
780 		 *
781 		 * Note that it's even possible that relids overlaps neither side of
782 		 * the join clause.  For example, consider A LEFT JOIN B ON A.x = B.x
783 		 * AND A.x = 1.  The clause A.x = 1 will appear in B's joininfo list,
784 		 * but overlaps neither side of B.  In that case, we just skip this
785 		 * join clause, since it doesn't suggest a useful sort order for this
786 		 * relation.
787 		 */
788 		if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
789 			useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
790 														restrictinfo->right_ec);
791 		else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
792 			useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
793 														restrictinfo->left_ec);
794 	}
795 
796 	return useful_eclass_list;
797 }
798 
799 /*
800  * get_useful_pathkeys_for_relation
801  *		Determine which orderings of a relation might be useful.
802  *
803  * Getting data in sorted order can be useful either because the requested
804  * order matches the final output ordering for the overall query we're
805  * planning, or because it enables an efficient merge join.  Here, we try
806  * to figure out which pathkeys to consider.
807  */
808 static List *
get_useful_pathkeys_for_relation(PlannerInfo * root,RelOptInfo * rel)809 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
810 {
811 	List	   *useful_pathkeys_list = NIL;
812 	List	   *useful_eclass_list;
813 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
814 	EquivalenceClass *query_ec = NULL;
815 	ListCell   *lc;
816 
817 	/*
818 	 * Pushing the query_pathkeys to the remote server is always worth
819 	 * considering, because it might let us avoid a local sort.
820 	 */
821 	if (root->query_pathkeys)
822 	{
823 		bool		query_pathkeys_ok = true;
824 
825 		foreach(lc, root->query_pathkeys)
826 		{
827 			PathKey    *pathkey = (PathKey *) lfirst(lc);
828 			EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
829 			Expr	   *em_expr;
830 
831 			/*
832 			 * The planner and executor don't have any clever strategy for
833 			 * taking data sorted by a prefix of the query's pathkeys and
834 			 * getting it to be sorted by all of those pathkeys. We'll just
835 			 * end up resorting the entire data set.  So, unless we can push
836 			 * down all of the query pathkeys, forget it.
837 			 *
838 			 * is_foreign_expr would detect volatile expressions as well, but
839 			 * checking ec_has_volatile here saves some cycles.
840 			 */
841 			if (pathkey_ec->ec_has_volatile ||
842 				!(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
843 				!is_foreign_expr(root, rel, em_expr))
844 			{
845 				query_pathkeys_ok = false;
846 				break;
847 			}
848 		}
849 
850 		if (query_pathkeys_ok)
851 			useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
852 	}
853 
854 	/*
855 	 * Even if we're not using remote estimates, having the remote side do the
856 	 * sort generally won't be any worse than doing it locally, and it might
857 	 * be much better if the remote side can generate data in the right order
858 	 * without needing a sort at all.  However, what we're going to do next is
859 	 * try to generate pathkeys that seem promising for possible merge joins,
860 	 * and that's more speculative.  A wrong choice might hurt quite a bit, so
861 	 * bail out if we can't use remote estimates.
862 	 */
863 	if (!fpinfo->use_remote_estimate)
864 		return useful_pathkeys_list;
865 
866 	/* Get the list of interesting EquivalenceClasses. */
867 	useful_eclass_list = get_useful_ecs_for_relation(root, rel);
868 
869 	/* Extract unique EC for query, if any, so we don't consider it again. */
870 	if (list_length(root->query_pathkeys) == 1)
871 	{
872 		PathKey    *query_pathkey = linitial(root->query_pathkeys);
873 
874 		query_ec = query_pathkey->pk_eclass;
875 	}
876 
877 	/*
878 	 * As a heuristic, the only pathkeys we consider here are those of length
879 	 * one.  It's surely possible to consider more, but since each one we
880 	 * choose to consider will generate a round-trip to the remote side, we
881 	 * need to be a bit cautious here.  It would sure be nice to have a local
882 	 * cache of information about remote index definitions...
883 	 */
884 	foreach(lc, useful_eclass_list)
885 	{
886 		EquivalenceClass *cur_ec = lfirst(lc);
887 		Expr	   *em_expr;
888 		PathKey    *pathkey;
889 
890 		/* If redundant with what we did above, skip it. */
891 		if (cur_ec == query_ec)
892 			continue;
893 
894 		/* If no pushable expression for this rel, skip it. */
895 		em_expr = find_em_expr_for_rel(cur_ec, rel);
896 		if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
897 			continue;
898 
899 		/* Looks like we can generate a pathkey, so let's do it. */
900 		pathkey = make_canonical_pathkey(root, cur_ec,
901 										 linitial_oid(cur_ec->ec_opfamilies),
902 										 BTLessStrategyNumber,
903 										 false);
904 		useful_pathkeys_list = lappend(useful_pathkeys_list,
905 									   list_make1(pathkey));
906 	}
907 
908 	return useful_pathkeys_list;
909 }
910 
911 /*
912  * postgresGetForeignPaths
913  *		Create possible scan paths for a scan on the foreign table
914  */
915 static void
postgresGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)916 postgresGetForeignPaths(PlannerInfo *root,
917 						RelOptInfo *baserel,
918 						Oid foreigntableid)
919 {
920 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
921 	ForeignPath *path;
922 	List	   *ppi_list;
923 	ListCell   *lc;
924 
925 	/*
926 	 * Create simplest ForeignScan path node and add it to baserel.  This path
927 	 * corresponds to SeqScan path of regular tables (though depending on what
928 	 * baserestrict conditions we were able to send to remote, there might
929 	 * actually be an indexscan happening there).  We already did all the work
930 	 * to estimate cost and size of this path.
931 	 *
932 	 * Although this path uses no join clauses, it could still have required
933 	 * parameterization due to LATERAL refs in its tlist.
934 	 */
935 	path = create_foreignscan_path(root, baserel,
936 								   NULL,	/* default pathtarget */
937 								   fpinfo->rows,
938 								   fpinfo->startup_cost,
939 								   fpinfo->total_cost,
940 								   NIL, /* no pathkeys */
941 								   baserel->lateral_relids,
942 								   NULL,	/* no extra plan */
943 								   NIL);	/* no fdw_private list */
944 	add_path(baserel, (Path *) path);
945 
946 	/* Add paths with pathkeys */
947 	add_paths_with_pathkeys_for_rel(root, baserel, NULL);
948 
949 	/*
950 	 * If we're not using remote estimates, stop here.  We have no way to
951 	 * estimate whether any join clauses would be worth sending across, so
952 	 * don't bother building parameterized paths.
953 	 */
954 	if (!fpinfo->use_remote_estimate)
955 		return;
956 
957 	/*
958 	 * Thumb through all join clauses for the rel to identify which outer
959 	 * relations could supply one or more safe-to-send-to-remote join clauses.
960 	 * We'll build a parameterized path for each such outer relation.
961 	 *
962 	 * It's convenient to manage this by representing each candidate outer
963 	 * relation by the ParamPathInfo node for it.  We can then use the
964 	 * ppi_clauses list in the ParamPathInfo node directly as a list of the
965 	 * interesting join clauses for that rel.  This takes care of the
966 	 * possibility that there are multiple safe join clauses for such a rel,
967 	 * and also ensures that we account for unsafe join clauses that we'll
968 	 * still have to enforce locally (since the parameterized-path machinery
969 	 * insists that we handle all movable clauses).
970 	 */
971 	ppi_list = NIL;
972 	foreach(lc, baserel->joininfo)
973 	{
974 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
975 		Relids		required_outer;
976 		ParamPathInfo *param_info;
977 
978 		/* Check if clause can be moved to this rel */
979 		if (!join_clause_is_movable_to(rinfo, baserel))
980 			continue;
981 
982 		/* See if it is safe to send to remote */
983 		if (!is_foreign_expr(root, baserel, rinfo->clause))
984 			continue;
985 
986 		/* Calculate required outer rels for the resulting path */
987 		required_outer = bms_union(rinfo->clause_relids,
988 								   baserel->lateral_relids);
989 		/* We do not want the foreign rel itself listed in required_outer */
990 		required_outer = bms_del_member(required_outer, baserel->relid);
991 
992 		/*
993 		 * required_outer probably can't be empty here, but if it were, we
994 		 * couldn't make a parameterized path.
995 		 */
996 		if (bms_is_empty(required_outer))
997 			continue;
998 
999 		/* Get the ParamPathInfo */
1000 		param_info = get_baserel_parampathinfo(root, baserel,
1001 											   required_outer);
1002 		Assert(param_info != NULL);
1003 
1004 		/*
1005 		 * Add it to list unless we already have it.  Testing pointer equality
1006 		 * is OK since get_baserel_parampathinfo won't make duplicates.
1007 		 */
1008 		ppi_list = list_append_unique_ptr(ppi_list, param_info);
1009 	}
1010 
1011 	/*
1012 	 * The above scan examined only "generic" join clauses, not those that
1013 	 * were absorbed into EquivalenceClauses.  See if we can make anything out
1014 	 * of EquivalenceClauses.
1015 	 */
1016 	if (baserel->has_eclass_joins)
1017 	{
1018 		/*
1019 		 * We repeatedly scan the eclass list looking for column references
1020 		 * (or expressions) belonging to the foreign rel.  Each time we find
1021 		 * one, we generate a list of equivalence joinclauses for it, and then
1022 		 * see if any are safe to send to the remote.  Repeat till there are
1023 		 * no more candidate EC members.
1024 		 */
1025 		ec_member_foreign_arg arg;
1026 
1027 		arg.already_used = NIL;
1028 		for (;;)
1029 		{
1030 			List	   *clauses;
1031 
1032 			/* Make clauses, skipping any that join to lateral_referencers */
1033 			arg.current = NULL;
1034 			clauses = generate_implied_equalities_for_column(root,
1035 															 baserel,
1036 															 ec_member_matches_foreign,
1037 															 (void *) &arg,
1038 															 baserel->lateral_referencers);
1039 
1040 			/* Done if there are no more expressions in the foreign rel */
1041 			if (arg.current == NULL)
1042 			{
1043 				Assert(clauses == NIL);
1044 				break;
1045 			}
1046 
1047 			/* Scan the extracted join clauses */
1048 			foreach(lc, clauses)
1049 			{
1050 				RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1051 				Relids		required_outer;
1052 				ParamPathInfo *param_info;
1053 
1054 				/* Check if clause can be moved to this rel */
1055 				if (!join_clause_is_movable_to(rinfo, baserel))
1056 					continue;
1057 
1058 				/* See if it is safe to send to remote */
1059 				if (!is_foreign_expr(root, baserel, rinfo->clause))
1060 					continue;
1061 
1062 				/* Calculate required outer rels for the resulting path */
1063 				required_outer = bms_union(rinfo->clause_relids,
1064 										   baserel->lateral_relids);
1065 				required_outer = bms_del_member(required_outer, baserel->relid);
1066 				if (bms_is_empty(required_outer))
1067 					continue;
1068 
1069 				/* Get the ParamPathInfo */
1070 				param_info = get_baserel_parampathinfo(root, baserel,
1071 													   required_outer);
1072 				Assert(param_info != NULL);
1073 
1074 				/* Add it to list unless we already have it */
1075 				ppi_list = list_append_unique_ptr(ppi_list, param_info);
1076 			}
1077 
1078 			/* Try again, now ignoring the expression we found this time */
1079 			arg.already_used = lappend(arg.already_used, arg.current);
1080 		}
1081 	}
1082 
1083 	/*
1084 	 * Now build a path for each useful outer relation.
1085 	 */
1086 	foreach(lc, ppi_list)
1087 	{
1088 		ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1089 		double		rows;
1090 		int			width;
1091 		Cost		startup_cost;
1092 		Cost		total_cost;
1093 
1094 		/* Get a cost estimate from the remote */
1095 		estimate_path_cost_size(root, baserel,
1096 								param_info->ppi_clauses, NIL,
1097 								&rows, &width,
1098 								&startup_cost, &total_cost);
1099 
1100 		/*
1101 		 * ppi_rows currently won't get looked at by anything, but still we
1102 		 * may as well ensure that it matches our idea of the rowcount.
1103 		 */
1104 		param_info->ppi_rows = rows;
1105 
1106 		/* Make the path */
1107 		path = create_foreignscan_path(root, baserel,
1108 									   NULL,	/* default pathtarget */
1109 									   rows,
1110 									   startup_cost,
1111 									   total_cost,
1112 									   NIL, /* no pathkeys */
1113 									   param_info->ppi_req_outer,
1114 									   NULL,
1115 									   NIL);	/* no fdw_private list */
1116 		add_path(baserel, (Path *) path);
1117 	}
1118 }
1119 
1120 /*
1121  * postgresGetForeignPlan
1122  *		Create ForeignScan plan node which implements selected best path
1123  */
1124 static ForeignScan *
postgresGetForeignPlan(PlannerInfo * root,RelOptInfo * foreignrel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)1125 postgresGetForeignPlan(PlannerInfo *root,
1126 					   RelOptInfo *foreignrel,
1127 					   Oid foreigntableid,
1128 					   ForeignPath *best_path,
1129 					   List *tlist,
1130 					   List *scan_clauses,
1131 					   Plan *outer_plan)
1132 {
1133 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1134 	Index		scan_relid;
1135 	List	   *fdw_private;
1136 	List	   *remote_exprs = NIL;
1137 	List	   *local_exprs = NIL;
1138 	List	   *params_list = NIL;
1139 	List	   *fdw_scan_tlist = NIL;
1140 	List	   *fdw_recheck_quals = NIL;
1141 	List	   *retrieved_attrs;
1142 	StringInfoData sql;
1143 	ListCell   *lc;
1144 
1145 	if (IS_SIMPLE_REL(foreignrel))
1146 	{
1147 		/*
1148 		 * For base relations, set scan_relid as the relid of the relation.
1149 		 */
1150 		scan_relid = foreignrel->relid;
1151 
1152 		/*
1153 		 * In a base-relation scan, we must apply the given scan_clauses.
1154 		 *
1155 		 * Separate the scan_clauses into those that can be executed remotely
1156 		 * and those that can't.  baserestrictinfo clauses that were
1157 		 * previously determined to be safe or unsafe by classifyConditions
1158 		 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1159 		 * else in the scan_clauses list will be a join clause, which we have
1160 		 * to check for remote-safety.
1161 		 *
1162 		 * Note: the join clauses we see here should be the exact same ones
1163 		 * previously examined by postgresGetForeignPaths.  Possibly it'd be
1164 		 * worth passing forward the classification work done then, rather
1165 		 * than repeating it here.
1166 		 *
1167 		 * This code must match "extract_actual_clauses(scan_clauses, false)"
1168 		 * except for the additional decision about remote versus local
1169 		 * execution.
1170 		 */
1171 		foreach(lc, scan_clauses)
1172 		{
1173 			RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1174 
1175 			/* Ignore any pseudoconstants, they're dealt with elsewhere */
1176 			if (rinfo->pseudoconstant)
1177 				continue;
1178 
1179 			if (list_member_ptr(fpinfo->remote_conds, rinfo))
1180 				remote_exprs = lappend(remote_exprs, rinfo->clause);
1181 			else if (list_member_ptr(fpinfo->local_conds, rinfo))
1182 				local_exprs = lappend(local_exprs, rinfo->clause);
1183 			else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1184 				remote_exprs = lappend(remote_exprs, rinfo->clause);
1185 			else
1186 				local_exprs = lappend(local_exprs, rinfo->clause);
1187 		}
1188 
1189 		/*
1190 		 * For a base-relation scan, we have to support EPQ recheck, which
1191 		 * should recheck all the remote quals.
1192 		 */
1193 		fdw_recheck_quals = remote_exprs;
1194 	}
1195 	else
1196 	{
1197 		/*
1198 		 * Join relation or upper relation - set scan_relid to 0.
1199 		 */
1200 		scan_relid = 0;
1201 
1202 		/*
1203 		 * For a join rel, baserestrictinfo is NIL and we are not considering
1204 		 * parameterization right now, so there should be no scan_clauses for
1205 		 * a joinrel or an upper rel either.
1206 		 */
1207 		Assert(!scan_clauses);
1208 
1209 		/*
1210 		 * Instead we get the conditions to apply from the fdw_private
1211 		 * structure.
1212 		 */
1213 		remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1214 		local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1215 
1216 		/*
1217 		 * We leave fdw_recheck_quals empty in this case, since we never need
1218 		 * to apply EPQ recheck clauses.  In the case of a joinrel, EPQ
1219 		 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1220 		 * If we're planning an upperrel (ie, remote grouping or aggregation)
1221 		 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1222 		 * allowed, and indeed we *can't* put the remote clauses into
1223 		 * fdw_recheck_quals because the unaggregated Vars won't be available
1224 		 * locally.
1225 		 */
1226 
1227 		/* Build the list of columns to be fetched from the foreign server. */
1228 		fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1229 
1230 		/*
1231 		 * Ensure that the outer plan produces a tuple whose descriptor
1232 		 * matches our scan tuple slot.  Also, remove the local conditions
1233 		 * from outer plan's quals, lest they be evaluated twice, once by the
1234 		 * local plan and once by the scan.
1235 		 */
1236 		if (outer_plan)
1237 		{
1238 			ListCell   *lc;
1239 
1240 			/*
1241 			 * Right now, we only consider grouping and aggregation beyond
1242 			 * joins. Queries involving aggregates or grouping do not require
1243 			 * EPQ mechanism, hence should not have an outer plan here.
1244 			 */
1245 			Assert(!IS_UPPER_REL(foreignrel));
1246 
1247 			/*
1248 			 * First, update the plan's qual list if possible.  In some cases
1249 			 * the quals might be enforced below the topmost plan level, in
1250 			 * which case we'll fail to remove them; it's not worth working
1251 			 * harder than this.
1252 			 */
1253 			foreach(lc, local_exprs)
1254 			{
1255 				Node	   *qual = lfirst(lc);
1256 
1257 				outer_plan->qual = list_delete(outer_plan->qual, qual);
1258 
1259 				/*
1260 				 * For an inner join the local conditions of foreign scan plan
1261 				 * can be part of the joinquals as well.  (They might also be
1262 				 * in the mergequals or hashquals, but we can't touch those
1263 				 * without breaking the plan.)
1264 				 */
1265 				if (IsA(outer_plan, NestLoop) ||
1266 					IsA(outer_plan, MergeJoin) ||
1267 					IsA(outer_plan, HashJoin))
1268 				{
1269 					Join	   *join_plan = (Join *) outer_plan;
1270 
1271 					if (join_plan->jointype == JOIN_INNER)
1272 						join_plan->joinqual = list_delete(join_plan->joinqual,
1273 														  qual);
1274 				}
1275 			}
1276 
1277 			/*
1278 			 * Now fix the subplan's tlist --- this might result in inserting
1279 			 * a Result node atop the plan tree.
1280 			 */
1281 			outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1282 												best_path->path.parallel_safe);
1283 		}
1284 	}
1285 
1286 	/*
1287 	 * Build the query string to be sent for execution, and identify
1288 	 * expressions to be sent as parameters.
1289 	 */
1290 	initStringInfo(&sql);
1291 	deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1292 							remote_exprs, best_path->path.pathkeys,
1293 							false, &retrieved_attrs, &params_list);
1294 
1295 	/* Remember remote_exprs for possible use by postgresPlanDirectModify */
1296 	fpinfo->final_remote_exprs = remote_exprs;
1297 
1298 	/*
1299 	 * Build the fdw_private list that will be available to the executor.
1300 	 * Items in the list must match order in enum FdwScanPrivateIndex.
1301 	 */
1302 	fdw_private = list_make3(makeString(sql.data),
1303 							 retrieved_attrs,
1304 							 makeInteger(fpinfo->fetch_size));
1305 	if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1306 		fdw_private = lappend(fdw_private,
1307 							  makeString(fpinfo->relation_name->data));
1308 
1309 	/*
1310 	 * Create the ForeignScan node for the given relation.
1311 	 *
1312 	 * Note that the remote parameter expressions are stored in the fdw_exprs
1313 	 * field of the finished plan node; we can't keep them in private state
1314 	 * because then they wouldn't be subject to later planner processing.
1315 	 */
1316 	return make_foreignscan(tlist,
1317 							local_exprs,
1318 							scan_relid,
1319 							params_list,
1320 							fdw_private,
1321 							fdw_scan_tlist,
1322 							fdw_recheck_quals,
1323 							outer_plan);
1324 }
1325 
1326 /*
1327  * postgresBeginForeignScan
1328  *		Initiate an executor scan of a foreign PostgreSQL table.
1329  */
1330 static void
postgresBeginForeignScan(ForeignScanState * node,int eflags)1331 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1332 {
1333 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1334 	EState	   *estate = node->ss.ps.state;
1335 	PgFdwScanState *fsstate;
1336 	RangeTblEntry *rte;
1337 	Oid			userid;
1338 	ForeignTable *table;
1339 	UserMapping *user;
1340 	int			rtindex;
1341 	int			numParams;
1342 
1343 	/*
1344 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
1345 	 */
1346 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1347 		return;
1348 
1349 	/*
1350 	 * We'll save private state in node->fdw_state.
1351 	 */
1352 	fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1353 	node->fdw_state = (void *) fsstate;
1354 
1355 	/*
1356 	 * Identify which user to do the remote access as.  This should match what
1357 	 * ExecCheckRTEPerms() does.  In case of a join or aggregate, use the
1358 	 * lowest-numbered member RTE as a representative; we would get the same
1359 	 * result from any.
1360 	 */
1361 	if (fsplan->scan.scanrelid > 0)
1362 		rtindex = fsplan->scan.scanrelid;
1363 	else
1364 		rtindex = bms_next_member(fsplan->fs_relids, -1);
1365 	rte = rt_fetch(rtindex, estate->es_range_table);
1366 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1367 
1368 	/* Get info about foreign table. */
1369 	table = GetForeignTable(rte->relid);
1370 	user = GetUserMapping(userid, table->serverid);
1371 
1372 	/*
1373 	 * Get connection to the foreign server.  Connection manager will
1374 	 * establish new connection if necessary.
1375 	 */
1376 	fsstate->conn = GetConnection(user, false);
1377 
1378 	/* Assign a unique ID for my cursor */
1379 	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1380 	fsstate->cursor_exists = false;
1381 
1382 	/* Get private info created by planner functions. */
1383 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
1384 									 FdwScanPrivateSelectSql));
1385 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1386 												 FdwScanPrivateRetrievedAttrs);
1387 	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1388 										  FdwScanPrivateFetchSize));
1389 
1390 	/* Create contexts for batches of tuples and per-tuple temp workspace. */
1391 	fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1392 											   "postgres_fdw tuple data",
1393 											   ALLOCSET_DEFAULT_SIZES);
1394 	fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1395 											  "postgres_fdw temporary data",
1396 											  ALLOCSET_SMALL_SIZES);
1397 
1398 	/*
1399 	 * Get info we'll need for converting data fetched from the foreign server
1400 	 * into local representation and error reporting during that process.
1401 	 */
1402 	if (fsplan->scan.scanrelid > 0)
1403 	{
1404 		fsstate->rel = node->ss.ss_currentRelation;
1405 		fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1406 	}
1407 	else
1408 	{
1409 		fsstate->rel = NULL;
1410 		fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1411 	}
1412 
1413 	fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1414 
1415 	/*
1416 	 * Prepare for processing of parameters used in remote query, if any.
1417 	 */
1418 	numParams = list_length(fsplan->fdw_exprs);
1419 	fsstate->numParams = numParams;
1420 	if (numParams > 0)
1421 		prepare_query_params((PlanState *) node,
1422 							 fsplan->fdw_exprs,
1423 							 numParams,
1424 							 &fsstate->param_flinfo,
1425 							 &fsstate->param_exprs,
1426 							 &fsstate->param_values);
1427 }
1428 
1429 /*
1430  * postgresIterateForeignScan
1431  *		Retrieve next row from the result set, or clear tuple slot to indicate
1432  *		EOF.
1433  */
1434 static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState * node)1435 postgresIterateForeignScan(ForeignScanState *node)
1436 {
1437 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1438 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1439 
1440 	/*
1441 	 * If this is the first call after Begin or ReScan, we need to create the
1442 	 * cursor on the remote side.
1443 	 */
1444 	if (!fsstate->cursor_exists)
1445 		create_cursor(node);
1446 
1447 	/*
1448 	 * Get some more tuples, if we've run out.
1449 	 */
1450 	if (fsstate->next_tuple >= fsstate->num_tuples)
1451 	{
1452 		/* No point in another fetch if we already detected EOF, though. */
1453 		if (!fsstate->eof_reached)
1454 			fetch_more_data(node);
1455 		/* If we didn't get any tuples, must be end of data. */
1456 		if (fsstate->next_tuple >= fsstate->num_tuples)
1457 			return ExecClearTuple(slot);
1458 	}
1459 
1460 	/*
1461 	 * Return the next tuple.
1462 	 */
1463 	ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1464 				   slot,
1465 				   InvalidBuffer,
1466 				   false);
1467 
1468 	return slot;
1469 }
1470 
1471 /*
1472  * postgresReScanForeignScan
1473  *		Restart the scan.
1474  */
1475 static void
postgresReScanForeignScan(ForeignScanState * node)1476 postgresReScanForeignScan(ForeignScanState *node)
1477 {
1478 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1479 	char		sql[64];
1480 	PGresult   *res;
1481 
1482 	/* If we haven't created the cursor yet, nothing to do. */
1483 	if (!fsstate->cursor_exists)
1484 		return;
1485 
1486 	/*
1487 	 * If any internal parameters affecting this node have changed, we'd
1488 	 * better destroy and recreate the cursor.  Otherwise, rewinding it should
1489 	 * be good enough.  If we've only fetched zero or one batch, we needn't
1490 	 * even rewind the cursor, just rescan what we have.
1491 	 */
1492 	if (node->ss.ps.chgParam != NULL)
1493 	{
1494 		fsstate->cursor_exists = false;
1495 		snprintf(sql, sizeof(sql), "CLOSE c%u",
1496 				 fsstate->cursor_number);
1497 	}
1498 	else if (fsstate->fetch_ct_2 > 1)
1499 	{
1500 		snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1501 				 fsstate->cursor_number);
1502 	}
1503 	else
1504 	{
1505 		/* Easy: just rescan what we already have in memory, if anything */
1506 		fsstate->next_tuple = 0;
1507 		return;
1508 	}
1509 
1510 	/*
1511 	 * We don't use a PG_TRY block here, so be careful not to throw error
1512 	 * without releasing the PGresult.
1513 	 */
1514 	res = pgfdw_exec_query(fsstate->conn, sql);
1515 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1516 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1517 	PQclear(res);
1518 
1519 	/* Now force a fresh FETCH. */
1520 	fsstate->tuples = NULL;
1521 	fsstate->num_tuples = 0;
1522 	fsstate->next_tuple = 0;
1523 	fsstate->fetch_ct_2 = 0;
1524 	fsstate->eof_reached = false;
1525 }
1526 
1527 /*
1528  * postgresEndForeignScan
1529  *		Finish scanning foreign table and dispose objects used for this scan
1530  */
1531 static void
postgresEndForeignScan(ForeignScanState * node)1532 postgresEndForeignScan(ForeignScanState *node)
1533 {
1534 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1535 
1536 	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1537 	if (fsstate == NULL)
1538 		return;
1539 
1540 	/* Close the cursor if open, to prevent accumulation of cursors */
1541 	if (fsstate->cursor_exists)
1542 		close_cursor(fsstate->conn, fsstate->cursor_number);
1543 
1544 	/* Release remote connection */
1545 	ReleaseConnection(fsstate->conn);
1546 	fsstate->conn = NULL;
1547 
1548 	/* MemoryContexts will be deleted automatically. */
1549 }
1550 
1551 /*
1552  * postgresAddForeignUpdateTargets
1553  *		Add resjunk column(s) needed for update/delete on a foreign table
1554  */
1555 static void
postgresAddForeignUpdateTargets(Query * parsetree,RangeTblEntry * target_rte,Relation target_relation)1556 postgresAddForeignUpdateTargets(Query *parsetree,
1557 								RangeTblEntry *target_rte,
1558 								Relation target_relation)
1559 {
1560 	Var		   *var;
1561 	const char *attrname;
1562 	TargetEntry *tle;
1563 
1564 	/*
1565 	 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1566 	 */
1567 
1568 	/* Make a Var representing the desired value */
1569 	var = makeVar(parsetree->resultRelation,
1570 				  SelfItemPointerAttributeNumber,
1571 				  TIDOID,
1572 				  -1,
1573 				  InvalidOid,
1574 				  0);
1575 
1576 	/* Wrap it in a resjunk TLE with the right name ... */
1577 	attrname = "ctid";
1578 
1579 	tle = makeTargetEntry((Expr *) var,
1580 						  list_length(parsetree->targetList) + 1,
1581 						  pstrdup(attrname),
1582 						  true);
1583 
1584 	/* ... and add it to the query's targetlist */
1585 	parsetree->targetList = lappend(parsetree->targetList, tle);
1586 }
1587 
1588 /*
1589  * postgresPlanForeignModify
1590  *		Plan an insert/update/delete operation on a foreign table
1591  */
1592 static List *
postgresPlanForeignModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)1593 postgresPlanForeignModify(PlannerInfo *root,
1594 						  ModifyTable *plan,
1595 						  Index resultRelation,
1596 						  int subplan_index)
1597 {
1598 	CmdType		operation = plan->operation;
1599 	RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1600 	Relation	rel;
1601 	StringInfoData sql;
1602 	List	   *targetAttrs = NIL;
1603 	List	   *returningList = NIL;
1604 	List	   *retrieved_attrs = NIL;
1605 	bool		doNothing = false;
1606 
1607 	initStringInfo(&sql);
1608 
1609 	/*
1610 	 * Core code already has some lock on each rel being planned, so we can
1611 	 * use NoLock here.
1612 	 */
1613 	rel = heap_open(rte->relid, NoLock);
1614 
1615 	/*
1616 	 * In an INSERT, we transmit all columns that are defined in the foreign
1617 	 * table.  In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1618 	 * foreign table, we transmit all columns like INSERT; else we transmit
1619 	 * only columns that were explicitly targets of the UPDATE, so as to avoid
1620 	 * unnecessary data transmission.  (We can't do that for INSERT since we
1621 	 * would miss sending default values for columns not listed in the source
1622 	 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1623 	 * those triggers might change values for non-target columns, in which
1624 	 * case we would miss sending changed values for those columns.)
1625 	 */
1626 	if (operation == CMD_INSERT ||
1627 		(operation == CMD_UPDATE &&
1628 		 rel->trigdesc &&
1629 		 rel->trigdesc->trig_update_before_row))
1630 	{
1631 		TupleDesc	tupdesc = RelationGetDescr(rel);
1632 		int			attnum;
1633 
1634 		for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1635 		{
1636 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1637 
1638 			if (!attr->attisdropped)
1639 				targetAttrs = lappend_int(targetAttrs, attnum);
1640 		}
1641 	}
1642 	else if (operation == CMD_UPDATE)
1643 	{
1644 		int			col;
1645 
1646 		col = -1;
1647 		while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1648 		{
1649 			/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1650 			AttrNumber	attno = col + FirstLowInvalidHeapAttributeNumber;
1651 
1652 			if (attno <= InvalidAttrNumber) /* shouldn't happen */
1653 				elog(ERROR, "system-column update is not supported");
1654 			targetAttrs = lappend_int(targetAttrs, attno);
1655 		}
1656 	}
1657 
1658 	/*
1659 	 * Extract the relevant RETURNING list if any.
1660 	 */
1661 	if (plan->returningLists)
1662 		returningList = (List *) list_nth(plan->returningLists, subplan_index);
1663 
1664 	/*
1665 	 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1666 	 * should have already been rejected in the optimizer, as presently there
1667 	 * is no way to recognize an arbiter index on a foreign table.  Only DO
1668 	 * NOTHING is supported without an inference specification.
1669 	 */
1670 	if (plan->onConflictAction == ONCONFLICT_NOTHING)
1671 		doNothing = true;
1672 	else if (plan->onConflictAction != ONCONFLICT_NONE)
1673 		elog(ERROR, "unexpected ON CONFLICT specification: %d",
1674 			 (int) plan->onConflictAction);
1675 
1676 	/*
1677 	 * Construct the SQL command string.
1678 	 */
1679 	switch (operation)
1680 	{
1681 		case CMD_INSERT:
1682 			deparseInsertSql(&sql, rte, resultRelation, rel,
1683 							 targetAttrs, doNothing, returningList,
1684 							 &retrieved_attrs);
1685 			break;
1686 		case CMD_UPDATE:
1687 			deparseUpdateSql(&sql, rte, resultRelation, rel,
1688 							 targetAttrs, returningList,
1689 							 &retrieved_attrs);
1690 			break;
1691 		case CMD_DELETE:
1692 			deparseDeleteSql(&sql, rte, resultRelation, rel,
1693 							 returningList,
1694 							 &retrieved_attrs);
1695 			break;
1696 		default:
1697 			elog(ERROR, "unexpected operation: %d", (int) operation);
1698 			break;
1699 	}
1700 
1701 	heap_close(rel, NoLock);
1702 
1703 	/*
1704 	 * Build the fdw_private list that will be available to the executor.
1705 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
1706 	 */
1707 	return list_make4(makeString(sql.data),
1708 					  targetAttrs,
1709 					  makeInteger((retrieved_attrs != NIL)),
1710 					  retrieved_attrs);
1711 }
1712 
1713 /*
1714  * postgresBeginForeignModify
1715  *		Begin an insert/update/delete operation on a foreign table
1716  */
1717 static void
postgresBeginForeignModify(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo,List * fdw_private,int subplan_index,int eflags)1718 postgresBeginForeignModify(ModifyTableState *mtstate,
1719 						   ResultRelInfo *resultRelInfo,
1720 						   List *fdw_private,
1721 						   int subplan_index,
1722 						   int eflags)
1723 {
1724 	PgFdwModifyState *fmstate;
1725 	char	   *query;
1726 	List	   *target_attrs;
1727 	bool		has_returning;
1728 	List	   *retrieved_attrs;
1729 	RangeTblEntry *rte;
1730 
1731 	/*
1732 	 * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
1733 	 * stays NULL.
1734 	 */
1735 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1736 		return;
1737 
1738 	/* Deconstruct fdw_private data. */
1739 	query = strVal(list_nth(fdw_private,
1740 							FdwModifyPrivateUpdateSql));
1741 	target_attrs = (List *) list_nth(fdw_private,
1742 									 FdwModifyPrivateTargetAttnums);
1743 	has_returning = intVal(list_nth(fdw_private,
1744 									FdwModifyPrivateHasReturning));
1745 	retrieved_attrs = (List *) list_nth(fdw_private,
1746 										FdwModifyPrivateRetrievedAttrs);
1747 
1748 	/* Find RTE. */
1749 	rte = rt_fetch(resultRelInfo->ri_RangeTableIndex,
1750 				   mtstate->ps.state->es_range_table);
1751 
1752 	/* Construct an execution state. */
1753 	fmstate = create_foreign_modify(mtstate->ps.state,
1754 									rte,
1755 									resultRelInfo,
1756 									mtstate->operation,
1757 									mtstate->mt_plans[subplan_index]->plan,
1758 									query,
1759 									target_attrs,
1760 									has_returning,
1761 									retrieved_attrs);
1762 
1763 	resultRelInfo->ri_FdwState = fmstate;
1764 }
1765 
1766 /*
1767  * postgresExecForeignInsert
1768  *		Insert one row into a foreign table
1769  */
1770 static TupleTableSlot *
postgresExecForeignInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1771 postgresExecForeignInsert(EState *estate,
1772 						  ResultRelInfo *resultRelInfo,
1773 						  TupleTableSlot *slot,
1774 						  TupleTableSlot *planSlot)
1775 {
1776 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1777 	const char **p_values;
1778 	PGresult   *res;
1779 	int			n_rows;
1780 
1781 	/*
1782 	 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1783 	 * postgresBeginForeignInsert())
1784 	 */
1785 	if (fmstate->aux_fmstate)
1786 		fmstate = fmstate->aux_fmstate;
1787 
1788 	/* Set up the prepared statement on the remote server, if we didn't yet */
1789 	if (!fmstate->p_name)
1790 		prepare_foreign_modify(fmstate);
1791 
1792 	/* Convert parameters needed by prepared statement to text form */
1793 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1794 
1795 	/*
1796 	 * Execute the prepared statement.
1797 	 */
1798 	if (!PQsendQueryPrepared(fmstate->conn,
1799 							 fmstate->p_name,
1800 							 fmstate->p_nums,
1801 							 p_values,
1802 							 NULL,
1803 							 NULL,
1804 							 0))
1805 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1806 
1807 	/*
1808 	 * Get the result, and check for success.
1809 	 *
1810 	 * We don't use a PG_TRY block here, so be careful not to throw error
1811 	 * without releasing the PGresult.
1812 	 */
1813 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
1814 	if (PQresultStatus(res) !=
1815 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1816 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1817 
1818 	/* Check number of rows affected, and fetch RETURNING tuple if any */
1819 	if (fmstate->has_returning)
1820 	{
1821 		n_rows = PQntuples(res);
1822 		if (n_rows > 0)
1823 			store_returning_result(fmstate, slot, res);
1824 	}
1825 	else
1826 		n_rows = atoi(PQcmdTuples(res));
1827 
1828 	/* And clean up */
1829 	PQclear(res);
1830 
1831 	MemoryContextReset(fmstate->temp_cxt);
1832 
1833 	/* Return NULL if nothing was inserted on the remote end */
1834 	return (n_rows > 0) ? slot : NULL;
1835 }
1836 
1837 /*
1838  * postgresExecForeignUpdate
1839  *		Update one row in a foreign table
1840  */
1841 static TupleTableSlot *
postgresExecForeignUpdate(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1842 postgresExecForeignUpdate(EState *estate,
1843 						  ResultRelInfo *resultRelInfo,
1844 						  TupleTableSlot *slot,
1845 						  TupleTableSlot *planSlot)
1846 {
1847 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1848 	Datum		datum;
1849 	bool		isNull;
1850 	const char **p_values;
1851 	PGresult   *res;
1852 	int			n_rows;
1853 
1854 	/* Set up the prepared statement on the remote server, if we didn't yet */
1855 	if (!fmstate->p_name)
1856 		prepare_foreign_modify(fmstate);
1857 
1858 	/* Get the ctid that was passed up as a resjunk column */
1859 	datum = ExecGetJunkAttribute(planSlot,
1860 								 fmstate->ctidAttno,
1861 								 &isNull);
1862 	/* shouldn't ever get a null result... */
1863 	if (isNull)
1864 		elog(ERROR, "ctid is NULL");
1865 
1866 	/* Convert parameters needed by prepared statement to text form */
1867 	p_values = convert_prep_stmt_params(fmstate,
1868 										(ItemPointer) DatumGetPointer(datum),
1869 										slot);
1870 
1871 	/*
1872 	 * Execute the prepared statement.
1873 	 */
1874 	if (!PQsendQueryPrepared(fmstate->conn,
1875 							 fmstate->p_name,
1876 							 fmstate->p_nums,
1877 							 p_values,
1878 							 NULL,
1879 							 NULL,
1880 							 0))
1881 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1882 
1883 	/*
1884 	 * Get the result, and check for success.
1885 	 *
1886 	 * We don't use a PG_TRY block here, so be careful not to throw error
1887 	 * without releasing the PGresult.
1888 	 */
1889 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
1890 	if (PQresultStatus(res) !=
1891 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1892 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1893 
1894 	/* Check number of rows affected, and fetch RETURNING tuple if any */
1895 	if (fmstate->has_returning)
1896 	{
1897 		n_rows = PQntuples(res);
1898 		if (n_rows > 0)
1899 			store_returning_result(fmstate, slot, res);
1900 	}
1901 	else
1902 		n_rows = atoi(PQcmdTuples(res));
1903 
1904 	/* And clean up */
1905 	PQclear(res);
1906 
1907 	MemoryContextReset(fmstate->temp_cxt);
1908 
1909 	/* Return NULL if nothing was updated on the remote end */
1910 	return (n_rows > 0) ? slot : NULL;
1911 }
1912 
1913 /*
1914  * postgresExecForeignDelete
1915  *		Delete one row from a foreign table
1916  */
1917 static TupleTableSlot *
postgresExecForeignDelete(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1918 postgresExecForeignDelete(EState *estate,
1919 						  ResultRelInfo *resultRelInfo,
1920 						  TupleTableSlot *slot,
1921 						  TupleTableSlot *planSlot)
1922 {
1923 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1924 	Datum		datum;
1925 	bool		isNull;
1926 	const char **p_values;
1927 	PGresult   *res;
1928 	int			n_rows;
1929 
1930 	/* Set up the prepared statement on the remote server, if we didn't yet */
1931 	if (!fmstate->p_name)
1932 		prepare_foreign_modify(fmstate);
1933 
1934 	/* Get the ctid that was passed up as a resjunk column */
1935 	datum = ExecGetJunkAttribute(planSlot,
1936 								 fmstate->ctidAttno,
1937 								 &isNull);
1938 	/* shouldn't ever get a null result... */
1939 	if (isNull)
1940 		elog(ERROR, "ctid is NULL");
1941 
1942 	/* Convert parameters needed by prepared statement to text form */
1943 	p_values = convert_prep_stmt_params(fmstate,
1944 										(ItemPointer) DatumGetPointer(datum),
1945 										NULL);
1946 
1947 	/*
1948 	 * Execute the prepared statement.
1949 	 */
1950 	if (!PQsendQueryPrepared(fmstate->conn,
1951 							 fmstate->p_name,
1952 							 fmstate->p_nums,
1953 							 p_values,
1954 							 NULL,
1955 							 NULL,
1956 							 0))
1957 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1958 
1959 	/*
1960 	 * Get the result, and check for success.
1961 	 *
1962 	 * We don't use a PG_TRY block here, so be careful not to throw error
1963 	 * without releasing the PGresult.
1964 	 */
1965 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
1966 	if (PQresultStatus(res) !=
1967 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1968 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1969 
1970 	/* Check number of rows affected, and fetch RETURNING tuple if any */
1971 	if (fmstate->has_returning)
1972 	{
1973 		n_rows = PQntuples(res);
1974 		if (n_rows > 0)
1975 			store_returning_result(fmstate, slot, res);
1976 	}
1977 	else
1978 		n_rows = atoi(PQcmdTuples(res));
1979 
1980 	/* And clean up */
1981 	PQclear(res);
1982 
1983 	MemoryContextReset(fmstate->temp_cxt);
1984 
1985 	/* Return NULL if nothing was deleted on the remote end */
1986 	return (n_rows > 0) ? slot : NULL;
1987 }
1988 
1989 /*
1990  * postgresEndForeignModify
1991  *		Finish an insert/update/delete operation on a foreign table
1992  */
1993 static void
postgresEndForeignModify(EState * estate,ResultRelInfo * resultRelInfo)1994 postgresEndForeignModify(EState *estate,
1995 						 ResultRelInfo *resultRelInfo)
1996 {
1997 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1998 
1999 	/* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2000 	if (fmstate == NULL)
2001 		return;
2002 
2003 	/* Destroy the execution state */
2004 	finish_foreign_modify(fmstate);
2005 }
2006 
2007 /*
2008  * postgresBeginForeignInsert
2009  *		Begin an insert operation on a foreign table
2010  */
2011 static void
postgresBeginForeignInsert(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo)2012 postgresBeginForeignInsert(ModifyTableState *mtstate,
2013 						   ResultRelInfo *resultRelInfo)
2014 {
2015 	PgFdwModifyState *fmstate;
2016 	ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
2017 	EState	   *estate = mtstate->ps.state;
2018 	Index		resultRelation;
2019 	Relation	rel = resultRelInfo->ri_RelationDesc;
2020 	RangeTblEntry *rte;
2021 	TupleDesc	tupdesc = RelationGetDescr(rel);
2022 	int			attnum;
2023 	StringInfoData sql;
2024 	List	   *targetAttrs = NIL;
2025 	List	   *retrieved_attrs = NIL;
2026 	bool		doNothing = false;
2027 
2028 	/*
2029 	 * If the foreign table we are about to insert routed rows into is also
2030 	 * an UPDATE subplan result rel that will be updated later, proceeding
2031 	 * with the INSERT will result in the later UPDATE incorrectly modifying
2032 	 * those routed rows, so prevent the INSERT --- it would be nice if we
2033 	 * could handle this case; but for now, throw an error for safety.
2034 	 */
2035 	if (plan && plan->operation == CMD_UPDATE &&
2036 		(resultRelInfo->ri_usesFdwDirectModify ||
2037 		 resultRelInfo->ri_FdwState) &&
2038 		resultRelInfo > mtstate->resultRelInfo + mtstate->mt_whichplan)
2039 		ereport(ERROR,
2040 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2041 				 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
2042 						RelationGetRelationName(rel))));
2043 
2044 	initStringInfo(&sql);
2045 
2046 	/* We transmit all columns that are defined in the foreign table. */
2047 	for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2048 	{
2049 		Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
2050 
2051 		if (!attr->attisdropped)
2052 			targetAttrs = lappend_int(targetAttrs, attnum);
2053 	}
2054 
2055 	/* Check if we add the ON CONFLICT clause to the remote query. */
2056 	if (plan)
2057 	{
2058 		OnConflictAction onConflictAction = plan->onConflictAction;
2059 
2060 		/* We only support DO NOTHING without an inference specification. */
2061 		if (onConflictAction == ONCONFLICT_NOTHING)
2062 			doNothing = true;
2063 		else if (onConflictAction != ONCONFLICT_NONE)
2064 			elog(ERROR, "unexpected ON CONFLICT specification: %d",
2065 				 (int) onConflictAction);
2066 	}
2067 
2068 	/*
2069 	 * If the foreign table is a partition that doesn't have a corresponding
2070 	 * RTE entry, we need to create a new RTE
2071 	 * describing the foreign table for use by deparseInsertSql and
2072 	 * create_foreign_modify() below, after first copying the parent's RTE and
2073 	 * modifying some fields to describe the foreign partition to work on.
2074 	 * However, if this is invoked by UPDATE, the existing RTE may already
2075 	 * correspond to this partition if it is one of the UPDATE subplan target
2076 	 * rels; in that case, we can just use the existing RTE as-is.
2077 	 */
2078 	if (resultRelInfo->ri_RangeTableIndex == 0)
2079 	{
2080 		ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
2081 
2082 		rte = list_nth(estate->es_range_table, rootResultRelInfo->ri_RangeTableIndex - 1);
2083 		rte = copyObject(rte);
2084 		rte->relid = RelationGetRelid(rel);
2085 		rte->relkind = RELKIND_FOREIGN_TABLE;
2086 
2087 		/*
2088 		 * For UPDATE, we must use the RT index of the first subplan target
2089 		 * rel's RTE, because the core code would have built expressions for
2090 		 * the partition, such as RETURNING, using that RT index as varno of
2091 		 * Vars contained in those expressions.
2092 		 */
2093 		if (plan && plan->operation == CMD_UPDATE &&
2094 			rootResultRelInfo->ri_RangeTableIndex == plan->nominalRelation)
2095 			resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2096 		else
2097 			resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2098 	}
2099 	else
2100 	{
2101 		resultRelation = resultRelInfo->ri_RangeTableIndex;
2102 		rte = list_nth(estate->es_range_table, resultRelation - 1);
2103 	}
2104 
2105 	/* Construct the SQL command string. */
2106 	deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2107 					 resultRelInfo->ri_returningList, &retrieved_attrs);
2108 
2109 	/* Construct an execution state. */
2110 	fmstate = create_foreign_modify(mtstate->ps.state,
2111 									rte,
2112 									resultRelInfo,
2113 									CMD_INSERT,
2114 									NULL,
2115 									sql.data,
2116 									targetAttrs,
2117 									retrieved_attrs != NIL,
2118 									retrieved_attrs);
2119 
2120 	/*
2121 	 * If the given resultRelInfo already has PgFdwModifyState set, it means
2122 	 * the foreign table is an UPDATE subplan result rel; in which case, store
2123 	 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2124 	 */
2125 	if (resultRelInfo->ri_FdwState)
2126 	{
2127 		Assert(plan && plan->operation == CMD_UPDATE);
2128 		Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2129 		((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2130 	}
2131 	else
2132 		resultRelInfo->ri_FdwState = fmstate;
2133 }
2134 
2135 /*
2136  * postgresEndForeignInsert
2137  *		Finish an insert operation on a foreign table
2138  */
2139 static void
postgresEndForeignInsert(EState * estate,ResultRelInfo * resultRelInfo)2140 postgresEndForeignInsert(EState *estate,
2141 						 ResultRelInfo *resultRelInfo)
2142 {
2143 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2144 
2145 	Assert(fmstate != NULL);
2146 
2147 	/*
2148 	 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2149 	 * postgresBeginForeignInsert())
2150 	 */
2151 	if (fmstate->aux_fmstate)
2152 		fmstate = fmstate->aux_fmstate;
2153 
2154 	/* Destroy the execution state */
2155 	finish_foreign_modify(fmstate);
2156 }
2157 
2158 /*
2159  * postgresIsForeignRelUpdatable
2160  *		Determine whether a foreign table supports INSERT, UPDATE and/or
2161  *		DELETE.
2162  */
2163 static int
postgresIsForeignRelUpdatable(Relation rel)2164 postgresIsForeignRelUpdatable(Relation rel)
2165 {
2166 	bool		updatable;
2167 	ForeignTable *table;
2168 	ForeignServer *server;
2169 	ListCell   *lc;
2170 
2171 	/*
2172 	 * By default, all postgres_fdw foreign tables are assumed updatable. This
2173 	 * can be overridden by a per-server setting, which in turn can be
2174 	 * overridden by a per-table setting.
2175 	 */
2176 	updatable = true;
2177 
2178 	table = GetForeignTable(RelationGetRelid(rel));
2179 	server = GetForeignServer(table->serverid);
2180 
2181 	foreach(lc, server->options)
2182 	{
2183 		DefElem    *def = (DefElem *) lfirst(lc);
2184 
2185 		if (strcmp(def->defname, "updatable") == 0)
2186 			updatable = defGetBoolean(def);
2187 	}
2188 	foreach(lc, table->options)
2189 	{
2190 		DefElem    *def = (DefElem *) lfirst(lc);
2191 
2192 		if (strcmp(def->defname, "updatable") == 0)
2193 			updatable = defGetBoolean(def);
2194 	}
2195 
2196 	/*
2197 	 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2198 	 */
2199 	return updatable ?
2200 		(1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2201 }
2202 
2203 /*
2204  * postgresRecheckForeignScan
2205  *		Execute a local join execution plan for a foreign join
2206  */
2207 static bool
postgresRecheckForeignScan(ForeignScanState * node,TupleTableSlot * slot)2208 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2209 {
2210 	Index		scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2211 	PlanState  *outerPlan = outerPlanState(node);
2212 	TupleTableSlot *result;
2213 
2214 	/* For base foreign relations, it suffices to set fdw_recheck_quals */
2215 	if (scanrelid > 0)
2216 		return true;
2217 
2218 	Assert(outerPlan != NULL);
2219 
2220 	/* Execute a local join execution plan */
2221 	result = ExecProcNode(outerPlan);
2222 	if (TupIsNull(result))
2223 		return false;
2224 
2225 	/* Store result in the given slot */
2226 	ExecCopySlot(slot, result);
2227 
2228 	return true;
2229 }
2230 
2231 /*
2232  * postgresPlanDirectModify
2233  *		Consider a direct foreign table modification
2234  *
2235  * Decide whether it is safe to modify a foreign table directly, and if so,
2236  * rewrite subplan accordingly.
2237  */
2238 static bool
postgresPlanDirectModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)2239 postgresPlanDirectModify(PlannerInfo *root,
2240 						 ModifyTable *plan,
2241 						 Index resultRelation,
2242 						 int subplan_index)
2243 {
2244 	CmdType		operation = plan->operation;
2245 	Plan	   *subplan;
2246 	RelOptInfo *foreignrel;
2247 	RangeTblEntry *rte;
2248 	PgFdwRelationInfo *fpinfo;
2249 	Relation	rel;
2250 	StringInfoData sql;
2251 	ForeignScan *fscan;
2252 	List	   *targetAttrs = NIL;
2253 	List	   *remote_exprs;
2254 	List	   *params_list = NIL;
2255 	List	   *returningList = NIL;
2256 	List	   *retrieved_attrs = NIL;
2257 
2258 	/*
2259 	 * Decide whether it is safe to modify a foreign table directly.
2260 	 */
2261 
2262 	/*
2263 	 * The table modification must be an UPDATE or DELETE.
2264 	 */
2265 	if (operation != CMD_UPDATE && operation != CMD_DELETE)
2266 		return false;
2267 
2268 	/*
2269 	 * It's unsafe to modify a foreign table directly if there are any local
2270 	 * joins needed.
2271 	 */
2272 	subplan = (Plan *) list_nth(plan->plans, subplan_index);
2273 	if (!IsA(subplan, ForeignScan))
2274 		return false;
2275 	fscan = (ForeignScan *) subplan;
2276 
2277 	/*
2278 	 * It's unsafe to modify a foreign table directly if there are any quals
2279 	 * that should be evaluated locally.
2280 	 */
2281 	if (subplan->qual != NIL)
2282 		return false;
2283 
2284 	/* Safe to fetch data about the target foreign rel */
2285 	if (fscan->scan.scanrelid == 0)
2286 	{
2287 		foreignrel = find_join_rel(root, fscan->fs_relids);
2288 		/* We should have a rel for this foreign join. */
2289 		Assert(foreignrel);
2290 	}
2291 	else
2292 		foreignrel = root->simple_rel_array[resultRelation];
2293 	rte = root->simple_rte_array[resultRelation];
2294 	fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2295 
2296 	/*
2297 	 * It's unsafe to update a foreign table directly, if any expressions to
2298 	 * assign to the target columns are unsafe to evaluate remotely.
2299 	 */
2300 	if (operation == CMD_UPDATE)
2301 	{
2302 		int			col;
2303 
2304 		/*
2305 		 * We transmit only columns that were explicitly targets of the
2306 		 * UPDATE, so as to avoid unnecessary data transmission.
2307 		 */
2308 		col = -1;
2309 		while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2310 		{
2311 			/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2312 			AttrNumber	attno = col + FirstLowInvalidHeapAttributeNumber;
2313 			TargetEntry *tle;
2314 
2315 			if (attno <= InvalidAttrNumber) /* shouldn't happen */
2316 				elog(ERROR, "system-column update is not supported");
2317 
2318 			tle = get_tle_by_resno(subplan->targetlist, attno);
2319 
2320 			if (!tle)
2321 				elog(ERROR, "attribute number %d not found in subplan targetlist",
2322 					 attno);
2323 
2324 			if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2325 				return false;
2326 
2327 			targetAttrs = lappend_int(targetAttrs, attno);
2328 		}
2329 	}
2330 
2331 	/*
2332 	 * Ok, rewrite subplan so as to modify the foreign table directly.
2333 	 */
2334 	initStringInfo(&sql);
2335 
2336 	/*
2337 	 * Core code already has some lock on each rel being planned, so we can
2338 	 * use NoLock here.
2339 	 */
2340 	rel = heap_open(rte->relid, NoLock);
2341 
2342 	/*
2343 	 * Recall the qual clauses that must be evaluated remotely.  (These are
2344 	 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2345 	 * doesn't care.)
2346 	 */
2347 	remote_exprs = fpinfo->final_remote_exprs;
2348 
2349 	/*
2350 	 * Extract the relevant RETURNING list if any.
2351 	 */
2352 	if (plan->returningLists)
2353 	{
2354 		returningList = (List *) list_nth(plan->returningLists, subplan_index);
2355 
2356 		/*
2357 		 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2358 		 * we fetch from the foreign server any Vars specified in RETURNING
2359 		 * that refer not only to the target relation but to non-target
2360 		 * relations.  So we'll deparse them into the RETURNING clause of the
2361 		 * remote query; use a targetlist consisting of them instead, which
2362 		 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2363 		 * node below.
2364 		 */
2365 		if (fscan->scan.scanrelid == 0)
2366 			returningList = build_remote_returning(resultRelation, rel,
2367 												   returningList);
2368 	}
2369 
2370 	/*
2371 	 * Construct the SQL command string.
2372 	 */
2373 	switch (operation)
2374 	{
2375 		case CMD_UPDATE:
2376 			deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2377 								   foreignrel,
2378 								   ((Plan *) fscan)->targetlist,
2379 								   targetAttrs,
2380 								   remote_exprs, &params_list,
2381 								   returningList, &retrieved_attrs);
2382 			break;
2383 		case CMD_DELETE:
2384 			deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2385 								   foreignrel,
2386 								   remote_exprs, &params_list,
2387 								   returningList, &retrieved_attrs);
2388 			break;
2389 		default:
2390 			elog(ERROR, "unexpected operation: %d", (int) operation);
2391 			break;
2392 	}
2393 
2394 	/*
2395 	 * Update the operation info.
2396 	 */
2397 	fscan->operation = operation;
2398 
2399 	/*
2400 	 * Update the fdw_exprs list that will be available to the executor.
2401 	 */
2402 	fscan->fdw_exprs = params_list;
2403 
2404 	/*
2405 	 * Update the fdw_private list that will be available to the executor.
2406 	 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2407 	 */
2408 	fscan->fdw_private = list_make4(makeString(sql.data),
2409 									makeInteger((retrieved_attrs != NIL)),
2410 									retrieved_attrs,
2411 									makeInteger(plan->canSetTag));
2412 
2413 	/*
2414 	 * Update the foreign-join-related fields.
2415 	 */
2416 	if (fscan->scan.scanrelid == 0)
2417 	{
2418 		/* No need for the outer subplan. */
2419 		fscan->scan.plan.lefttree = NULL;
2420 
2421 		/* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2422 		if (returningList)
2423 			rebuild_fdw_scan_tlist(fscan, returningList);
2424 	}
2425 
2426 	heap_close(rel, NoLock);
2427 	return true;
2428 }
2429 
2430 /*
2431  * postgresBeginDirectModify
2432  *		Prepare a direct foreign table modification
2433  */
2434 static void
postgresBeginDirectModify(ForeignScanState * node,int eflags)2435 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2436 {
2437 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2438 	EState	   *estate = node->ss.ps.state;
2439 	PgFdwDirectModifyState *dmstate;
2440 	Index		rtindex;
2441 	RangeTblEntry *rte;
2442 	Oid			userid;
2443 	ForeignTable *table;
2444 	UserMapping *user;
2445 	int			numParams;
2446 
2447 	/*
2448 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
2449 	 */
2450 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2451 		return;
2452 
2453 	/*
2454 	 * We'll save private state in node->fdw_state.
2455 	 */
2456 	dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2457 	node->fdw_state = (void *) dmstate;
2458 
2459 	/*
2460 	 * Identify which user to do the remote access as.  This should match what
2461 	 * ExecCheckRTEPerms() does.
2462 	 */
2463 	rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
2464 	rte = rt_fetch(rtindex, estate->es_range_table);
2465 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2466 
2467 	/* Get info about foreign table. */
2468 	if (fsplan->scan.scanrelid == 0)
2469 		dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2470 	else
2471 		dmstate->rel = node->ss.ss_currentRelation;
2472 	table = GetForeignTable(RelationGetRelid(dmstate->rel));
2473 	user = GetUserMapping(userid, table->serverid);
2474 
2475 	/*
2476 	 * Get connection to the foreign server.  Connection manager will
2477 	 * establish new connection if necessary.
2478 	 */
2479 	dmstate->conn = GetConnection(user, false);
2480 
2481 	/* Update the foreign-join-related fields. */
2482 	if (fsplan->scan.scanrelid == 0)
2483 	{
2484 		/* Save info about foreign table. */
2485 		dmstate->resultRel = dmstate->rel;
2486 
2487 		/*
2488 		 * Set dmstate->rel to NULL to teach get_returning_data() and
2489 		 * make_tuple_from_result_row() that columns fetched from the remote
2490 		 * server are described by fdw_scan_tlist of the foreign-scan plan
2491 		 * node, not the tuple descriptor for the target relation.
2492 		 */
2493 		dmstate->rel = NULL;
2494 	}
2495 
2496 	/* Initialize state variable */
2497 	dmstate->num_tuples = -1;	/* -1 means not set yet */
2498 
2499 	/* Get private info created by planner functions. */
2500 	dmstate->query = strVal(list_nth(fsplan->fdw_private,
2501 									 FdwDirectModifyPrivateUpdateSql));
2502 	dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2503 											 FdwDirectModifyPrivateHasReturning));
2504 	dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2505 												 FdwDirectModifyPrivateRetrievedAttrs);
2506 	dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2507 											 FdwDirectModifyPrivateSetProcessed));
2508 
2509 	/* Create context for per-tuple temp workspace. */
2510 	dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2511 											  "postgres_fdw temporary data",
2512 											  ALLOCSET_SMALL_SIZES);
2513 
2514 	/* Prepare for input conversion of RETURNING results. */
2515 	if (dmstate->has_returning)
2516 	{
2517 		TupleDesc	tupdesc;
2518 
2519 		if (fsplan->scan.scanrelid == 0)
2520 			tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2521 		else
2522 			tupdesc = RelationGetDescr(dmstate->rel);
2523 
2524 		dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2525 
2526 		/*
2527 		 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2528 		 * initialize a filter to extract an updated/deleted tuple from a scan
2529 		 * tuple.
2530 		 */
2531 		if (fsplan->scan.scanrelid == 0)
2532 			init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2533 	}
2534 
2535 	/*
2536 	 * Prepare for processing of parameters used in remote query, if any.
2537 	 */
2538 	numParams = list_length(fsplan->fdw_exprs);
2539 	dmstate->numParams = numParams;
2540 	if (numParams > 0)
2541 		prepare_query_params((PlanState *) node,
2542 							 fsplan->fdw_exprs,
2543 							 numParams,
2544 							 &dmstate->param_flinfo,
2545 							 &dmstate->param_exprs,
2546 							 &dmstate->param_values);
2547 }
2548 
2549 /*
2550  * postgresIterateDirectModify
2551  *		Execute a direct foreign table modification
2552  */
2553 static TupleTableSlot *
postgresIterateDirectModify(ForeignScanState * node)2554 postgresIterateDirectModify(ForeignScanState *node)
2555 {
2556 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2557 	EState	   *estate = node->ss.ps.state;
2558 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2559 
2560 	/*
2561 	 * If this is the first call after Begin, execute the statement.
2562 	 */
2563 	if (dmstate->num_tuples == -1)
2564 		execute_dml_stmt(node);
2565 
2566 	/*
2567 	 * If the local query doesn't specify RETURNING, just clear tuple slot.
2568 	 */
2569 	if (!resultRelInfo->ri_projectReturning)
2570 	{
2571 		TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2572 		Instrumentation *instr = node->ss.ps.instrument;
2573 
2574 		Assert(!dmstate->has_returning);
2575 
2576 		/* Increment the command es_processed count if necessary. */
2577 		if (dmstate->set_processed)
2578 			estate->es_processed += dmstate->num_tuples;
2579 
2580 		/* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2581 		if (instr)
2582 			instr->tuplecount += dmstate->num_tuples;
2583 
2584 		return ExecClearTuple(slot);
2585 	}
2586 
2587 	/*
2588 	 * Get the next RETURNING tuple.
2589 	 */
2590 	return get_returning_data(node);
2591 }
2592 
2593 /*
2594  * postgresEndDirectModify
2595  *		Finish a direct foreign table modification
2596  */
2597 static void
postgresEndDirectModify(ForeignScanState * node)2598 postgresEndDirectModify(ForeignScanState *node)
2599 {
2600 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2601 
2602 	/* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2603 	if (dmstate == NULL)
2604 		return;
2605 
2606 	/* Release PGresult */
2607 	if (dmstate->result)
2608 		PQclear(dmstate->result);
2609 
2610 	/* Release remote connection */
2611 	ReleaseConnection(dmstate->conn);
2612 	dmstate->conn = NULL;
2613 
2614 	/* close the target relation. */
2615 	if (dmstate->resultRel)
2616 		ExecCloseScanRelation(dmstate->resultRel);
2617 
2618 	/* MemoryContext will be deleted automatically. */
2619 }
2620 
2621 /*
2622  * postgresExplainForeignScan
2623  *		Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2624  */
2625 static void
postgresExplainForeignScan(ForeignScanState * node,ExplainState * es)2626 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2627 {
2628 	List	   *fdw_private;
2629 	char	   *sql;
2630 	char	   *relations;
2631 
2632 	fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2633 
2634 	/*
2635 	 * Add names of relation handled by the foreign scan when the scan is a
2636 	 * join
2637 	 */
2638 	if (list_length(fdw_private) > FdwScanPrivateRelations)
2639 	{
2640 		relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2641 		ExplainPropertyText("Relations", relations, es);
2642 	}
2643 
2644 	/*
2645 	 * Add remote query, when VERBOSE option is specified.
2646 	 */
2647 	if (es->verbose)
2648 	{
2649 		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2650 		ExplainPropertyText("Remote SQL", sql, es);
2651 	}
2652 }
2653 
2654 /*
2655  * postgresExplainForeignModify
2656  *		Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2657  */
2658 static void
postgresExplainForeignModify(ModifyTableState * mtstate,ResultRelInfo * rinfo,List * fdw_private,int subplan_index,ExplainState * es)2659 postgresExplainForeignModify(ModifyTableState *mtstate,
2660 							 ResultRelInfo *rinfo,
2661 							 List *fdw_private,
2662 							 int subplan_index,
2663 							 ExplainState *es)
2664 {
2665 	if (es->verbose)
2666 	{
2667 		char	   *sql = strVal(list_nth(fdw_private,
2668 										  FdwModifyPrivateUpdateSql));
2669 
2670 		ExplainPropertyText("Remote SQL", sql, es);
2671 	}
2672 }
2673 
2674 /*
2675  * postgresExplainDirectModify
2676  *		Produce extra output for EXPLAIN of a ForeignScan that modifies a
2677  *		foreign table directly
2678  */
2679 static void
postgresExplainDirectModify(ForeignScanState * node,ExplainState * es)2680 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2681 {
2682 	List	   *fdw_private;
2683 	char	   *sql;
2684 
2685 	if (es->verbose)
2686 	{
2687 		fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2688 		sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2689 		ExplainPropertyText("Remote SQL", sql, es);
2690 	}
2691 }
2692 
2693 
2694 /*
2695  * estimate_path_cost_size
2696  *		Get cost and size estimates for a foreign scan on given foreign relation
2697  *		either a base relation or a join between foreign relations or an upper
2698  *		relation containing foreign relations.
2699  *
2700  * param_join_conds are the parameterization clauses with outer relations.
2701  * pathkeys specify the expected sort order if any for given path being costed.
2702  *
2703  * The function returns the cost and size estimates in p_rows, p_width,
2704  * p_startup_cost and p_total_cost variables.
2705  */
2706 static void
estimate_path_cost_size(PlannerInfo * root,RelOptInfo * foreignrel,List * param_join_conds,List * pathkeys,double * p_rows,int * p_width,Cost * p_startup_cost,Cost * p_total_cost)2707 estimate_path_cost_size(PlannerInfo *root,
2708 						RelOptInfo *foreignrel,
2709 						List *param_join_conds,
2710 						List *pathkeys,
2711 						double *p_rows, int *p_width,
2712 						Cost *p_startup_cost, Cost *p_total_cost)
2713 {
2714 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2715 	double		rows;
2716 	double		retrieved_rows;
2717 	int			width;
2718 	Cost		startup_cost;
2719 	Cost		total_cost;
2720 	Cost		cpu_per_tuple;
2721 
2722 	/*
2723 	 * If the table or the server is configured to use remote estimates,
2724 	 * connect to the foreign server and execute EXPLAIN to estimate the
2725 	 * number of rows selected by the restriction+join clauses.  Otherwise,
2726 	 * estimate rows using whatever statistics we have locally, in a way
2727 	 * similar to ordinary tables.
2728 	 */
2729 	if (fpinfo->use_remote_estimate)
2730 	{
2731 		List	   *remote_param_join_conds;
2732 		List	   *local_param_join_conds;
2733 		StringInfoData sql;
2734 		PGconn	   *conn;
2735 		Selectivity local_sel;
2736 		QualCost	local_cost;
2737 		List	   *fdw_scan_tlist = NIL;
2738 		List	   *remote_conds;
2739 
2740 		/* Required only to be passed to deparseSelectStmtForRel */
2741 		List	   *retrieved_attrs;
2742 
2743 		/*
2744 		 * param_join_conds might contain both clauses that are safe to send
2745 		 * across, and clauses that aren't.
2746 		 */
2747 		classifyConditions(root, foreignrel, param_join_conds,
2748 						   &remote_param_join_conds, &local_param_join_conds);
2749 
2750 		/* Build the list of columns to be fetched from the foreign server. */
2751 		if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2752 			fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2753 		else
2754 			fdw_scan_tlist = NIL;
2755 
2756 		/*
2757 		 * The complete list of remote conditions includes everything from
2758 		 * baserestrictinfo plus any extra join_conds relevant to this
2759 		 * particular path.
2760 		 */
2761 		remote_conds = list_concat(list_copy(remote_param_join_conds),
2762 								   fpinfo->remote_conds);
2763 
2764 		/*
2765 		 * Construct EXPLAIN query including the desired SELECT, FROM, and
2766 		 * WHERE clauses. Params and other-relation Vars are replaced by dummy
2767 		 * values, so don't request params_list.
2768 		 */
2769 		initStringInfo(&sql);
2770 		appendStringInfoString(&sql, "EXPLAIN ");
2771 		deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2772 								remote_conds, pathkeys, false,
2773 								&retrieved_attrs, NULL);
2774 
2775 		/* Get the remote estimate */
2776 		conn = GetConnection(fpinfo->user, false);
2777 		get_remote_estimate(sql.data, conn, &rows, &width,
2778 							&startup_cost, &total_cost);
2779 		ReleaseConnection(conn);
2780 
2781 		retrieved_rows = rows;
2782 
2783 		/* Factor in the selectivity of the locally-checked quals */
2784 		local_sel = clauselist_selectivity(root,
2785 										   local_param_join_conds,
2786 										   foreignrel->relid,
2787 										   JOIN_INNER,
2788 										   NULL);
2789 		local_sel *= fpinfo->local_conds_sel;
2790 
2791 		rows = clamp_row_est(rows * local_sel);
2792 
2793 		/* Add in the eval cost of the locally-checked quals */
2794 		startup_cost += fpinfo->local_conds_cost.startup;
2795 		total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2796 		cost_qual_eval(&local_cost, local_param_join_conds, root);
2797 		startup_cost += local_cost.startup;
2798 		total_cost += local_cost.per_tuple * retrieved_rows;
2799 	}
2800 	else
2801 	{
2802 		Cost		run_cost = 0;
2803 
2804 		/*
2805 		 * We don't support join conditions in this mode (hence, no
2806 		 * parameterized paths can be made).
2807 		 */
2808 		Assert(param_join_conds == NIL);
2809 
2810 		/*
2811 		 * Use rows/width estimates made by set_baserel_size_estimates() for
2812 		 * base foreign relations and set_joinrel_size_estimates() for join
2813 		 * between foreign relations.
2814 		 */
2815 		rows = foreignrel->rows;
2816 		width = foreignrel->reltarget->width;
2817 
2818 		/* Back into an estimate of the number of retrieved rows. */
2819 		retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2820 
2821 		/*
2822 		 * We will come here again and again with different set of pathkeys
2823 		 * that caller wants to cost. We don't need to calculate the cost of
2824 		 * bare scan each time. Instead, use the costs if we have cached them
2825 		 * already.
2826 		 */
2827 		if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2828 		{
2829 			startup_cost = fpinfo->rel_startup_cost;
2830 			run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2831 		}
2832 		else if (IS_JOIN_REL(foreignrel))
2833 		{
2834 			PgFdwRelationInfo *fpinfo_i;
2835 			PgFdwRelationInfo *fpinfo_o;
2836 			QualCost	join_cost;
2837 			QualCost	remote_conds_cost;
2838 			double		nrows;
2839 
2840 			/* For join we expect inner and outer relations set */
2841 			Assert(fpinfo->innerrel && fpinfo->outerrel);
2842 
2843 			fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2844 			fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2845 
2846 			/* Estimate of number of rows in cross product */
2847 			nrows = fpinfo_i->rows * fpinfo_o->rows;
2848 			/* Clamp retrieved rows estimate to at most size of cross product */
2849 			retrieved_rows = Min(retrieved_rows, nrows);
2850 
2851 			/*
2852 			 * The cost of foreign join is estimated as cost of generating
2853 			 * rows for the joining relations + cost for applying quals on the
2854 			 * rows.
2855 			 */
2856 
2857 			/*
2858 			 * Calculate the cost of clauses pushed down to the foreign server
2859 			 */
2860 			cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2861 			/* Calculate the cost of applying join clauses */
2862 			cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2863 
2864 			/*
2865 			 * Startup cost includes startup cost of joining relations and the
2866 			 * startup cost for join and other clauses. We do not include the
2867 			 * startup cost specific to join strategy (e.g. setting up hash
2868 			 * tables) since we do not know what strategy the foreign server
2869 			 * is going to use.
2870 			 */
2871 			startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2872 			startup_cost += join_cost.startup;
2873 			startup_cost += remote_conds_cost.startup;
2874 			startup_cost += fpinfo->local_conds_cost.startup;
2875 
2876 			/*
2877 			 * Run time cost includes:
2878 			 *
2879 			 * 1. Run time cost (total_cost - startup_cost) of relations being
2880 			 * joined
2881 			 *
2882 			 * 2. Run time cost of applying join clauses on the cross product
2883 			 * of the joining relations.
2884 			 *
2885 			 * 3. Run time cost of applying pushed down other clauses on the
2886 			 * result of join
2887 			 *
2888 			 * 4. Run time cost of applying nonpushable other clauses locally
2889 			 * on the result fetched from the foreign server.
2890 			 */
2891 			run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2892 			run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2893 			run_cost += nrows * join_cost.per_tuple;
2894 			nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2895 			run_cost += nrows * remote_conds_cost.per_tuple;
2896 			run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2897 		}
2898 		else if (IS_UPPER_REL(foreignrel))
2899 		{
2900 			PgFdwRelationInfo *ofpinfo;
2901 			PathTarget *ptarget = foreignrel->reltarget;
2902 			AggClauseCosts aggcosts;
2903 			double		input_rows;
2904 			int			numGroupCols;
2905 			double		numGroups = 1;
2906 
2907 			/* Make sure the core code set the pathtarget. */
2908 			Assert(ptarget != NULL);
2909 
2910 			/*
2911 			 * This cost model is mixture of costing done for sorted and
2912 			 * hashed aggregates in cost_agg().  We are not sure which
2913 			 * strategy will be considered at remote side, thus for
2914 			 * simplicity, we put all startup related costs in startup_cost
2915 			 * and all finalization and run cost are added in total_cost.
2916 			 *
2917 			 * Also, core does not care about costing HAVING expressions and
2918 			 * adding that to the costs.  So similarly, here too we are not
2919 			 * considering remote and local conditions for costing.
2920 			 */
2921 
2922 			ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2923 
2924 			/* Get rows and width from input rel */
2925 			input_rows = ofpinfo->rows;
2926 			width = ofpinfo->width;
2927 
2928 			/* Collect statistics about aggregates for estimating costs. */
2929 			MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2930 			if (root->parse->hasAggs)
2931 			{
2932 				get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2933 									 AGGSPLIT_SIMPLE, &aggcosts);
2934 
2935 				/*
2936 				 * The cost of aggregates in the HAVING qual will be the same
2937 				 * for each child as it is for the parent, so there's no need
2938 				 * to use a translated version of havingQual.
2939 				 */
2940 				get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2941 									 AGGSPLIT_SIMPLE, &aggcosts);
2942 			}
2943 
2944 			/* Get number of grouping columns and possible number of groups */
2945 			numGroupCols = list_length(root->parse->groupClause);
2946 			numGroups = estimate_num_groups(root,
2947 											get_sortgrouplist_exprs(root->parse->groupClause,
2948 																	fpinfo->grouped_tlist),
2949 											input_rows, NULL);
2950 
2951 			/*
2952 			 * Number of rows expected from foreign server will be same as
2953 			 * that of number of groups.
2954 			 */
2955 			rows = retrieved_rows = numGroups;
2956 
2957 			/*-----
2958 			 * Startup cost includes:
2959 			 *	  1. Startup cost for underneath input relation
2960 			 *	  2. Cost of performing aggregation, per cost_agg()
2961 			 *	  3. Startup cost for PathTarget eval
2962 			 *-----
2963 			 */
2964 			startup_cost = ofpinfo->rel_startup_cost;
2965 			startup_cost += aggcosts.transCost.startup;
2966 			startup_cost += aggcosts.transCost.per_tuple * input_rows;
2967 			startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2968 			startup_cost += ptarget->cost.startup;
2969 
2970 			/*-----
2971 			 * Run time cost includes:
2972 			 *	  1. Run time cost of underneath input relation
2973 			 *	  2. Run time cost of performing aggregation, per cost_agg()
2974 			 *	  3. PathTarget eval cost for each output row
2975 			 *-----
2976 			 */
2977 			run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2978 			run_cost += aggcosts.finalCost * numGroups;
2979 			run_cost += cpu_tuple_cost * numGroups;
2980 			run_cost += ptarget->cost.per_tuple * numGroups;
2981 		}
2982 		else
2983 		{
2984 			/* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2985 			retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2986 
2987 			/*
2988 			 * Cost as though this were a seqscan, which is pessimistic.  We
2989 			 * effectively imagine the local_conds are being evaluated
2990 			 * remotely, too.
2991 			 */
2992 			startup_cost = 0;
2993 			run_cost = 0;
2994 			run_cost += seq_page_cost * foreignrel->pages;
2995 
2996 			startup_cost += foreignrel->baserestrictcost.startup;
2997 			cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2998 			run_cost += cpu_per_tuple * foreignrel->tuples;
2999 		}
3000 
3001 		/*
3002 		 * Without remote estimates, we have no real way to estimate the cost
3003 		 * of generating sorted output.  It could be free if the query plan
3004 		 * the remote side would have chosen generates properly-sorted output
3005 		 * anyway, but in most cases it will cost something.  Estimate a value
3006 		 * high enough that we won't pick the sorted path when the ordering
3007 		 * isn't locally useful, but low enough that we'll err on the side of
3008 		 * pushing down the ORDER BY clause when it's useful to do so.
3009 		 */
3010 		if (pathkeys != NIL)
3011 		{
3012 			startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3013 			run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3014 		}
3015 
3016 		total_cost = startup_cost + run_cost;
3017 	}
3018 
3019 	/*
3020 	 * Cache the costs for scans without any pathkeys or parameterization
3021 	 * before adding the costs for transferring data from the foreign server.
3022 	 * These costs are useful for costing the join between this relation and
3023 	 * another foreign relation or to calculate the costs of paths with
3024 	 * pathkeys for this relation, when the costs can not be obtained from the
3025 	 * foreign server. This function will be called at least once for every
3026 	 * foreign relation without pathkeys and parameterization.
3027 	 */
3028 	if (pathkeys == NIL && param_join_conds == NIL)
3029 	{
3030 		fpinfo->rel_startup_cost = startup_cost;
3031 		fpinfo->rel_total_cost = total_cost;
3032 	}
3033 
3034 	/*
3035 	 * Add some additional cost factors to account for connection overhead
3036 	 * (fdw_startup_cost), transferring data across the network
3037 	 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3038 	 * (cpu_tuple_cost per retrieved row).
3039 	 */
3040 	startup_cost += fpinfo->fdw_startup_cost;
3041 	total_cost += fpinfo->fdw_startup_cost;
3042 	total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3043 	total_cost += cpu_tuple_cost * retrieved_rows;
3044 
3045 	/* Return results. */
3046 	*p_rows = rows;
3047 	*p_width = width;
3048 	*p_startup_cost = startup_cost;
3049 	*p_total_cost = total_cost;
3050 }
3051 
3052 /*
3053  * Estimate costs of executing a SQL statement remotely.
3054  * The given "sql" must be an EXPLAIN command.
3055  */
3056 static void
get_remote_estimate(const char * sql,PGconn * conn,double * rows,int * width,Cost * startup_cost,Cost * total_cost)3057 get_remote_estimate(const char *sql, PGconn *conn,
3058 					double *rows, int *width,
3059 					Cost *startup_cost, Cost *total_cost)
3060 {
3061 	PGresult   *volatile res = NULL;
3062 
3063 	/* PGresult must be released before leaving this function. */
3064 	PG_TRY();
3065 	{
3066 		char	   *line;
3067 		char	   *p;
3068 		int			n;
3069 
3070 		/*
3071 		 * Execute EXPLAIN remotely.
3072 		 */
3073 		res = pgfdw_exec_query(conn, sql);
3074 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3075 			pgfdw_report_error(ERROR, res, conn, false, sql);
3076 
3077 		/*
3078 		 * Extract cost numbers for topmost plan node.  Note we search for a
3079 		 * left paren from the end of the line to avoid being confused by
3080 		 * other uses of parentheses.
3081 		 */
3082 		line = PQgetvalue(res, 0, 0);
3083 		p = strrchr(line, '(');
3084 		if (p == NULL)
3085 			elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3086 		n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3087 				   startup_cost, total_cost, rows, width);
3088 		if (n != 4)
3089 			elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3090 
3091 		PQclear(res);
3092 		res = NULL;
3093 	}
3094 	PG_CATCH();
3095 	{
3096 		if (res)
3097 			PQclear(res);
3098 		PG_RE_THROW();
3099 	}
3100 	PG_END_TRY();
3101 }
3102 
3103 /*
3104  * Detect whether we want to process an EquivalenceClass member.
3105  *
3106  * This is a callback for use by generate_implied_equalities_for_column.
3107  */
3108 static bool
ec_member_matches_foreign(PlannerInfo * root,RelOptInfo * rel,EquivalenceClass * ec,EquivalenceMember * em,void * arg)3109 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
3110 						  EquivalenceClass *ec, EquivalenceMember *em,
3111 						  void *arg)
3112 {
3113 	ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
3114 	Expr	   *expr = em->em_expr;
3115 
3116 	/*
3117 	 * If we've identified what we're processing in the current scan, we only
3118 	 * want to match that expression.
3119 	 */
3120 	if (state->current != NULL)
3121 		return equal(expr, state->current);
3122 
3123 	/*
3124 	 * Otherwise, ignore anything we've already processed.
3125 	 */
3126 	if (list_member(state->already_used, expr))
3127 		return false;
3128 
3129 	/* This is the new target to process. */
3130 	state->current = expr;
3131 	return true;
3132 }
3133 
3134 /*
3135  * Create cursor for node's query with current parameter values.
3136  */
3137 static void
create_cursor(ForeignScanState * node)3138 create_cursor(ForeignScanState *node)
3139 {
3140 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3141 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
3142 	int			numParams = fsstate->numParams;
3143 	const char **values = fsstate->param_values;
3144 	PGconn	   *conn = fsstate->conn;
3145 	StringInfoData buf;
3146 	PGresult   *res;
3147 
3148 	/*
3149 	 * Construct array of query parameter values in text format.  We do the
3150 	 * conversions in the short-lived per-tuple context, so as not to cause a
3151 	 * memory leak over repeated scans.
3152 	 */
3153 	if (numParams > 0)
3154 	{
3155 		MemoryContext oldcontext;
3156 
3157 		oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3158 
3159 		process_query_params(econtext,
3160 							 fsstate->param_flinfo,
3161 							 fsstate->param_exprs,
3162 							 values);
3163 
3164 		MemoryContextSwitchTo(oldcontext);
3165 	}
3166 
3167 	/* Construct the DECLARE CURSOR command */
3168 	initStringInfo(&buf);
3169 	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3170 					 fsstate->cursor_number, fsstate->query);
3171 
3172 	/*
3173 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3174 	 * to infer types for all parameters.  Since we explicitly cast every
3175 	 * parameter (see deparse.c), the "inference" is trivial and will produce
3176 	 * the desired result.  This allows us to avoid assuming that the remote
3177 	 * server has the same OIDs we do for the parameters' types.
3178 	 */
3179 	if (!PQsendQueryParams(conn, buf.data, numParams,
3180 						   NULL, values, NULL, NULL, 0))
3181 		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3182 
3183 	/*
3184 	 * Get the result, and check for success.
3185 	 *
3186 	 * We don't use a PG_TRY block here, so be careful not to throw error
3187 	 * without releasing the PGresult.
3188 	 */
3189 	res = pgfdw_get_result(conn, buf.data);
3190 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3191 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3192 	PQclear(res);
3193 
3194 	/* Mark the cursor as created, and show no tuples have been retrieved */
3195 	fsstate->cursor_exists = true;
3196 	fsstate->tuples = NULL;
3197 	fsstate->num_tuples = 0;
3198 	fsstate->next_tuple = 0;
3199 	fsstate->fetch_ct_2 = 0;
3200 	fsstate->eof_reached = false;
3201 
3202 	/* Clean up */
3203 	pfree(buf.data);
3204 }
3205 
3206 /*
3207  * Fetch some more rows from the node's cursor.
3208  */
3209 static void
fetch_more_data(ForeignScanState * node)3210 fetch_more_data(ForeignScanState *node)
3211 {
3212 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3213 	PGresult   *volatile res = NULL;
3214 	MemoryContext oldcontext;
3215 
3216 	/*
3217 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
3218 	 * batch.
3219 	 */
3220 	fsstate->tuples = NULL;
3221 	MemoryContextReset(fsstate->batch_cxt);
3222 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3223 
3224 	/* PGresult must be released before leaving this function. */
3225 	PG_TRY();
3226 	{
3227 		PGconn	   *conn = fsstate->conn;
3228 		char		sql[64];
3229 		int			numrows;
3230 		int			i;
3231 
3232 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3233 				 fsstate->fetch_size, fsstate->cursor_number);
3234 
3235 		res = pgfdw_exec_query(conn, sql);
3236 		/* On error, report the original query, not the FETCH. */
3237 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3238 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3239 
3240 		/* Convert the data into HeapTuples */
3241 		numrows = PQntuples(res);
3242 		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3243 		fsstate->num_tuples = numrows;
3244 		fsstate->next_tuple = 0;
3245 
3246 		for (i = 0; i < numrows; i++)
3247 		{
3248 			Assert(IsA(node->ss.ps.plan, ForeignScan));
3249 
3250 			fsstate->tuples[i] =
3251 				make_tuple_from_result_row(res, i,
3252 										   fsstate->rel,
3253 										   fsstate->attinmeta,
3254 										   fsstate->retrieved_attrs,
3255 										   node,
3256 										   fsstate->temp_cxt);
3257 		}
3258 
3259 		/* Update fetch_ct_2 */
3260 		if (fsstate->fetch_ct_2 < 2)
3261 			fsstate->fetch_ct_2++;
3262 
3263 		/* Must be EOF if we didn't get as many tuples as we asked for. */
3264 		fsstate->eof_reached = (numrows < fsstate->fetch_size);
3265 
3266 		PQclear(res);
3267 		res = NULL;
3268 	}
3269 	PG_CATCH();
3270 	{
3271 		if (res)
3272 			PQclear(res);
3273 		PG_RE_THROW();
3274 	}
3275 	PG_END_TRY();
3276 
3277 	MemoryContextSwitchTo(oldcontext);
3278 }
3279 
3280 /*
3281  * Force assorted GUC parameters to settings that ensure that we'll output
3282  * data values in a form that is unambiguous to the remote server.
3283  *
3284  * This is rather expensive and annoying to do once per row, but there's
3285  * little choice if we want to be sure values are transmitted accurately;
3286  * we can't leave the settings in place between rows for fear of affecting
3287  * user-visible computations.
3288  *
3289  * We use the equivalent of a function SET option to allow the settings to
3290  * persist only until the caller calls reset_transmission_modes().  If an
3291  * error is thrown in between, guc.c will take care of undoing the settings.
3292  *
3293  * The return value is the nestlevel that must be passed to
3294  * reset_transmission_modes() to undo things.
3295  */
3296 int
set_transmission_modes(void)3297 set_transmission_modes(void)
3298 {
3299 	int			nestlevel = NewGUCNestLevel();
3300 
3301 	/*
3302 	 * The values set here should match what pg_dump does.  See also
3303 	 * configure_remote_session in connection.c.
3304 	 */
3305 	if (DateStyle != USE_ISO_DATES)
3306 		(void) set_config_option("datestyle", "ISO",
3307 								 PGC_USERSET, PGC_S_SESSION,
3308 								 GUC_ACTION_SAVE, true, 0, false);
3309 	if (IntervalStyle != INTSTYLE_POSTGRES)
3310 		(void) set_config_option("intervalstyle", "postgres",
3311 								 PGC_USERSET, PGC_S_SESSION,
3312 								 GUC_ACTION_SAVE, true, 0, false);
3313 	if (extra_float_digits < 3)
3314 		(void) set_config_option("extra_float_digits", "3",
3315 								 PGC_USERSET, PGC_S_SESSION,
3316 								 GUC_ACTION_SAVE, true, 0, false);
3317 
3318 	return nestlevel;
3319 }
3320 
3321 /*
3322  * Undo the effects of set_transmission_modes().
3323  */
3324 void
reset_transmission_modes(int nestlevel)3325 reset_transmission_modes(int nestlevel)
3326 {
3327 	AtEOXact_GUC(true, nestlevel);
3328 }
3329 
3330 /*
3331  * Utility routine to close a cursor.
3332  */
3333 static void
close_cursor(PGconn * conn,unsigned int cursor_number)3334 close_cursor(PGconn *conn, unsigned int cursor_number)
3335 {
3336 	char		sql[64];
3337 	PGresult   *res;
3338 
3339 	snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3340 
3341 	/*
3342 	 * We don't use a PG_TRY block here, so be careful not to throw error
3343 	 * without releasing the PGresult.
3344 	 */
3345 	res = pgfdw_exec_query(conn, sql);
3346 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3347 		pgfdw_report_error(ERROR, res, conn, true, sql);
3348 	PQclear(res);
3349 }
3350 
3351 /*
3352  * create_foreign_modify
3353  *		Construct an execution state of a foreign insert/update/delete
3354  *		operation
3355  */
3356 static PgFdwModifyState *
create_foreign_modify(EState * estate,RangeTblEntry * rte,ResultRelInfo * resultRelInfo,CmdType operation,Plan * subplan,char * query,List * target_attrs,bool has_returning,List * retrieved_attrs)3357 create_foreign_modify(EState *estate,
3358 					  RangeTblEntry *rte,
3359 					  ResultRelInfo *resultRelInfo,
3360 					  CmdType operation,
3361 					  Plan *subplan,
3362 					  char *query,
3363 					  List *target_attrs,
3364 					  bool has_returning,
3365 					  List *retrieved_attrs)
3366 {
3367 	PgFdwModifyState *fmstate;
3368 	Relation	rel = resultRelInfo->ri_RelationDesc;
3369 	TupleDesc	tupdesc = RelationGetDescr(rel);
3370 	Oid			userid;
3371 	ForeignTable *table;
3372 	UserMapping *user;
3373 	AttrNumber	n_params;
3374 	Oid			typefnoid;
3375 	bool		isvarlena;
3376 	ListCell   *lc;
3377 
3378 	/* Begin constructing PgFdwModifyState. */
3379 	fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3380 	fmstate->rel = rel;
3381 
3382 	/*
3383 	 * Identify which user to do the remote access as.  This should match what
3384 	 * ExecCheckRTEPerms() does.
3385 	 */
3386 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3387 
3388 	/* Get info about foreign table. */
3389 	table = GetForeignTable(RelationGetRelid(rel));
3390 	user = GetUserMapping(userid, table->serverid);
3391 
3392 	/* Open connection; report that we'll create a prepared statement. */
3393 	fmstate->conn = GetConnection(user, true);
3394 	fmstate->p_name = NULL;		/* prepared statement not made yet */
3395 
3396 	/* Set up remote query information. */
3397 	fmstate->query = query;
3398 	fmstate->target_attrs = target_attrs;
3399 	fmstate->has_returning = has_returning;
3400 	fmstate->retrieved_attrs = retrieved_attrs;
3401 
3402 	/* Create context for per-tuple temp workspace. */
3403 	fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3404 											  "postgres_fdw temporary data",
3405 											  ALLOCSET_SMALL_SIZES);
3406 
3407 	/* Prepare for input conversion of RETURNING results. */
3408 	if (fmstate->has_returning)
3409 		fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3410 
3411 	/* Prepare for output conversion of parameters used in prepared stmt. */
3412 	n_params = list_length(fmstate->target_attrs) + 1;
3413 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3414 	fmstate->p_nums = 0;
3415 
3416 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
3417 	{
3418 		Assert(subplan != NULL);
3419 
3420 		/* Find the ctid resjunk column in the subplan's result */
3421 		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
3422 														  "ctid");
3423 		if (!AttributeNumberIsValid(fmstate->ctidAttno))
3424 			elog(ERROR, "could not find junk ctid column");
3425 
3426 		/* First transmittable parameter will be ctid */
3427 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
3428 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3429 		fmstate->p_nums++;
3430 	}
3431 
3432 	if (operation == CMD_INSERT || operation == CMD_UPDATE)
3433 	{
3434 		/* Set up for remaining transmittable parameters */
3435 		foreach(lc, fmstate->target_attrs)
3436 		{
3437 			int			attnum = lfirst_int(lc);
3438 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3439 
3440 			Assert(!attr->attisdropped);
3441 
3442 			getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
3443 			fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3444 			fmstate->p_nums++;
3445 		}
3446 	}
3447 
3448 	Assert(fmstate->p_nums <= n_params);
3449 
3450 	/* Initialize auxiliary state */
3451 	fmstate->aux_fmstate = NULL;
3452 
3453 	return fmstate;
3454 }
3455 
3456 /*
3457  * prepare_foreign_modify
3458  *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3459  */
3460 static void
prepare_foreign_modify(PgFdwModifyState * fmstate)3461 prepare_foreign_modify(PgFdwModifyState *fmstate)
3462 {
3463 	char		prep_name[NAMEDATALEN];
3464 	char	   *p_name;
3465 	PGresult   *res;
3466 
3467 	/* Construct name we'll use for the prepared statement. */
3468 	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3469 			 GetPrepStmtNumber(fmstate->conn));
3470 	p_name = pstrdup(prep_name);
3471 
3472 	/*
3473 	 * We intentionally do not specify parameter types here, but leave the
3474 	 * remote server to derive them by default.  This avoids possible problems
3475 	 * with the remote server using different type OIDs than we do.  All of
3476 	 * the prepared statements we use in this module are simple enough that
3477 	 * the remote server will make the right choices.
3478 	 */
3479 	if (!PQsendPrepare(fmstate->conn,
3480 					   p_name,
3481 					   fmstate->query,
3482 					   0,
3483 					   NULL))
3484 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3485 
3486 	/*
3487 	 * Get the result, and check for success.
3488 	 *
3489 	 * We don't use a PG_TRY block here, so be careful not to throw error
3490 	 * without releasing the PGresult.
3491 	 */
3492 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
3493 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3494 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3495 	PQclear(res);
3496 
3497 	/* This action shows that the prepare has been done. */
3498 	fmstate->p_name = p_name;
3499 }
3500 
3501 /*
3502  * convert_prep_stmt_params
3503  *		Create array of text strings representing parameter values
3504  *
3505  * tupleid is ctid to send, or NULL if none
3506  * slot is slot to get remaining parameters from, or NULL if none
3507  *
3508  * Data is constructed in temp_cxt; caller should reset that after use.
3509  */
3510 static const char **
convert_prep_stmt_params(PgFdwModifyState * fmstate,ItemPointer tupleid,TupleTableSlot * slot)3511 convert_prep_stmt_params(PgFdwModifyState *fmstate,
3512 						 ItemPointer tupleid,
3513 						 TupleTableSlot *slot)
3514 {
3515 	const char **p_values;
3516 	int			pindex = 0;
3517 	MemoryContext oldcontext;
3518 
3519 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3520 
3521 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3522 
3523 	/* 1st parameter should be ctid, if it's in use */
3524 	if (tupleid != NULL)
3525 	{
3526 		/* don't need set_transmission_modes for TID output */
3527 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3528 											  PointerGetDatum(tupleid));
3529 		pindex++;
3530 	}
3531 
3532 	/* get following parameters from slot */
3533 	if (slot != NULL && fmstate->target_attrs != NIL)
3534 	{
3535 		int			nestlevel;
3536 		ListCell   *lc;
3537 
3538 		nestlevel = set_transmission_modes();
3539 
3540 		foreach(lc, fmstate->target_attrs)
3541 		{
3542 			int			attnum = lfirst_int(lc);
3543 			Datum		value;
3544 			bool		isnull;
3545 
3546 			value = slot_getattr(slot, attnum, &isnull);
3547 			if (isnull)
3548 				p_values[pindex] = NULL;
3549 			else
3550 				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3551 													  value);
3552 			pindex++;
3553 		}
3554 
3555 		reset_transmission_modes(nestlevel);
3556 	}
3557 
3558 	Assert(pindex == fmstate->p_nums);
3559 
3560 	MemoryContextSwitchTo(oldcontext);
3561 
3562 	return p_values;
3563 }
3564 
3565 /*
3566  * store_returning_result
3567  *		Store the result of a RETURNING clause
3568  *
3569  * On error, be sure to release the PGresult on the way out.  Callers do not
3570  * have PG_TRY blocks to ensure this happens.
3571  */
3572 static void
store_returning_result(PgFdwModifyState * fmstate,TupleTableSlot * slot,PGresult * res)3573 store_returning_result(PgFdwModifyState *fmstate,
3574 					   TupleTableSlot *slot, PGresult *res)
3575 {
3576 	PG_TRY();
3577 	{
3578 		HeapTuple	newtup;
3579 
3580 		newtup = make_tuple_from_result_row(res, 0,
3581 											fmstate->rel,
3582 											fmstate->attinmeta,
3583 											fmstate->retrieved_attrs,
3584 											NULL,
3585 											fmstate->temp_cxt);
3586 		/* tuple will be deleted when it is cleared from the slot */
3587 		ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3588 	}
3589 	PG_CATCH();
3590 	{
3591 		if (res)
3592 			PQclear(res);
3593 		PG_RE_THROW();
3594 	}
3595 	PG_END_TRY();
3596 }
3597 
3598 /*
3599  * finish_foreign_modify
3600  *		Release resources for a foreign insert/update/delete operation
3601  */
3602 static void
finish_foreign_modify(PgFdwModifyState * fmstate)3603 finish_foreign_modify(PgFdwModifyState *fmstate)
3604 {
3605 	Assert(fmstate != NULL);
3606 
3607 	/* If we created a prepared statement, destroy it */
3608 	if (fmstate->p_name)
3609 	{
3610 		char		sql[64];
3611 		PGresult   *res;
3612 
3613 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
3614 
3615 		/*
3616 		 * We don't use a PG_TRY block here, so be careful not to throw error
3617 		 * without releasing the PGresult.
3618 		 */
3619 		res = pgfdw_exec_query(fmstate->conn, sql);
3620 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
3621 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
3622 		PQclear(res);
3623 		fmstate->p_name = NULL;
3624 	}
3625 
3626 	/* Release remote connection */
3627 	ReleaseConnection(fmstate->conn);
3628 	fmstate->conn = NULL;
3629 }
3630 
3631 /*
3632  * build_remote_returning
3633  *		Build a RETURNING targetlist of a remote query for performing an
3634  *		UPDATE/DELETE .. RETURNING on a join directly
3635  */
3636 static List *
build_remote_returning(Index rtindex,Relation rel,List * returningList)3637 build_remote_returning(Index rtindex, Relation rel, List *returningList)
3638 {
3639 	bool		have_wholerow = false;
3640 	List	   *tlist = NIL;
3641 	List	   *vars;
3642 	ListCell   *lc;
3643 
3644 	Assert(returningList);
3645 
3646 	vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
3647 
3648 	/*
3649 	 * If there's a whole-row reference to the target relation, then we'll
3650 	 * need all the columns of the relation.
3651 	 */
3652 	foreach(lc, vars)
3653 	{
3654 		Var		   *var = (Var *) lfirst(lc);
3655 
3656 		if (IsA(var, Var) &&
3657 			var->varno == rtindex &&
3658 			var->varattno == InvalidAttrNumber)
3659 		{
3660 			have_wholerow = true;
3661 			break;
3662 		}
3663 	}
3664 
3665 	if (have_wholerow)
3666 	{
3667 		TupleDesc	tupdesc = RelationGetDescr(rel);
3668 		int			i;
3669 
3670 		for (i = 1; i <= tupdesc->natts; i++)
3671 		{
3672 			Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
3673 			Var		   *var;
3674 
3675 			/* Ignore dropped attributes. */
3676 			if (attr->attisdropped)
3677 				continue;
3678 
3679 			var = makeVar(rtindex,
3680 						  i,
3681 						  attr->atttypid,
3682 						  attr->atttypmod,
3683 						  attr->attcollation,
3684 						  0);
3685 
3686 			tlist = lappend(tlist,
3687 							makeTargetEntry((Expr *) var,
3688 											list_length(tlist) + 1,
3689 											NULL,
3690 											false));
3691 		}
3692 	}
3693 
3694 	/* Now add any remaining columns to tlist. */
3695 	foreach(lc, vars)
3696 	{
3697 		Var		   *var = (Var *) lfirst(lc);
3698 
3699 		/*
3700 		 * No need for whole-row references to the target relation.  We don't
3701 		 * need system columns other than ctid and oid either, since those are
3702 		 * set locally.
3703 		 */
3704 		if (IsA(var, Var) &&
3705 			var->varno == rtindex &&
3706 			var->varattno <= InvalidAttrNumber &&
3707 			var->varattno != SelfItemPointerAttributeNumber &&
3708 			var->varattno != ObjectIdAttributeNumber)
3709 			continue;			/* don't need it */
3710 
3711 		if (tlist_member((Expr *) var, tlist))
3712 			continue;			/* already got it */
3713 
3714 		tlist = lappend(tlist,
3715 						makeTargetEntry((Expr *) var,
3716 										list_length(tlist) + 1,
3717 										NULL,
3718 										false));
3719 	}
3720 
3721 	list_free(vars);
3722 
3723 	return tlist;
3724 }
3725 
3726 /*
3727  * rebuild_fdw_scan_tlist
3728  *		Build new fdw_scan_tlist of given foreign-scan plan node from given
3729  *		tlist
3730  *
3731  * There might be columns that the fdw_scan_tlist of the given foreign-scan
3732  * plan node contains that the given tlist doesn't.  The fdw_scan_tlist would
3733  * have contained resjunk columns such as 'ctid' of the target relation and
3734  * 'wholerow' of non-target relations, but the tlist might not contain them,
3735  * for example.  So, adjust the tlist so it contains all the columns specified
3736  * in the fdw_scan_tlist; else setrefs.c will get confused.
3737  */
3738 static void
rebuild_fdw_scan_tlist(ForeignScan * fscan,List * tlist)3739 rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
3740 {
3741 	List	   *new_tlist = tlist;
3742 	List	   *old_tlist = fscan->fdw_scan_tlist;
3743 	ListCell   *lc;
3744 
3745 	foreach(lc, old_tlist)
3746 	{
3747 		TargetEntry *tle = (TargetEntry *) lfirst(lc);
3748 
3749 		if (tlist_member(tle->expr, new_tlist))
3750 			continue;			/* already got it */
3751 
3752 		new_tlist = lappend(new_tlist,
3753 							makeTargetEntry(tle->expr,
3754 											list_length(new_tlist) + 1,
3755 											NULL,
3756 											false));
3757 	}
3758 	fscan->fdw_scan_tlist = new_tlist;
3759 }
3760 
3761 /*
3762  * Execute a direct UPDATE/DELETE statement.
3763  */
3764 static void
execute_dml_stmt(ForeignScanState * node)3765 execute_dml_stmt(ForeignScanState *node)
3766 {
3767 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3768 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
3769 	int			numParams = dmstate->numParams;
3770 	const char **values = dmstate->param_values;
3771 
3772 	/*
3773 	 * Construct array of query parameter values in text format.
3774 	 */
3775 	if (numParams > 0)
3776 		process_query_params(econtext,
3777 							 dmstate->param_flinfo,
3778 							 dmstate->param_exprs,
3779 							 values);
3780 
3781 	/*
3782 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3783 	 * to infer types for all parameters.  Since we explicitly cast every
3784 	 * parameter (see deparse.c), the "inference" is trivial and will produce
3785 	 * the desired result.  This allows us to avoid assuming that the remote
3786 	 * server has the same OIDs we do for the parameters' types.
3787 	 */
3788 	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3789 						   NULL, values, NULL, NULL, 0))
3790 		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3791 
3792 	/*
3793 	 * Get the result, and check for success.
3794 	 *
3795 	 * We don't use a PG_TRY block here, so be careful not to throw error
3796 	 * without releasing the PGresult.
3797 	 */
3798 	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3799 	if (PQresultStatus(dmstate->result) !=
3800 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
3801 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3802 						   dmstate->query);
3803 
3804 	/* Get the number of rows affected. */
3805 	if (dmstate->has_returning)
3806 		dmstate->num_tuples = PQntuples(dmstate->result);
3807 	else
3808 		dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3809 }
3810 
3811 /*
3812  * Get the result of a RETURNING clause.
3813  */
3814 static TupleTableSlot *
get_returning_data(ForeignScanState * node)3815 get_returning_data(ForeignScanState *node)
3816 {
3817 	PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3818 	EState	   *estate = node->ss.ps.state;
3819 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3820 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3821 	TupleTableSlot *resultSlot;
3822 
3823 	Assert(resultRelInfo->ri_projectReturning);
3824 
3825 	/* If we didn't get any tuples, must be end of data. */
3826 	if (dmstate->next_tuple >= dmstate->num_tuples)
3827 		return ExecClearTuple(slot);
3828 
3829 	/* Increment the command es_processed count if necessary. */
3830 	if (dmstate->set_processed)
3831 		estate->es_processed += 1;
3832 
3833 	/*
3834 	 * Store a RETURNING tuple.  If has_returning is false, just emit a dummy
3835 	 * tuple.  (has_returning is false when the local query is of the form
3836 	 * "UPDATE/DELETE .. RETURNING 1" for example.)
3837 	 */
3838 	if (!dmstate->has_returning)
3839 	{
3840 		ExecStoreAllNullTuple(slot);
3841 		resultSlot = slot;
3842 	}
3843 	else
3844 	{
3845 		/*
3846 		 * On error, be sure to release the PGresult on the way out.  Callers
3847 		 * do not have PG_TRY blocks to ensure this happens.
3848 		 */
3849 		PG_TRY();
3850 		{
3851 			HeapTuple	newtup;
3852 
3853 			newtup = make_tuple_from_result_row(dmstate->result,
3854 												dmstate->next_tuple,
3855 												dmstate->rel,
3856 												dmstate->attinmeta,
3857 												dmstate->retrieved_attrs,
3858 												node,
3859 												dmstate->temp_cxt);
3860 			ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3861 		}
3862 		PG_CATCH();
3863 		{
3864 			if (dmstate->result)
3865 				PQclear(dmstate->result);
3866 			PG_RE_THROW();
3867 		}
3868 		PG_END_TRY();
3869 
3870 		/* Get the updated/deleted tuple. */
3871 		if (dmstate->rel)
3872 			resultSlot = slot;
3873 		else
3874 			resultSlot = apply_returning_filter(dmstate, slot, estate);
3875 	}
3876 	dmstate->next_tuple++;
3877 
3878 	/* Make slot available for evaluation of the local query RETURNING list. */
3879 	resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
3880 		resultSlot;
3881 
3882 	return slot;
3883 }
3884 
3885 /*
3886  * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
3887  */
3888 static void
init_returning_filter(PgFdwDirectModifyState * dmstate,List * fdw_scan_tlist,Index rtindex)3889 init_returning_filter(PgFdwDirectModifyState *dmstate,
3890 					  List *fdw_scan_tlist,
3891 					  Index rtindex)
3892 {
3893 	TupleDesc	resultTupType = RelationGetDescr(dmstate->resultRel);
3894 	ListCell   *lc;
3895 	int			i;
3896 
3897 	/*
3898 	 * Calculate the mapping between the fdw_scan_tlist's entries and the
3899 	 * result tuple's attributes.
3900 	 *
3901 	 * The "map" is an array of indexes of the result tuple's attributes in
3902 	 * fdw_scan_tlist, i.e., one entry for every attribute of the result
3903 	 * tuple.  We store zero for any attributes that don't have the
3904 	 * corresponding entries in that list, marking that a NULL is needed in
3905 	 * the result tuple.
3906 	 *
3907 	 * Also get the indexes of the entries for ctid and oid if any.
3908 	 */
3909 	dmstate->attnoMap = (AttrNumber *)
3910 		palloc0(resultTupType->natts * sizeof(AttrNumber));
3911 
3912 	dmstate->ctidAttno = dmstate->oidAttno = 0;
3913 
3914 	i = 1;
3915 	dmstate->hasSystemCols = false;
3916 	foreach(lc, fdw_scan_tlist)
3917 	{
3918 		TargetEntry *tle = (TargetEntry *) lfirst(lc);
3919 		Var		   *var = (Var *) tle->expr;
3920 
3921 		Assert(IsA(var, Var));
3922 
3923 		/*
3924 		 * If the Var is a column of the target relation to be retrieved from
3925 		 * the foreign server, get the index of the entry.
3926 		 */
3927 		if (var->varno == rtindex &&
3928 			list_member_int(dmstate->retrieved_attrs, i))
3929 		{
3930 			int			attrno = var->varattno;
3931 
3932 			if (attrno < 0)
3933 			{
3934 				/*
3935 				 * We don't retrieve system columns other than ctid and oid.
3936 				 */
3937 				if (attrno == SelfItemPointerAttributeNumber)
3938 					dmstate->ctidAttno = i;
3939 				else if (attrno == ObjectIdAttributeNumber)
3940 					dmstate->oidAttno = i;
3941 				else
3942 					Assert(false);
3943 				dmstate->hasSystemCols = true;
3944 			}
3945 			else
3946 			{
3947 				/*
3948 				 * We don't retrieve whole-row references to the target
3949 				 * relation either.
3950 				 */
3951 				Assert(attrno > 0);
3952 
3953 				dmstate->attnoMap[attrno - 1] = i;
3954 			}
3955 		}
3956 		i++;
3957 	}
3958 }
3959 
3960 /*
3961  * Extract and return an updated/deleted tuple from a scan tuple.
3962  */
3963 static TupleTableSlot *
apply_returning_filter(PgFdwDirectModifyState * dmstate,TupleTableSlot * slot,EState * estate)3964 apply_returning_filter(PgFdwDirectModifyState *dmstate,
3965 					   TupleTableSlot *slot,
3966 					   EState *estate)
3967 {
3968 	TupleDesc	resultTupType = RelationGetDescr(dmstate->resultRel);
3969 	TupleTableSlot *resultSlot;
3970 	Datum	   *values;
3971 	bool	   *isnull;
3972 	Datum	   *old_values;
3973 	bool	   *old_isnull;
3974 	int			i;
3975 
3976 	/*
3977 	 * Use the trigger tuple slot as a place to store the result tuple.
3978 	 */
3979 	resultSlot = estate->es_trig_tuple_slot;
3980 	if (resultSlot->tts_tupleDescriptor != resultTupType)
3981 		ExecSetSlotDescriptor(resultSlot, resultTupType);
3982 
3983 	/*
3984 	 * Extract all the values of the scan tuple.
3985 	 */
3986 	slot_getallattrs(slot);
3987 	old_values = slot->tts_values;
3988 	old_isnull = slot->tts_isnull;
3989 
3990 	/*
3991 	 * Prepare to build the result tuple.
3992 	 */
3993 	ExecClearTuple(resultSlot);
3994 	values = resultSlot->tts_values;
3995 	isnull = resultSlot->tts_isnull;
3996 
3997 	/*
3998 	 * Transpose data into proper fields of the result tuple.
3999 	 */
4000 	for (i = 0; i < resultTupType->natts; i++)
4001 	{
4002 		int			j = dmstate->attnoMap[i];
4003 
4004 		if (j == 0)
4005 		{
4006 			values[i] = (Datum) 0;
4007 			isnull[i] = true;
4008 		}
4009 		else
4010 		{
4011 			values[i] = old_values[j - 1];
4012 			isnull[i] = old_isnull[j - 1];
4013 		}
4014 	}
4015 
4016 	/*
4017 	 * Build the virtual tuple.
4018 	 */
4019 	ExecStoreVirtualTuple(resultSlot);
4020 
4021 	/*
4022 	 * If we have any system columns to return, install them.
4023 	 */
4024 	if (dmstate->hasSystemCols)
4025 	{
4026 		HeapTuple	resultTup = ExecMaterializeSlot(resultSlot);
4027 
4028 		/* ctid */
4029 		if (dmstate->ctidAttno)
4030 		{
4031 			ItemPointer ctid = NULL;
4032 
4033 			ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4034 			resultTup->t_self = *ctid;
4035 		}
4036 
4037 		/* oid */
4038 		if (dmstate->oidAttno)
4039 		{
4040 			Oid			oid = InvalidOid;
4041 
4042 			oid = DatumGetObjectId(old_values[dmstate->oidAttno - 1]);
4043 			HeapTupleSetOid(resultTup, oid);
4044 		}
4045 
4046 		/*
4047 		 * And remaining columns
4048 		 *
4049 		 * Note: since we currently don't allow the target relation to appear
4050 		 * on the nullable side of an outer join, any system columns wouldn't
4051 		 * go to NULL.
4052 		 *
4053 		 * Note: no need to care about tableoid here because it will be
4054 		 * initialized in ExecProcessReturning().
4055 		 */
4056 		HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
4057 		HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
4058 		HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
4059 	}
4060 
4061 	/*
4062 	 * And return the result tuple.
4063 	 */
4064 	return resultSlot;
4065 }
4066 
4067 /*
4068  * Prepare for processing of parameters used in remote query.
4069  */
4070 static void
prepare_query_params(PlanState * node,List * fdw_exprs,int numParams,FmgrInfo ** param_flinfo,List ** param_exprs,const char *** param_values)4071 prepare_query_params(PlanState *node,
4072 					 List *fdw_exprs,
4073 					 int numParams,
4074 					 FmgrInfo **param_flinfo,
4075 					 List **param_exprs,
4076 					 const char ***param_values)
4077 {
4078 	int			i;
4079 	ListCell   *lc;
4080 
4081 	Assert(numParams > 0);
4082 
4083 	/* Prepare for output conversion of parameters used in remote query. */
4084 	*param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4085 
4086 	i = 0;
4087 	foreach(lc, fdw_exprs)
4088 	{
4089 		Node	   *param_expr = (Node *) lfirst(lc);
4090 		Oid			typefnoid;
4091 		bool		isvarlena;
4092 
4093 		getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4094 		fmgr_info(typefnoid, &(*param_flinfo)[i]);
4095 		i++;
4096 	}
4097 
4098 	/*
4099 	 * Prepare remote-parameter expressions for evaluation.  (Note: in
4100 	 * practice, we expect that all these expressions will be just Params, so
4101 	 * we could possibly do something more efficient than using the full
4102 	 * expression-eval machinery for this.  But probably there would be little
4103 	 * benefit, and it'd require postgres_fdw to know more than is desirable
4104 	 * about Param evaluation.)
4105 	 */
4106 	*param_exprs = ExecInitExprList(fdw_exprs, node);
4107 
4108 	/* Allocate buffer for text form of query parameters. */
4109 	*param_values = (const char **) palloc0(numParams * sizeof(char *));
4110 }
4111 
4112 /*
4113  * Construct array of query parameter values in text format.
4114  */
4115 static void
process_query_params(ExprContext * econtext,FmgrInfo * param_flinfo,List * param_exprs,const char ** param_values)4116 process_query_params(ExprContext *econtext,
4117 					 FmgrInfo *param_flinfo,
4118 					 List *param_exprs,
4119 					 const char **param_values)
4120 {
4121 	int			nestlevel;
4122 	int			i;
4123 	ListCell   *lc;
4124 
4125 	nestlevel = set_transmission_modes();
4126 
4127 	i = 0;
4128 	foreach(lc, param_exprs)
4129 	{
4130 		ExprState  *expr_state = (ExprState *) lfirst(lc);
4131 		Datum		expr_value;
4132 		bool		isNull;
4133 
4134 		/* Evaluate the parameter expression */
4135 		expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4136 
4137 		/*
4138 		 * Get string representation of each parameter value by invoking
4139 		 * type-specific output function, unless the value is null.
4140 		 */
4141 		if (isNull)
4142 			param_values[i] = NULL;
4143 		else
4144 			param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4145 
4146 		i++;
4147 	}
4148 
4149 	reset_transmission_modes(nestlevel);
4150 }
4151 
4152 /*
4153  * postgresAnalyzeForeignTable
4154  *		Test whether analyzing this foreign table is supported
4155  */
4156 static bool
postgresAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)4157 postgresAnalyzeForeignTable(Relation relation,
4158 							AcquireSampleRowsFunc *func,
4159 							BlockNumber *totalpages)
4160 {
4161 	ForeignTable *table;
4162 	UserMapping *user;
4163 	PGconn	   *conn;
4164 	StringInfoData sql;
4165 	PGresult   *volatile res = NULL;
4166 
4167 	/* Return the row-analysis function pointer */
4168 	*func = postgresAcquireSampleRowsFunc;
4169 
4170 	/*
4171 	 * Now we have to get the number of pages.  It's annoying that the ANALYZE
4172 	 * API requires us to return that now, because it forces some duplication
4173 	 * of effort between this routine and postgresAcquireSampleRowsFunc.  But
4174 	 * it's probably not worth redefining that API at this point.
4175 	 */
4176 
4177 	/*
4178 	 * Get the connection to use.  We do the remote access as the table's
4179 	 * owner, even if the ANALYZE was started by some other user.
4180 	 */
4181 	table = GetForeignTable(RelationGetRelid(relation));
4182 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4183 	conn = GetConnection(user, false);
4184 
4185 	/*
4186 	 * Construct command to get page count for relation.
4187 	 */
4188 	initStringInfo(&sql);
4189 	deparseAnalyzeSizeSql(&sql, relation);
4190 
4191 	/* In what follows, do not risk leaking any PGresults. */
4192 	PG_TRY();
4193 	{
4194 		res = pgfdw_exec_query(conn, sql.data);
4195 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
4196 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
4197 
4198 		if (PQntuples(res) != 1 || PQnfields(res) != 1)
4199 			elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4200 		*totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4201 
4202 		PQclear(res);
4203 		res = NULL;
4204 	}
4205 	PG_CATCH();
4206 	{
4207 		if (res)
4208 			PQclear(res);
4209 		PG_RE_THROW();
4210 	}
4211 	PG_END_TRY();
4212 
4213 	ReleaseConnection(conn);
4214 
4215 	return true;
4216 }
4217 
4218 /*
4219  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4220  *
4221  * We fetch the whole table from the remote side and pick out some sample rows.
4222  *
4223  * Selected rows are returned in the caller-allocated array rows[],
4224  * which must have at least targrows entries.
4225  * The actual number of rows selected is returned as the function result.
4226  * We also count the total number of rows in the table and return it into
4227  * *totalrows.  Note that *totaldeadrows is always set to 0.
4228  *
4229  * Note that the returned list of rows is not always in order by physical
4230  * position in the table.  Therefore, correlation estimates derived later
4231  * may be meaningless, but it's OK because we don't use the estimates
4232  * currently (the planner only pays attention to correlation for indexscans).
4233  */
4234 static int
postgresAcquireSampleRowsFunc(Relation relation,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)4235 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
4236 							  HeapTuple *rows, int targrows,
4237 							  double *totalrows,
4238 							  double *totaldeadrows)
4239 {
4240 	PgFdwAnalyzeState astate;
4241 	ForeignTable *table;
4242 	ForeignServer *server;
4243 	UserMapping *user;
4244 	PGconn	   *conn;
4245 	unsigned int cursor_number;
4246 	StringInfoData sql;
4247 	PGresult   *volatile res = NULL;
4248 
4249 	/* Initialize workspace state */
4250 	astate.rel = relation;
4251 	astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
4252 
4253 	astate.rows = rows;
4254 	astate.targrows = targrows;
4255 	astate.numrows = 0;
4256 	astate.samplerows = 0;
4257 	astate.rowstoskip = -1;		/* -1 means not set yet */
4258 	reservoir_init_selection_state(&astate.rstate, targrows);
4259 
4260 	/* Remember ANALYZE context, and create a per-tuple temp context */
4261 	astate.anl_cxt = CurrentMemoryContext;
4262 	astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
4263 											"postgres_fdw temporary data",
4264 											ALLOCSET_SMALL_SIZES);
4265 
4266 	/*
4267 	 * Get the connection to use.  We do the remote access as the table's
4268 	 * owner, even if the ANALYZE was started by some other user.
4269 	 */
4270 	table = GetForeignTable(RelationGetRelid(relation));
4271 	server = GetForeignServer(table->serverid);
4272 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4273 	conn = GetConnection(user, false);
4274 
4275 	/*
4276 	 * Construct cursor that retrieves whole rows from remote.
4277 	 */
4278 	cursor_number = GetCursorNumber(conn);
4279 	initStringInfo(&sql);
4280 	appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
4281 	deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
4282 
4283 	/* In what follows, do not risk leaking any PGresults. */
4284 	PG_TRY();
4285 	{
4286 		res = pgfdw_exec_query(conn, sql.data);
4287 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
4288 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
4289 		PQclear(res);
4290 		res = NULL;
4291 
4292 		/* Retrieve and process rows a batch at a time. */
4293 		for (;;)
4294 		{
4295 			char		fetch_sql[64];
4296 			int			fetch_size;
4297 			int			numrows;
4298 			int			i;
4299 			ListCell   *lc;
4300 
4301 			/* Allow users to cancel long query */
4302 			CHECK_FOR_INTERRUPTS();
4303 
4304 			/*
4305 			 * XXX possible future improvement: if rowstoskip is large, we
4306 			 * could issue a MOVE rather than physically fetching the rows,
4307 			 * then just adjust rowstoskip and samplerows appropriately.
4308 			 */
4309 
4310 			/* The fetch size is arbitrary, but shouldn't be enormous. */
4311 			fetch_size = 100;
4312 			foreach(lc, server->options)
4313 			{
4314 				DefElem    *def = (DefElem *) lfirst(lc);
4315 
4316 				if (strcmp(def->defname, "fetch_size") == 0)
4317 				{
4318 					fetch_size = strtol(defGetString(def), NULL, 10);
4319 					break;
4320 				}
4321 			}
4322 			foreach(lc, table->options)
4323 			{
4324 				DefElem    *def = (DefElem *) lfirst(lc);
4325 
4326 				if (strcmp(def->defname, "fetch_size") == 0)
4327 				{
4328 					fetch_size = strtol(defGetString(def), NULL, 10);
4329 					break;
4330 				}
4331 			}
4332 
4333 			/* Fetch some rows */
4334 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
4335 					 fetch_size, cursor_number);
4336 
4337 			res = pgfdw_exec_query(conn, fetch_sql);
4338 			/* On error, report the original query, not the FETCH. */
4339 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
4340 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
4341 
4342 			/* Process whatever we got. */
4343 			numrows = PQntuples(res);
4344 			for (i = 0; i < numrows; i++)
4345 				analyze_row_processor(res, i, &astate);
4346 
4347 			PQclear(res);
4348 			res = NULL;
4349 
4350 			/* Must be EOF if we didn't get all the rows requested. */
4351 			if (numrows < fetch_size)
4352 				break;
4353 		}
4354 
4355 		/* Close the cursor, just to be tidy. */
4356 		close_cursor(conn, cursor_number);
4357 	}
4358 	PG_CATCH();
4359 	{
4360 		if (res)
4361 			PQclear(res);
4362 		PG_RE_THROW();
4363 	}
4364 	PG_END_TRY();
4365 
4366 	ReleaseConnection(conn);
4367 
4368 	/* We assume that we have no dead tuple. */
4369 	*totaldeadrows = 0.0;
4370 
4371 	/* We've retrieved all living tuples from foreign server. */
4372 	*totalrows = astate.samplerows;
4373 
4374 	/*
4375 	 * Emit some interesting relation info
4376 	 */
4377 	ereport(elevel,
4378 			(errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
4379 					RelationGetRelationName(relation),
4380 					astate.samplerows, astate.numrows)));
4381 
4382 	return astate.numrows;
4383 }
4384 
4385 /*
4386  * Collect sample rows from the result of query.
4387  *	 - Use all tuples in sample until target # of samples are collected.
4388  *	 - Subsequently, replace already-sampled tuples randomly.
4389  */
4390 static void
analyze_row_processor(PGresult * res,int row,PgFdwAnalyzeState * astate)4391 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
4392 {
4393 	int			targrows = astate->targrows;
4394 	int			pos;			/* array index to store tuple in */
4395 	MemoryContext oldcontext;
4396 
4397 	/* Always increment sample row counter. */
4398 	astate->samplerows += 1;
4399 
4400 	/*
4401 	 * Determine the slot where this sample row should be stored.  Set pos to
4402 	 * negative value to indicate the row should be skipped.
4403 	 */
4404 	if (astate->numrows < targrows)
4405 	{
4406 		/* First targrows rows are always included into the sample */
4407 		pos = astate->numrows++;
4408 	}
4409 	else
4410 	{
4411 		/*
4412 		 * Now we start replacing tuples in the sample until we reach the end
4413 		 * of the relation.  Same algorithm as in acquire_sample_rows in
4414 		 * analyze.c; see Jeff Vitter's paper.
4415 		 */
4416 		if (astate->rowstoskip < 0)
4417 			astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
4418 
4419 		if (astate->rowstoskip <= 0)
4420 		{
4421 			/* Choose a random reservoir element to replace. */
4422 			pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
4423 			Assert(pos >= 0 && pos < targrows);
4424 			heap_freetuple(astate->rows[pos]);
4425 		}
4426 		else
4427 		{
4428 			/* Skip this tuple. */
4429 			pos = -1;
4430 		}
4431 
4432 		astate->rowstoskip -= 1;
4433 	}
4434 
4435 	if (pos >= 0)
4436 	{
4437 		/*
4438 		 * Create sample tuple from current result row, and store it in the
4439 		 * position determined above.  The tuple has to be created in anl_cxt.
4440 		 */
4441 		oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
4442 
4443 		astate->rows[pos] = make_tuple_from_result_row(res, row,
4444 													   astate->rel,
4445 													   astate->attinmeta,
4446 													   astate->retrieved_attrs,
4447 													   NULL,
4448 													   astate->temp_cxt);
4449 
4450 		MemoryContextSwitchTo(oldcontext);
4451 	}
4452 }
4453 
4454 /*
4455  * Import a foreign schema
4456  */
4457 static List *
postgresImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)4458 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
4459 {
4460 	List	   *commands = NIL;
4461 	bool		import_collate = true;
4462 	bool		import_default = false;
4463 	bool		import_not_null = true;
4464 	ForeignServer *server;
4465 	UserMapping *mapping;
4466 	PGconn	   *conn;
4467 	StringInfoData buf;
4468 	PGresult   *volatile res = NULL;
4469 	int			numrows,
4470 				i;
4471 	ListCell   *lc;
4472 
4473 	/* Parse statement options */
4474 	foreach(lc, stmt->options)
4475 	{
4476 		DefElem    *def = (DefElem *) lfirst(lc);
4477 
4478 		if (strcmp(def->defname, "import_collate") == 0)
4479 			import_collate = defGetBoolean(def);
4480 		else if (strcmp(def->defname, "import_default") == 0)
4481 			import_default = defGetBoolean(def);
4482 		else if (strcmp(def->defname, "import_not_null") == 0)
4483 			import_not_null = defGetBoolean(def);
4484 		else
4485 			ereport(ERROR,
4486 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
4487 					 errmsg("invalid option \"%s\"", def->defname)));
4488 	}
4489 
4490 	/*
4491 	 * Get connection to the foreign server.  Connection manager will
4492 	 * establish new connection if necessary.
4493 	 */
4494 	server = GetForeignServer(serverOid);
4495 	mapping = GetUserMapping(GetUserId(), server->serverid);
4496 	conn = GetConnection(mapping, false);
4497 
4498 	/* Don't attempt to import collation if remote server hasn't got it */
4499 	if (PQserverVersion(conn) < 90100)
4500 		import_collate = false;
4501 
4502 	/* Create workspace for strings */
4503 	initStringInfo(&buf);
4504 
4505 	/* In what follows, do not risk leaking any PGresults. */
4506 	PG_TRY();
4507 	{
4508 		/* Check that the schema really exists */
4509 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
4510 		deparseStringLiteral(&buf, stmt->remote_schema);
4511 
4512 		res = pgfdw_exec_query(conn, buf.data);
4513 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
4514 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
4515 
4516 		if (PQntuples(res) != 1)
4517 			ereport(ERROR,
4518 					(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
4519 					 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
4520 							stmt->remote_schema, server->servername)));
4521 
4522 		PQclear(res);
4523 		res = NULL;
4524 		resetStringInfo(&buf);
4525 
4526 		/*
4527 		 * Fetch all table data from this schema, possibly restricted by
4528 		 * EXCEPT or LIMIT TO.  (We don't actually need to pay any attention
4529 		 * to EXCEPT/LIMIT TO here, because the core code will filter the
4530 		 * statements we return according to those lists anyway.  But it
4531 		 * should save a few cycles to not process excluded tables in the
4532 		 * first place.)
4533 		 *
4534 		 * Ignore table data for partitions and only include the definitions
4535 		 * of the root partitioned tables to allow access to the complete
4536 		 * remote data set locally in the schema imported.
4537 		 *
4538 		 * Note: because we run the connection with search_path restricted to
4539 		 * pg_catalog, the format_type() and pg_get_expr() outputs will always
4540 		 * include a schema name for types/functions in other schemas, which
4541 		 * is what we want.
4542 		 */
4543 		if (import_collate)
4544 			appendStringInfoString(&buf,
4545 								   "SELECT relname, "
4546 								   "  attname, "
4547 								   "  format_type(atttypid, atttypmod), "
4548 								   "  attnotnull, "
4549 								   "  pg_get_expr(adbin, adrelid), "
4550 								   "  collname, "
4551 								   "  collnsp.nspname "
4552 								   "FROM pg_class c "
4553 								   "  JOIN pg_namespace n ON "
4554 								   "    relnamespace = n.oid "
4555 								   "  LEFT JOIN pg_attribute a ON "
4556 								   "    attrelid = c.oid AND attnum > 0 "
4557 								   "      AND NOT attisdropped "
4558 								   "  LEFT JOIN pg_attrdef ad ON "
4559 								   "    adrelid = c.oid AND adnum = attnum "
4560 								   "  LEFT JOIN pg_collation coll ON "
4561 								   "    coll.oid = attcollation "
4562 								   "  LEFT JOIN pg_namespace collnsp ON "
4563 								   "    collnsp.oid = collnamespace ");
4564 		else
4565 			appendStringInfoString(&buf,
4566 								   "SELECT relname, "
4567 								   "  attname, "
4568 								   "  format_type(atttypid, atttypmod), "
4569 								   "  attnotnull, "
4570 								   "  pg_get_expr(adbin, adrelid), "
4571 								   "  NULL, NULL "
4572 								   "FROM pg_class c "
4573 								   "  JOIN pg_namespace n ON "
4574 								   "    relnamespace = n.oid "
4575 								   "  LEFT JOIN pg_attribute a ON "
4576 								   "    attrelid = c.oid AND attnum > 0 "
4577 								   "      AND NOT attisdropped "
4578 								   "  LEFT JOIN pg_attrdef ad ON "
4579 								   "    adrelid = c.oid AND adnum = attnum ");
4580 
4581 		appendStringInfoString(&buf,
4582 							   "WHERE c.relkind IN ("
4583 							   CppAsString2(RELKIND_RELATION) ","
4584 							   CppAsString2(RELKIND_VIEW) ","
4585 							   CppAsString2(RELKIND_FOREIGN_TABLE) ","
4586 							   CppAsString2(RELKIND_MATVIEW) ","
4587 							   CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
4588 							   "  AND n.nspname = ");
4589 		deparseStringLiteral(&buf, stmt->remote_schema);
4590 
4591 		/* Partitions are supported since Postgres 10 */
4592 		if (PQserverVersion(conn) >= 100000)
4593 			appendStringInfoString(&buf, " AND NOT c.relispartition ");
4594 
4595 		/* Apply restrictions for LIMIT TO and EXCEPT */
4596 		if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
4597 			stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4598 		{
4599 			bool		first_item = true;
4600 
4601 			appendStringInfoString(&buf, " AND c.relname ");
4602 			if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4603 				appendStringInfoString(&buf, "NOT ");
4604 			appendStringInfoString(&buf, "IN (");
4605 
4606 			/* Append list of table names within IN clause */
4607 			foreach(lc, stmt->table_list)
4608 			{
4609 				RangeVar   *rv = (RangeVar *) lfirst(lc);
4610 
4611 				if (first_item)
4612 					first_item = false;
4613 				else
4614 					appendStringInfoString(&buf, ", ");
4615 				deparseStringLiteral(&buf, rv->relname);
4616 			}
4617 			appendStringInfoChar(&buf, ')');
4618 		}
4619 
4620 		/* Append ORDER BY at the end of query to ensure output ordering */
4621 		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
4622 
4623 		/* Fetch the data */
4624 		res = pgfdw_exec_query(conn, buf.data);
4625 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
4626 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
4627 
4628 		/* Process results */
4629 		numrows = PQntuples(res);
4630 		/* note: incrementation of i happens in inner loop's while() test */
4631 		for (i = 0; i < numrows;)
4632 		{
4633 			char	   *tablename = PQgetvalue(res, i, 0);
4634 			bool		first_item = true;
4635 
4636 			resetStringInfo(&buf);
4637 			appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
4638 							 quote_identifier(tablename));
4639 
4640 			/* Scan all rows for this table */
4641 			do
4642 			{
4643 				char	   *attname;
4644 				char	   *typename;
4645 				char	   *attnotnull;
4646 				char	   *attdefault;
4647 				char	   *collname;
4648 				char	   *collnamespace;
4649 
4650 				/* If table has no columns, we'll see nulls here */
4651 				if (PQgetisnull(res, i, 1))
4652 					continue;
4653 
4654 				attname = PQgetvalue(res, i, 1);
4655 				typename = PQgetvalue(res, i, 2);
4656 				attnotnull = PQgetvalue(res, i, 3);
4657 				attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
4658 					PQgetvalue(res, i, 4);
4659 				collname = PQgetisnull(res, i, 5) ? (char *) NULL :
4660 					PQgetvalue(res, i, 5);
4661 				collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
4662 					PQgetvalue(res, i, 6);
4663 
4664 				if (first_item)
4665 					first_item = false;
4666 				else
4667 					appendStringInfoString(&buf, ",\n");
4668 
4669 				/* Print column name and type */
4670 				appendStringInfo(&buf, "  %s %s",
4671 								 quote_identifier(attname),
4672 								 typename);
4673 
4674 				/*
4675 				 * Add column_name option so that renaming the foreign table's
4676 				 * column doesn't break the association to the underlying
4677 				 * column.
4678 				 */
4679 				appendStringInfoString(&buf, " OPTIONS (column_name ");
4680 				deparseStringLiteral(&buf, attname);
4681 				appendStringInfoChar(&buf, ')');
4682 
4683 				/* Add COLLATE if needed */
4684 				if (import_collate && collname != NULL && collnamespace != NULL)
4685 					appendStringInfo(&buf, " COLLATE %s.%s",
4686 									 quote_identifier(collnamespace),
4687 									 quote_identifier(collname));
4688 
4689 				/* Add DEFAULT if needed */
4690 				if (import_default && attdefault != NULL)
4691 					appendStringInfo(&buf, " DEFAULT %s", attdefault);
4692 
4693 				/* Add NOT NULL if needed */
4694 				if (import_not_null && attnotnull[0] == 't')
4695 					appendStringInfoString(&buf, " NOT NULL");
4696 			}
4697 			while (++i < numrows &&
4698 				   strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4699 
4700 			/*
4701 			 * Add server name and table-level options.  We specify remote
4702 			 * schema and table name as options (the latter to ensure that
4703 			 * renaming the foreign table doesn't break the association).
4704 			 */
4705 			appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4706 							 quote_identifier(server->servername));
4707 
4708 			appendStringInfoString(&buf, "schema_name ");
4709 			deparseStringLiteral(&buf, stmt->remote_schema);
4710 			appendStringInfoString(&buf, ", table_name ");
4711 			deparseStringLiteral(&buf, tablename);
4712 
4713 			appendStringInfoString(&buf, ");");
4714 
4715 			commands = lappend(commands, pstrdup(buf.data));
4716 		}
4717 
4718 		/* Clean up */
4719 		PQclear(res);
4720 		res = NULL;
4721 	}
4722 	PG_CATCH();
4723 	{
4724 		if (res)
4725 			PQclear(res);
4726 		PG_RE_THROW();
4727 	}
4728 	PG_END_TRY();
4729 
4730 	ReleaseConnection(conn);
4731 
4732 	return commands;
4733 }
4734 
4735 /*
4736  * Assess whether the join between inner and outer relations can be pushed down
4737  * to the foreign server. As a side effect, save information we obtain in this
4738  * function to PgFdwRelationInfo passed in.
4739  */
4740 static bool
foreign_join_ok(PlannerInfo * root,RelOptInfo * joinrel,JoinType jointype,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinPathExtraData * extra)4741 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
4742 				RelOptInfo *outerrel, RelOptInfo *innerrel,
4743 				JoinPathExtraData *extra)
4744 {
4745 	PgFdwRelationInfo *fpinfo;
4746 	PgFdwRelationInfo *fpinfo_o;
4747 	PgFdwRelationInfo *fpinfo_i;
4748 	ListCell   *lc;
4749 	List	   *joinclauses;
4750 
4751 	/*
4752 	 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4753 	 * Constructing queries representing SEMI and ANTI joins is hard, hence
4754 	 * not considered right now.
4755 	 */
4756 	if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4757 		jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4758 		return false;
4759 
4760 	/*
4761 	 * If either of the joining relations is marked as unsafe to pushdown, the
4762 	 * join can not be pushed down.
4763 	 */
4764 	fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4765 	fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4766 	fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4767 	if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4768 		!fpinfo_i || !fpinfo_i->pushdown_safe)
4769 		return false;
4770 
4771 	/*
4772 	 * If joining relations have local conditions, those conditions are
4773 	 * required to be applied before joining the relations. Hence the join can
4774 	 * not be pushed down.
4775 	 */
4776 	if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4777 		return false;
4778 
4779 	/*
4780 	 * Merge FDW options.  We might be tempted to do this after we have deemed
4781 	 * the foreign join to be OK.  But we must do this beforehand so that we
4782 	 * know which quals can be evaluated on the foreign server, which might
4783 	 * depend on shippable_extensions.
4784 	 */
4785 	fpinfo->server = fpinfo_o->server;
4786 	merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
4787 
4788 	/*
4789 	 * Separate restrict list into join quals and pushed-down (other) quals.
4790 	 *
4791 	 * Join quals belonging to an outer join must all be shippable, else we
4792 	 * cannot execute the join remotely.  Add such quals to 'joinclauses'.
4793 	 *
4794 	 * Add other quals to fpinfo->remote_conds if they are shippable, else to
4795 	 * fpinfo->local_conds.  In an inner join it's okay to execute conditions
4796 	 * either locally or remotely; the same is true for pushed-down conditions
4797 	 * at an outer join.
4798 	 *
4799 	 * Note we might return failure after having already scribbled on
4800 	 * fpinfo->remote_conds and fpinfo->local_conds.  That's okay because we
4801 	 * won't consult those lists again if we deem the join unshippable.
4802 	 */
4803 	joinclauses = NIL;
4804 	foreach(lc, extra->restrictlist)
4805 	{
4806 		RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
4807 		bool		is_remote_clause = is_foreign_expr(root, joinrel,
4808 													   rinfo->clause);
4809 
4810 		if (IS_OUTER_JOIN(jointype) &&
4811 			!RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
4812 		{
4813 			if (!is_remote_clause)
4814 				return false;
4815 			joinclauses = lappend(joinclauses, rinfo);
4816 		}
4817 		else
4818 		{
4819 			if (is_remote_clause)
4820 				fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
4821 			else
4822 				fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
4823 		}
4824 	}
4825 
4826 	/*
4827 	 * deparseExplicitTargetList() isn't smart enough to handle anything other
4828 	 * than a Var.  In particular, if there's some PlaceHolderVar that would
4829 	 * need to be evaluated within this join tree (because there's an upper
4830 	 * reference to a quantity that may go to NULL as a result of an outer
4831 	 * join), then we can't try to push the join down because we'll fail when
4832 	 * we get to deparseExplicitTargetList().  However, a PlaceHolderVar that
4833 	 * needs to be evaluated *at the top* of this join tree is OK, because we
4834 	 * can do that locally after fetching the results from the remote side.
4835 	 */
4836 	foreach(lc, root->placeholder_list)
4837 	{
4838 		PlaceHolderInfo *phinfo = lfirst(lc);
4839 		Relids		relids;
4840 
4841 		/* PlaceHolderInfo refers to parent relids, not child relids. */
4842 		relids = IS_OTHER_REL(joinrel) ?
4843 			joinrel->top_parent_relids : joinrel->relids;
4844 
4845 		if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4846 			bms_nonempty_difference(relids, phinfo->ph_eval_at))
4847 			return false;
4848 	}
4849 
4850 	/* Save the join clauses, for later use. */
4851 	fpinfo->joinclauses = joinclauses;
4852 
4853 	fpinfo->outerrel = outerrel;
4854 	fpinfo->innerrel = innerrel;
4855 	fpinfo->jointype = jointype;
4856 
4857 	/*
4858 	 * By default, both the input relations are not required to be deparsed as
4859 	 * subqueries, but there might be some relations covered by the input
4860 	 * relations that are required to be deparsed as subqueries, so save the
4861 	 * relids of those relations for later use by the deparser.
4862 	 */
4863 	fpinfo->make_outerrel_subquery = false;
4864 	fpinfo->make_innerrel_subquery = false;
4865 	Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
4866 	Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
4867 	fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
4868 											fpinfo_i->lower_subquery_rels);
4869 
4870 	/*
4871 	 * Pull the other remote conditions from the joining relations into join
4872 	 * clauses or other remote clauses (remote_conds) of this relation
4873 	 * wherever possible. This avoids building subqueries at every join step.
4874 	 *
4875 	 * For an inner join, clauses from both the relations are added to the
4876 	 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4877 	 * the outer side are added to remote_conds since those can be evaluated
4878 	 * after the join is evaluated. The clauses from inner side are added to
4879 	 * the joinclauses, since they need to be evaluated while constructing the
4880 	 * join.
4881 	 *
4882 	 * For a FULL OUTER JOIN, the other clauses from either relation can not
4883 	 * be added to the joinclauses or remote_conds, since each relation acts
4884 	 * as an outer relation for the other.
4885 	 *
4886 	 * The joining sides can not have local conditions, thus no need to test
4887 	 * shippability of the clauses being pulled up.
4888 	 */
4889 	switch (jointype)
4890 	{
4891 		case JOIN_INNER:
4892 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4893 											   list_copy(fpinfo_i->remote_conds));
4894 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4895 											   list_copy(fpinfo_o->remote_conds));
4896 			break;
4897 
4898 		case JOIN_LEFT:
4899 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4900 											  list_copy(fpinfo_i->remote_conds));
4901 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4902 											   list_copy(fpinfo_o->remote_conds));
4903 			break;
4904 
4905 		case JOIN_RIGHT:
4906 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4907 											  list_copy(fpinfo_o->remote_conds));
4908 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4909 											   list_copy(fpinfo_i->remote_conds));
4910 			break;
4911 
4912 		case JOIN_FULL:
4913 
4914 			/*
4915 			 * In this case, if any of the input relations has conditions, we
4916 			 * need to deparse that relation as a subquery so that the
4917 			 * conditions can be evaluated before the join.  Remember it in
4918 			 * the fpinfo of this relation so that the deparser can take
4919 			 * appropriate action.  Also, save the relids of base relations
4920 			 * covered by that relation for later use by the deparser.
4921 			 */
4922 			if (fpinfo_o->remote_conds)
4923 			{
4924 				fpinfo->make_outerrel_subquery = true;
4925 				fpinfo->lower_subquery_rels =
4926 					bms_add_members(fpinfo->lower_subquery_rels,
4927 									outerrel->relids);
4928 			}
4929 			if (fpinfo_i->remote_conds)
4930 			{
4931 				fpinfo->make_innerrel_subquery = true;
4932 				fpinfo->lower_subquery_rels =
4933 					bms_add_members(fpinfo->lower_subquery_rels,
4934 									innerrel->relids);
4935 			}
4936 			break;
4937 
4938 		default:
4939 			/* Should not happen, we have just checked this above */
4940 			elog(ERROR, "unsupported join type %d", jointype);
4941 	}
4942 
4943 	/*
4944 	 * For an inner join, all restrictions can be treated alike. Treating the
4945 	 * pushed down conditions as join conditions allows a top level full outer
4946 	 * join to be deparsed without requiring subqueries.
4947 	 */
4948 	if (jointype == JOIN_INNER)
4949 	{
4950 		Assert(!fpinfo->joinclauses);
4951 		fpinfo->joinclauses = fpinfo->remote_conds;
4952 		fpinfo->remote_conds = NIL;
4953 	}
4954 
4955 	/* Mark that this join can be pushed down safely */
4956 	fpinfo->pushdown_safe = true;
4957 
4958 	/* Get user mapping */
4959 	if (fpinfo->use_remote_estimate)
4960 	{
4961 		if (fpinfo_o->use_remote_estimate)
4962 			fpinfo->user = fpinfo_o->user;
4963 		else
4964 			fpinfo->user = fpinfo_i->user;
4965 	}
4966 	else
4967 		fpinfo->user = NULL;
4968 
4969 	/*
4970 	 * Set cached relation costs to some negative value, so that we can detect
4971 	 * when they are set to some sensible costs, during one (usually the
4972 	 * first) of the calls to estimate_path_cost_size().
4973 	 */
4974 	fpinfo->rel_startup_cost = -1;
4975 	fpinfo->rel_total_cost = -1;
4976 
4977 	/*
4978 	 * Set the string describing this join relation to be used in EXPLAIN
4979 	 * output of corresponding ForeignScan.
4980 	 */
4981 	fpinfo->relation_name = makeStringInfo();
4982 	appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4983 					 fpinfo_o->relation_name->data,
4984 					 get_jointype_name(fpinfo->jointype),
4985 					 fpinfo_i->relation_name->data);
4986 
4987 	/*
4988 	 * Set the relation index.  This is defined as the position of this
4989 	 * joinrel in the join_rel_list list plus the length of the rtable list.
4990 	 * Note that since this joinrel is at the end of the join_rel_list list
4991 	 * when we are called, we can get the position by list_length.
4992 	 */
4993 	Assert(fpinfo->relation_index == 0);	/* shouldn't be set yet */
4994 	fpinfo->relation_index =
4995 		list_length(root->parse->rtable) + list_length(root->join_rel_list);
4996 
4997 	return true;
4998 }
4999 
5000 static void
add_paths_with_pathkeys_for_rel(PlannerInfo * root,RelOptInfo * rel,Path * epq_path)5001 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
5002 								Path *epq_path)
5003 {
5004 	List	   *useful_pathkeys_list = NIL; /* List of all pathkeys */
5005 	ListCell   *lc;
5006 
5007 	useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
5008 
5009 	/* Create one path for each set of pathkeys we found above. */
5010 	foreach(lc, useful_pathkeys_list)
5011 	{
5012 		double		rows;
5013 		int			width;
5014 		Cost		startup_cost;
5015 		Cost		total_cost;
5016 		List	   *useful_pathkeys = lfirst(lc);
5017 		Path	   *sorted_epq_path;
5018 
5019 		estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
5020 								&rows, &width, &startup_cost, &total_cost);
5021 
5022 		/*
5023 		 * The EPQ path must be at least as well sorted as the path itself, in
5024 		 * case it gets used as input to a mergejoin.
5025 		 */
5026 		sorted_epq_path = epq_path;
5027 		if (sorted_epq_path != NULL &&
5028 			!pathkeys_contained_in(useful_pathkeys,
5029 								   sorted_epq_path->pathkeys))
5030 			sorted_epq_path = (Path *)
5031 				create_sort_path(root,
5032 								 rel,
5033 								 sorted_epq_path,
5034 								 useful_pathkeys,
5035 								 -1.0);
5036 
5037 		add_path(rel, (Path *)
5038 				 create_foreignscan_path(root, rel,
5039 										 NULL,
5040 										 rows,
5041 										 startup_cost,
5042 										 total_cost,
5043 										 useful_pathkeys,
5044 										 rel->lateral_relids,
5045 										 sorted_epq_path,
5046 										 NIL));
5047 	}
5048 }
5049 
5050 /*
5051  * Parse options from foreign server and apply them to fpinfo.
5052  *
5053  * New options might also require tweaking merge_fdw_options().
5054  */
5055 static void
apply_server_options(PgFdwRelationInfo * fpinfo)5056 apply_server_options(PgFdwRelationInfo *fpinfo)
5057 {
5058 	ListCell   *lc;
5059 
5060 	foreach(lc, fpinfo->server->options)
5061 	{
5062 		DefElem    *def = (DefElem *) lfirst(lc);
5063 
5064 		if (strcmp(def->defname, "use_remote_estimate") == 0)
5065 			fpinfo->use_remote_estimate = defGetBoolean(def);
5066 		else if (strcmp(def->defname, "fdw_startup_cost") == 0)
5067 			fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
5068 		else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
5069 			fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
5070 		else if (strcmp(def->defname, "extensions") == 0)
5071 			fpinfo->shippable_extensions =
5072 				ExtractExtensionList(defGetString(def), false);
5073 		else if (strcmp(def->defname, "fetch_size") == 0)
5074 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5075 	}
5076 }
5077 
5078 /*
5079  * Parse options from foreign table and apply them to fpinfo.
5080  *
5081  * New options might also require tweaking merge_fdw_options().
5082  */
5083 static void
apply_table_options(PgFdwRelationInfo * fpinfo)5084 apply_table_options(PgFdwRelationInfo *fpinfo)
5085 {
5086 	ListCell   *lc;
5087 
5088 	foreach(lc, fpinfo->table->options)
5089 	{
5090 		DefElem    *def = (DefElem *) lfirst(lc);
5091 
5092 		if (strcmp(def->defname, "use_remote_estimate") == 0)
5093 			fpinfo->use_remote_estimate = defGetBoolean(def);
5094 		else if (strcmp(def->defname, "fetch_size") == 0)
5095 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5096 	}
5097 }
5098 
5099 /*
5100  * Merge FDW options from input relations into a new set of options for a join
5101  * or an upper rel.
5102  *
5103  * For a join relation, FDW-specific information about the inner and outer
5104  * relations is provided using fpinfo_i and fpinfo_o.  For an upper relation,
5105  * fpinfo_o provides the information for the input relation; fpinfo_i is
5106  * expected to NULL.
5107  */
5108 static void
merge_fdw_options(PgFdwRelationInfo * fpinfo,const PgFdwRelationInfo * fpinfo_o,const PgFdwRelationInfo * fpinfo_i)5109 merge_fdw_options(PgFdwRelationInfo *fpinfo,
5110 				  const PgFdwRelationInfo *fpinfo_o,
5111 				  const PgFdwRelationInfo *fpinfo_i)
5112 {
5113 	/* We must always have fpinfo_o. */
5114 	Assert(fpinfo_o);
5115 
5116 	/* fpinfo_i may be NULL, but if present the servers must both match. */
5117 	Assert(!fpinfo_i ||
5118 		   fpinfo_i->server->serverid == fpinfo_o->server->serverid);
5119 
5120 	/*
5121 	 * Copy the server specific FDW options.  (For a join, both relations come
5122 	 * from the same server, so the server options should have the same value
5123 	 * for both relations.)
5124 	 */
5125 	fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
5126 	fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
5127 	fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
5128 	fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
5129 	fpinfo->fetch_size = fpinfo_o->fetch_size;
5130 
5131 	/* Merge the table level options from either side of the join. */
5132 	if (fpinfo_i)
5133 	{
5134 		/*
5135 		 * We'll prefer to use remote estimates for this join if any table
5136 		 * from either side of the join is using remote estimates.  This is
5137 		 * most likely going to be preferred since they're already willing to
5138 		 * pay the price of a round trip to get the remote EXPLAIN.  In any
5139 		 * case it's not entirely clear how we might otherwise handle this
5140 		 * best.
5141 		 */
5142 		fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
5143 			fpinfo_i->use_remote_estimate;
5144 
5145 		/*
5146 		 * Set fetch size to maximum of the joining sides, since we are
5147 		 * expecting the rows returned by the join to be proportional to the
5148 		 * relation sizes.
5149 		 */
5150 		fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
5151 	}
5152 }
5153 
5154 /*
5155  * postgresGetForeignJoinPaths
5156  *		Add possible ForeignPath to joinrel, if join is safe to push down.
5157  */
5158 static void
postgresGetForeignJoinPaths(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)5159 postgresGetForeignJoinPaths(PlannerInfo *root,
5160 							RelOptInfo *joinrel,
5161 							RelOptInfo *outerrel,
5162 							RelOptInfo *innerrel,
5163 							JoinType jointype,
5164 							JoinPathExtraData *extra)
5165 {
5166 	PgFdwRelationInfo *fpinfo;
5167 	ForeignPath *joinpath;
5168 	double		rows;
5169 	int			width;
5170 	Cost		startup_cost;
5171 	Cost		total_cost;
5172 	Path	   *epq_path;		/* Path to create plan to be executed when
5173 								 * EvalPlanQual gets triggered. */
5174 
5175 	/*
5176 	 * Skip if this join combination has been considered already.
5177 	 */
5178 	if (joinrel->fdw_private)
5179 		return;
5180 
5181 	/*
5182 	 * This code does not work for joins with lateral references, since those
5183 	 * must have parameterized paths, which we don't generate yet.
5184 	 */
5185 	if (!bms_is_empty(joinrel->lateral_relids))
5186 		return;
5187 
5188 	/*
5189 	 * Create unfinished PgFdwRelationInfo entry which is used to indicate
5190 	 * that the join relation is already considered, so that we won't waste
5191 	 * time in judging safety of join pushdown and adding the same paths again
5192 	 * if found safe. Once we know that this join can be pushed down, we fill
5193 	 * the entry.
5194 	 */
5195 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5196 	fpinfo->pushdown_safe = false;
5197 	joinrel->fdw_private = fpinfo;
5198 	/* attrs_used is only for base relations. */
5199 	fpinfo->attrs_used = NULL;
5200 
5201 	/*
5202 	 * If there is a possibility that EvalPlanQual will be executed, we need
5203 	 * to be able to reconstruct the row using scans of the base relations.
5204 	 * GetExistingLocalJoinPath will find a suitable path for this purpose in
5205 	 * the path list of the joinrel, if one exists.  We must be careful to
5206 	 * call it before adding any ForeignPath, since the ForeignPath might
5207 	 * dominate the only suitable local path available.  We also do it before
5208 	 * calling foreign_join_ok(), since that function updates fpinfo and marks
5209 	 * it as pushable if the join is found to be pushable.
5210 	 */
5211 	if (root->parse->commandType == CMD_DELETE ||
5212 		root->parse->commandType == CMD_UPDATE ||
5213 		root->rowMarks)
5214 	{
5215 		epq_path = GetExistingLocalJoinPath(joinrel);
5216 		if (!epq_path)
5217 		{
5218 			elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
5219 			return;
5220 		}
5221 	}
5222 	else
5223 		epq_path = NULL;
5224 
5225 	if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
5226 	{
5227 		/* Free path required for EPQ if we copied one; we don't need it now */
5228 		if (epq_path)
5229 			pfree(epq_path);
5230 		return;
5231 	}
5232 
5233 	/*
5234 	 * Compute the selectivity and cost of the local_conds, so we don't have
5235 	 * to do it over again for each path. The best we can do for these
5236 	 * conditions is to estimate selectivity on the basis of local statistics.
5237 	 * The local conditions are applied after the join has been computed on
5238 	 * the remote side like quals in WHERE clause, so pass jointype as
5239 	 * JOIN_INNER.
5240 	 */
5241 	fpinfo->local_conds_sel = clauselist_selectivity(root,
5242 													 fpinfo->local_conds,
5243 													 0,
5244 													 JOIN_INNER,
5245 													 NULL);
5246 	cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5247 
5248 	/*
5249 	 * If we are going to estimate costs locally, estimate the join clause
5250 	 * selectivity here while we have special join info.
5251 	 */
5252 	if (!fpinfo->use_remote_estimate)
5253 		fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
5254 														0, fpinfo->jointype,
5255 														extra->sjinfo);
5256 
5257 	/* Estimate costs for bare join relation */
5258 	estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
5259 							&width, &startup_cost, &total_cost);
5260 	/* Now update this information in the joinrel */
5261 	joinrel->rows = rows;
5262 	joinrel->reltarget->width = width;
5263 	fpinfo->rows = rows;
5264 	fpinfo->width = width;
5265 	fpinfo->startup_cost = startup_cost;
5266 	fpinfo->total_cost = total_cost;
5267 
5268 	/*
5269 	 * Create a new join path and add it to the joinrel which represents a
5270 	 * join between foreign tables.
5271 	 */
5272 	joinpath = create_foreignscan_path(root,
5273 									   joinrel,
5274 									   NULL,	/* default pathtarget */
5275 									   rows,
5276 									   startup_cost,
5277 									   total_cost,
5278 									   NIL, /* no pathkeys */
5279 									   joinrel->lateral_relids,
5280 									   epq_path,
5281 									   NIL);	/* no fdw_private */
5282 
5283 	/* Add generated path into joinrel by add_path(). */
5284 	add_path(joinrel, (Path *) joinpath);
5285 
5286 	/* Consider pathkeys for the join relation */
5287 	add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
5288 
5289 	/* XXX Consider parameterized paths for the join relation */
5290 }
5291 
5292 /*
5293  * Assess whether the aggregation, grouping and having operations can be pushed
5294  * down to the foreign server.  As a side effect, save information we obtain in
5295  * this function to PgFdwRelationInfo of the input relation.
5296  */
5297 static bool
foreign_grouping_ok(PlannerInfo * root,RelOptInfo * grouped_rel,Node * havingQual)5298 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
5299 					Node *havingQual)
5300 {
5301 	Query	   *query = root->parse;
5302 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
5303 	PathTarget *grouping_target = grouped_rel->reltarget;
5304 	PgFdwRelationInfo *ofpinfo;
5305 	ListCell   *lc;
5306 	int			i;
5307 	List	   *tlist = NIL;
5308 
5309 	/* We currently don't support pushing Grouping Sets. */
5310 	if (query->groupingSets)
5311 		return false;
5312 
5313 	/* Get the fpinfo of the underlying scan relation. */
5314 	ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
5315 
5316 	/*
5317 	 * If underlying scan relation has any local conditions, those conditions
5318 	 * are required to be applied before performing aggregation.  Hence the
5319 	 * aggregate cannot be pushed down.
5320 	 */
5321 	if (ofpinfo->local_conds)
5322 		return false;
5323 
5324 	/*
5325 	 * Examine grouping expressions, as well as other expressions we'd need to
5326 	 * compute, and check whether they are safe to push down to the foreign
5327 	 * server.  All GROUP BY expressions will be part of the grouping target
5328 	 * and thus there is no need to search for them separately.  Add grouping
5329 	 * expressions into target list which will be passed to foreign server.
5330 	 *
5331 	 * A tricky fine point is that we must not put any expression into the
5332 	 * target list that is just a foreign param (that is, something that
5333 	 * deparse.c would conclude has to be sent to the foreign server).  If we
5334 	 * do, the expression will also appear in the fdw_exprs list of the plan
5335 	 * node, and setrefs.c will get confused and decide that the fdw_exprs
5336 	 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
5337 	 * a broken plan.  Somewhat oddly, it's OK if the expression contains such
5338 	 * a node, as long as it's not at top level; then no match is possible.
5339 	 */
5340 	i = 0;
5341 	foreach(lc, grouping_target->exprs)
5342 	{
5343 		Expr	   *expr = (Expr *) lfirst(lc);
5344 		Index		sgref = get_pathtarget_sortgroupref(grouping_target, i);
5345 		ListCell   *l;
5346 
5347 		/* Check whether this expression is part of GROUP BY clause */
5348 		if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
5349 		{
5350 			TargetEntry *tle;
5351 
5352 			/*
5353 			 * If any GROUP BY expression is not shippable, then we cannot
5354 			 * push down aggregation to the foreign server.
5355 			 */
5356 			if (!is_foreign_expr(root, grouped_rel, expr))
5357 				return false;
5358 
5359 			/*
5360 			 * If it would be a foreign param, we can't put it into the tlist,
5361 			 * so we have to fail.
5362 			 */
5363 			if (is_foreign_param(root, grouped_rel, expr))
5364 				return false;
5365 
5366 			/*
5367 			 * Pushable, so add to tlist.  We need to create a TLE for this
5368 			 * expression and apply the sortgroupref to it.  We cannot use
5369 			 * add_to_flat_tlist() here because that avoids making duplicate
5370 			 * entries in the tlist.  If there are duplicate entries with
5371 			 * distinct sortgrouprefs, we have to duplicate that situation in
5372 			 * the output tlist.
5373 			 */
5374 			tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
5375 			tle->ressortgroupref = sgref;
5376 			tlist = lappend(tlist, tle);
5377 		}
5378 		else
5379 		{
5380 			/*
5381 			 * Non-grouping expression we need to compute.  Can we ship it
5382 			 * as-is to the foreign server?
5383 			 */
5384 			if (is_foreign_expr(root, grouped_rel, expr) &&
5385 				!is_foreign_param(root, grouped_rel, expr))
5386 			{
5387 				/* Yes, so add to tlist as-is; OK to suppress duplicates */
5388 				tlist = add_to_flat_tlist(tlist, list_make1(expr));
5389 			}
5390 			else
5391 			{
5392 				/* Not pushable as a whole; extract its Vars and aggregates */
5393 				List	   *aggvars;
5394 
5395 				aggvars = pull_var_clause((Node *) expr,
5396 										  PVC_INCLUDE_AGGREGATES);
5397 
5398 				/*
5399 				 * If any aggregate expression is not shippable, then we
5400 				 * cannot push down aggregation to the foreign server.  (We
5401 				 * don't have to check is_foreign_param, since that certainly
5402 				 * won't return true for any such expression.)
5403 				 */
5404 				if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
5405 					return false;
5406 
5407 				/*
5408 				 * Add aggregates, if any, into the targetlist.  Plain Vars
5409 				 * outside an aggregate can be ignored, because they should be
5410 				 * either same as some GROUP BY column or part of some GROUP
5411 				 * BY expression.  In either case, they are already part of
5412 				 * the targetlist and thus no need to add them again.  In fact
5413 				 * including plain Vars in the tlist when they do not match a
5414 				 * GROUP BY column would cause the foreign server to complain
5415 				 * that the shipped query is invalid.
5416 				 */
5417 				foreach(l, aggvars)
5418 				{
5419 					Expr	   *expr = (Expr *) lfirst(l);
5420 
5421 					if (IsA(expr, Aggref))
5422 						tlist = add_to_flat_tlist(tlist, list_make1(expr));
5423 				}
5424 			}
5425 		}
5426 
5427 		i++;
5428 	}
5429 
5430 	/*
5431 	 * Classify the pushable and non-pushable HAVING clauses and save them in
5432 	 * remote_conds and local_conds of the grouped rel's fpinfo.
5433 	 */
5434 	if (havingQual)
5435 	{
5436 		ListCell   *lc;
5437 
5438 		foreach(lc, (List *) havingQual)
5439 		{
5440 			Expr	   *expr = (Expr *) lfirst(lc);
5441 			RestrictInfo *rinfo;
5442 
5443 			/*
5444 			 * Currently, the core code doesn't wrap havingQuals in
5445 			 * RestrictInfos, so we must make our own.
5446 			 */
5447 			Assert(!IsA(expr, RestrictInfo));
5448 			rinfo = make_restrictinfo(expr,
5449 									  true,
5450 									  false,
5451 									  false,
5452 									  root->qual_security_level,
5453 									  grouped_rel->relids,
5454 									  NULL,
5455 									  NULL);
5456 			if (is_foreign_expr(root, grouped_rel, expr))
5457 				fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5458 			else
5459 				fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5460 		}
5461 	}
5462 
5463 	/*
5464 	 * If there are any local conditions, pull Vars and aggregates from it and
5465 	 * check whether they are safe to pushdown or not.
5466 	 */
5467 	if (fpinfo->local_conds)
5468 	{
5469 		List	   *aggvars = NIL;
5470 		ListCell   *lc;
5471 
5472 		foreach(lc, fpinfo->local_conds)
5473 		{
5474 			RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5475 
5476 			aggvars = list_concat(aggvars,
5477 								  pull_var_clause((Node *) rinfo->clause,
5478 												  PVC_INCLUDE_AGGREGATES));
5479 		}
5480 
5481 		foreach(lc, aggvars)
5482 		{
5483 			Expr	   *expr = (Expr *) lfirst(lc);
5484 
5485 			/*
5486 			 * If aggregates within local conditions are not safe to push
5487 			 * down, then we cannot push down the query.  Vars are already
5488 			 * part of GROUP BY clause which are checked above, so no need to
5489 			 * access them again here.  Again, we need not check
5490 			 * is_foreign_param for a foreign aggregate.
5491 			 */
5492 			if (IsA(expr, Aggref))
5493 			{
5494 				if (!is_foreign_expr(root, grouped_rel, expr))
5495 					return false;
5496 
5497 				tlist = add_to_flat_tlist(tlist, list_make1(expr));
5498 			}
5499 		}
5500 	}
5501 
5502 	/* Store generated targetlist */
5503 	fpinfo->grouped_tlist = tlist;
5504 
5505 	/* Safe to pushdown */
5506 	fpinfo->pushdown_safe = true;
5507 
5508 	/*
5509 	 * Set cached relation costs to some negative value, so that we can detect
5510 	 * when they are set to some sensible costs, during one (usually the
5511 	 * first) of the calls to estimate_path_cost_size().
5512 	 */
5513 	fpinfo->rel_startup_cost = -1;
5514 	fpinfo->rel_total_cost = -1;
5515 
5516 	/*
5517 	 * Set the string describing this grouped relation to be used in EXPLAIN
5518 	 * output of corresponding ForeignScan.
5519 	 */
5520 	fpinfo->relation_name = makeStringInfo();
5521 	appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
5522 					 ofpinfo->relation_name->data);
5523 
5524 	return true;
5525 }
5526 
5527 /*
5528  * postgresGetForeignUpperPaths
5529  *		Add paths for post-join operations like aggregation, grouping etc. if
5530  *		corresponding operations are safe to push down.
5531  *
5532  * Right now, we only support aggregate, grouping and having clause pushdown.
5533  */
5534 static void
postgresGetForeignUpperPaths(PlannerInfo * root,UpperRelationKind stage,RelOptInfo * input_rel,RelOptInfo * output_rel,void * extra)5535 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
5536 							 RelOptInfo *input_rel, RelOptInfo *output_rel,
5537 							 void *extra)
5538 {
5539 	PgFdwRelationInfo *fpinfo;
5540 
5541 	/*
5542 	 * If input rel is not safe to pushdown, then simply return as we cannot
5543 	 * perform any post-join operations on the foreign server.
5544 	 */
5545 	if (!input_rel->fdw_private ||
5546 		!((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
5547 		return;
5548 
5549 	/* Ignore stages we don't support; and skip any duplicate calls. */
5550 	if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
5551 		return;
5552 
5553 	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5554 	fpinfo->pushdown_safe = false;
5555 	output_rel->fdw_private = fpinfo;
5556 
5557 	add_foreign_grouping_paths(root, input_rel, output_rel,
5558 							   (GroupPathExtraData *) extra);
5559 }
5560 
5561 /*
5562  * add_foreign_grouping_paths
5563  *		Add foreign path for grouping and/or aggregation.
5564  *
5565  * Given input_rel represents the underlying scan.  The paths are added to the
5566  * given grouped_rel.
5567  */
5568 static void
add_foreign_grouping_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * grouped_rel,GroupPathExtraData * extra)5569 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
5570 						   RelOptInfo *grouped_rel,
5571 						   GroupPathExtraData *extra)
5572 {
5573 	Query	   *parse = root->parse;
5574 	PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5575 	PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
5576 	ForeignPath *grouppath;
5577 	double		rows;
5578 	int			width;
5579 	Cost		startup_cost;
5580 	Cost		total_cost;
5581 
5582 	/* Nothing to be done, if there is no grouping or aggregation required. */
5583 	if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
5584 		!root->hasHavingQual)
5585 		return;
5586 
5587 	Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
5588 		   extra->patype == PARTITIONWISE_AGGREGATE_FULL);
5589 
5590 	/* save the input_rel as outerrel in fpinfo */
5591 	fpinfo->outerrel = input_rel;
5592 
5593 	/*
5594 	 * Copy foreign table, foreign server, user mapping, FDW options etc.
5595 	 * details from the input relation's fpinfo.
5596 	 */
5597 	fpinfo->table = ifpinfo->table;
5598 	fpinfo->server = ifpinfo->server;
5599 	fpinfo->user = ifpinfo->user;
5600 	merge_fdw_options(fpinfo, ifpinfo, NULL);
5601 
5602 	/*
5603 	 * Assess if it is safe to push down aggregation and grouping.
5604 	 *
5605 	 * Use HAVING qual from extra. In case of child partition, it will have
5606 	 * translated Vars.
5607 	 */
5608 	if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
5609 		return;
5610 
5611 	/* Estimate the cost of push down */
5612 	estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
5613 							&width, &startup_cost, &total_cost);
5614 
5615 	/* Now update this information in the fpinfo */
5616 	fpinfo->rows = rows;
5617 	fpinfo->width = width;
5618 	fpinfo->startup_cost = startup_cost;
5619 	fpinfo->total_cost = total_cost;
5620 
5621 	/* Create and add foreign path to the grouping relation. */
5622 	grouppath = create_foreignscan_path(root,
5623 										grouped_rel,
5624 										grouped_rel->reltarget,
5625 										rows,
5626 										startup_cost,
5627 										total_cost,
5628 										NIL,	/* no pathkeys */
5629 										grouped_rel->lateral_relids,
5630 										NULL,
5631 										NIL);	/* no fdw_private */
5632 
5633 	/* Add generated path into grouped_rel by add_path(). */
5634 	add_path(grouped_rel, (Path *) grouppath);
5635 }
5636 
5637 /*
5638  * Create a tuple from the specified row of the PGresult.
5639  *
5640  * rel is the local representation of the foreign table, attinmeta is
5641  * conversion data for the rel's tupdesc, and retrieved_attrs is an
5642  * integer list of the table column numbers present in the PGresult.
5643  * fsstate is the ForeignScan plan node's execution state.
5644  * temp_context is a working context that can be reset after each tuple.
5645  *
5646  * Note: either rel or fsstate, but not both, can be NULL.  rel is NULL
5647  * if we're processing a remote join, while fsstate is NULL in a non-query
5648  * context such as ANALYZE, or if we're processing a non-scan query node.
5649  */
5650 static HeapTuple
make_tuple_from_result_row(PGresult * res,int row,Relation rel,AttInMetadata * attinmeta,List * retrieved_attrs,ForeignScanState * fsstate,MemoryContext temp_context)5651 make_tuple_from_result_row(PGresult *res,
5652 						   int row,
5653 						   Relation rel,
5654 						   AttInMetadata *attinmeta,
5655 						   List *retrieved_attrs,
5656 						   ForeignScanState *fsstate,
5657 						   MemoryContext temp_context)
5658 {
5659 	HeapTuple	tuple;
5660 	TupleDesc	tupdesc;
5661 	Datum	   *values;
5662 	bool	   *nulls;
5663 	ItemPointer ctid = NULL;
5664 	Oid			oid = InvalidOid;
5665 	ConversionLocation errpos;
5666 	ErrorContextCallback errcallback;
5667 	MemoryContext oldcontext;
5668 	ListCell   *lc;
5669 	int			j;
5670 
5671 	Assert(row < PQntuples(res));
5672 
5673 	/*
5674 	 * Do the following work in a temp context that we reset after each tuple.
5675 	 * This cleans up not only the data we have direct access to, but any
5676 	 * cruft the I/O functions might leak.
5677 	 */
5678 	oldcontext = MemoryContextSwitchTo(temp_context);
5679 
5680 	/*
5681 	 * Get the tuple descriptor for the row.  Use the rel's tupdesc if rel is
5682 	 * provided, otherwise look to the scan node's ScanTupleSlot.
5683 	 */
5684 	if (rel)
5685 		tupdesc = RelationGetDescr(rel);
5686 	else
5687 	{
5688 		Assert(fsstate);
5689 		tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
5690 	}
5691 
5692 	values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
5693 	nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
5694 	/* Initialize to nulls for any columns not present in result */
5695 	memset(nulls, true, tupdesc->natts * sizeof(bool));
5696 
5697 	/*
5698 	 * Set up and install callback to report where conversion error occurs.
5699 	 */
5700 	errpos.cur_attno = 0;
5701 	errpos.rel = rel;
5702 	errpos.fsstate = fsstate;
5703 	errcallback.callback = conversion_error_callback;
5704 	errcallback.arg = (void *) &errpos;
5705 	errcallback.previous = error_context_stack;
5706 	error_context_stack = &errcallback;
5707 
5708 	/*
5709 	 * i indexes columns in the relation, j indexes columns in the PGresult.
5710 	 */
5711 	j = 0;
5712 	foreach(lc, retrieved_attrs)
5713 	{
5714 		int			i = lfirst_int(lc);
5715 		char	   *valstr;
5716 
5717 		/* fetch next column's textual value */
5718 		if (PQgetisnull(res, row, j))
5719 			valstr = NULL;
5720 		else
5721 			valstr = PQgetvalue(res, row, j);
5722 
5723 		/*
5724 		 * convert value to internal representation
5725 		 *
5726 		 * Note: we ignore system columns other than ctid and oid in result
5727 		 */
5728 		errpos.cur_attno = i;
5729 		if (i > 0)
5730 		{
5731 			/* ordinary column */
5732 			Assert(i <= tupdesc->natts);
5733 			nulls[i - 1] = (valstr == NULL);
5734 			/* Apply the input function even to nulls, to support domains */
5735 			values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
5736 											  valstr,
5737 											  attinmeta->attioparams[i - 1],
5738 											  attinmeta->atttypmods[i - 1]);
5739 		}
5740 		else if (i == SelfItemPointerAttributeNumber)
5741 		{
5742 			/* ctid */
5743 			if (valstr != NULL)
5744 			{
5745 				Datum		datum;
5746 
5747 				datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
5748 				ctid = (ItemPointer) DatumGetPointer(datum);
5749 			}
5750 		}
5751 		else if (i == ObjectIdAttributeNumber)
5752 		{
5753 			/* oid */
5754 			if (valstr != NULL)
5755 			{
5756 				Datum		datum;
5757 
5758 				datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
5759 				oid = DatumGetObjectId(datum);
5760 			}
5761 		}
5762 		errpos.cur_attno = 0;
5763 
5764 		j++;
5765 	}
5766 
5767 	/* Uninstall error context callback. */
5768 	error_context_stack = errcallback.previous;
5769 
5770 	/*
5771 	 * Check we got the expected number of columns.  Note: j == 0 and
5772 	 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
5773 	 */
5774 	if (j > 0 && j != PQnfields(res))
5775 		elog(ERROR, "remote query result does not match the foreign table");
5776 
5777 	/*
5778 	 * Build the result tuple in caller's memory context.
5779 	 */
5780 	MemoryContextSwitchTo(oldcontext);
5781 
5782 	tuple = heap_form_tuple(tupdesc, values, nulls);
5783 
5784 	/*
5785 	 * If we have a CTID to return, install it in both t_self and t_ctid.
5786 	 * t_self is the normal place, but if the tuple is converted to a
5787 	 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
5788 	 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
5789 	 */
5790 	if (ctid)
5791 		tuple->t_self = tuple->t_data->t_ctid = *ctid;
5792 
5793 	/*
5794 	 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
5795 	 * heap_form_tuple.  heap_form_tuple actually creates the tuple with
5796 	 * DatumTupleFields, not HeapTupleFields, but the executor expects
5797 	 * HeapTupleFields and will happily extract system columns on that
5798 	 * assumption.  If we don't do this then, for example, the tuple length
5799 	 * ends up in the xmin field, which isn't what we want.
5800 	 */
5801 	HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
5802 	HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
5803 	HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
5804 
5805 	/*
5806 	 * If we have an OID to return, install it.
5807 	 */
5808 	if (OidIsValid(oid))
5809 		HeapTupleSetOid(tuple, oid);
5810 
5811 	/* Clean up */
5812 	MemoryContextReset(temp_context);
5813 
5814 	return tuple;
5815 }
5816 
5817 /*
5818  * Callback function which is called when error occurs during column value
5819  * conversion.  Print names of column and relation.
5820  *
5821  * Note that this function mustn't do any catalog lookups, since we are in
5822  * an already-failed transaction.  Fortunately, we can get the needed info
5823  * from the relation or the query's rangetable instead.
5824  */
5825 static void
conversion_error_callback(void * arg)5826 conversion_error_callback(void *arg)
5827 {
5828 	ConversionLocation *errpos = (ConversionLocation *) arg;
5829 	Relation	rel = errpos->rel;
5830 	ForeignScanState *fsstate = errpos->fsstate;
5831 	const char *attname = NULL;
5832 	const char *relname = NULL;
5833 	bool		is_wholerow = false;
5834 
5835 	/*
5836 	 * If we're in a scan node, always use aliases from the rangetable, for
5837 	 * consistency between the simple-relation and remote-join cases.  Look at
5838 	 * the relation's tupdesc only if we're not in a scan node.
5839 	 */
5840 	if (fsstate)
5841 	{
5842 		/* ForeignScan case */
5843 		ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
5844 		int			varno = 0;
5845 		AttrNumber	colno = 0;
5846 
5847 		if (fsplan->scan.scanrelid > 0)
5848 		{
5849 			/* error occurred in a scan against a foreign table */
5850 			varno = fsplan->scan.scanrelid;
5851 			colno = errpos->cur_attno;
5852 		}
5853 		else
5854 		{
5855 			/* error occurred in a scan against a foreign join */
5856 			TargetEntry *tle;
5857 
5858 			tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
5859 								errpos->cur_attno - 1);
5860 
5861 			/*
5862 			 * Target list can have Vars and expressions.  For Vars, we can
5863 			 * get some information, however for expressions we can't.  Thus
5864 			 * for expressions, just show generic context message.
5865 			 */
5866 			if (IsA(tle->expr, Var))
5867 			{
5868 				Var		   *var = (Var *) tle->expr;
5869 
5870 				varno = var->varno;
5871 				colno = var->varattno;
5872 			}
5873 		}
5874 
5875 		if (varno > 0)
5876 		{
5877 			EState	   *estate = fsstate->ss.ps.state;
5878 			RangeTblEntry *rte = rt_fetch(varno, estate->es_range_table);
5879 
5880 			relname = rte->eref->aliasname;
5881 
5882 			if (colno == 0)
5883 				is_wholerow = true;
5884 			else if (colno > 0 && colno <= list_length(rte->eref->colnames))
5885 				attname = strVal(list_nth(rte->eref->colnames, colno - 1));
5886 			else if (colno == SelfItemPointerAttributeNumber)
5887 				attname = "ctid";
5888 			else if (colno == ObjectIdAttributeNumber)
5889 				attname = "oid";
5890 		}
5891 	}
5892 	else if (rel)
5893 	{
5894 		/* Non-ForeignScan case (we should always have a rel here) */
5895 		TupleDesc	tupdesc = RelationGetDescr(rel);
5896 
5897 		relname = RelationGetRelationName(rel);
5898 		if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
5899 		{
5900 			Form_pg_attribute attr = TupleDescAttr(tupdesc,
5901 												   errpos->cur_attno - 1);
5902 
5903 			attname = NameStr(attr->attname);
5904 		}
5905 		else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
5906 			attname = "ctid";
5907 	}
5908 
5909 	if (relname && is_wholerow)
5910 		errcontext("whole-row reference to foreign table \"%s\"", relname);
5911 	else if (relname && attname)
5912 		errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
5913 	else
5914 		errcontext("processing expression at position %d in select list",
5915 				   errpos->cur_attno);
5916 }
5917 
5918 /*
5919  * Find an equivalence class member expression, all of whose Vars, come from
5920  * the indicated relation.
5921  */
5922 Expr *
find_em_expr_for_rel(EquivalenceClass * ec,RelOptInfo * rel)5923 find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
5924 {
5925 	ListCell   *lc_em;
5926 
5927 	foreach(lc_em, ec->ec_members)
5928 	{
5929 		EquivalenceMember *em = lfirst(lc_em);
5930 
5931 		if (bms_is_subset(em->em_relids, rel->relids) &&
5932 			!bms_is_empty(em->em_relids))
5933 		{
5934 			/*
5935 			 * If there is more than one equivalence member whose Vars are
5936 			 * taken entirely from this relation, we'll be content to choose
5937 			 * any one of those.
5938 			 */
5939 			return em->em_expr;
5940 		}
5941 	}
5942 
5943 	/* We didn't find any suitable equivalence class expression */
5944 	return NULL;
5945 }
5946