1 /*-------------------------------------------------------------------------
2 *
3 * postgres_fdw.c
4 * Foreign-data wrapper for remote PostgreSQL servers
5 *
6 * Portions Copyright (c) 2012-2020, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/postgres_fdw/postgres_fdw.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include <limits.h>
16
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "commands/defrem.h"
22 #include "commands/explain.h"
23 #include "commands/vacuum.h"
24 #include "foreign/fdwapi.h"
25 #include "funcapi.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "nodes/nodeFuncs.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/cost.h"
31 #include "optimizer/optimizer.h"
32 #include "optimizer/pathnode.h"
33 #include "optimizer/paths.h"
34 #include "optimizer/planmain.h"
35 #include "optimizer/restrictinfo.h"
36 #include "optimizer/tlist.h"
37 #include "parser/parsetree.h"
38 #include "postgres_fdw.h"
39 #include "utils/builtins.h"
40 #include "utils/float.h"
41 #include "utils/guc.h"
42 #include "utils/lsyscache.h"
43 #include "utils/memutils.h"
44 #include "utils/rel.h"
45 #include "utils/sampling.h"
46 #include "utils/selfuncs.h"
47
48 /* source-code-compatibility hacks for pull_varnos() API change */
49 #define make_restrictinfo(a,b,c,d,e,f,g,h,i) make_restrictinfo_new(a,b,c,d,e,f,g,h,i)
50
51 PG_MODULE_MAGIC;
52
53 /* Default CPU cost to start up a foreign query. */
54 #define DEFAULT_FDW_STARTUP_COST 100.0
55
56 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
57 #define DEFAULT_FDW_TUPLE_COST 0.01
58
59 /* If no remote estimates, assume a sort costs 20% extra */
60 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
61
62 /*
63 * Indexes of FDW-private information stored in fdw_private lists.
64 *
65 * These items are indexed with the enum FdwScanPrivateIndex, so an item
66 * can be fetched with list_nth(). For example, to get the SELECT statement:
67 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
68 */
69 enum FdwScanPrivateIndex
70 {
free_openssl_digest(OSSLDigest * digest)71 /* SQL statement to execute remotely (as a String node) */
72 FdwScanPrivateSelectSql,
73 /* Integer list of attribute numbers retrieved by the SELECT */
74 FdwScanPrivateRetrievedAttrs,
75 /* Integer representing the desired fetch_size */
76 FdwScanPrivateFetchSize,
77
78 /*
79 * String describing join i.e. names of relations being joined and types
80 * of join, added when the scan is join
81 */
82 FdwScanPrivateRelations
83 };
84
85 /*
86 * Similarly, this enum describes what's kept in the fdw_private list for
digest_free_callback(ResourceReleasePhase phase,bool isCommit,bool isTopLevel,void * arg)87 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
88 *
89 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
90 * 2) Integer list of target attribute numbers for INSERT/UPDATE
91 * (NIL for a DELETE)
92 * 3) Boolean flag showing if the remote query has a RETURNING clause
93 * 4) Integer list of attribute numbers retrieved by RETURNING, if any
94 */
95 enum FdwModifyPrivateIndex
96 {
97 /* SQL statement to execute remotely (as a String node) */
98 FdwModifyPrivateUpdateSql,
99 /* Integer list of target attribute numbers for INSERT/UPDATE */
100 FdwModifyPrivateTargetAttnums,
101 /* has-returning flag (as an integer Value node) */
102 FdwModifyPrivateHasReturning,
103 /* Integer list of attribute numbers retrieved by RETURNING */
104 FdwModifyPrivateRetrievedAttrs
105 };
106
107 /*
108 * Similarly, this enum describes what's kept in the fdw_private list for
109 * a ForeignScan node that modifies a foreign table directly. We store:
110 *
111 * 1) UPDATE/DELETE statement text to be sent to the remote server
112 * 2) Boolean flag showing if the remote query has a RETURNING clause
113 * 3) Integer list of attribute numbers retrieved by RETURNING, if any
digest_result_size(PX_MD * h)114 * 4) Boolean flag showing if we set the command es_processed
115 */
116 enum FdwDirectModifyPrivateIndex
117 {
118 /* SQL statement to execute remotely (as a String node) */
119 FdwDirectModifyPrivateUpdateSql,
120 /* has-returning flag (as an integer Value node) */
121 FdwDirectModifyPrivateHasReturning,
122 /* Integer list of attribute numbers retrieved by RETURNING */
123 FdwDirectModifyPrivateRetrievedAttrs,
124 /* set-processed flag (as an integer Value node) */
125 FdwDirectModifyPrivateSetProcessed
126 };
127
128 /*
129 * Execution state of a foreign scan using postgres_fdw.
130 */
131 typedef struct PgFdwScanState
132 {
133 Relation rel; /* relcache entry for the foreign table. NULL
134 * for a foreign join scan. */
135 TupleDesc tupdesc; /* tuple descriptor of scan */
136 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
137
digest_reset(PX_MD * h)138 /* extracted fdw_private data */
139 char *query; /* text of SELECT command */
140 List *retrieved_attrs; /* list of retrieved attribute numbers */
141
142 /* for remote query execution */
143 PGconn *conn; /* connection for the scan */
144 unsigned int cursor_number; /* quasi-unique ID for my cursor */
145 bool cursor_exists; /* have we created the cursor? */
146 int numParams; /* number of parameters passed to query */
digest_update(PX_MD * h,const uint8 * data,unsigned dlen)147 FmgrInfo *param_flinfo; /* output conversion functions for them */
148 List *param_exprs; /* executable expressions for param values */
149 const char **param_values; /* textual values of query parameters */
150
151 /* for storing result tuples */
152 HeapTuple *tuples; /* array of currently-retrieved tuples */
153 int num_tuples; /* # of tuples in array */
154 int next_tuple; /* index of next one to return */
155
digest_finish(PX_MD * h,uint8 * dst)156 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
157 int fetch_ct_2; /* Min(# of fetches done, 2) */
158 bool eof_reached; /* true if last fetch reached EOF */
159
160 /* working memory contexts */
161 MemoryContext batch_cxt; /* context holding current batch of tuples */
162 MemoryContext temp_cxt; /* context for per-tuple temporary data */
163
164 int fetch_size; /* number of tuples per fetch */
digest_free(PX_MD * h)165 } PgFdwScanState;
166
167 /*
168 * Execution state of a foreign insert/update/delete operation.
169 */
170 typedef struct PgFdwModifyState
171 {
172 Relation rel; /* relcache entry for the foreign table */
173 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
174
175 /* for remote query execution */
176 PGconn *conn; /* connection for the scan */
177 char *p_name; /* name of prepared statement, if created */
px_find_digest(const char * name,PX_MD ** res)178
179 /* extracted fdw_private data */
180 char *query; /* text of INSERT/UPDATE/DELETE command */
181 List *target_attrs; /* list of target attribute numbers */
182 bool has_returning; /* is there a RETURNING clause? */
183 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
184
185 /* info about parameters for prepared statement */
186 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
187 int p_nums; /* number of parameters to transmit */
188 FmgrInfo *p_flinfo; /* output conversion functions for them */
189
190 /* working memory context */
191 MemoryContext temp_cxt; /* context for per-tuple temporary data */
192
193 /* for update row movement if subplan result rel */
194 struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
195 * created */
196 } PgFdwModifyState;
197
198 /*
199 * Execution state of a foreign scan that modifies a foreign table directly.
200 */
201 typedef struct PgFdwDirectModifyState
202 {
203 Relation rel; /* relcache entry for the foreign table */
204 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
205
206 /* extracted fdw_private data */
207 char *query; /* text of UPDATE/DELETE command */
208 bool has_returning; /* is there a RETURNING clause? */
209 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
210 bool set_processed; /* do we set the command es_processed? */
211
212 /* for remote query execution */
213 PGconn *conn; /* connection for the update */
214 int numParams; /* number of parameters passed to query */
215 FmgrInfo *param_flinfo; /* output conversion functions for them */
216 List *param_exprs; /* executable expressions for param values */
217 const char **param_values; /* textual values of query parameters */
218
219 /* for storing result tuples */
220 PGresult *result; /* result for query */
221 int num_tuples; /* # of result tuples */
222 int next_tuple; /* index of next one to return */
223 Relation resultRel; /* relcache entry for the target relation */
224 AttrNumber *attnoMap; /* array of attnums of input user columns */
225 AttrNumber ctidAttno; /* attnum of input ctid column */
226 AttrNumber oidAttno; /* attnum of input oid column */
227 bool hasSystemCols; /* are there system columns of resultRel? */
228
229 /* working memory context */
230 MemoryContext temp_cxt; /* context for per-tuple temporary data */
231 } PgFdwDirectModifyState;
232
233 /*
234 * Workspace for analyzing a foreign table.
235 */
236 typedef struct PgFdwAnalyzeState
237 {
238 Relation rel; /* relcache entry for the foreign table */
239 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
240 List *retrieved_attrs; /* attr numbers retrieved by query */
241
242 /* collected sample rows */
243 HeapTuple *rows; /* array of size targrows */
244 int targrows; /* target # of sample rows */
245 int numrows; /* # of sample rows collected */
246
247 /* for random sampling */
248 double samplerows; /* # of rows fetched */
249 double rowstoskip; /* # of rows to skip before next sample */
250 ReservoirStateData rstate; /* state for reservoir sampling */
251
252 /* working memory contexts */
253 MemoryContext anl_cxt; /* context for per-analyze lifespan data */
254 MemoryContext temp_cxt; /* context for per-tuple temporary data */
255 } PgFdwAnalyzeState;
256
257 /*
258 * This enum describes what's kept in the fdw_private list for a ForeignPath.
259 * We store:
260 *
261 * 1) Boolean flag showing if the remote query has the final sort
262 * 2) Boolean flag showing if the remote query has the LIMIT clause
263 */
264 enum FdwPathPrivateIndex
265 {
266 /* has-final-sort flag (as an integer Value node) */
267 FdwPathPrivateHasFinalSort,
268 /* has-limit flag (as an integer Value node) */
269 FdwPathPrivateHasLimit
270 };
271
272 /* Struct for extra information passed to estimate_path_cost_size() */
273 typedef struct
274 {
275 PathTarget *target;
276 bool has_final_sort;
277 bool has_limit;
278 double limit_tuples;
279 int64 count_est;
280 int64 offset_est;
281 } PgFdwPathExtraData;
282
283 /*
284 * Identify the attribute where data conversion fails.
285 */
286 typedef struct ConversionLocation
287 {
288 AttrNumber cur_attno; /* attribute number being processed, or 0 */
289 Relation rel; /* foreign table being processed, or NULL */
290 ForeignScanState *fsstate; /* plan node being processed, or NULL */
291 } ConversionLocation;
free_openssl_cipher(OSSLCipher * od)292
293 /* Callback argument for ec_member_matches_foreign */
294 typedef struct
295 {
296 Expr *current; /* current expr, or NULL if not yet found */
297 List *already_used; /* expressions already dealt with */
298 } ec_member_foreign_arg;
299
300 /*
301 * SQL functions
302 */
303 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
304
305 /*
306 * FDW callback routines
307 */
cipher_free_callback(ResourceReleasePhase phase,bool isCommit,bool isTopLevel,void * arg)308 static void postgresGetForeignRelSize(PlannerInfo *root,
309 RelOptInfo *baserel,
310 Oid foreigntableid);
311 static void postgresGetForeignPaths(PlannerInfo *root,
312 RelOptInfo *baserel,
313 Oid foreigntableid);
314 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
315 RelOptInfo *foreignrel,
316 Oid foreigntableid,
317 ForeignPath *best_path,
318 List *tlist,
319 List *scan_clauses,
320 Plan *outer_plan);
321 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
322 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
323 static void postgresReScanForeignScan(ForeignScanState *node);
324 static void postgresEndForeignScan(ForeignScanState *node);
325 static void postgresAddForeignUpdateTargets(Query *parsetree,
326 RangeTblEntry *target_rte,
327 Relation target_relation);
328 static List *postgresPlanForeignModify(PlannerInfo *root,
329 ModifyTable *plan,
330 Index resultRelation,
331 int subplan_index);
332 static void postgresBeginForeignModify(ModifyTableState *mtstate,
333 ResultRelInfo *resultRelInfo,
334 List *fdw_private,
335 int subplan_index,
336 int eflags);
gen_ossl_block_size(PX_Cipher * c)337 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
338 ResultRelInfo *resultRelInfo,
339 TupleTableSlot *slot,
340 TupleTableSlot *planSlot);
341 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
342 ResultRelInfo *resultRelInfo,
343 TupleTableSlot *slot,
344 TupleTableSlot *planSlot);
gen_ossl_key_size(PX_Cipher * c)345 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
346 ResultRelInfo *resultRelInfo,
347 TupleTableSlot *slot,
348 TupleTableSlot *planSlot);
349 static void postgresEndForeignModify(EState *estate,
350 ResultRelInfo *resultRelInfo);
351 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
352 ResultRelInfo *resultRelInfo);
gen_ossl_iv_size(PX_Cipher * c)353 static void postgresEndForeignInsert(EState *estate,
354 ResultRelInfo *resultRelInfo);
355 static int postgresIsForeignRelUpdatable(Relation rel);
356 static bool postgresPlanDirectModify(PlannerInfo *root,
357 ModifyTable *plan,
358 Index resultRelation,
359 int subplan_index);
360 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
361 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
362 static void postgresEndDirectModify(ForeignScanState *node);
gen_ossl_free(PX_Cipher * c)363 static void postgresExplainForeignScan(ForeignScanState *node,
364 ExplainState *es);
365 static void postgresExplainForeignModify(ModifyTableState *mtstate,
366 ResultRelInfo *rinfo,
367 List *fdw_private,
368 int subplan_index,
369 ExplainState *es);
370 static void postgresExplainDirectModify(ForeignScanState *node,
371 ExplainState *es);
gen_ossl_decrypt(PX_Cipher * c,const uint8 * data,unsigned dlen,uint8 * res)372 static bool postgresAnalyzeForeignTable(Relation relation,
373 AcquireSampleRowsFunc *func,
374 BlockNumber *totalpages);
375 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
376 Oid serverOid);
377 static void postgresGetForeignJoinPaths(PlannerInfo *root,
378 RelOptInfo *joinrel,
379 RelOptInfo *outerrel,
380 RelOptInfo *innerrel,
381 JoinType jointype,
382 JoinPathExtraData *extra);
383 static bool postgresRecheckForeignScan(ForeignScanState *node,
384 TupleTableSlot *slot);
385 static void postgresGetForeignUpperPaths(PlannerInfo *root,
386 UpperRelationKind stage,
387 RelOptInfo *input_rel,
388 RelOptInfo *output_rel,
389 void *extra);
390
391 /*
392 * Helper functions
393 */
394 static void estimate_path_cost_size(PlannerInfo *root,
395 RelOptInfo *foreignrel,
396 List *param_join_conds,
397 List *pathkeys,
gen_ossl_encrypt(PX_Cipher * c,const uint8 * data,unsigned dlen,uint8 * res)398 PgFdwPathExtraData *fpextra,
399 double *p_rows, int *p_width,
400 Cost *p_startup_cost, Cost *p_total_cost);
401 static void get_remote_estimate(const char *sql,
402 PGconn *conn,
403 double *rows,
404 int *width,
405 Cost *startup_cost,
406 Cost *total_cost);
407 static void adjust_foreign_grouping_path_cost(PlannerInfo *root,
408 List *pathkeys,
409 double retrieved_rows,
410 double width,
411 double limit_tuples,
412 Cost *p_startup_cost,
413 Cost *p_run_cost);
414 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
415 EquivalenceClass *ec, EquivalenceMember *em,
416 void *arg);
417 static void create_cursor(ForeignScanState *node);
418 static void fetch_more_data(ForeignScanState *node);
419 static void close_cursor(PGconn *conn, unsigned int cursor_number);
420 static PgFdwModifyState *create_foreign_modify(EState *estate,
421 RangeTblEntry *rte,
422 ResultRelInfo *resultRelInfo,
423 CmdType operation,
424 Plan *subplan,
425 char *query,
426 List *target_attrs,
427 bool has_returning,
428 List *retrieved_attrs);
429 static TupleTableSlot *execute_foreign_modify(EState *estate,
430 ResultRelInfo *resultRelInfo,
bf_check_supported_key_len(void)431 CmdType operation,
432 TupleTableSlot *slot,
433 TupleTableSlot *planSlot);
434 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
435 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
436 ItemPointer tupleid,
437 TupleTableSlot *slot);
438 static void store_returning_result(PgFdwModifyState *fmstate,
439 TupleTableSlot *slot, PGresult *res);
440 static void finish_foreign_modify(PgFdwModifyState *fmstate);
441 static List *build_remote_returning(Index rtindex, Relation rel,
442 List *returningList);
443 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
444 static void execute_dml_stmt(ForeignScanState *node);
445 static TupleTableSlot *get_returning_data(ForeignScanState *node);
446 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
447 List *fdw_scan_tlist,
448 Index rtindex);
449 static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
450 TupleTableSlot *slot,
451 EState *estate);
452 static void prepare_query_params(PlanState *node,
453 List *fdw_exprs,
454 int numParams,
455 FmgrInfo **param_flinfo,
456 List **param_exprs,
457 const char ***param_values);
458 static void process_query_params(ExprContext *econtext,
459 FmgrInfo *param_flinfo,
460 List *param_exprs,
461 const char **param_values);
462 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
463 HeapTuple *rows, int targrows,
464 double *totalrows,
465 double *totaldeadrows);
466 static void analyze_row_processor(PGresult *res, int row,
467 PgFdwAnalyzeState *astate);
468 static HeapTuple make_tuple_from_result_row(PGresult *res,
469 int row,
470 Relation rel,
471 AttInMetadata *attinmeta,
472 List *retrieved_attrs,
473 ForeignScanState *fsstate,
bf_init(PX_Cipher * c,const uint8 * key,unsigned klen,const uint8 * iv)474 MemoryContext temp_context);
475 static void conversion_error_callback(void *arg);
476 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
477 JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
478 JoinPathExtraData *extra);
479 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
480 Node *havingQual);
481 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
482 RelOptInfo *rel);
483 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
484 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
485 Path *epq_path);
486 static void add_foreign_grouping_paths(PlannerInfo *root,
487 RelOptInfo *input_rel,
488 RelOptInfo *grouped_rel,
489 GroupPathExtraData *extra);
490 static void add_foreign_ordered_paths(PlannerInfo *root,
491 RelOptInfo *input_rel,
492 RelOptInfo *ordered_rel);
493 static void add_foreign_final_paths(PlannerInfo *root,
494 RelOptInfo *input_rel,
495 RelOptInfo *final_rel,
496 FinalPathExtraData *extra);
497 static void apply_server_options(PgFdwRelationInfo *fpinfo);
498 static void apply_table_options(PgFdwRelationInfo *fpinfo);
499 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
500 const PgFdwRelationInfo *fpinfo_o,
501 const PgFdwRelationInfo *fpinfo_i);
502
503
504 /*
505 * Foreign-data wrapper handler function: return a struct with pointers
ossl_des_init(PX_Cipher * c,const uint8 * key,unsigned klen,const uint8 * iv)506 * to my callback routines.
507 */
508 Datum
509 postgres_fdw_handler(PG_FUNCTION_ARGS)
510 {
511 FdwRoutine *routine = makeNode(FdwRoutine);
512
513 /* Functions for scanning foreign tables */
514 routine->GetForeignRelSize = postgresGetForeignRelSize;
515 routine->GetForeignPaths = postgresGetForeignPaths;
516 routine->GetForeignPlan = postgresGetForeignPlan;
517 routine->BeginForeignScan = postgresBeginForeignScan;
518 routine->IterateForeignScan = postgresIterateForeignScan;
519 routine->ReScanForeignScan = postgresReScanForeignScan;
520 routine->EndForeignScan = postgresEndForeignScan;
521
522 /* Functions for updating foreign tables */
523 routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
524 routine->PlanForeignModify = postgresPlanForeignModify;
525 routine->BeginForeignModify = postgresBeginForeignModify;
526 routine->ExecForeignInsert = postgresExecForeignInsert;
527 routine->ExecForeignUpdate = postgresExecForeignUpdate;
528 routine->ExecForeignDelete = postgresExecForeignDelete;
529 routine->EndForeignModify = postgresEndForeignModify;
530 routine->BeginForeignInsert = postgresBeginForeignInsert;
531 routine->EndForeignInsert = postgresEndForeignInsert;
532 routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
533 routine->PlanDirectModify = postgresPlanDirectModify;
534 routine->BeginDirectModify = postgresBeginDirectModify;
535 routine->IterateDirectModify = postgresIterateDirectModify;
536 routine->EndDirectModify = postgresEndDirectModify;
537
538 /* Function for EvalPlanQual rechecks */
539 routine->RecheckForeignScan = postgresRecheckForeignScan;
540 /* Support functions for EXPLAIN */
541 routine->ExplainForeignScan = postgresExplainForeignScan;
542 routine->ExplainForeignModify = postgresExplainForeignModify;
543 routine->ExplainDirectModify = postgresExplainDirectModify;
544
545 /* Support functions for ANALYZE */
546 routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
547
548 /* Support functions for IMPORT FOREIGN SCHEMA */
549 routine->ImportForeignSchema = postgresImportForeignSchema;
550
551 /* Support functions for join push-down */
552 routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
553
554 /* Support functions for upper relation push-down */
555 routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
556
557 PG_RETURN_POINTER(routine);
558 }
559
560 /*
561 * postgresGetForeignRelSize
ossl_aes_init(PX_Cipher * c,const uint8 * key,unsigned klen,const uint8 * iv)562 * Estimate # of rows and width of the result of the scan
563 *
564 * We should consider the effect of all baserestrictinfo clauses here, but
565 * not any join clauses.
566 */
567 static void
568 postgresGetForeignRelSize(PlannerInfo *root,
569 RelOptInfo *baserel,
570 Oid foreigntableid)
571 {
572 PgFdwRelationInfo *fpinfo;
573 ListCell *lc;
574 RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
575
576 /*
577 * We use PgFdwRelationInfo to pass various information to subsequent
578 * functions.
579 */
580 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
581 baserel->fdw_private = (void *) fpinfo;
582
583 /* Base foreign tables need to be pushed down always. */
584 fpinfo->pushdown_safe = true;
585
586 /* Look up foreign-table catalog info. */
587 fpinfo->table = GetForeignTable(foreigntableid);
588 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
589
590 /*
591 * Extract user-settable option values. Note that per-table settings of
592 * use_remote_estimate and fetch_size override per-server settings of
593 * them, respectively.
594 */
595 fpinfo->use_remote_estimate = false;
596 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
597 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
598 fpinfo->shippable_extensions = NIL;
599 fpinfo->fetch_size = 100;
600
601 apply_server_options(fpinfo);
602 apply_table_options(fpinfo);
603
604 /*
605 * If the table or the server is configured to use remote estimates,
606 * identify which user to do remote access as during planning. This
607 * should match what ExecCheckRTEPerms() does. If we fail due to lack of
608 * permissions, the query would have failed at runtime anyway.
609 */
610 if (fpinfo->use_remote_estimate)
611 {
612 Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
613
614 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
615 }
616 else
617 fpinfo->user = NULL;
618
619 /*
620 * Identify which baserestrictinfo clauses can be sent to the remote
621 * server and which can't.
622 */
623 classifyConditions(root, baserel, baserel->baserestrictinfo,
624 &fpinfo->remote_conds, &fpinfo->local_conds);
625
626 /*
627 * Identify which attributes will need to be retrieved from the remote
628 * server. These include all attrs needed for joins or final output, plus
629 * all attrs used in the local_conds. (Note: if we end up using a
630 * parameterized scan, it's possible that some of the join clauses will be
631 * sent to the remote and thus we wouldn't really need to retrieve the
632 * columns used in them. Doesn't seem worth detecting that case though.)
633 */
634 fpinfo->attrs_used = NULL;
635 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
636 &fpinfo->attrs_used);
637 foreach(lc, fpinfo->local_conds)
638 {
639 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
640
641 pull_varattnos((Node *) rinfo->clause, baserel->relid,
642 &fpinfo->attrs_used);
643 }
644
645 /*
646 * Compute the selectivity and cost of the local_conds, so we don't have
647 * to do it over again for each path. The best we can do for these
648 * conditions is to estimate selectivity on the basis of local statistics.
649 */
650 fpinfo->local_conds_sel = clauselist_selectivity(root,
651 fpinfo->local_conds,
652 baserel->relid,
653 JOIN_INNER,
654 NULL);
655
656 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
657
658 /*
659 * Set # of retrieved rows and cached relation costs to some negative
660 * value, so that we can detect when they are set to some sensible values,
661 * during one (usually the first) of the calls to estimate_path_cost_size.
662 */
663 fpinfo->retrieved_rows = -1;
664 fpinfo->rel_startup_cost = -1;
665 fpinfo->rel_total_cost = -1;
666
667 /*
668 * If the table or the server is configured to use remote estimates,
669 * connect to the foreign server and execute EXPLAIN to estimate the
670 * number of rows selected by the restriction clauses, as well as the
671 * average row width. Otherwise, estimate using whatever statistics we
672 * have locally, in a way similar to ordinary tables.
673 */
674 if (fpinfo->use_remote_estimate)
675 {
676 /*
677 * Get cost/size estimates with help of remote server. Save the
678 * values in fpinfo so we don't need to do it again to generate the
679 * basic foreign path.
680 */
681 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
682 &fpinfo->rows, &fpinfo->width,
683 &fpinfo->startup_cost, &fpinfo->total_cost);
684
685 /* Report estimated baserel size to planner. */
686 baserel->rows = fpinfo->rows;
687 baserel->reltarget->width = fpinfo->width;
688 }
689 else
690 {
691 /*
692 * If the foreign table has never been ANALYZEd, it will have relpages
693 * and reltuples equal to zero, which most likely has nothing to do
694 * with reality. We can't do a whole lot about that if we're not
695 * allowed to consult the remote server, but we can use a hack similar
696 * to plancat.c's treatment of empty relations: use a minimum size
697 * estimate of 10 pages, and divide by the column-datatype-based width
698 * estimate to get the corresponding number of tuples.
699 */
700 if (baserel->pages == 0 && baserel->tuples == 0)
701 {
702 baserel->pages = 10;
703 baserel->tuples =
704 (10 * BLCKSZ) / (baserel->reltarget->width +
705 MAXALIGN(SizeofHeapTupleHeader));
706 }
707
708 /* Estimate baserel size as best we can with local statistics. */
709 set_baserel_size_estimates(root, baserel);
710
711 /* Fill in basically-bogus cost estimates for use later. */
712 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
713 &fpinfo->rows, &fpinfo->width,
714 &fpinfo->startup_cost, &fpinfo->total_cost);
715 }
716
717 /*
718 * fpinfo->relation_name gets the numeric rangetable index of the foreign
719 * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a
720 * human-readable string at that time.)
721 */
722 fpinfo->relation_name = psprintf("%u", baserel->relid);
723
724 /* No outer and inner relations. */
725 fpinfo->make_outerrel_subquery = false;
726 fpinfo->make_innerrel_subquery = false;
727 fpinfo->lower_subquery_rels = NULL;
728 /* Set the relation index. */
729 fpinfo->relation_index = baserel->relid;
730 }
731
732 /*
733 * get_useful_ecs_for_relation
734 * Determine which EquivalenceClasses might be involved in useful
735 * orderings of this relation.
736 *
737 * This function is in some respects a mirror image of the core function
738 * pathkeys_useful_for_merging: for a regular table, we know what indexes
739 * we have and want to test whether any of them are useful. For a foreign
740 * table, we don't know what indexes are present on the remote side but
741 * want to speculate about which ones we'd like to use if they existed.
742 *
743 * This function returns a list of potentially-useful equivalence classes,
744 * but it does not guarantee that an EquivalenceMember exists which contains
745 * Vars only from the given relation. For example, given ft1 JOIN t1 ON
746 * ft1.x + t1.x = 0, this function will say that the equivalence class
747 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
748 * t1 is local (or on a different server), it will turn out that no useful
749 * ORDER BY clause can be generated. It's not our job to figure that out
750 * here; we're only interested in identifying relevant ECs.
751 */
752 static List *
753 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
754 {
755 List *useful_eclass_list = NIL;
756 ListCell *lc;
757 Relids relids;
758
759 /*
760 * First, consider whether any active EC is potentially useful for a merge
761 * join against this relation.
762 */
px_find_cipher(const char * name,PX_Cipher ** res)763 if (rel->has_eclass_joins)
764 {
765 foreach(lc, root->eq_classes)
766 {
767 EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
768
769 if (eclass_useful_for_merging(root, cur_ec, rel))
770 useful_eclass_list = lappend(useful_eclass_list, cur_ec);
771 }
772 }
773
774 /*
775 * Next, consider whether there are any non-EC derivable join clauses that
776 * are merge-joinable. If the joininfo list is empty, we can exit
777 * quickly.
778 */
779 if (rel->joininfo == NIL)
780 return useful_eclass_list;
781
782 /* If this is a child rel, we must use the topmost parent rel to search. */
783 if (IS_OTHER_REL(rel))
784 {
785 Assert(!bms_is_empty(rel->top_parent_relids));
786 relids = rel->top_parent_relids;
787 }
788 else
789 relids = rel->relids;
790
791 /* Check each join clause in turn. */
792 foreach(lc, rel->joininfo)
793 {
794 RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
795
796 /* Consider only mergejoinable clauses */
797 if (restrictinfo->mergeopfamilies == NIL)
798 continue;
799
800 /* Make sure we've got canonical ECs. */
801 update_mergeclause_eclasses(root, restrictinfo);
802
803 /*
804 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
805 * that left_ec and right_ec will be initialized, per comments in
806 * distribute_qual_to_rels.
807 *
808 * We want to identify which side of this merge-joinable clause
809 * contains columns from the relation produced by this RelOptInfo. We
810 * test for overlap, not containment, because there could be extra
811 * relations on either side. For example, suppose we've got something
812 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
813 * A.y = D.y. The input rel might be the joinrel between A and B, and
814 * we'll consider the join clause A.y = D.y. relids contains a
815 * relation not involved in the join class (B) and the equivalence
816 * class for the left-hand side of the clause contains a relation not
817 * involved in the input rel (C). Despite the fact that we have only
818 * overlap and not containment in either direction, A.y is potentially
819 * useful as a sort column.
820 *
821 * Note that it's even possible that relids overlaps neither side of
822 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
823 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
824 * but overlaps neither side of B. In that case, we just skip this
825 * join clause, since it doesn't suggest a useful sort order for this
826 * relation.
827 */
828 if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
829 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
830 restrictinfo->right_ec);
831 else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
832 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
833 restrictinfo->left_ec);
834 }
835
836 return useful_eclass_list;
837 }
838
839 /*
840 * get_useful_pathkeys_for_relation
841 * Determine which orderings of a relation might be useful.
842 *
843 * Getting data in sorted order can be useful either because the requested
844 * order matches the final output ordering for the overall query we're
845 * planning, or because it enables an efficient merge join. Here, we try
846 * to figure out which pathkeys to consider.
847 */
848 static List *
849 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
850 {
851 List *useful_pathkeys_list = NIL;
852 List *useful_eclass_list;
853 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
854 EquivalenceClass *query_ec = NULL;
855 ListCell *lc;
856
857 /*
858 * Pushing the query_pathkeys to the remote server is always worth
859 * considering, because it might let us avoid a local sort.
860 */
861 fpinfo->qp_is_pushdown_safe = false;
862 if (root->query_pathkeys)
863 {
864 bool query_pathkeys_ok = true;
865
866 foreach(lc, root->query_pathkeys)
867 {
868 PathKey *pathkey = (PathKey *) lfirst(lc);
869 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
870 Expr *em_expr;
871
872 /*
873 * The planner and executor don't have any clever strategy for
874 * taking data sorted by a prefix of the query's pathkeys and
875 * getting it to be sorted by all of those pathkeys. We'll just
876 * end up resorting the entire data set. So, unless we can push
877 * down all of the query pathkeys, forget it.
878 *
879 * is_foreign_expr would detect volatile expressions as well, but
880 * checking ec_has_volatile here saves some cycles.
881 */
882 if (pathkey_ec->ec_has_volatile ||
883 !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
884 !is_foreign_expr(root, rel, em_expr))
885 {
886 query_pathkeys_ok = false;
887 break;
888 }
889 }
890
891 if (query_pathkeys_ok)
892 {
893 useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
894 fpinfo->qp_is_pushdown_safe = true;
895 }
896 }
897
898 /*
899 * Even if we're not using remote estimates, having the remote side do the
900 * sort generally won't be any worse than doing it locally, and it might
901 * be much better if the remote side can generate data in the right order
902 * without needing a sort at all. However, what we're going to do next is
903 * try to generate pathkeys that seem promising for possible merge joins,
904 * and that's more speculative. A wrong choice might hurt quite a bit, so
905 * bail out if we can't use remote estimates.
906 */
907 if (!fpinfo->use_remote_estimate)
908 return useful_pathkeys_list;
909
910 /* Get the list of interesting EquivalenceClasses. */
911 useful_eclass_list = get_useful_ecs_for_relation(root, rel);
912
913 /* Extract unique EC for query, if any, so we don't consider it again. */
914 if (list_length(root->query_pathkeys) == 1)
915 {
916 PathKey *query_pathkey = linitial(root->query_pathkeys);
917
918 query_ec = query_pathkey->pk_eclass;
919 }
920
921 /*
922 * As a heuristic, the only pathkeys we consider here are those of length
923 * one. It's surely possible to consider more, but since each one we
924 * choose to consider will generate a round-trip to the remote side, we
925 * need to be a bit cautious here. It would sure be nice to have a local
926 * cache of information about remote index definitions...
927 */
928 foreach(lc, useful_eclass_list)
929 {
930 EquivalenceClass *cur_ec = lfirst(lc);
931 Expr *em_expr;
932 PathKey *pathkey;
933
934 /* If redundant with what we did above, skip it. */
935 if (cur_ec == query_ec)
936 continue;
937
938 /* If no pushable expression for this rel, skip it. */
939 em_expr = find_em_expr_for_rel(cur_ec, rel);
940 if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
941 continue;
942
943 /* Looks like we can generate a pathkey, so let's do it. */
944 pathkey = make_canonical_pathkey(root, cur_ec,
945 linitial_oid(cur_ec->ec_opfamilies),
946 BTLessStrategyNumber,
947 false);
948 useful_pathkeys_list = lappend(useful_pathkeys_list,
949 list_make1(pathkey));
950 }
951
952 return useful_pathkeys_list;
953 }
954
955 /*
956 * postgresGetForeignPaths
957 * Create possible scan paths for a scan on the foreign table
958 */
959 static void
960 postgresGetForeignPaths(PlannerInfo *root,
961 RelOptInfo *baserel,
962 Oid foreigntableid)
963 {
964 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
965 ForeignPath *path;
966 List *ppi_list;
967 ListCell *lc;
968
969 /*
970 * Create simplest ForeignScan path node and add it to baserel. This path
971 * corresponds to SeqScan path of regular tables (though depending on what
972 * baserestrict conditions we were able to send to remote, there might
973 * actually be an indexscan happening there). We already did all the work
974 * to estimate cost and size of this path.
975 *
976 * Although this path uses no join clauses, it could still have required
977 * parameterization due to LATERAL refs in its tlist.
978 */
979 path = create_foreignscan_path(root, baserel,
980 NULL, /* default pathtarget */
981 fpinfo->rows,
982 fpinfo->startup_cost,
983 fpinfo->total_cost,
984 NIL, /* no pathkeys */
985 baserel->lateral_relids,
986 NULL, /* no extra plan */
987 NIL); /* no fdw_private list */
988 add_path(baserel, (Path *) path);
989
990 /* Add paths with pathkeys */
991 add_paths_with_pathkeys_for_rel(root, baserel, NULL);
992
993 /*
994 * If we're not using remote estimates, stop here. We have no way to
995 * estimate whether any join clauses would be worth sending across, so
996 * don't bother building parameterized paths.
997 */
998 if (!fpinfo->use_remote_estimate)
999 return;
1000
1001 /*
1002 * Thumb through all join clauses for the rel to identify which outer
1003 * relations could supply one or more safe-to-send-to-remote join clauses.
1004 * We'll build a parameterized path for each such outer relation.
1005 *
1006 * It's convenient to manage this by representing each candidate outer
1007 * relation by the ParamPathInfo node for it. We can then use the
1008 * ppi_clauses list in the ParamPathInfo node directly as a list of the
1009 * interesting join clauses for that rel. This takes care of the
1010 * possibility that there are multiple safe join clauses for such a rel,
1011 * and also ensures that we account for unsafe join clauses that we'll
1012 * still have to enforce locally (since the parameterized-path machinery
1013 * insists that we handle all movable clauses).
1014 */
1015 ppi_list = NIL;
1016 foreach(lc, baserel->joininfo)
1017 {
1018 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1019 Relids required_outer;
1020 ParamPathInfo *param_info;
1021
1022 /* Check if clause can be moved to this rel */
1023 if (!join_clause_is_movable_to(rinfo, baserel))
1024 continue;
1025
1026 /* See if it is safe to send to remote */
1027 if (!is_foreign_expr(root, baserel, rinfo->clause))
1028 continue;
1029
1030 /* Calculate required outer rels for the resulting path */
1031 required_outer = bms_union(rinfo->clause_relids,
1032 baserel->lateral_relids);
1033 /* We do not want the foreign rel itself listed in required_outer */
1034 required_outer = bms_del_member(required_outer, baserel->relid);
1035
1036 /*
1037 * required_outer probably can't be empty here, but if it were, we
1038 * couldn't make a parameterized path.
1039 */
1040 if (bms_is_empty(required_outer))
1041 continue;
1042
1043 /* Get the ParamPathInfo */
1044 param_info = get_baserel_parampathinfo(root, baserel,
1045 required_outer);
1046 Assert(param_info != NULL);
1047
1048 /*
1049 * Add it to list unless we already have it. Testing pointer equality
1050 * is OK since get_baserel_parampathinfo won't make duplicates.
1051 */
1052 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1053 }
1054
1055 /*
1056 * The above scan examined only "generic" join clauses, not those that
1057 * were absorbed into EquivalenceClauses. See if we can make anything out
1058 * of EquivalenceClauses.
1059 */
1060 if (baserel->has_eclass_joins)
1061 {
1062 /*
1063 * We repeatedly scan the eclass list looking for column references
1064 * (or expressions) belonging to the foreign rel. Each time we find
1065 * one, we generate a list of equivalence joinclauses for it, and then
1066 * see if any are safe to send to the remote. Repeat till there are
1067 * no more candidate EC members.
1068 */
1069 ec_member_foreign_arg arg;
1070
1071 arg.already_used = NIL;
1072 for (;;)
1073 {
1074 List *clauses;
1075
1076 /* Make clauses, skipping any that join to lateral_referencers */
1077 arg.current = NULL;
1078 clauses = generate_implied_equalities_for_column(root,
1079 baserel,
1080 ec_member_matches_foreign,
1081 (void *) &arg,
1082 baserel->lateral_referencers);
1083
1084 /* Done if there are no more expressions in the foreign rel */
1085 if (arg.current == NULL)
1086 {
1087 Assert(clauses == NIL);
1088 break;
1089 }
1090
1091 /* Scan the extracted join clauses */
1092 foreach(lc, clauses)
1093 {
1094 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1095 Relids required_outer;
1096 ParamPathInfo *param_info;
1097
1098 /* Check if clause can be moved to this rel */
1099 if (!join_clause_is_movable_to(rinfo, baserel))
1100 continue;
1101
1102 /* See if it is safe to send to remote */
1103 if (!is_foreign_expr(root, baserel, rinfo->clause))
1104 continue;
1105
1106 /* Calculate required outer rels for the resulting path */
1107 required_outer = bms_union(rinfo->clause_relids,
1108 baserel->lateral_relids);
1109 required_outer = bms_del_member(required_outer, baserel->relid);
1110 if (bms_is_empty(required_outer))
1111 continue;
1112
1113 /* Get the ParamPathInfo */
1114 param_info = get_baserel_parampathinfo(root, baserel,
1115 required_outer);
1116 Assert(param_info != NULL);
1117
1118 /* Add it to list unless we already have it */
1119 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1120 }
1121
1122 /* Try again, now ignoring the expression we found this time */
1123 arg.already_used = lappend(arg.already_used, arg.current);
1124 }
1125 }
1126
1127 /*
1128 * Now build a path for each useful outer relation.
1129 */
1130 foreach(lc, ppi_list)
1131 {
1132 ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1133 double rows;
1134 int width;
1135 Cost startup_cost;
1136 Cost total_cost;
1137
1138 /* Get a cost estimate from the remote */
1139 estimate_path_cost_size(root, baserel,
1140 param_info->ppi_clauses, NIL, NULL,
1141 &rows, &width,
1142 &startup_cost, &total_cost);
1143
1144 /*
1145 * ppi_rows currently won't get looked at by anything, but still we
1146 * may as well ensure that it matches our idea of the rowcount.
1147 */
1148 param_info->ppi_rows = rows;
1149
1150 /* Make the path */
1151 path = create_foreignscan_path(root, baserel,
1152 NULL, /* default pathtarget */
1153 rows,
1154 startup_cost,
1155 total_cost,
1156 NIL, /* no pathkeys */
1157 param_info->ppi_req_outer,
1158 NULL,
1159 NIL); /* no fdw_private list */
1160 add_path(baserel, (Path *) path);
1161 }
1162 }
1163
1164 /*
1165 * postgresGetForeignPlan
1166 * Create ForeignScan plan node which implements selected best path
1167 */
1168 static ForeignScan *
1169 postgresGetForeignPlan(PlannerInfo *root,
1170 RelOptInfo *foreignrel,
1171 Oid foreigntableid,
1172 ForeignPath *best_path,
1173 List *tlist,
1174 List *scan_clauses,
1175 Plan *outer_plan)
1176 {
1177 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1178 Index scan_relid;
1179 List *fdw_private;
1180 List *remote_exprs = NIL;
1181 List *local_exprs = NIL;
1182 List *params_list = NIL;
1183 List *fdw_scan_tlist = NIL;
1184 List *fdw_recheck_quals = NIL;
1185 List *retrieved_attrs;
1186 StringInfoData sql;
1187 bool has_final_sort = false;
1188 bool has_limit = false;
1189 ListCell *lc;
1190
1191 /*
1192 * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1193 */
1194 if (best_path->fdw_private)
1195 {
1196 has_final_sort = intVal(list_nth(best_path->fdw_private,
1197 FdwPathPrivateHasFinalSort));
1198 has_limit = intVal(list_nth(best_path->fdw_private,
1199 FdwPathPrivateHasLimit));
1200 }
1201
1202 if (IS_SIMPLE_REL(foreignrel))
1203 {
1204 /*
1205 * For base relations, set scan_relid as the relid of the relation.
1206 */
1207 scan_relid = foreignrel->relid;
1208
1209 /*
1210 * In a base-relation scan, we must apply the given scan_clauses.
1211 *
1212 * Separate the scan_clauses into those that can be executed remotely
1213 * and those that can't. baserestrictinfo clauses that were
1214 * previously determined to be safe or unsafe by classifyConditions
1215 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1216 * else in the scan_clauses list will be a join clause, which we have
1217 * to check for remote-safety.
1218 *
1219 * Note: the join clauses we see here should be the exact same ones
1220 * previously examined by postgresGetForeignPaths. Possibly it'd be
1221 * worth passing forward the classification work done then, rather
1222 * than repeating it here.
1223 *
1224 * This code must match "extract_actual_clauses(scan_clauses, false)"
1225 * except for the additional decision about remote versus local
1226 * execution.
1227 */
1228 foreach(lc, scan_clauses)
1229 {
1230 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1231
1232 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1233 if (rinfo->pseudoconstant)
1234 continue;
1235
1236 if (list_member_ptr(fpinfo->remote_conds, rinfo))
1237 remote_exprs = lappend(remote_exprs, rinfo->clause);
1238 else if (list_member_ptr(fpinfo->local_conds, rinfo))
1239 local_exprs = lappend(local_exprs, rinfo->clause);
1240 else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1241 remote_exprs = lappend(remote_exprs, rinfo->clause);
1242 else
1243 local_exprs = lappend(local_exprs, rinfo->clause);
1244 }
1245
1246 /*
1247 * For a base-relation scan, we have to support EPQ recheck, which
1248 * should recheck all the remote quals.
1249 */
1250 fdw_recheck_quals = remote_exprs;
1251 }
1252 else
1253 {
1254 /*
1255 * Join relation or upper relation - set scan_relid to 0.
1256 */
1257 scan_relid = 0;
1258
1259 /*
1260 * For a join rel, baserestrictinfo is NIL and we are not considering
1261 * parameterization right now, so there should be no scan_clauses for
1262 * a joinrel or an upper rel either.
1263 */
1264 Assert(!scan_clauses);
1265
1266 /*
1267 * Instead we get the conditions to apply from the fdw_private
1268 * structure.
1269 */
1270 remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1271 local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1272
1273 /*
1274 * We leave fdw_recheck_quals empty in this case, since we never need
1275 * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1276 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1277 * If we're planning an upperrel (ie, remote grouping or aggregation)
1278 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1279 * allowed, and indeed we *can't* put the remote clauses into
1280 * fdw_recheck_quals because the unaggregated Vars won't be available
1281 * locally.
1282 */
1283
1284 /* Build the list of columns to be fetched from the foreign server. */
1285 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1286
1287 /*
1288 * Ensure that the outer plan produces a tuple whose descriptor
1289 * matches our scan tuple slot. Also, remove the local conditions
1290 * from outer plan's quals, lest they be evaluated twice, once by the
1291 * local plan and once by the scan.
1292 */
1293 if (outer_plan)
1294 {
1295 ListCell *lc;
1296
1297 /*
1298 * Right now, we only consider grouping and aggregation beyond
1299 * joins. Queries involving aggregates or grouping do not require
1300 * EPQ mechanism, hence should not have an outer plan here.
1301 */
1302 Assert(!IS_UPPER_REL(foreignrel));
1303
1304 /*
1305 * First, update the plan's qual list if possible. In some cases
1306 * the quals might be enforced below the topmost plan level, in
1307 * which case we'll fail to remove them; it's not worth working
1308 * harder than this.
1309 */
1310 foreach(lc, local_exprs)
1311 {
1312 Node *qual = lfirst(lc);
1313
1314 outer_plan->qual = list_delete(outer_plan->qual, qual);
1315
1316 /*
1317 * For an inner join the local conditions of foreign scan plan
1318 * can be part of the joinquals as well. (They might also be
1319 * in the mergequals or hashquals, but we can't touch those
1320 * without breaking the plan.)
1321 */
1322 if (IsA(outer_plan, NestLoop) ||
1323 IsA(outer_plan, MergeJoin) ||
1324 IsA(outer_plan, HashJoin))
1325 {
1326 Join *join_plan = (Join *) outer_plan;
1327
1328 if (join_plan->jointype == JOIN_INNER)
1329 join_plan->joinqual = list_delete(join_plan->joinqual,
1330 qual);
1331 }
1332 }
1333
1334 /*
1335 * Now fix the subplan's tlist --- this might result in inserting
1336 * a Result node atop the plan tree.
1337 */
1338 outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1339 best_path->path.parallel_safe);
1340 }
1341 }
1342
1343 /*
1344 * Build the query string to be sent for execution, and identify
1345 * expressions to be sent as parameters.
1346 */
1347 initStringInfo(&sql);
1348 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1349 remote_exprs, best_path->path.pathkeys,
1350 has_final_sort, has_limit, false,
1351 &retrieved_attrs, ¶ms_list);
1352
1353 /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1354 fpinfo->final_remote_exprs = remote_exprs;
1355
1356 /*
1357 * Build the fdw_private list that will be available to the executor.
1358 * Items in the list must match order in enum FdwScanPrivateIndex.
1359 */
1360 fdw_private = list_make3(makeString(sql.data),
1361 retrieved_attrs,
1362 makeInteger(fpinfo->fetch_size));
1363 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1364 fdw_private = lappend(fdw_private,
1365 makeString(fpinfo->relation_name));
1366
1367 /*
1368 * Create the ForeignScan node for the given relation.
1369 *
1370 * Note that the remote parameter expressions are stored in the fdw_exprs
1371 * field of the finished plan node; we can't keep them in private state
1372 * because then they wouldn't be subject to later planner processing.
1373 */
1374 return make_foreignscan(tlist,
1375 local_exprs,
1376 scan_relid,
1377 params_list,
1378 fdw_private,
1379 fdw_scan_tlist,
1380 fdw_recheck_quals,
1381 outer_plan);
1382 }
1383
1384 /*
1385 * postgresBeginForeignScan
1386 * Initiate an executor scan of a foreign PostgreSQL table.
1387 */
1388 static void
1389 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1390 {
1391 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1392 EState *estate = node->ss.ps.state;
1393 PgFdwScanState *fsstate;
1394 RangeTblEntry *rte;
1395 Oid userid;
1396 ForeignTable *table;
1397 UserMapping *user;
1398 int rtindex;
1399 int numParams;
1400
1401 /*
1402 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1403 */
1404 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1405 return;
1406
1407 /*
1408 * We'll save private state in node->fdw_state.
1409 */
1410 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1411 node->fdw_state = (void *) fsstate;
1412
1413 /*
1414 * Identify which user to do the remote access as. This should match what
1415 * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1416 * lowest-numbered member RTE as a representative; we would get the same
1417 * result from any.
1418 */
1419 if (fsplan->scan.scanrelid > 0)
1420 rtindex = fsplan->scan.scanrelid;
1421 else
1422 rtindex = bms_next_member(fsplan->fs_relids, -1);
1423 rte = exec_rt_fetch(rtindex, estate);
1424 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1425
1426 /* Get info about foreign table. */
1427 table = GetForeignTable(rte->relid);
1428 user = GetUserMapping(userid, table->serverid);
1429
1430 /*
1431 * Get connection to the foreign server. Connection manager will
1432 * establish new connection if necessary.
1433 */
1434 fsstate->conn = GetConnection(user, false);
1435
1436 /* Assign a unique ID for my cursor */
1437 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1438 fsstate->cursor_exists = false;
1439
1440 /* Get private info created by planner functions. */
1441 fsstate->query = strVal(list_nth(fsplan->fdw_private,
1442 FdwScanPrivateSelectSql));
1443 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1444 FdwScanPrivateRetrievedAttrs);
1445 fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1446 FdwScanPrivateFetchSize));
1447
1448 /* Create contexts for batches of tuples and per-tuple temp workspace. */
1449 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1450 "postgres_fdw tuple data",
1451 ALLOCSET_DEFAULT_SIZES);
1452 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1453 "postgres_fdw temporary data",
1454 ALLOCSET_SMALL_SIZES);
1455
1456 /*
1457 * Get info we'll need for converting data fetched from the foreign server
1458 * into local representation and error reporting during that process.
1459 */
1460 if (fsplan->scan.scanrelid > 0)
1461 {
1462 fsstate->rel = node->ss.ss_currentRelation;
1463 fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1464 }
1465 else
1466 {
1467 fsstate->rel = NULL;
1468 fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1469 }
1470
1471 fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1472
1473 /*
1474 * Prepare for processing of parameters used in remote query, if any.
1475 */
1476 numParams = list_length(fsplan->fdw_exprs);
1477 fsstate->numParams = numParams;
1478 if (numParams > 0)
1479 prepare_query_params((PlanState *) node,
1480 fsplan->fdw_exprs,
1481 numParams,
1482 &fsstate->param_flinfo,
1483 &fsstate->param_exprs,
1484 &fsstate->param_values);
1485 }
1486
1487 /*
1488 * postgresIterateForeignScan
1489 * Retrieve next row from the result set, or clear tuple slot to indicate
1490 * EOF.
1491 */
1492 static TupleTableSlot *
1493 postgresIterateForeignScan(ForeignScanState *node)
1494 {
1495 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1496 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1497
1498 /*
1499 * If this is the first call after Begin or ReScan, we need to create the
1500 * cursor on the remote side.
1501 */
1502 if (!fsstate->cursor_exists)
1503 create_cursor(node);
1504
1505 /*
1506 * Get some more tuples, if we've run out.
1507 */
1508 if (fsstate->next_tuple >= fsstate->num_tuples)
1509 {
1510 /* No point in another fetch if we already detected EOF, though. */
1511 if (!fsstate->eof_reached)
1512 fetch_more_data(node);
1513 /* If we didn't get any tuples, must be end of data. */
1514 if (fsstate->next_tuple >= fsstate->num_tuples)
1515 return ExecClearTuple(slot);
1516 }
1517
1518 /*
1519 * Return the next tuple.
1520 */
1521 ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1522 slot,
1523 false);
1524
1525 return slot;
1526 }
1527
1528 /*
1529 * postgresReScanForeignScan
1530 * Restart the scan.
1531 */
1532 static void
1533 postgresReScanForeignScan(ForeignScanState *node)
1534 {
1535 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1536 char sql[64];
1537 PGresult *res;
1538
1539 /* If we haven't created the cursor yet, nothing to do. */
1540 if (!fsstate->cursor_exists)
1541 return;
1542
1543 /*
1544 * If any internal parameters affecting this node have changed, we'd
1545 * better destroy and recreate the cursor. Otherwise, rewinding it should
1546 * be good enough. If we've only fetched zero or one batch, we needn't
1547 * even rewind the cursor, just rescan what we have.
1548 */
1549 if (node->ss.ps.chgParam != NULL)
1550 {
1551 fsstate->cursor_exists = false;
1552 snprintf(sql, sizeof(sql), "CLOSE c%u",
1553 fsstate->cursor_number);
1554 }
1555 else if (fsstate->fetch_ct_2 > 1)
1556 {
1557 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1558 fsstate->cursor_number);
1559 }
1560 else
1561 {
1562 /* Easy: just rescan what we already have in memory, if anything */
1563 fsstate->next_tuple = 0;
1564 return;
1565 }
1566
1567 /*
1568 * We don't use a PG_TRY block here, so be careful not to throw error
1569 * without releasing the PGresult.
1570 */
1571 res = pgfdw_exec_query(fsstate->conn, sql);
1572 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1573 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1574 PQclear(res);
1575
1576 /* Now force a fresh FETCH. */
1577 fsstate->tuples = NULL;
1578 fsstate->num_tuples = 0;
1579 fsstate->next_tuple = 0;
1580 fsstate->fetch_ct_2 = 0;
1581 fsstate->eof_reached = false;
1582 }
1583
1584 /*
1585 * postgresEndForeignScan
1586 * Finish scanning foreign table and dispose objects used for this scan
1587 */
1588 static void
1589 postgresEndForeignScan(ForeignScanState *node)
1590 {
1591 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1592
1593 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1594 if (fsstate == NULL)
1595 return;
1596
1597 /* Close the cursor if open, to prevent accumulation of cursors */
1598 if (fsstate->cursor_exists)
1599 close_cursor(fsstate->conn, fsstate->cursor_number);
1600
1601 /* Release remote connection */
1602 ReleaseConnection(fsstate->conn);
1603 fsstate->conn = NULL;
1604
1605 /* MemoryContexts will be deleted automatically. */
1606 }
1607
1608 /*
1609 * postgresAddForeignUpdateTargets
1610 * Add resjunk column(s) needed for update/delete on a foreign table
1611 */
1612 static void
1613 postgresAddForeignUpdateTargets(Query *parsetree,
1614 RangeTblEntry *target_rte,
1615 Relation target_relation)
1616 {
1617 Var *var;
1618 const char *attrname;
1619 TargetEntry *tle;
1620
1621 /*
1622 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1623 */
1624
1625 /* Make a Var representing the desired value */
1626 var = makeVar(parsetree->resultRelation,
1627 SelfItemPointerAttributeNumber,
1628 TIDOID,
1629 -1,
1630 InvalidOid,
1631 0);
1632
1633 /* Wrap it in a resjunk TLE with the right name ... */
1634 attrname = "ctid";
1635
1636 tle = makeTargetEntry((Expr *) var,
1637 list_length(parsetree->targetList) + 1,
1638 pstrdup(attrname),
1639 true);
1640
1641 /* ... and add it to the query's targetlist */
1642 parsetree->targetList = lappend(parsetree->targetList, tle);
1643 }
1644
1645 /*
1646 * postgresPlanForeignModify
1647 * Plan an insert/update/delete operation on a foreign table
1648 */
1649 static List *
1650 postgresPlanForeignModify(PlannerInfo *root,
1651 ModifyTable *plan,
1652 Index resultRelation,
1653 int subplan_index)
1654 {
1655 CmdType operation = plan->operation;
1656 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1657 Relation rel;
1658 StringInfoData sql;
1659 List *targetAttrs = NIL;
1660 List *withCheckOptionList = NIL;
1661 List *returningList = NIL;
1662 List *retrieved_attrs = NIL;
1663 bool doNothing = false;
1664
1665 initStringInfo(&sql);
1666
1667 /*
1668 * Core code already has some lock on each rel being planned, so we can
1669 * use NoLock here.
1670 */
1671 rel = table_open(rte->relid, NoLock);
1672
1673 /*
1674 * In an INSERT, we transmit all columns that are defined in the foreign
1675 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1676 * foreign table, we transmit all columns like INSERT; else we transmit
1677 * only columns that were explicitly targets of the UPDATE, so as to avoid
1678 * unnecessary data transmission. (We can't do that for INSERT since we
1679 * would miss sending default values for columns not listed in the source
1680 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1681 * those triggers might change values for non-target columns, in which
1682 * case we would miss sending changed values for those columns.)
1683 */
1684 if (operation == CMD_INSERT ||
1685 (operation == CMD_UPDATE &&
1686 rel->trigdesc &&
1687 rel->trigdesc->trig_update_before_row))
1688 {
1689 TupleDesc tupdesc = RelationGetDescr(rel);
1690 int attnum;
1691
1692 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1693 {
1694 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1695
1696 if (!attr->attisdropped)
1697 targetAttrs = lappend_int(targetAttrs, attnum);
1698 }
1699 }
1700 else if (operation == CMD_UPDATE)
1701 {
1702 int col;
1703 Bitmapset *allUpdatedCols = bms_union(rte->updatedCols, rte->extraUpdatedCols);
1704
1705 col = -1;
1706 while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1707 {
1708 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1709 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
1710
1711 if (attno <= InvalidAttrNumber) /* shouldn't happen */
1712 elog(ERROR, "system-column update is not supported");
1713 targetAttrs = lappend_int(targetAttrs, attno);
1714 }
1715 }
1716
1717 /*
1718 * Extract the relevant WITH CHECK OPTION list if any.
1719 */
1720 if (plan->withCheckOptionLists)
1721 withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1722 subplan_index);
1723
1724 /*
1725 * Extract the relevant RETURNING list if any.
1726 */
1727 if (plan->returningLists)
1728 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1729
1730 /*
1731 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1732 * should have already been rejected in the optimizer, as presently there
1733 * is no way to recognize an arbiter index on a foreign table. Only DO
1734 * NOTHING is supported without an inference specification.
1735 */
1736 if (plan->onConflictAction == ONCONFLICT_NOTHING)
1737 doNothing = true;
1738 else if (plan->onConflictAction != ONCONFLICT_NONE)
1739 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1740 (int) plan->onConflictAction);
1741
1742 /*
1743 * Construct the SQL command string.
1744 */
1745 switch (operation)
1746 {
1747 case CMD_INSERT:
1748 deparseInsertSql(&sql, rte, resultRelation, rel,
1749 targetAttrs, doNothing,
1750 withCheckOptionList, returningList,
1751 &retrieved_attrs);
1752 break;
1753 case CMD_UPDATE:
1754 deparseUpdateSql(&sql, rte, resultRelation, rel,
1755 targetAttrs,
1756 withCheckOptionList, returningList,
1757 &retrieved_attrs);
1758 break;
1759 case CMD_DELETE:
1760 deparseDeleteSql(&sql, rte, resultRelation, rel,
1761 returningList,
1762 &retrieved_attrs);
1763 break;
1764 default:
1765 elog(ERROR, "unexpected operation: %d", (int) operation);
1766 break;
1767 }
1768
1769 table_close(rel, NoLock);
1770
1771 /*
1772 * Build the fdw_private list that will be available to the executor.
1773 * Items in the list must match enum FdwModifyPrivateIndex, above.
1774 */
1775 return list_make4(makeString(sql.data),
1776 targetAttrs,
1777 makeInteger((retrieved_attrs != NIL)),
1778 retrieved_attrs);
1779 }
1780
1781 /*
1782 * postgresBeginForeignModify
1783 * Begin an insert/update/delete operation on a foreign table
1784 */
1785 static void
1786 postgresBeginForeignModify(ModifyTableState *mtstate,
1787 ResultRelInfo *resultRelInfo,
1788 List *fdw_private,
1789 int subplan_index,
1790 int eflags)
1791 {
1792 PgFdwModifyState *fmstate;
1793 char *query;
1794 List *target_attrs;
1795 bool has_returning;
1796 List *retrieved_attrs;
1797 RangeTblEntry *rte;
1798
1799 /*
1800 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1801 * stays NULL.
1802 */
1803 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1804 return;
1805
1806 /* Deconstruct fdw_private data. */
1807 query = strVal(list_nth(fdw_private,
1808 FdwModifyPrivateUpdateSql));
1809 target_attrs = (List *) list_nth(fdw_private,
1810 FdwModifyPrivateTargetAttnums);
1811 has_returning = intVal(list_nth(fdw_private,
1812 FdwModifyPrivateHasReturning));
1813 retrieved_attrs = (List *) list_nth(fdw_private,
1814 FdwModifyPrivateRetrievedAttrs);
1815
1816 /* Find RTE. */
1817 rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1818 mtstate->ps.state);
1819
1820 /* Construct an execution state. */
1821 fmstate = create_foreign_modify(mtstate->ps.state,
1822 rte,
1823 resultRelInfo,
1824 mtstate->operation,
1825 mtstate->mt_plans[subplan_index]->plan,
1826 query,
1827 target_attrs,
1828 has_returning,
1829 retrieved_attrs);
1830
1831 resultRelInfo->ri_FdwState = fmstate;
1832 }
1833
1834 /*
1835 * postgresExecForeignInsert
1836 * Insert one row into a foreign table
1837 */
1838 static TupleTableSlot *
1839 postgresExecForeignInsert(EState *estate,
1840 ResultRelInfo *resultRelInfo,
1841 TupleTableSlot *slot,
1842 TupleTableSlot *planSlot)
1843 {
1844 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1845 TupleTableSlot *rslot;
1846
1847 /*
1848 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1849 * postgresBeginForeignInsert())
1850 */
1851 if (fmstate->aux_fmstate)
1852 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1853 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1854 slot, planSlot);
1855 /* Revert that change */
1856 if (fmstate->aux_fmstate)
1857 resultRelInfo->ri_FdwState = fmstate;
1858
1859 return rslot;
1860 }
1861
1862 /*
1863 * postgresExecForeignUpdate
1864 * Update one row in a foreign table
1865 */
1866 static TupleTableSlot *
1867 postgresExecForeignUpdate(EState *estate,
1868 ResultRelInfo *resultRelInfo,
1869 TupleTableSlot *slot,
1870 TupleTableSlot *planSlot)
1871 {
1872 return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
1873 slot, planSlot);
1874 }
1875
1876 /*
1877 * postgresExecForeignDelete
1878 * Delete one row from a foreign table
1879 */
1880 static TupleTableSlot *
1881 postgresExecForeignDelete(EState *estate,
1882 ResultRelInfo *resultRelInfo,
1883 TupleTableSlot *slot,
1884 TupleTableSlot *planSlot)
1885 {
1886 return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
1887 slot, planSlot);
1888 }
1889
1890 /*
1891 * postgresEndForeignModify
1892 * Finish an insert/update/delete operation on a foreign table
1893 */
1894 static void
1895 postgresEndForeignModify(EState *estate,
1896 ResultRelInfo *resultRelInfo)
1897 {
1898 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1899
1900 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1901 if (fmstate == NULL)
1902 return;
1903
1904 /* Destroy the execution state */
1905 finish_foreign_modify(fmstate);
1906 }
1907
1908 /*
1909 * postgresBeginForeignInsert
1910 * Begin an insert operation on a foreign table
1911 */
1912 static void
1913 postgresBeginForeignInsert(ModifyTableState *mtstate,
1914 ResultRelInfo *resultRelInfo)
1915 {
1916 PgFdwModifyState *fmstate;
1917 ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
1918 EState *estate = mtstate->ps.state;
1919 Index resultRelation;
1920 Relation rel = resultRelInfo->ri_RelationDesc;
1921 RangeTblEntry *rte;
1922 TupleDesc tupdesc = RelationGetDescr(rel);
1923 int attnum;
1924 StringInfoData sql;
1925 List *targetAttrs = NIL;
1926 List *retrieved_attrs = NIL;
1927 bool doNothing = false;
1928
1929 /*
1930 * If the foreign table we are about to insert routed rows into is also an
1931 * UPDATE subplan result rel that will be updated later, proceeding with
1932 * the INSERT will result in the later UPDATE incorrectly modifying those
1933 * routed rows, so prevent the INSERT --- it would be nice if we could
1934 * handle this case; but for now, throw an error for safety.
1935 */
1936 if (plan && plan->operation == CMD_UPDATE &&
1937 (resultRelInfo->ri_usesFdwDirectModify ||
1938 resultRelInfo->ri_FdwState) &&
1939 resultRelInfo > mtstate->resultRelInfo + mtstate->mt_whichplan)
1940 ereport(ERROR,
1941 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1942 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
1943 RelationGetRelationName(rel))));
1944
1945 initStringInfo(&sql);
1946
1947 /* We transmit all columns that are defined in the foreign table. */
1948 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1949 {
1950 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1951
1952 if (!attr->attisdropped)
1953 targetAttrs = lappend_int(targetAttrs, attnum);
1954 }
1955
1956 /* Check if we add the ON CONFLICT clause to the remote query. */
1957 if (plan)
1958 {
1959 OnConflictAction onConflictAction = plan->onConflictAction;
1960
1961 /* We only support DO NOTHING without an inference specification. */
1962 if (onConflictAction == ONCONFLICT_NOTHING)
1963 doNothing = true;
1964 else if (onConflictAction != ONCONFLICT_NONE)
1965 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1966 (int) onConflictAction);
1967 }
1968
1969 /*
1970 * If the foreign table is a partition that doesn't have a corresponding
1971 * RTE entry, we need to create a new RTE
1972 * describing the foreign table for use by deparseInsertSql and
1973 * create_foreign_modify() below, after first copying the parent's RTE and
1974 * modifying some fields to describe the foreign partition to work on.
1975 * However, if this is invoked by UPDATE, the existing RTE may already
1976 * correspond to this partition if it is one of the UPDATE subplan target
1977 * rels; in that case, we can just use the existing RTE as-is.
1978 */
1979 if (resultRelInfo->ri_RangeTableIndex == 0)
1980 {
1981 ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
1982
1983 rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
1984 rte = copyObject(rte);
1985 rte->relid = RelationGetRelid(rel);
1986 rte->relkind = RELKIND_FOREIGN_TABLE;
1987
1988 /*
1989 * For UPDATE, we must use the RT index of the first subplan target
1990 * rel's RTE, because the core code would have built expressions for
1991 * the partition, such as RETURNING, using that RT index as varno of
1992 * Vars contained in those expressions.
1993 */
1994 if (plan && plan->operation == CMD_UPDATE &&
1995 rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
1996 resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
1997 else
1998 resultRelation = rootResultRelInfo->ri_RangeTableIndex;
1999 }
2000 else
2001 {
2002 resultRelation = resultRelInfo->ri_RangeTableIndex;
2003 rte = exec_rt_fetch(resultRelation, estate);
2004 }
2005
2006 /* Construct the SQL command string. */
2007 deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2008 resultRelInfo->ri_WithCheckOptions,
2009 resultRelInfo->ri_returningList,
2010 &retrieved_attrs);
2011
2012 /* Construct an execution state. */
2013 fmstate = create_foreign_modify(mtstate->ps.state,
2014 rte,
2015 resultRelInfo,
2016 CMD_INSERT,
2017 NULL,
2018 sql.data,
2019 targetAttrs,
2020 retrieved_attrs != NIL,
2021 retrieved_attrs);
2022
2023 /*
2024 * If the given resultRelInfo already has PgFdwModifyState set, it means
2025 * the foreign table is an UPDATE subplan result rel; in which case, store
2026 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2027 */
2028 if (resultRelInfo->ri_FdwState)
2029 {
2030 Assert(plan && plan->operation == CMD_UPDATE);
2031 Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2032 ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2033 }
2034 else
2035 resultRelInfo->ri_FdwState = fmstate;
2036 }
2037
2038 /*
2039 * postgresEndForeignInsert
2040 * Finish an insert operation on a foreign table
2041 */
2042 static void
2043 postgresEndForeignInsert(EState *estate,
2044 ResultRelInfo *resultRelInfo)
2045 {
2046 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2047
2048 Assert(fmstate != NULL);
2049
2050 /*
2051 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2052 * postgresBeginForeignInsert())
2053 */
2054 if (fmstate->aux_fmstate)
2055 fmstate = fmstate->aux_fmstate;
2056
2057 /* Destroy the execution state */
2058 finish_foreign_modify(fmstate);
2059 }
2060
2061 /*
2062 * postgresIsForeignRelUpdatable
2063 * Determine whether a foreign table supports INSERT, UPDATE and/or
2064 * DELETE.
2065 */
2066 static int
2067 postgresIsForeignRelUpdatable(Relation rel)
2068 {
2069 bool updatable;
2070 ForeignTable *table;
2071 ForeignServer *server;
2072 ListCell *lc;
2073
2074 /*
2075 * By default, all postgres_fdw foreign tables are assumed updatable. This
2076 * can be overridden by a per-server setting, which in turn can be
2077 * overridden by a per-table setting.
2078 */
2079 updatable = true;
2080
2081 table = GetForeignTable(RelationGetRelid(rel));
2082 server = GetForeignServer(table->serverid);
2083
2084 foreach(lc, server->options)
2085 {
2086 DefElem *def = (DefElem *) lfirst(lc);
2087
2088 if (strcmp(def->defname, "updatable") == 0)
2089 updatable = defGetBoolean(def);
2090 }
2091 foreach(lc, table->options)
2092 {
2093 DefElem *def = (DefElem *) lfirst(lc);
2094
2095 if (strcmp(def->defname, "updatable") == 0)
2096 updatable = defGetBoolean(def);
2097 }
2098
2099 /*
2100 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2101 */
2102 return updatable ?
2103 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2104 }
2105
2106 /*
2107 * postgresRecheckForeignScan
2108 * Execute a local join execution plan for a foreign join
2109 */
2110 static bool
2111 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2112 {
2113 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2114 PlanState *outerPlan = outerPlanState(node);
2115 TupleTableSlot *result;
2116
2117 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2118 if (scanrelid > 0)
2119 return true;
2120
2121 Assert(outerPlan != NULL);
2122
2123 /* Execute a local join execution plan */
2124 result = ExecProcNode(outerPlan);
2125 if (TupIsNull(result))
2126 return false;
2127
2128 /* Store result in the given slot */
2129 ExecCopySlot(slot, result);
2130
2131 return true;
2132 }
2133
2134 /*
2135 * postgresPlanDirectModify
2136 * Consider a direct foreign table modification
2137 *
2138 * Decide whether it is safe to modify a foreign table directly, and if so,
2139 * rewrite subplan accordingly.
2140 */
2141 static bool
2142 postgresPlanDirectModify(PlannerInfo *root,
2143 ModifyTable *plan,
2144 Index resultRelation,
2145 int subplan_index)
2146 {
2147 CmdType operation = plan->operation;
2148 Plan *subplan;
2149 RelOptInfo *foreignrel;
2150 RangeTblEntry *rte;
2151 PgFdwRelationInfo *fpinfo;
2152 Relation rel;
2153 StringInfoData sql;
2154 ForeignScan *fscan;
2155 List *targetAttrs = NIL;
2156 List *remote_exprs;
2157 List *params_list = NIL;
2158 List *returningList = NIL;
2159 List *retrieved_attrs = NIL;
2160
2161 /*
2162 * Decide whether it is safe to modify a foreign table directly.
2163 */
2164
2165 /*
2166 * The table modification must be an UPDATE or DELETE.
2167 */
2168 if (operation != CMD_UPDATE && operation != CMD_DELETE)
2169 return false;
2170
2171 /*
2172 * It's unsafe to modify a foreign table directly if there are any local
2173 * joins needed.
2174 */
2175 subplan = (Plan *) list_nth(plan->plans, subplan_index);
2176 if (!IsA(subplan, ForeignScan))
2177 return false;
2178 fscan = (ForeignScan *) subplan;
2179
2180 /*
2181 * It's unsafe to modify a foreign table directly if there are any quals
2182 * that should be evaluated locally.
2183 */
2184 if (subplan->qual != NIL)
2185 return false;
2186
2187 /* Safe to fetch data about the target foreign rel */
2188 if (fscan->scan.scanrelid == 0)
2189 {
2190 foreignrel = find_join_rel(root, fscan->fs_relids);
2191 /* We should have a rel for this foreign join. */
2192 Assert(foreignrel);
2193 }
2194 else
2195 foreignrel = root->simple_rel_array[resultRelation];
2196 rte = root->simple_rte_array[resultRelation];
2197 fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2198
2199 /*
2200 * It's unsafe to update a foreign table directly, if any expressions to
2201 * assign to the target columns are unsafe to evaluate remotely.
2202 */
2203 if (operation == CMD_UPDATE)
2204 {
2205 int col;
2206
2207 /*
2208 * We transmit only columns that were explicitly targets of the
2209 * UPDATE, so as to avoid unnecessary data transmission.
2210 */
2211 col = -1;
2212 while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2213 {
2214 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2215 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
2216 TargetEntry *tle;
2217
2218 if (attno <= InvalidAttrNumber) /* shouldn't happen */
2219 elog(ERROR, "system-column update is not supported");
2220
2221 tle = get_tle_by_resno(subplan->targetlist, attno);
2222
2223 if (!tle)
2224 elog(ERROR, "attribute number %d not found in subplan targetlist",
2225 attno);
2226
2227 if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2228 return false;
2229
2230 targetAttrs = lappend_int(targetAttrs, attno);
2231 }
2232 }
2233
2234 /*
2235 * Ok, rewrite subplan so as to modify the foreign table directly.
2236 */
2237 initStringInfo(&sql);
2238
2239 /*
2240 * Core code already has some lock on each rel being planned, so we can
2241 * use NoLock here.
2242 */
2243 rel = table_open(rte->relid, NoLock);
2244
2245 /*
2246 * Recall the qual clauses that must be evaluated remotely. (These are
2247 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2248 * doesn't care.)
2249 */
2250 remote_exprs = fpinfo->final_remote_exprs;
2251
2252 /*
2253 * Extract the relevant RETURNING list if any.
2254 */
2255 if (plan->returningLists)
2256 {
2257 returningList = (List *) list_nth(plan->returningLists, subplan_index);
2258
2259 /*
2260 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2261 * we fetch from the foreign server any Vars specified in RETURNING
2262 * that refer not only to the target relation but to non-target
2263 * relations. So we'll deparse them into the RETURNING clause of the
2264 * remote query; use a targetlist consisting of them instead, which
2265 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2266 * node below.
2267 */
2268 if (fscan->scan.scanrelid == 0)
2269 returningList = build_remote_returning(resultRelation, rel,
2270 returningList);
2271 }
2272
2273 /*
2274 * Construct the SQL command string.
2275 */
2276 switch (operation)
2277 {
2278 case CMD_UPDATE:
2279 deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2280 foreignrel,
2281 ((Plan *) fscan)->targetlist,
2282 targetAttrs,
2283 remote_exprs, ¶ms_list,
2284 returningList, &retrieved_attrs);
2285 break;
2286 case CMD_DELETE:
2287 deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2288 foreignrel,
2289 remote_exprs, ¶ms_list,
2290 returningList, &retrieved_attrs);
2291 break;
2292 default:
2293 elog(ERROR, "unexpected operation: %d", (int) operation);
2294 break;
2295 }
2296
2297 /*
2298 * Update the operation info.
2299 */
2300 fscan->operation = operation;
2301
2302 /*
2303 * Update the fdw_exprs list that will be available to the executor.
2304 */
2305 fscan->fdw_exprs = params_list;
2306
2307 /*
2308 * Update the fdw_private list that will be available to the executor.
2309 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2310 */
2311 fscan->fdw_private = list_make4(makeString(sql.data),
2312 makeInteger((retrieved_attrs != NIL)),
2313 retrieved_attrs,
2314 makeInteger(plan->canSetTag));
2315
2316 /*
2317 * Update the foreign-join-related fields.
2318 */
2319 if (fscan->scan.scanrelid == 0)
2320 {
2321 /* No need for the outer subplan. */
2322 fscan->scan.plan.lefttree = NULL;
2323
2324 /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2325 if (returningList)
2326 rebuild_fdw_scan_tlist(fscan, returningList);
2327 }
2328
2329 table_close(rel, NoLock);
2330 return true;
2331 }
2332
2333 /*
2334 * postgresBeginDirectModify
2335 * Prepare a direct foreign table modification
2336 */
2337 static void
2338 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2339 {
2340 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2341 EState *estate = node->ss.ps.state;
2342 PgFdwDirectModifyState *dmstate;
2343 Index rtindex;
2344 RangeTblEntry *rte;
2345 Oid userid;
2346 ForeignTable *table;
2347 UserMapping *user;
2348 int numParams;
2349
2350 /*
2351 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2352 */
2353 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2354 return;
2355
2356 /*
2357 * We'll save private state in node->fdw_state.
2358 */
2359 dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2360 node->fdw_state = (void *) dmstate;
2361
2362 /*
2363 * Identify which user to do the remote access as. This should match what
2364 * ExecCheckRTEPerms() does.
2365 */
2366 rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
2367 rte = exec_rt_fetch(rtindex, estate);
2368 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2369
2370 /* Get info about foreign table. */
2371 if (fsplan->scan.scanrelid == 0)
2372 dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2373 else
2374 dmstate->rel = node->ss.ss_currentRelation;
2375 table = GetForeignTable(RelationGetRelid(dmstate->rel));
2376 user = GetUserMapping(userid, table->serverid);
2377
2378 /*
2379 * Get connection to the foreign server. Connection manager will
2380 * establish new connection if necessary.
2381 */
2382 dmstate->conn = GetConnection(user, false);
2383
2384 /* Update the foreign-join-related fields. */
2385 if (fsplan->scan.scanrelid == 0)
2386 {
2387 /* Save info about foreign table. */
2388 dmstate->resultRel = dmstate->rel;
2389
2390 /*
2391 * Set dmstate->rel to NULL to teach get_returning_data() and
2392 * make_tuple_from_result_row() that columns fetched from the remote
2393 * server are described by fdw_scan_tlist of the foreign-scan plan
2394 * node, not the tuple descriptor for the target relation.
2395 */
2396 dmstate->rel = NULL;
2397 }
2398
2399 /* Initialize state variable */
2400 dmstate->num_tuples = -1; /* -1 means not set yet */
2401
2402 /* Get private info created by planner functions. */
2403 dmstate->query = strVal(list_nth(fsplan->fdw_private,
2404 FdwDirectModifyPrivateUpdateSql));
2405 dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2406 FdwDirectModifyPrivateHasReturning));
2407 dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2408 FdwDirectModifyPrivateRetrievedAttrs);
2409 dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2410 FdwDirectModifyPrivateSetProcessed));
2411
2412 /* Create context for per-tuple temp workspace. */
2413 dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2414 "postgres_fdw temporary data",
2415 ALLOCSET_SMALL_SIZES);
2416
2417 /* Prepare for input conversion of RETURNING results. */
2418 if (dmstate->has_returning)
2419 {
2420 TupleDesc tupdesc;
2421
2422 if (fsplan->scan.scanrelid == 0)
2423 tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2424 else
2425 tupdesc = RelationGetDescr(dmstate->rel);
2426
2427 dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2428
2429 /*
2430 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2431 * initialize a filter to extract an updated/deleted tuple from a scan
2432 * tuple.
2433 */
2434 if (fsplan->scan.scanrelid == 0)
2435 init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2436 }
2437
2438 /*
2439 * Prepare for processing of parameters used in remote query, if any.
2440 */
2441 numParams = list_length(fsplan->fdw_exprs);
2442 dmstate->numParams = numParams;
2443 if (numParams > 0)
2444 prepare_query_params((PlanState *) node,
2445 fsplan->fdw_exprs,
2446 numParams,
2447 &dmstate->param_flinfo,
2448 &dmstate->param_exprs,
2449 &dmstate->param_values);
2450 }
2451
2452 /*
2453 * postgresIterateDirectModify
2454 * Execute a direct foreign table modification
2455 */
2456 static TupleTableSlot *
2457 postgresIterateDirectModify(ForeignScanState *node)
2458 {
2459 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2460 EState *estate = node->ss.ps.state;
2461 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2462
2463 /*
2464 * If this is the first call after Begin, execute the statement.
2465 */
2466 if (dmstate->num_tuples == -1)
2467 execute_dml_stmt(node);
2468
2469 /*
2470 * If the local query doesn't specify RETURNING, just clear tuple slot.
2471 */
2472 if (!resultRelInfo->ri_projectReturning)
2473 {
2474 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2475 Instrumentation *instr = node->ss.ps.instrument;
2476
2477 Assert(!dmstate->has_returning);
2478
2479 /* Increment the command es_processed count if necessary. */
2480 if (dmstate->set_processed)
2481 estate->es_processed += dmstate->num_tuples;
2482
2483 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2484 if (instr)
2485 instr->tuplecount += dmstate->num_tuples;
2486
2487 return ExecClearTuple(slot);
2488 }
2489
2490 /*
2491 * Get the next RETURNING tuple.
2492 */
2493 return get_returning_data(node);
2494 }
2495
2496 /*
2497 * postgresEndDirectModify
2498 * Finish a direct foreign table modification
2499 */
2500 static void
2501 postgresEndDirectModify(ForeignScanState *node)
2502 {
2503 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2504
2505 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2506 if (dmstate == NULL)
2507 return;
2508
2509 /* Release PGresult */
2510 if (dmstate->result)
2511 PQclear(dmstate->result);
2512
2513 /* Release remote connection */
2514 ReleaseConnection(dmstate->conn);
2515 dmstate->conn = NULL;
2516
2517 /* MemoryContext will be deleted automatically. */
2518 }
2519
2520 /*
2521 * postgresExplainForeignScan
2522 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2523 */
2524 static void
2525 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2526 {
2527 ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan);
2528 List *fdw_private = plan->fdw_private;
2529
2530 /*
2531 * Identify foreign scans that are really joins or upper relations. The
2532 * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2533 * digit string(s), which are RT indexes, with the correct relation names.
2534 * We do that here, not when the plan is created, because we can't know
2535 * what aliases ruleutils.c will assign at plan creation time.
2536 */
2537 if (list_length(fdw_private) > FdwScanPrivateRelations)
2538 {
2539 StringInfo relations;
2540 char *rawrelations;
2541 char *ptr;
2542 int minrti,
2543 rtoffset;
2544
2545 rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2546
2547 /*
2548 * A difficulty with using a string representation of RT indexes is
2549 * that setrefs.c won't update the string when flattening the
2550 * rangetable. To find out what rtoffset was applied, identify the
2551 * minimum RT index appearing in the string and compare it to the
2552 * minimum member of plan->fs_relids. (We expect all the relids in
2553 * the join will have been offset by the same amount; the Asserts
2554 * below should catch it if that ever changes.)
2555 */
2556 minrti = INT_MAX;
2557 ptr = rawrelations;
2558 while (*ptr)
2559 {
2560 if (isdigit((unsigned char) *ptr))
2561 {
2562 int rti = strtol(ptr, &ptr, 10);
2563
2564 if (rti < minrti)
2565 minrti = rti;
2566 }
2567 else
2568 ptr++;
2569 }
2570 rtoffset = bms_next_member(plan->fs_relids, -1) - minrti;
2571
2572 /* Now we can translate the string */
2573 relations = makeStringInfo();
2574 ptr = rawrelations;
2575 while (*ptr)
2576 {
2577 if (isdigit((unsigned char) *ptr))
2578 {
2579 int rti = strtol(ptr, &ptr, 10);
2580 RangeTblEntry *rte;
2581 char *relname;
2582 char *refname;
2583
2584 rti += rtoffset;
2585 Assert(bms_is_member(rti, plan->fs_relids));
2586 rte = rt_fetch(rti, es->rtable);
2587 Assert(rte->rtekind == RTE_RELATION);
2588 /* This logic should agree with explain.c's ExplainTargetRel */
2589 relname = get_rel_name(rte->relid);
2590 if (es->verbose)
2591 {
2592 char *namespace;
2593
2594 namespace = get_namespace_name(get_rel_namespace(rte->relid));
2595 appendStringInfo(relations, "%s.%s",
2596 quote_identifier(namespace),
2597 quote_identifier(relname));
2598 }
2599 else
2600 appendStringInfo(relations, "%s",
2601 quote_identifier(relname));
2602 refname = (char *) list_nth(es->rtable_names, rti - 1);
2603 if (refname == NULL)
2604 refname = rte->eref->aliasname;
2605 if (strcmp(refname, relname) != 0)
2606 appendStringInfo(relations, " %s",
2607 quote_identifier(refname));
2608 }
2609 else
2610 appendStringInfoChar(relations, *ptr++);
2611 }
2612 ExplainPropertyText("Relations", relations->data, es);
2613 }
2614
2615 /*
2616 * Add remote query, when VERBOSE option is specified.
2617 */
2618 if (es->verbose)
2619 {
2620 char *sql;
2621
2622 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2623 ExplainPropertyText("Remote SQL", sql, es);
2624 }
2625 }
2626
2627 /*
2628 * postgresExplainForeignModify
2629 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2630 */
2631 static void
2632 postgresExplainForeignModify(ModifyTableState *mtstate,
2633 ResultRelInfo *rinfo,
2634 List *fdw_private,
2635 int subplan_index,
2636 ExplainState *es)
2637 {
2638 if (es->verbose)
2639 {
2640 char *sql = strVal(list_nth(fdw_private,
2641 FdwModifyPrivateUpdateSql));
2642
2643 ExplainPropertyText("Remote SQL", sql, es);
2644 }
2645 }
2646
2647 /*
2648 * postgresExplainDirectModify
2649 * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2650 * foreign table directly
2651 */
2652 static void
2653 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2654 {
2655 List *fdw_private;
2656 char *sql;
2657
2658 if (es->verbose)
2659 {
2660 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2661 sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2662 ExplainPropertyText("Remote SQL", sql, es);
2663 }
2664 }
2665
2666
2667 /*
2668 * estimate_path_cost_size
2669 * Get cost and size estimates for a foreign scan on given foreign relation
2670 * either a base relation or a join between foreign relations or an upper
2671 * relation containing foreign relations.
2672 *
2673 * param_join_conds are the parameterization clauses with outer relations.
2674 * pathkeys specify the expected sort order if any for given path being costed.
2675 * fpextra specifies additional post-scan/join-processing steps such as the
2676 * final sort and the LIMIT restriction.
2677 *
2678 * The function returns the cost and size estimates in p_rows, p_width,
2679 * p_startup_cost and p_total_cost variables.
2680 */
2681 static void
2682 estimate_path_cost_size(PlannerInfo *root,
2683 RelOptInfo *foreignrel,
2684 List *param_join_conds,
2685 List *pathkeys,
2686 PgFdwPathExtraData *fpextra,
2687 double *p_rows, int *p_width,
2688 Cost *p_startup_cost, Cost *p_total_cost)
2689 {
2690 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2691 double rows;
2692 double retrieved_rows;
2693 int width;
2694 Cost startup_cost;
2695 Cost total_cost;
2696
2697 /* Make sure the core code has set up the relation's reltarget */
2698 Assert(foreignrel->reltarget);
2699
2700 /*
2701 * If the table or the server is configured to use remote estimates,
2702 * connect to the foreign server and execute EXPLAIN to estimate the
2703 * number of rows selected by the restriction+join clauses. Otherwise,
2704 * estimate rows using whatever statistics we have locally, in a way
2705 * similar to ordinary tables.
2706 */
2707 if (fpinfo->use_remote_estimate)
2708 {
2709 List *remote_param_join_conds;
2710 List *local_param_join_conds;
2711 StringInfoData sql;
2712 PGconn *conn;
2713 Selectivity local_sel;
2714 QualCost local_cost;
2715 List *fdw_scan_tlist = NIL;
2716 List *remote_conds;
2717
2718 /* Required only to be passed to deparseSelectStmtForRel */
2719 List *retrieved_attrs;
2720
2721 /*
2722 * param_join_conds might contain both clauses that are safe to send
2723 * across, and clauses that aren't.
2724 */
2725 classifyConditions(root, foreignrel, param_join_conds,
2726 &remote_param_join_conds, &local_param_join_conds);
2727
2728 /* Build the list of columns to be fetched from the foreign server. */
2729 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2730 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2731 else
2732 fdw_scan_tlist = NIL;
2733
2734 /*
2735 * The complete list of remote conditions includes everything from
2736 * baserestrictinfo plus any extra join_conds relevant to this
2737 * particular path.
2738 */
2739 remote_conds = list_concat(remote_param_join_conds,
2740 fpinfo->remote_conds);
2741
2742 /*
2743 * Construct EXPLAIN query including the desired SELECT, FROM, and
2744 * WHERE clauses. Params and other-relation Vars are replaced by dummy
2745 * values, so don't request params_list.
2746 */
2747 initStringInfo(&sql);
2748 appendStringInfoString(&sql, "EXPLAIN ");
2749 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2750 remote_conds, pathkeys,
2751 fpextra ? fpextra->has_final_sort : false,
2752 fpextra ? fpextra->has_limit : false,
2753 false, &retrieved_attrs, NULL);
2754
2755 /* Get the remote estimate */
2756 conn = GetConnection(fpinfo->user, false);
2757 get_remote_estimate(sql.data, conn, &rows, &width,
2758 &startup_cost, &total_cost);
2759 ReleaseConnection(conn);
2760
2761 retrieved_rows = rows;
2762
2763 /* Factor in the selectivity of the locally-checked quals */
2764 local_sel = clauselist_selectivity(root,
2765 local_param_join_conds,
2766 foreignrel->relid,
2767 JOIN_INNER,
2768 NULL);
2769 local_sel *= fpinfo->local_conds_sel;
2770
2771 rows = clamp_row_est(rows * local_sel);
2772
2773 /* Add in the eval cost of the locally-checked quals */
2774 startup_cost += fpinfo->local_conds_cost.startup;
2775 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2776 cost_qual_eval(&local_cost, local_param_join_conds, root);
2777 startup_cost += local_cost.startup;
2778 total_cost += local_cost.per_tuple * retrieved_rows;
2779
2780 /*
2781 * Add in tlist eval cost for each output row. In case of an
2782 * aggregate, some of the tlist expressions such as grouping
2783 * expressions will be evaluated remotely, so adjust the costs.
2784 */
2785 startup_cost += foreignrel->reltarget->cost.startup;
2786 total_cost += foreignrel->reltarget->cost.startup;
2787 total_cost += foreignrel->reltarget->cost.per_tuple * rows;
2788 if (IS_UPPER_REL(foreignrel))
2789 {
2790 QualCost tlist_cost;
2791
2792 cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
2793 startup_cost -= tlist_cost.startup;
2794 total_cost -= tlist_cost.startup;
2795 total_cost -= tlist_cost.per_tuple * rows;
2796 }
2797 }
2798 else
2799 {
2800 Cost run_cost = 0;
2801
2802 /*
2803 * We don't support join conditions in this mode (hence, no
2804 * parameterized paths can be made).
2805 */
2806 Assert(param_join_conds == NIL);
2807
2808 /*
2809 * We will come here again and again with different set of pathkeys or
2810 * additional post-scan/join-processing steps that caller wants to
2811 * cost. We don't need to calculate the cost/size estimates for the
2812 * underlying scan, join, or grouping each time. Instead, use those
2813 * estimates if we have cached them already.
2814 */
2815 if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
2816 {
2817 Assert(fpinfo->retrieved_rows >= 0);
2818
2819 rows = fpinfo->rows;
2820 retrieved_rows = fpinfo->retrieved_rows;
2821 width = fpinfo->width;
2822 startup_cost = fpinfo->rel_startup_cost;
2823 run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2824
2825 /*
2826 * If we estimate the costs of a foreign scan or a foreign join
2827 * with additional post-scan/join-processing steps, the scan or
2828 * join costs obtained from the cache wouldn't yet contain the
2829 * eval costs for the final scan/join target, which would've been
2830 * updated by apply_scanjoin_target_to_paths(); add the eval costs
2831 * now.
2832 */
2833 if (fpextra && !IS_UPPER_REL(foreignrel))
2834 {
2835 /* Shouldn't get here unless we have LIMIT */
2836 Assert(fpextra->has_limit);
2837 Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
2838 foreignrel->reloptkind == RELOPT_JOINREL);
2839 startup_cost += foreignrel->reltarget->cost.startup;
2840 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2841 }
2842 }
2843 else if (IS_JOIN_REL(foreignrel))
2844 {
2845 PgFdwRelationInfo *fpinfo_i;
2846 PgFdwRelationInfo *fpinfo_o;
2847 QualCost join_cost;
2848 QualCost remote_conds_cost;
2849 double nrows;
2850
2851 /* Use rows/width estimates made by the core code. */
2852 rows = foreignrel->rows;
2853 width = foreignrel->reltarget->width;
2854
2855 /* For join we expect inner and outer relations set */
2856 Assert(fpinfo->innerrel && fpinfo->outerrel);
2857
2858 fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2859 fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2860
2861 /* Estimate of number of rows in cross product */
2862 nrows = fpinfo_i->rows * fpinfo_o->rows;
2863
2864 /*
2865 * Back into an estimate of the number of retrieved rows. Just in
2866 * case this is nuts, clamp to at most nrows.
2867 */
2868 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2869 retrieved_rows = Min(retrieved_rows, nrows);
2870
2871 /*
2872 * The cost of foreign join is estimated as cost of generating
2873 * rows for the joining relations + cost for applying quals on the
2874 * rows.
2875 */
2876
2877 /*
2878 * Calculate the cost of clauses pushed down to the foreign server
2879 */
2880 cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2881 /* Calculate the cost of applying join clauses */
2882 cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2883
2884 /*
2885 * Startup cost includes startup cost of joining relations and the
2886 * startup cost for join and other clauses. We do not include the
2887 * startup cost specific to join strategy (e.g. setting up hash
2888 * tables) since we do not know what strategy the foreign server
2889 * is going to use.
2890 */
2891 startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2892 startup_cost += join_cost.startup;
2893 startup_cost += remote_conds_cost.startup;
2894 startup_cost += fpinfo->local_conds_cost.startup;
2895
2896 /*
2897 * Run time cost includes:
2898 *
2899 * 1. Run time cost (total_cost - startup_cost) of relations being
2900 * joined
2901 *
2902 * 2. Run time cost of applying join clauses on the cross product
2903 * of the joining relations.
2904 *
2905 * 3. Run time cost of applying pushed down other clauses on the
2906 * result of join
2907 *
2908 * 4. Run time cost of applying nonpushable other clauses locally
2909 * on the result fetched from the foreign server.
2910 */
2911 run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2912 run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2913 run_cost += nrows * join_cost.per_tuple;
2914 nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2915 run_cost += nrows * remote_conds_cost.per_tuple;
2916 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2917
2918 /* Add in tlist eval cost for each output row */
2919 startup_cost += foreignrel->reltarget->cost.startup;
2920 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2921 }
2922 else if (IS_UPPER_REL(foreignrel))
2923 {
2924 RelOptInfo *outerrel = fpinfo->outerrel;
2925 PgFdwRelationInfo *ofpinfo;
2926 AggClauseCosts aggcosts;
2927 double input_rows;
2928 int numGroupCols;
2929 double numGroups = 1;
2930
2931 /* The upper relation should have its outer relation set */
2932 Assert(outerrel);
2933 /* and that outer relation should have its reltarget set */
2934 Assert(outerrel->reltarget);
2935
2936 /*
2937 * This cost model is mixture of costing done for sorted and
2938 * hashed aggregates in cost_agg(). We are not sure which
2939 * strategy will be considered at remote side, thus for
2940 * simplicity, we put all startup related costs in startup_cost
2941 * and all finalization and run cost are added in total_cost.
2942 */
2943
2944 ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
2945
2946 /* Get rows from input rel */
2947 input_rows = ofpinfo->rows;
2948
2949 /* Collect statistics about aggregates for estimating costs. */
2950 MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2951 if (root->parse->hasAggs)
2952 {
2953 get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2954 AGGSPLIT_SIMPLE, &aggcosts);
2955
2956 /*
2957 * The cost of aggregates in the HAVING qual will be the same
2958 * for each child as it is for the parent, so there's no need
2959 * to use a translated version of havingQual.
2960 */
2961 get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2962 AGGSPLIT_SIMPLE, &aggcosts);
2963 }
2964
2965 /* Get number of grouping columns and possible number of groups */
2966 numGroupCols = list_length(root->parse->groupClause);
2967 numGroups = estimate_num_groups(root,
2968 get_sortgrouplist_exprs(root->parse->groupClause,
2969 fpinfo->grouped_tlist),
2970 input_rows, NULL);
2971
2972 /*
2973 * Get the retrieved_rows and rows estimates. If there are HAVING
2974 * quals, account for their selectivity.
2975 */
2976 if (root->parse->havingQual)
2977 {
2978 /* Factor in the selectivity of the remotely-checked quals */
2979 retrieved_rows =
2980 clamp_row_est(numGroups *
2981 clauselist_selectivity(root,
2982 fpinfo->remote_conds,
2983 0,
2984 JOIN_INNER,
2985 NULL));
2986 /* Factor in the selectivity of the locally-checked quals */
2987 rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
2988 }
2989 else
2990 {
2991 rows = retrieved_rows = numGroups;
2992 }
2993
2994 /* Use width estimate made by the core code. */
2995 width = foreignrel->reltarget->width;
2996
2997 /*-----
2998 * Startup cost includes:
2999 * 1. Startup cost for underneath input relation, adjusted for
3000 * tlist replacement by apply_scanjoin_target_to_paths()
3001 * 2. Cost of performing aggregation, per cost_agg()
3002 *-----
3003 */
3004 startup_cost = ofpinfo->rel_startup_cost;
3005 startup_cost += outerrel->reltarget->cost.startup;
3006 startup_cost += aggcosts.transCost.startup;
3007 startup_cost += aggcosts.transCost.per_tuple * input_rows;
3008 startup_cost += aggcosts.finalCost.startup;
3009 startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
3010
3011 /*-----
3012 * Run time cost includes:
3013 * 1. Run time cost of underneath input relation, adjusted for
3014 * tlist replacement by apply_scanjoin_target_to_paths()
3015 * 2. Run time cost of performing aggregation, per cost_agg()
3016 *-----
3017 */
3018 run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
3019 run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
3020 run_cost += aggcosts.finalCost.per_tuple * numGroups;
3021 run_cost += cpu_tuple_cost * numGroups;
3022
3023 /* Account for the eval cost of HAVING quals, if any */
3024 if (root->parse->havingQual)
3025 {
3026 QualCost remote_cost;
3027
3028 /* Add in the eval cost of the remotely-checked quals */
3029 cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
3030 startup_cost += remote_cost.startup;
3031 run_cost += remote_cost.per_tuple * numGroups;
3032 /* Add in the eval cost of the locally-checked quals */
3033 startup_cost += fpinfo->local_conds_cost.startup;
3034 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3035 }
3036
3037 /* Add in tlist eval cost for each output row */
3038 startup_cost += foreignrel->reltarget->cost.startup;
3039 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3040 }
3041 else
3042 {
3043 Cost cpu_per_tuple;
3044
3045 /* Use rows/width estimates made by set_baserel_size_estimates. */
3046 rows = foreignrel->rows;
3047 width = foreignrel->reltarget->width;
3048
3049 /*
3050 * Back into an estimate of the number of retrieved rows. Just in
3051 * case this is nuts, clamp to at most foreignrel->tuples.
3052 */
3053 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3054 retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
3055
3056 /*
3057 * Cost as though this were a seqscan, which is pessimistic. We
3058 * effectively imagine the local_conds are being evaluated
3059 * remotely, too.
3060 */
3061 startup_cost = 0;
3062 run_cost = 0;
3063 run_cost += seq_page_cost * foreignrel->pages;
3064
3065 startup_cost += foreignrel->baserestrictcost.startup;
3066 cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3067 run_cost += cpu_per_tuple * foreignrel->tuples;
3068
3069 /* Add in tlist eval cost for each output row */
3070 startup_cost += foreignrel->reltarget->cost.startup;
3071 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3072 }
3073
3074 /*
3075 * Without remote estimates, we have no real way to estimate the cost
3076 * of generating sorted output. It could be free if the query plan
3077 * the remote side would have chosen generates properly-sorted output
3078 * anyway, but in most cases it will cost something. Estimate a value
3079 * high enough that we won't pick the sorted path when the ordering
3080 * isn't locally useful, but low enough that we'll err on the side of
3081 * pushing down the ORDER BY clause when it's useful to do so.
3082 */
3083 if (pathkeys != NIL)
3084 {
3085 if (IS_UPPER_REL(foreignrel))
3086 {
3087 Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3088 fpinfo->stage == UPPERREL_GROUP_AGG);
3089 adjust_foreign_grouping_path_cost(root, pathkeys,
3090 retrieved_rows, width,
3091 fpextra->limit_tuples,
3092 &startup_cost, &run_cost);
3093 }
3094 else
3095 {
3096 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3097 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3098 }
3099 }
3100
3101 total_cost = startup_cost + run_cost;
3102
3103 /* Adjust the cost estimates if we have LIMIT */
3104 if (fpextra && fpextra->has_limit)
3105 {
3106 adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3107 fpextra->offset_est, fpextra->count_est);
3108 retrieved_rows = rows;
3109 }
3110 }
3111
3112 /*
3113 * If this includes the final sort step, the given target, which will be
3114 * applied to the resulting path, might have different expressions from
3115 * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3116 * eval costs.
3117 */
3118 if (fpextra && fpextra->has_final_sort &&
3119 fpextra->target != foreignrel->reltarget)
3120 {
3121 QualCost oldcost = foreignrel->reltarget->cost;
3122 QualCost newcost = fpextra->target->cost;
3123
3124 startup_cost += newcost.startup - oldcost.startup;
3125 total_cost += newcost.startup - oldcost.startup;
3126 total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3127 }
3128
3129 /*
3130 * Cache the retrieved rows and cost estimates for scans, joins, or
3131 * groupings without any parameterization, pathkeys, or additional
3132 * post-scan/join-processing steps, before adding the costs for
3133 * transferring data from the foreign server. These estimates are useful
3134 * for costing remote joins involving this relation or costing other
3135 * remote operations on this relation such as remote sorts and remote
3136 * LIMIT restrictions, when the costs can not be obtained from the foreign
3137 * server. This function will be called at least once for every foreign
3138 * relation without any parameterization, pathkeys, or additional
3139 * post-scan/join-processing steps.
3140 */
3141 if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3142 {
3143 fpinfo->retrieved_rows = retrieved_rows;
3144 fpinfo->rel_startup_cost = startup_cost;
3145 fpinfo->rel_total_cost = total_cost;
3146 }
3147
3148 /*
3149 * Add some additional cost factors to account for connection overhead
3150 * (fdw_startup_cost), transferring data across the network
3151 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3152 * (cpu_tuple_cost per retrieved row).
3153 */
3154 startup_cost += fpinfo->fdw_startup_cost;
3155 total_cost += fpinfo->fdw_startup_cost;
3156 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3157 total_cost += cpu_tuple_cost * retrieved_rows;
3158
3159 /*
3160 * If we have LIMIT, we should prefer performing the restriction remotely
3161 * rather than locally, as the former avoids extra row fetches from the
3162 * remote that the latter might cause. But since the core code doesn't
3163 * account for such fetches when estimating the costs of the local
3164 * restriction (see create_limit_path()), there would be no difference
3165 * between the costs of the local restriction and the costs of the remote
3166 * restriction estimated above if we don't use remote estimates (except
3167 * for the case where the foreignrel is a grouping relation, the given
3168 * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3169 * accounted for in costing the remote restriction). Tweak the costs of
3170 * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3171 * one.
3172 */
3173 if (!fpinfo->use_remote_estimate &&
3174 fpextra && fpextra->has_limit &&
3175 fpextra->limit_tuples > 0 &&
3176 fpextra->limit_tuples < fpinfo->rows)
3177 {
3178 Assert(fpinfo->rows > 0);
3179 total_cost -= (total_cost - startup_cost) * 0.05 *
3180 (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3181 }
3182
3183 /* Return results. */
3184 *p_rows = rows;
3185 *p_width = width;
3186 *p_startup_cost = startup_cost;
3187 *p_total_cost = total_cost;
3188 }
3189
3190 /*
3191 * Estimate costs of executing a SQL statement remotely.
3192 * The given "sql" must be an EXPLAIN command.
3193 */
3194 static void
3195 get_remote_estimate(const char *sql, PGconn *conn,
3196 double *rows, int *width,
3197 Cost *startup_cost, Cost *total_cost)
3198 {
3199 PGresult *volatile res = NULL;
3200
3201 /* PGresult must be released before leaving this function. */
3202 PG_TRY();
3203 {
3204 char *line;
3205 char *p;
3206 int n;
3207
3208 /*
3209 * Execute EXPLAIN remotely.
3210 */
3211 res = pgfdw_exec_query(conn, sql);
3212 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3213 pgfdw_report_error(ERROR, res, conn, false, sql);
3214
3215 /*
3216 * Extract cost numbers for topmost plan node. Note we search for a
3217 * left paren from the end of the line to avoid being confused by
3218 * other uses of parentheses.
3219 */
3220 line = PQgetvalue(res, 0, 0);
3221 p = strrchr(line, '(');
3222 if (p == NULL)
3223 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3224 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3225 startup_cost, total_cost, rows, width);
3226 if (n != 4)
3227 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3228 }
3229 PG_FINALLY();
3230 {
3231 if (res)
3232 PQclear(res);
3233 }
3234 PG_END_TRY();
3235 }
3236
3237 /*
3238 * Adjust the cost estimates of a foreign grouping path to include the cost of
3239 * generating properly-sorted output.
3240 */
3241 static void
3242 adjust_foreign_grouping_path_cost(PlannerInfo *root,
3243 List *pathkeys,
3244 double retrieved_rows,
3245 double width,
3246 double limit_tuples,
3247 Cost *p_startup_cost,
3248 Cost *p_run_cost)
3249 {
3250 /*
3251 * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3252 * side is unlikely to generate properly-sorted output, so it would need
3253 * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3254 * if the GROUP BY clause is sort-able but isn't a superset of the given
3255 * pathkeys, adjust the costs with that function. Otherwise, adjust the
3256 * costs by applying the same heuristic as for the scan or join case.
3257 */
3258 if (!grouping_is_sortable(root->parse->groupClause) ||
3259 !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3260 {
3261 Path sort_path; /* dummy for result of cost_sort */
3262
3263 cost_sort(&sort_path,
3264 root,
3265 pathkeys,
3266 *p_startup_cost + *p_run_cost,
3267 retrieved_rows,
3268 width,
3269 0.0,
3270 work_mem,
3271 limit_tuples);
3272
3273 *p_startup_cost = sort_path.startup_cost;
3274 *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3275 }
3276 else
3277 {
3278 /*
3279 * The default extra cost seems too large for foreign-grouping cases;
3280 * add 1/4th of that default.
3281 */
3282 double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3283 - 1.0) * 0.25;
3284
3285 *p_startup_cost *= sort_multiplier;
3286 *p_run_cost *= sort_multiplier;
3287 }
3288 }
3289
3290 /*
3291 * Detect whether we want to process an EquivalenceClass member.
3292 *
3293 * This is a callback for use by generate_implied_equalities_for_column.
3294 */
3295 static bool
3296 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
3297 EquivalenceClass *ec, EquivalenceMember *em,
3298 void *arg)
3299 {
3300 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
3301 Expr *expr = em->em_expr;
3302
3303 /*
3304 * If we've identified what we're processing in the current scan, we only
3305 * want to match that expression.
3306 */
3307 if (state->current != NULL)
3308 return equal(expr, state->current);
3309
3310 /*
3311 * Otherwise, ignore anything we've already processed.
3312 */
3313 if (list_member(state->already_used, expr))
3314 return false;
3315
3316 /* This is the new target to process. */
3317 state->current = expr;
3318 return true;
3319 }
3320
3321 /*
3322 * Create cursor for node's query with current parameter values.
3323 */
3324 static void
3325 create_cursor(ForeignScanState *node)
3326 {
3327 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3328 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3329 int numParams = fsstate->numParams;
3330 const char **values = fsstate->param_values;
3331 PGconn *conn = fsstate->conn;
3332 StringInfoData buf;
3333 PGresult *res;
3334
3335 /*
3336 * Construct array of query parameter values in text format. We do the
3337 * conversions in the short-lived per-tuple context, so as not to cause a
3338 * memory leak over repeated scans.
3339 */
3340 if (numParams > 0)
3341 {
3342 MemoryContext oldcontext;
3343
3344 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3345
3346 process_query_params(econtext,
3347 fsstate->param_flinfo,
3348 fsstate->param_exprs,
3349 values);
3350
3351 MemoryContextSwitchTo(oldcontext);
3352 }
3353
3354 /* Construct the DECLARE CURSOR command */
3355 initStringInfo(&buf);
3356 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3357 fsstate->cursor_number, fsstate->query);
3358
3359 /*
3360 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3361 * to infer types for all parameters. Since we explicitly cast every
3362 * parameter (see deparse.c), the "inference" is trivial and will produce
3363 * the desired result. This allows us to avoid assuming that the remote
3364 * server has the same OIDs we do for the parameters' types.
3365 */
3366 if (!PQsendQueryParams(conn, buf.data, numParams,
3367 NULL, values, NULL, NULL, 0))
3368 pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3369
3370 /*
3371 * Get the result, and check for success.
3372 *
3373 * We don't use a PG_TRY block here, so be careful not to throw error
3374 * without releasing the PGresult.
3375 */
3376 res = pgfdw_get_result(conn, buf.data);
3377 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3378 pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3379 PQclear(res);
3380
3381 /* Mark the cursor as created, and show no tuples have been retrieved */
3382 fsstate->cursor_exists = true;
3383 fsstate->tuples = NULL;
3384 fsstate->num_tuples = 0;
3385 fsstate->next_tuple = 0;
3386 fsstate->fetch_ct_2 = 0;
3387 fsstate->eof_reached = false;
3388
3389 /* Clean up */
3390 pfree(buf.data);
3391 }
3392
3393 /*
3394 * Fetch some more rows from the node's cursor.
3395 */
3396 static void
3397 fetch_more_data(ForeignScanState *node)
3398 {
3399 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3400 PGresult *volatile res = NULL;
3401 MemoryContext oldcontext;
3402
3403 /*
3404 * We'll store the tuples in the batch_cxt. First, flush the previous
3405 * batch.
3406 */
3407 fsstate->tuples = NULL;
3408 MemoryContextReset(fsstate->batch_cxt);
3409 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3410
3411 /* PGresult must be released before leaving this function. */
3412 PG_TRY();
3413 {
3414 PGconn *conn = fsstate->conn;
3415 char sql[64];
3416 int numrows;
3417 int i;
3418
3419 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3420 fsstate->fetch_size, fsstate->cursor_number);
3421
3422 res = pgfdw_exec_query(conn, sql);
3423 /* On error, report the original query, not the FETCH. */
3424 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3425 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3426
3427 /* Convert the data into HeapTuples */
3428 numrows = PQntuples(res);
3429 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3430 fsstate->num_tuples = numrows;
3431 fsstate->next_tuple = 0;
3432
3433 for (i = 0; i < numrows; i++)
3434 {
3435 Assert(IsA(node->ss.ps.plan, ForeignScan));
3436
3437 fsstate->tuples[i] =
3438 make_tuple_from_result_row(res, i,
3439 fsstate->rel,
3440 fsstate->attinmeta,
3441 fsstate->retrieved_attrs,
3442 node,
3443 fsstate->temp_cxt);
3444 }
3445
3446 /* Update fetch_ct_2 */
3447 if (fsstate->fetch_ct_2 < 2)
3448 fsstate->fetch_ct_2++;
3449
3450 /* Must be EOF if we didn't get as many tuples as we asked for. */
3451 fsstate->eof_reached = (numrows < fsstate->fetch_size);
3452 }
3453 PG_FINALLY();
3454 {
3455 if (res)
3456 PQclear(res);
3457 }
3458 PG_END_TRY();
3459
3460 MemoryContextSwitchTo(oldcontext);
3461 }
3462
3463 /*
3464 * Force assorted GUC parameters to settings that ensure that we'll output
3465 * data values in a form that is unambiguous to the remote server.
3466 *
3467 * This is rather expensive and annoying to do once per row, but there's
3468 * little choice if we want to be sure values are transmitted accurately;
3469 * we can't leave the settings in place between rows for fear of affecting
3470 * user-visible computations.
3471 *
3472 * We use the equivalent of a function SET option to allow the settings to
3473 * persist only until the caller calls reset_transmission_modes(). If an
3474 * error is thrown in between, guc.c will take care of undoing the settings.
3475 *
3476 * The return value is the nestlevel that must be passed to
3477 * reset_transmission_modes() to undo things.
3478 */
3479 int
3480 set_transmission_modes(void)
3481 {
3482 int nestlevel = NewGUCNestLevel();
3483
3484 /*
3485 * The values set here should match what pg_dump does. See also
3486 * configure_remote_session in connection.c.
3487 */
3488 if (DateStyle != USE_ISO_DATES)
3489 (void) set_config_option("datestyle", "ISO",
3490 PGC_USERSET, PGC_S_SESSION,
3491 GUC_ACTION_SAVE, true, 0, false);
3492 if (IntervalStyle != INTSTYLE_POSTGRES)
3493 (void) set_config_option("intervalstyle", "postgres",
3494 PGC_USERSET, PGC_S_SESSION,
3495 GUC_ACTION_SAVE, true, 0, false);
3496 if (extra_float_digits < 3)
3497 (void) set_config_option("extra_float_digits", "3",
3498 PGC_USERSET, PGC_S_SESSION,
3499 GUC_ACTION_SAVE, true, 0, false);
3500
3501 return nestlevel;
3502 }
3503
3504 /*
3505 * Undo the effects of set_transmission_modes().
3506 */
3507 void
3508 reset_transmission_modes(int nestlevel)
3509 {
3510 AtEOXact_GUC(true, nestlevel);
3511 }
3512
3513 /*
3514 * Utility routine to close a cursor.
3515 */
3516 static void
3517 close_cursor(PGconn *conn, unsigned int cursor_number)
3518 {
3519 char sql[64];
3520 PGresult *res;
3521
3522 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3523
3524 /*
3525 * We don't use a PG_TRY block here, so be careful not to throw error
3526 * without releasing the PGresult.
3527 */
3528 res = pgfdw_exec_query(conn, sql);
3529 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3530 pgfdw_report_error(ERROR, res, conn, true, sql);
3531 PQclear(res);
3532 }
3533
3534 /*
3535 * create_foreign_modify
3536 * Construct an execution state of a foreign insert/update/delete
3537 * operation
3538 */
3539 static PgFdwModifyState *
3540 create_foreign_modify(EState *estate,
3541 RangeTblEntry *rte,
3542 ResultRelInfo *resultRelInfo,
3543 CmdType operation,
3544 Plan *subplan,
3545 char *query,
3546 List *target_attrs,
3547 bool has_returning,
3548 List *retrieved_attrs)
3549 {
3550 PgFdwModifyState *fmstate;
3551 Relation rel = resultRelInfo->ri_RelationDesc;
3552 TupleDesc tupdesc = RelationGetDescr(rel);
3553 Oid userid;
3554 ForeignTable *table;
3555 UserMapping *user;
3556 AttrNumber n_params;
3557 Oid typefnoid;
3558 bool isvarlena;
3559 ListCell *lc;
3560
3561 /* Begin constructing PgFdwModifyState. */
3562 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3563 fmstate->rel = rel;
3564
3565 /*
3566 * Identify which user to do the remote access as. This should match what
3567 * ExecCheckRTEPerms() does.
3568 */
3569 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3570
3571 /* Get info about foreign table. */
3572 table = GetForeignTable(RelationGetRelid(rel));
3573 user = GetUserMapping(userid, table->serverid);
3574
3575 /* Open connection; report that we'll create a prepared statement. */
3576 fmstate->conn = GetConnection(user, true);
3577 fmstate->p_name = NULL; /* prepared statement not made yet */
3578
3579 /* Set up remote query information. */
3580 fmstate->query = query;
3581 fmstate->target_attrs = target_attrs;
3582 fmstate->has_returning = has_returning;
3583 fmstate->retrieved_attrs = retrieved_attrs;
3584
3585 /* Create context for per-tuple temp workspace. */
3586 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3587 "postgres_fdw temporary data",
3588 ALLOCSET_SMALL_SIZES);
3589
3590 /* Prepare for input conversion of RETURNING results. */
3591 if (fmstate->has_returning)
3592 fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3593
3594 /* Prepare for output conversion of parameters used in prepared stmt. */
3595 n_params = list_length(fmstate->target_attrs) + 1;
3596 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3597 fmstate->p_nums = 0;
3598
3599 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3600 {
3601 Assert(subplan != NULL);
3602
3603 /* Find the ctid resjunk column in the subplan's result */
3604 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
3605 "ctid");
3606 if (!AttributeNumberIsValid(fmstate->ctidAttno))
3607 elog(ERROR, "could not find junk ctid column");
3608
3609 /* First transmittable parameter will be ctid */
3610 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
3611 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3612 fmstate->p_nums++;
3613 }
3614
3615 if (operation == CMD_INSERT || operation == CMD_UPDATE)
3616 {
3617 /* Set up for remaining transmittable parameters */
3618 foreach(lc, fmstate->target_attrs)
3619 {
3620 int attnum = lfirst_int(lc);
3621 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3622
3623 Assert(!attr->attisdropped);
3624
3625 /* Ignore generated columns; they are set to DEFAULT */
3626 if (attr->attgenerated)
3627 continue;
3628 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
3629 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3630 fmstate->p_nums++;
3631 }
3632 }
3633
3634 Assert(fmstate->p_nums <= n_params);
3635
3636 /* Initialize auxiliary state */
3637 fmstate->aux_fmstate = NULL;
3638
3639 return fmstate;
3640 }
3641
3642 /*
3643 * execute_foreign_modify
3644 * Perform foreign-table modification as required, and fetch RETURNING
3645 * result if any. (This is the shared guts of postgresExecForeignInsert,
3646 * postgresExecForeignUpdate, and postgresExecForeignDelete.)
3647 */
3648 static TupleTableSlot *
3649 execute_foreign_modify(EState *estate,
3650 ResultRelInfo *resultRelInfo,
3651 CmdType operation,
3652 TupleTableSlot *slot,
3653 TupleTableSlot *planSlot)
3654 {
3655 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
3656 ItemPointer ctid = NULL;
3657 const char **p_values;
3658 PGresult *res;
3659 int n_rows;
3660
3661 /* The operation should be INSERT, UPDATE, or DELETE */
3662 Assert(operation == CMD_INSERT ||
3663 operation == CMD_UPDATE ||
3664 operation == CMD_DELETE);
3665
3666 /* Set up the prepared statement on the remote server, if we didn't yet */
3667 if (!fmstate->p_name)
3668 prepare_foreign_modify(fmstate);
3669
3670 /*
3671 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
3672 */
3673 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3674 {
3675 Datum datum;
3676 bool isNull;
3677
3678 datum = ExecGetJunkAttribute(planSlot,
3679 fmstate->ctidAttno,
3680 &isNull);
3681 /* shouldn't ever get a null result... */
3682 if (isNull)
3683 elog(ERROR, "ctid is NULL");
3684 ctid = (ItemPointer) DatumGetPointer(datum);
3685 }
3686
3687 /* Convert parameters needed by prepared statement to text form */
3688 p_values = convert_prep_stmt_params(fmstate, ctid, slot);
3689
3690 /*
3691 * Execute the prepared statement.
3692 */
3693 if (!PQsendQueryPrepared(fmstate->conn,
3694 fmstate->p_name,
3695 fmstate->p_nums,
3696 p_values,
3697 NULL,
3698 NULL,
3699 0))
3700 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3701
3702 /*
3703 * Get the result, and check for success.
3704 *
3705 * We don't use a PG_TRY block here, so be careful not to throw error
3706 * without releasing the PGresult.
3707 */
3708 res = pgfdw_get_result(fmstate->conn, fmstate->query);
3709 if (PQresultStatus(res) !=
3710 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
3711 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3712
3713 /* Check number of rows affected, and fetch RETURNING tuple if any */
3714 if (fmstate->has_returning)
3715 {
3716 n_rows = PQntuples(res);
3717 if (n_rows > 0)
3718 store_returning_result(fmstate, slot, res);
3719 }
3720 else
3721 n_rows = atoi(PQcmdTuples(res));
3722
3723 /* And clean up */
3724 PQclear(res);
3725
3726 MemoryContextReset(fmstate->temp_cxt);
3727
3728 /*
3729 * Return NULL if nothing was inserted/updated/deleted on the remote end
3730 */
3731 return (n_rows > 0) ? slot : NULL;
3732 }
3733
3734 /*
3735 * prepare_foreign_modify
3736 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3737 */
3738 static void
3739 prepare_foreign_modify(PgFdwModifyState *fmstate)
3740 {
3741 char prep_name[NAMEDATALEN];
3742 char *p_name;
3743 PGresult *res;
3744
3745 /* Construct name we'll use for the prepared statement. */
3746 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3747 GetPrepStmtNumber(fmstate->conn));
3748 p_name = pstrdup(prep_name);
3749
3750 /*
3751 * We intentionally do not specify parameter types here, but leave the
3752 * remote server to derive them by default. This avoids possible problems
3753 * with the remote server using different type OIDs than we do. All of
3754 * the prepared statements we use in this module are simple enough that
3755 * the remote server will make the right choices.
3756 */
3757 if (!PQsendPrepare(fmstate->conn,
3758 p_name,
3759 fmstate->query,
3760 0,
3761 NULL))
3762 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3763
3764 /*
3765 * Get the result, and check for success.
3766 *
3767 * We don't use a PG_TRY block here, so be careful not to throw error
3768 * without releasing the PGresult.
3769 */
3770 res = pgfdw_get_result(fmstate->conn, fmstate->query);
3771 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3772 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3773 PQclear(res);
3774
3775 /* This action shows that the prepare has been done. */
3776 fmstate->p_name = p_name;
3777 }
3778
3779 /*
3780 * convert_prep_stmt_params
3781 * Create array of text strings representing parameter values
3782 *
3783 * tupleid is ctid to send, or NULL if none
3784 * slot is slot to get remaining parameters from, or NULL if none
3785 *
3786 * Data is constructed in temp_cxt; caller should reset that after use.
3787 */
3788 static const char **
3789 convert_prep_stmt_params(PgFdwModifyState *fmstate,
3790 ItemPointer tupleid,
3791 TupleTableSlot *slot)
3792 {
3793 const char **p_values;
3794 int pindex = 0;
3795 MemoryContext oldcontext;
3796
3797 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3798
3799 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3800
3801 /* 1st parameter should be ctid, if it's in use */
3802 if (tupleid != NULL)
3803 {
3804 /* don't need set_transmission_modes for TID output */
3805 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3806 PointerGetDatum(tupleid));
3807 pindex++;
3808 }
3809
3810 /* get following parameters from slot */
3811 if (slot != NULL && fmstate->target_attrs != NIL)
3812 {
3813 TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
3814 int nestlevel;
3815 ListCell *lc;
3816
3817 nestlevel = set_transmission_modes();
3818
3819 foreach(lc, fmstate->target_attrs)
3820 {
3821 int attnum = lfirst_int(lc);
3822 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3823 Datum value;
3824 bool isnull;
3825
3826 /* Ignore generated columns; they are set to DEFAULT */
3827 if (attr->attgenerated)
3828 continue;
3829 value = slot_getattr(slot, attnum, &isnull);
3830 if (isnull)
3831 p_values[pindex] = NULL;
3832 else
3833 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3834 value);
3835 pindex++;
3836 }
3837
3838 reset_transmission_modes(nestlevel);
3839 }
3840
3841 Assert(pindex == fmstate->p_nums);
3842
3843 MemoryContextSwitchTo(oldcontext);
3844
3845 return p_values;
3846 }
3847
3848 /*
3849 * store_returning_result
3850 * Store the result of a RETURNING clause
3851 *
3852 * On error, be sure to release the PGresult on the way out. Callers do not
3853 * have PG_TRY blocks to ensure this happens.
3854 */
3855 static void
3856 store_returning_result(PgFdwModifyState *fmstate,
3857 TupleTableSlot *slot, PGresult *res)
3858 {
3859 PG_TRY();
3860 {
3861 HeapTuple newtup;
3862
3863 newtup = make_tuple_from_result_row(res, 0,
3864 fmstate->rel,
3865 fmstate->attinmeta,
3866 fmstate->retrieved_attrs,
3867 NULL,
3868 fmstate->temp_cxt);
3869
3870 /*
3871 * The returning slot will not necessarily be suitable to store
3872 * heaptuples directly, so allow for conversion.
3873 */
3874 ExecForceStoreHeapTuple(newtup, slot, true);
3875 }
3876 PG_CATCH();
3877 {
3878 if (res)
3879 PQclear(res);
3880 PG_RE_THROW();
3881 }
3882 PG_END_TRY();
3883 }
3884
3885 /*
3886 * finish_foreign_modify
3887 * Release resources for a foreign insert/update/delete operation
3888 */
3889 static void
3890 finish_foreign_modify(PgFdwModifyState *fmstate)
3891 {
3892 Assert(fmstate != NULL);
3893
3894 /* If we created a prepared statement, destroy it */
3895 if (fmstate->p_name)
3896 {
3897 char sql[64];
3898 PGresult *res;
3899
3900 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
3901
3902 /*
3903 * We don't use a PG_TRY block here, so be careful not to throw error
3904 * without releasing the PGresult.
3905 */
3906 res = pgfdw_exec_query(fmstate->conn, sql);
3907 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3908 pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
3909 PQclear(res);
3910 fmstate->p_name = NULL;
3911 }
3912
3913 /* Release remote connection */
3914 ReleaseConnection(fmstate->conn);
3915 fmstate->conn = NULL;
3916 }
3917
3918 /*
3919 * build_remote_returning
3920 * Build a RETURNING targetlist of a remote query for performing an
3921 * UPDATE/DELETE .. RETURNING on a join directly
3922 */
3923 static List *
3924 build_remote_returning(Index rtindex, Relation rel, List *returningList)
3925 {
3926 bool have_wholerow = false;
3927 List *tlist = NIL;
3928 List *vars;
3929 ListCell *lc;
3930
3931 Assert(returningList);
3932
3933 vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
3934
3935 /*
3936 * If there's a whole-row reference to the target relation, then we'll
3937 * need all the columns of the relation.
3938 */
3939 foreach(lc, vars)
3940 {
3941 Var *var = (Var *) lfirst(lc);
3942
3943 if (IsA(var, Var) &&
3944 var->varno == rtindex &&
3945 var->varattno == InvalidAttrNumber)
3946 {
3947 have_wholerow = true;
3948 break;
3949 }
3950 }
3951
3952 if (have_wholerow)
3953 {
3954 TupleDesc tupdesc = RelationGetDescr(rel);
3955 int i;
3956
3957 for (i = 1; i <= tupdesc->natts; i++)
3958 {
3959 Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
3960 Var *var;
3961
3962 /* Ignore dropped attributes. */
3963 if (attr->attisdropped)
3964 continue;
3965
3966 var = makeVar(rtindex,
3967 i,
3968 attr->atttypid,
3969 attr->atttypmod,
3970 attr->attcollation,
3971 0);
3972
3973 tlist = lappend(tlist,
3974 makeTargetEntry((Expr *) var,
3975 list_length(tlist) + 1,
3976 NULL,
3977 false));
3978 }
3979 }
3980
3981 /* Now add any remaining columns to tlist. */
3982 foreach(lc, vars)
3983 {
3984 Var *var = (Var *) lfirst(lc);
3985
3986 /*
3987 * No need for whole-row references to the target relation. We don't
3988 * need system columns other than ctid and oid either, since those are
3989 * set locally.
3990 */
3991 if (IsA(var, Var) &&
3992 var->varno == rtindex &&
3993 var->varattno <= InvalidAttrNumber &&
3994 var->varattno != SelfItemPointerAttributeNumber)
3995 continue; /* don't need it */
3996
3997 if (tlist_member((Expr *) var, tlist))
3998 continue; /* already got it */
3999
4000 tlist = lappend(tlist,
4001 makeTargetEntry((Expr *) var,
4002 list_length(tlist) + 1,
4003 NULL,
4004 false));
4005 }
4006
4007 list_free(vars);
4008
4009 return tlist;
4010 }
4011
4012 /*
4013 * rebuild_fdw_scan_tlist
4014 * Build new fdw_scan_tlist of given foreign-scan plan node from given
4015 * tlist
4016 *
4017 * There might be columns that the fdw_scan_tlist of the given foreign-scan
4018 * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
4019 * have contained resjunk columns such as 'ctid' of the target relation and
4020 * 'wholerow' of non-target relations, but the tlist might not contain them,
4021 * for example. So, adjust the tlist so it contains all the columns specified
4022 * in the fdw_scan_tlist; else setrefs.c will get confused.
4023 */
4024 static void
4025 rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
4026 {
4027 List *new_tlist = tlist;
4028 List *old_tlist = fscan->fdw_scan_tlist;
4029 ListCell *lc;
4030
4031 foreach(lc, old_tlist)
4032 {
4033 TargetEntry *tle = (TargetEntry *) lfirst(lc);
4034
4035 if (tlist_member(tle->expr, new_tlist))
4036 continue; /* already got it */
4037
4038 new_tlist = lappend(new_tlist,
4039 makeTargetEntry(tle->expr,
4040 list_length(new_tlist) + 1,
4041 NULL,
4042 false));
4043 }
4044 fscan->fdw_scan_tlist = new_tlist;
4045 }
4046
4047 /*
4048 * Execute a direct UPDATE/DELETE statement.
4049 */
4050 static void
4051 execute_dml_stmt(ForeignScanState *node)
4052 {
4053 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4054 ExprContext *econtext = node->ss.ps.ps_ExprContext;
4055 int numParams = dmstate->numParams;
4056 const char **values = dmstate->param_values;
4057
4058 /*
4059 * Construct array of query parameter values in text format.
4060 */
4061 if (numParams > 0)
4062 process_query_params(econtext,
4063 dmstate->param_flinfo,
4064 dmstate->param_exprs,
4065 values);
4066
4067 /*
4068 * Notice that we pass NULL for paramTypes, thus forcing the remote server
4069 * to infer types for all parameters. Since we explicitly cast every
4070 * parameter (see deparse.c), the "inference" is trivial and will produce
4071 * the desired result. This allows us to avoid assuming that the remote
4072 * server has the same OIDs we do for the parameters' types.
4073 */
4074 if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4075 NULL, values, NULL, NULL, 0))
4076 pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4077
4078 /*
4079 * Get the result, and check for success.
4080 *
4081 * We don't use a PG_TRY block here, so be careful not to throw error
4082 * without releasing the PGresult.
4083 */
4084 dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4085 if (PQresultStatus(dmstate->result) !=
4086 (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4087 pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4088 dmstate->query);
4089
4090 /* Get the number of rows affected. */
4091 if (dmstate->has_returning)
4092 dmstate->num_tuples = PQntuples(dmstate->result);
4093 else
4094 dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4095 }
4096
4097 /*
4098 * Get the result of a RETURNING clause.
4099 */
4100 static TupleTableSlot *
4101 get_returning_data(ForeignScanState *node)
4102 {
4103 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4104 EState *estate = node->ss.ps.state;
4105 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
4106 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4107 TupleTableSlot *resultSlot;
4108
4109 Assert(resultRelInfo->ri_projectReturning);
4110
4111 /* If we didn't get any tuples, must be end of data. */
4112 if (dmstate->next_tuple >= dmstate->num_tuples)
4113 return ExecClearTuple(slot);
4114
4115 /* Increment the command es_processed count if necessary. */
4116 if (dmstate->set_processed)
4117 estate->es_processed += 1;
4118
4119 /*
4120 * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4121 * tuple. (has_returning is false when the local query is of the form
4122 * "UPDATE/DELETE .. RETURNING 1" for example.)
4123 */
4124 if (!dmstate->has_returning)
4125 {
4126 ExecStoreAllNullTuple(slot);
4127 resultSlot = slot;
4128 }
4129 else
4130 {
4131 /*
4132 * On error, be sure to release the PGresult on the way out. Callers
4133 * do not have PG_TRY blocks to ensure this happens.
4134 */
4135 PG_TRY();
4136 {
4137 HeapTuple newtup;
4138
4139 newtup = make_tuple_from_result_row(dmstate->result,
4140 dmstate->next_tuple,
4141 dmstate->rel,
4142 dmstate->attinmeta,
4143 dmstate->retrieved_attrs,
4144 node,
4145 dmstate->temp_cxt);
4146 ExecStoreHeapTuple(newtup, slot, false);
4147 }
4148 PG_CATCH();
4149 {
4150 if (dmstate->result)
4151 PQclear(dmstate->result);
4152 PG_RE_THROW();
4153 }
4154 PG_END_TRY();
4155
4156 /* Get the updated/deleted tuple. */
4157 if (dmstate->rel)
4158 resultSlot = slot;
4159 else
4160 resultSlot = apply_returning_filter(dmstate, slot, estate);
4161 }
4162 dmstate->next_tuple++;
4163
4164 /* Make slot available for evaluation of the local query RETURNING list. */
4165 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4166 resultSlot;
4167
4168 return slot;
4169 }
4170
4171 /*
4172 * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4173 */
4174 static void
4175 init_returning_filter(PgFdwDirectModifyState *dmstate,
4176 List *fdw_scan_tlist,
4177 Index rtindex)
4178 {
4179 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4180 ListCell *lc;
4181 int i;
4182
4183 /*
4184 * Calculate the mapping between the fdw_scan_tlist's entries and the
4185 * result tuple's attributes.
4186 *
4187 * The "map" is an array of indexes of the result tuple's attributes in
4188 * fdw_scan_tlist, i.e., one entry for every attribute of the result
4189 * tuple. We store zero for any attributes that don't have the
4190 * corresponding entries in that list, marking that a NULL is needed in
4191 * the result tuple.
4192 *
4193 * Also get the indexes of the entries for ctid and oid if any.
4194 */
4195 dmstate->attnoMap = (AttrNumber *)
4196 palloc0(resultTupType->natts * sizeof(AttrNumber));
4197
4198 dmstate->ctidAttno = dmstate->oidAttno = 0;
4199
4200 i = 1;
4201 dmstate->hasSystemCols = false;
4202 foreach(lc, fdw_scan_tlist)
4203 {
4204 TargetEntry *tle = (TargetEntry *) lfirst(lc);
4205 Var *var = (Var *) tle->expr;
4206
4207 Assert(IsA(var, Var));
4208
4209 /*
4210 * If the Var is a column of the target relation to be retrieved from
4211 * the foreign server, get the index of the entry.
4212 */
4213 if (var->varno == rtindex &&
4214 list_member_int(dmstate->retrieved_attrs, i))
4215 {
4216 int attrno = var->varattno;
4217
4218 if (attrno < 0)
4219 {
4220 /*
4221 * We don't retrieve system columns other than ctid and oid.
4222 */
4223 if (attrno == SelfItemPointerAttributeNumber)
4224 dmstate->ctidAttno = i;
4225 else
4226 Assert(false);
4227 dmstate->hasSystemCols = true;
4228 }
4229 else
4230 {
4231 /*
4232 * We don't retrieve whole-row references to the target
4233 * relation either.
4234 */
4235 Assert(attrno > 0);
4236
4237 dmstate->attnoMap[attrno - 1] = i;
4238 }
4239 }
4240 i++;
4241 }
4242 }
4243
4244 /*
4245 * Extract and return an updated/deleted tuple from a scan tuple.
4246 */
4247 static TupleTableSlot *
4248 apply_returning_filter(PgFdwDirectModifyState *dmstate,
4249 TupleTableSlot *slot,
4250 EState *estate)
4251 {
4252 ResultRelInfo *relInfo = estate->es_result_relation_info;
4253 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4254 TupleTableSlot *resultSlot;
4255 Datum *values;
4256 bool *isnull;
4257 Datum *old_values;
4258 bool *old_isnull;
4259 int i;
4260
4261 /*
4262 * Use the return tuple slot as a place to store the result tuple.
4263 */
4264 resultSlot = ExecGetReturningSlot(estate, relInfo);
4265
4266 /*
4267 * Extract all the values of the scan tuple.
4268 */
4269 slot_getallattrs(slot);
4270 old_values = slot->tts_values;
4271 old_isnull = slot->tts_isnull;
4272
4273 /*
4274 * Prepare to build the result tuple.
4275 */
4276 ExecClearTuple(resultSlot);
4277 values = resultSlot->tts_values;
4278 isnull = resultSlot->tts_isnull;
4279
4280 /*
4281 * Transpose data into proper fields of the result tuple.
4282 */
4283 for (i = 0; i < resultTupType->natts; i++)
4284 {
4285 int j = dmstate->attnoMap[i];
4286
4287 if (j == 0)
4288 {
4289 values[i] = (Datum) 0;
4290 isnull[i] = true;
4291 }
4292 else
4293 {
4294 values[i] = old_values[j - 1];
4295 isnull[i] = old_isnull[j - 1];
4296 }
4297 }
4298
4299 /*
4300 * Build the virtual tuple.
4301 */
4302 ExecStoreVirtualTuple(resultSlot);
4303
4304 /*
4305 * If we have any system columns to return, materialize a heap tuple in
4306 * the slot from column values set above and install system columns in
4307 * that tuple.
4308 */
4309 if (dmstate->hasSystemCols)
4310 {
4311 HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4312
4313 /* ctid */
4314 if (dmstate->ctidAttno)
4315 {
4316 ItemPointer ctid = NULL;
4317
4318 ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4319 resultTup->t_self = *ctid;
4320 }
4321
4322 /*
4323 * And remaining columns
4324 *
4325 * Note: since we currently don't allow the target relation to appear
4326 * on the nullable side of an outer join, any system columns wouldn't
4327 * go to NULL.
4328 *
4329 * Note: no need to care about tableoid here because it will be
4330 * initialized in ExecProcessReturning().
4331 */
4332 HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
4333 HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
4334 HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
4335 }
4336
4337 /*
4338 * And return the result tuple.
4339 */
4340 return resultSlot;
4341 }
4342
4343 /*
4344 * Prepare for processing of parameters used in remote query.
4345 */
4346 static void
4347 prepare_query_params(PlanState *node,
4348 List *fdw_exprs,
4349 int numParams,
4350 FmgrInfo **param_flinfo,
4351 List **param_exprs,
4352 const char ***param_values)
4353 {
4354 int i;
4355 ListCell *lc;
4356
4357 Assert(numParams > 0);
4358
4359 /* Prepare for output conversion of parameters used in remote query. */
4360 *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4361
4362 i = 0;
4363 foreach(lc, fdw_exprs)
4364 {
4365 Node *param_expr = (Node *) lfirst(lc);
4366 Oid typefnoid;
4367 bool isvarlena;
4368
4369 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4370 fmgr_info(typefnoid, &(*param_flinfo)[i]);
4371 i++;
4372 }
4373
4374 /*
4375 * Prepare remote-parameter expressions for evaluation. (Note: in
4376 * practice, we expect that all these expressions will be just Params, so
4377 * we could possibly do something more efficient than using the full
4378 * expression-eval machinery for this. But probably there would be little
4379 * benefit, and it'd require postgres_fdw to know more than is desirable
4380 * about Param evaluation.)
4381 */
4382 *param_exprs = ExecInitExprList(fdw_exprs, node);
4383
4384 /* Allocate buffer for text form of query parameters. */
4385 *param_values = (const char **) palloc0(numParams * sizeof(char *));
4386 }
4387
4388 /*
4389 * Construct array of query parameter values in text format.
4390 */
4391 static void
4392 process_query_params(ExprContext *econtext,
4393 FmgrInfo *param_flinfo,
4394 List *param_exprs,
4395 const char **param_values)
4396 {
4397 int nestlevel;
4398 int i;
4399 ListCell *lc;
4400
4401 nestlevel = set_transmission_modes();
4402
4403 i = 0;
4404 foreach(lc, param_exprs)
4405 {
4406 ExprState *expr_state = (ExprState *) lfirst(lc);
4407 Datum expr_value;
4408 bool isNull;
4409
4410 /* Evaluate the parameter expression */
4411 expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4412
4413 /*
4414 * Get string representation of each parameter value by invoking
4415 * type-specific output function, unless the value is null.
4416 */
4417 if (isNull)
4418 param_values[i] = NULL;
4419 else
4420 param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value);
4421
4422 i++;
4423 }
4424
4425 reset_transmission_modes(nestlevel);
4426 }
4427
4428 /*
4429 * postgresAnalyzeForeignTable
4430 * Test whether analyzing this foreign table is supported
4431 */
4432 static bool
4433 postgresAnalyzeForeignTable(Relation relation,
4434 AcquireSampleRowsFunc *func,
4435 BlockNumber *totalpages)
4436 {
4437 ForeignTable *table;
4438 UserMapping *user;
4439 PGconn *conn;
4440 StringInfoData sql;
4441 PGresult *volatile res = NULL;
4442
4443 /* Return the row-analysis function pointer */
4444 *func = postgresAcquireSampleRowsFunc;
4445
4446 /*
4447 * Now we have to get the number of pages. It's annoying that the ANALYZE
4448 * API requires us to return that now, because it forces some duplication
4449 * of effort between this routine and postgresAcquireSampleRowsFunc. But
4450 * it's probably not worth redefining that API at this point.
4451 */
4452
4453 /*
4454 * Get the connection to use. We do the remote access as the table's
4455 * owner, even if the ANALYZE was started by some other user.
4456 */
4457 table = GetForeignTable(RelationGetRelid(relation));
4458 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4459 conn = GetConnection(user, false);
4460
4461 /*
4462 * Construct command to get page count for relation.
4463 */
4464 initStringInfo(&sql);
4465 deparseAnalyzeSizeSql(&sql, relation);
4466
4467 /* In what follows, do not risk leaking any PGresults. */
4468 PG_TRY();
4469 {
4470 res = pgfdw_exec_query(conn, sql.data);
4471 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4472 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4473
4474 if (PQntuples(res) != 1 || PQnfields(res) != 1)
4475 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4476 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4477 }
4478 PG_FINALLY();
4479 {
4480 if (res)
4481 PQclear(res);
4482 }
4483 PG_END_TRY();
4484
4485 ReleaseConnection(conn);
4486
4487 return true;
4488 }
4489
4490 /*
4491 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4492 *
4493 * We fetch the whole table from the remote side and pick out some sample rows.
4494 *
4495 * Selected rows are returned in the caller-allocated array rows[],
4496 * which must have at least targrows entries.
4497 * The actual number of rows selected is returned as the function result.
4498 * We also count the total number of rows in the table and return it into
4499 * *totalrows. Note that *totaldeadrows is always set to 0.
4500 *
4501 * Note that the returned list of rows is not always in order by physical
4502 * position in the table. Therefore, correlation estimates derived later
4503 * may be meaningless, but it's OK because we don't use the estimates
4504 * currently (the planner only pays attention to correlation for indexscans).
4505 */
4506 static int
4507 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
4508 HeapTuple *rows, int targrows,
4509 double *totalrows,
4510 double *totaldeadrows)
4511 {
4512 PgFdwAnalyzeState astate;
4513 ForeignTable *table;
4514 ForeignServer *server;
4515 UserMapping *user;
4516 PGconn *conn;
4517 unsigned int cursor_number;
4518 StringInfoData sql;
4519 PGresult *volatile res = NULL;
4520
4521 /* Initialize workspace state */
4522 astate.rel = relation;
4523 astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
4524
4525 astate.rows = rows;
4526 astate.targrows = targrows;
4527 astate.numrows = 0;
4528 astate.samplerows = 0;
4529 astate.rowstoskip = -1; /* -1 means not set yet */
4530 reservoir_init_selection_state(&astate.rstate, targrows);
4531
4532 /* Remember ANALYZE context, and create a per-tuple temp context */
4533 astate.anl_cxt = CurrentMemoryContext;
4534 astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
4535 "postgres_fdw temporary data",
4536 ALLOCSET_SMALL_SIZES);
4537
4538 /*
4539 * Get the connection to use. We do the remote access as the table's
4540 * owner, even if the ANALYZE was started by some other user.
4541 */
4542 table = GetForeignTable(RelationGetRelid(relation));
4543 server = GetForeignServer(table->serverid);
4544 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4545 conn = GetConnection(user, false);
4546
4547 /*
4548 * Construct cursor that retrieves whole rows from remote.
4549 */
4550 cursor_number = GetCursorNumber(conn);
4551 initStringInfo(&sql);
4552 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
4553 deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
4554
4555 /* In what follows, do not risk leaking any PGresults. */
4556 PG_TRY();
4557 {
4558 char fetch_sql[64];
4559 int fetch_size;
4560 ListCell *lc;
4561
4562 res = pgfdw_exec_query(conn, sql.data);
4563 if (PQresultStatus(res) != PGRES_COMMAND_OK)
4564 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4565 PQclear(res);
4566 res = NULL;
4567
4568 /*
4569 * Determine the fetch size. The default is arbitrary, but shouldn't
4570 * be enormous.
4571 */
4572 fetch_size = 100;
4573 foreach(lc, server->options)
4574 {
4575 DefElem *def = (DefElem *) lfirst(lc);
4576
4577 if (strcmp(def->defname, "fetch_size") == 0)
4578 {
4579 fetch_size = strtol(defGetString(def), NULL, 10);
4580 break;
4581 }
4582 }
4583 foreach(lc, table->options)
4584 {
4585 DefElem *def = (DefElem *) lfirst(lc);
4586
4587 if (strcmp(def->defname, "fetch_size") == 0)
4588 {
4589 fetch_size = strtol(defGetString(def), NULL, 10);
4590 break;
4591 }
4592 }
4593
4594 /* Construct command to fetch rows from remote. */
4595 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
4596 fetch_size, cursor_number);
4597
4598 /* Retrieve and process rows a batch at a time. */
4599 for (;;)
4600 {
4601 int numrows;
4602 int i;
4603
4604 /* Allow users to cancel long query */
4605 CHECK_FOR_INTERRUPTS();
4606
4607 /*
4608 * XXX possible future improvement: if rowstoskip is large, we
4609 * could issue a MOVE rather than physically fetching the rows,
4610 * then just adjust rowstoskip and samplerows appropriately.
4611 */
4612
4613 /* Fetch some rows */
4614 res = pgfdw_exec_query(conn, fetch_sql);
4615 /* On error, report the original query, not the FETCH. */
4616 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4617 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4618
4619 /* Process whatever we got. */
4620 numrows = PQntuples(res);
4621 for (i = 0; i < numrows; i++)
4622 analyze_row_processor(res, i, &astate);
4623
4624 PQclear(res);
4625 res = NULL;
4626
4627 /* Must be EOF if we didn't get all the rows requested. */
4628 if (numrows < fetch_size)
4629 break;
4630 }
4631
4632 /* Close the cursor, just to be tidy. */
4633 close_cursor(conn, cursor_number);
4634 }
4635 PG_CATCH();
4636 {
4637 if (res)
4638 PQclear(res);
4639 PG_RE_THROW();
4640 }
4641 PG_END_TRY();
4642
4643 ReleaseConnection(conn);
4644
4645 /* We assume that we have no dead tuple. */
4646 *totaldeadrows = 0.0;
4647
4648 /* We've retrieved all living tuples from foreign server. */
4649 *totalrows = astate.samplerows;
4650
4651 /*
4652 * Emit some interesting relation info
4653 */
4654 ereport(elevel,
4655 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
4656 RelationGetRelationName(relation),
4657 astate.samplerows, astate.numrows)));
4658
4659 return astate.numrows;
4660 }
4661
4662 /*
4663 * Collect sample rows from the result of query.
4664 * - Use all tuples in sample until target # of samples are collected.
4665 * - Subsequently, replace already-sampled tuples randomly.
4666 */
4667 static void
4668 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
4669 {
4670 int targrows = astate->targrows;
4671 int pos; /* array index to store tuple in */
4672 MemoryContext oldcontext;
4673
4674 /* Always increment sample row counter. */
4675 astate->samplerows += 1;
4676
4677 /*
4678 * Determine the slot where this sample row should be stored. Set pos to
4679 * negative value to indicate the row should be skipped.
4680 */
4681 if (astate->numrows < targrows)
4682 {
4683 /* First targrows rows are always included into the sample */
4684 pos = astate->numrows++;
4685 }
4686 else
4687 {
4688 /*
4689 * Now we start replacing tuples in the sample until we reach the end
4690 * of the relation. Same algorithm as in acquire_sample_rows in
4691 * analyze.c; see Jeff Vitter's paper.
4692 */
4693 if (astate->rowstoskip < 0)
4694 astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
4695
4696 if (astate->rowstoskip <= 0)
4697 {
4698 /* Choose a random reservoir element to replace. */
4699 pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
4700 Assert(pos >= 0 && pos < targrows);
4701 heap_freetuple(astate->rows[pos]);
4702 }
4703 else
4704 {
4705 /* Skip this tuple. */
4706 pos = -1;
4707 }
4708
4709 astate->rowstoskip -= 1;
4710 }
4711
4712 if (pos >= 0)
4713 {
4714 /*
4715 * Create sample tuple from current result row, and store it in the
4716 * position determined above. The tuple has to be created in anl_cxt.
4717 */
4718 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
4719
4720 astate->rows[pos] = make_tuple_from_result_row(res, row,
4721 astate->rel,
4722 astate->attinmeta,
4723 astate->retrieved_attrs,
4724 NULL,
4725 astate->temp_cxt);
4726
4727 MemoryContextSwitchTo(oldcontext);
4728 }
4729 }
4730
4731 /*
4732 * Import a foreign schema
4733 */
4734 static List *
4735 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
4736 {
4737 List *commands = NIL;
4738 bool import_collate = true;
4739 bool import_default = false;
4740 bool import_generated = true;
4741 bool import_not_null = true;
4742 ForeignServer *server;
4743 UserMapping *mapping;
4744 PGconn *conn;
4745 StringInfoData buf;
4746 PGresult *volatile res = NULL;
4747 int numrows,
4748 i;
4749 ListCell *lc;
4750
4751 /* Parse statement options */
4752 foreach(lc, stmt->options)
4753 {
4754 DefElem *def = (DefElem *) lfirst(lc);
4755
4756 if (strcmp(def->defname, "import_collate") == 0)
4757 import_collate = defGetBoolean(def);
4758 else if (strcmp(def->defname, "import_default") == 0)
4759 import_default = defGetBoolean(def);
4760 else if (strcmp(def->defname, "import_generated") == 0)
4761 import_generated = defGetBoolean(def);
4762 else if (strcmp(def->defname, "import_not_null") == 0)
4763 import_not_null = defGetBoolean(def);
4764 else
4765 ereport(ERROR,
4766 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
4767 errmsg("invalid option \"%s\"", def->defname)));
4768 }
4769
4770 /*
4771 * Get connection to the foreign server. Connection manager will
4772 * establish new connection if necessary.
4773 */
4774 server = GetForeignServer(serverOid);
4775 mapping = GetUserMapping(GetUserId(), server->serverid);
4776 conn = GetConnection(mapping, false);
4777
4778 /* Don't attempt to import collation if remote server hasn't got it */
4779 if (PQserverVersion(conn) < 90100)
4780 import_collate = false;
4781
4782 /* Create workspace for strings */
4783 initStringInfo(&buf);
4784
4785 /* In what follows, do not risk leaking any PGresults. */
4786 PG_TRY();
4787 {
4788 /* Check that the schema really exists */
4789 appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
4790 deparseStringLiteral(&buf, stmt->remote_schema);
4791
4792 res = pgfdw_exec_query(conn, buf.data);
4793 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4794 pgfdw_report_error(ERROR, res, conn, false, buf.data);
4795
4796 if (PQntuples(res) != 1)
4797 ereport(ERROR,
4798 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
4799 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
4800 stmt->remote_schema, server->servername)));
4801
4802 PQclear(res);
4803 res = NULL;
4804 resetStringInfo(&buf);
4805
4806 /*
4807 * Fetch all table data from this schema, possibly restricted by
4808 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
4809 * to EXCEPT/LIMIT TO here, because the core code will filter the
4810 * statements we return according to those lists anyway. But it
4811 * should save a few cycles to not process excluded tables in the
4812 * first place.)
4813 *
4814 * Ignore table data for partitions and only include the definitions
4815 * of the root partitioned tables to allow access to the complete
4816 * remote data set locally in the schema imported.
4817 *
4818 * Note: because we run the connection with search_path restricted to
4819 * pg_catalog, the format_type() and pg_get_expr() outputs will always
4820 * include a schema name for types/functions in other schemas, which
4821 * is what we want.
4822 */
4823 appendStringInfoString(&buf,
4824 "SELECT relname, "
4825 " attname, "
4826 " format_type(atttypid, atttypmod), "
4827 " attnotnull, ");
4828
4829 /* Generated columns are supported since Postgres 12 */
4830 if (PQserverVersion(conn) >= 120000)
4831 appendStringInfoString(&buf,
4832 " attgenerated, "
4833 " pg_get_expr(adbin, adrelid), ");
4834 else
4835 appendStringInfoString(&buf,
4836 " NULL, "
4837 " pg_get_expr(adbin, adrelid), ");
4838
4839 if (import_collate)
4840 appendStringInfoString(&buf,
4841 " collname, "
4842 " collnsp.nspname "
4843 "FROM pg_class c "
4844 " JOIN pg_namespace n ON "
4845 " relnamespace = n.oid "
4846 " LEFT JOIN pg_attribute a ON "
4847 " attrelid = c.oid AND attnum > 0 "
4848 " AND NOT attisdropped "
4849 " LEFT JOIN pg_attrdef ad ON "
4850 " adrelid = c.oid AND adnum = attnum "
4851 " LEFT JOIN pg_collation coll ON "
4852 " coll.oid = attcollation "
4853 " LEFT JOIN pg_namespace collnsp ON "
4854 " collnsp.oid = collnamespace ");
4855 else
4856 appendStringInfoString(&buf,
4857 " NULL, NULL "
4858 "FROM pg_class c "
4859 " JOIN pg_namespace n ON "
4860 " relnamespace = n.oid "
4861 " LEFT JOIN pg_attribute a ON "
4862 " attrelid = c.oid AND attnum > 0 "
4863 " AND NOT attisdropped "
4864 " LEFT JOIN pg_attrdef ad ON "
4865 " adrelid = c.oid AND adnum = attnum ");
4866
4867 appendStringInfoString(&buf,
4868 "WHERE c.relkind IN ("
4869 CppAsString2(RELKIND_RELATION) ","
4870 CppAsString2(RELKIND_VIEW) ","
4871 CppAsString2(RELKIND_FOREIGN_TABLE) ","
4872 CppAsString2(RELKIND_MATVIEW) ","
4873 CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
4874 " AND n.nspname = ");
4875 deparseStringLiteral(&buf, stmt->remote_schema);
4876
4877 /* Partitions are supported since Postgres 10 */
4878 if (PQserverVersion(conn) >= 100000)
4879 appendStringInfoString(&buf, " AND NOT c.relispartition ");
4880
4881 /* Apply restrictions for LIMIT TO and EXCEPT */
4882 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
4883 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4884 {
4885 bool first_item = true;
4886
4887 appendStringInfoString(&buf, " AND c.relname ");
4888 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4889 appendStringInfoString(&buf, "NOT ");
4890 appendStringInfoString(&buf, "IN (");
4891
4892 /* Append list of table names within IN clause */
4893 foreach(lc, stmt->table_list)
4894 {
4895 RangeVar *rv = (RangeVar *) lfirst(lc);
4896
4897 if (first_item)
4898 first_item = false;
4899 else
4900 appendStringInfoString(&buf, ", ");
4901 deparseStringLiteral(&buf, rv->relname);
4902 }
4903 appendStringInfoChar(&buf, ')');
4904 }
4905
4906 /* Append ORDER BY at the end of query to ensure output ordering */
4907 appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
4908
4909 /* Fetch the data */
4910 res = pgfdw_exec_query(conn, buf.data);
4911 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4912 pgfdw_report_error(ERROR, res, conn, false, buf.data);
4913
4914 /* Process results */
4915 numrows = PQntuples(res);
4916 /* note: incrementation of i happens in inner loop's while() test */
4917 for (i = 0; i < numrows;)
4918 {
4919 char *tablename = PQgetvalue(res, i, 0);
4920 bool first_item = true;
4921
4922 resetStringInfo(&buf);
4923 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
4924 quote_identifier(tablename));
4925
4926 /* Scan all rows for this table */
4927 do
4928 {
4929 char *attname;
4930 char *typename;
4931 char *attnotnull;
4932 char *attgenerated;
4933 char *attdefault;
4934 char *collname;
4935 char *collnamespace;
4936
4937 /* If table has no columns, we'll see nulls here */
4938 if (PQgetisnull(res, i, 1))
4939 continue;
4940
4941 attname = PQgetvalue(res, i, 1);
4942 typename = PQgetvalue(res, i, 2);
4943 attnotnull = PQgetvalue(res, i, 3);
4944 attgenerated = PQgetisnull(res, i, 4) ? (char *) NULL :
4945 PQgetvalue(res, i, 4);
4946 attdefault = PQgetisnull(res, i, 5) ? (char *) NULL :
4947 PQgetvalue(res, i, 5);
4948 collname = PQgetisnull(res, i, 6) ? (char *) NULL :
4949 PQgetvalue(res, i, 6);
4950 collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL :
4951 PQgetvalue(res, i, 7);
4952
4953 if (first_item)
4954 first_item = false;
4955 else
4956 appendStringInfoString(&buf, ",\n");
4957
4958 /* Print column name and type */
4959 appendStringInfo(&buf, " %s %s",
4960 quote_identifier(attname),
4961 typename);
4962
4963 /*
4964 * Add column_name option so that renaming the foreign table's
4965 * column doesn't break the association to the underlying
4966 * column.
4967 */
4968 appendStringInfoString(&buf, " OPTIONS (column_name ");
4969 deparseStringLiteral(&buf, attname);
4970 appendStringInfoChar(&buf, ')');
4971
4972 /* Add COLLATE if needed */
4973 if (import_collate && collname != NULL && collnamespace != NULL)
4974 appendStringInfo(&buf, " COLLATE %s.%s",
4975 quote_identifier(collnamespace),
4976 quote_identifier(collname));
4977
4978 /* Add DEFAULT if needed */
4979 if (import_default && attdefault != NULL &&
4980 (!attgenerated || !attgenerated[0]))
4981 appendStringInfo(&buf, " DEFAULT %s", attdefault);
4982
4983 /* Add GENERATED if needed */
4984 if (import_generated && attgenerated != NULL &&
4985 attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
4986 {
4987 Assert(attdefault != NULL);
4988 appendStringInfo(&buf,
4989 " GENERATED ALWAYS AS (%s) STORED",
4990 attdefault);
4991 }
4992
4993 /* Add NOT NULL if needed */
4994 if (import_not_null && attnotnull[0] == 't')
4995 appendStringInfoString(&buf, " NOT NULL");
4996 }
4997 while (++i < numrows &&
4998 strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4999
5000 /*
5001 * Add server name and table-level options. We specify remote
5002 * schema and table name as options (the latter to ensure that
5003 * renaming the foreign table doesn't break the association).
5004 */
5005 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
5006 quote_identifier(server->servername));
5007
5008 appendStringInfoString(&buf, "schema_name ");
5009 deparseStringLiteral(&buf, stmt->remote_schema);
5010 appendStringInfoString(&buf, ", table_name ");
5011 deparseStringLiteral(&buf, tablename);
5012
5013 appendStringInfoString(&buf, ");");
5014
5015 commands = lappend(commands, pstrdup(buf.data));
5016 }
5017 }
5018 PG_FINALLY();
5019 {
5020 if (res)
5021 PQclear(res);
5022 }
5023 PG_END_TRY();
5024
5025 ReleaseConnection(conn);
5026
5027 return commands;
5028 }
5029
5030 /*
5031 * Assess whether the join between inner and outer relations can be pushed down
5032 * to the foreign server. As a side effect, save information we obtain in this
5033 * function to PgFdwRelationInfo passed in.
5034 */
5035 static bool
5036 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
5037 RelOptInfo *outerrel, RelOptInfo *innerrel,
5038 JoinPathExtraData *extra)
5039 {
5040 PgFdwRelationInfo *fpinfo;
5041 PgFdwRelationInfo *fpinfo_o;
5042 PgFdwRelationInfo *fpinfo_i;
5043 ListCell *lc;
5044 List *joinclauses;
5045
5046 /*
5047 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
5048 * Constructing queries representing SEMI and ANTI joins is hard, hence
5049 * not considered right now.
5050 */
5051 if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
5052 jointype != JOIN_RIGHT && jointype != JOIN_FULL)
5053 return false;
5054
5055 /*
5056 * If either of the joining relations is marked as unsafe to pushdown, the
5057 * join can not be pushed down.
5058 */
5059 fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
5060 fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
5061 fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
5062 if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
5063 !fpinfo_i || !fpinfo_i->pushdown_safe)
5064 return false;
5065
5066 /*
5067 * If joining relations have local conditions, those conditions are
5068 * required to be applied before joining the relations. Hence the join can
5069 * not be pushed down.
5070 */
5071 if (fpinfo_o->local_conds || fpinfo_i->local_conds)
5072 return false;
5073
5074 /*
5075 * Merge FDW options. We might be tempted to do this after we have deemed
5076 * the foreign join to be OK. But we must do this beforehand so that we
5077 * know which quals can be evaluated on the foreign server, which might
5078 * depend on shippable_extensions.
5079 */
5080 fpinfo->server = fpinfo_o->server;
5081 merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
5082
5083 /*
5084 * Separate restrict list into join quals and pushed-down (other) quals.
5085 *
5086 * Join quals belonging to an outer join must all be shippable, else we
5087 * cannot execute the join remotely. Add such quals to 'joinclauses'.
5088 *
5089 * Add other quals to fpinfo->remote_conds if they are shippable, else to
5090 * fpinfo->local_conds. In an inner join it's okay to execute conditions
5091 * either locally or remotely; the same is true for pushed-down conditions
5092 * at an outer join.
5093 *
5094 * Note we might return failure after having already scribbled on
5095 * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
5096 * won't consult those lists again if we deem the join unshippable.
5097 */
5098 joinclauses = NIL;
5099 foreach(lc, extra->restrictlist)
5100 {
5101 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5102 bool is_remote_clause = is_foreign_expr(root, joinrel,
5103 rinfo->clause);
5104
5105 if (IS_OUTER_JOIN(jointype) &&
5106 !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
5107 {
5108 if (!is_remote_clause)
5109 return false;
5110 joinclauses = lappend(joinclauses, rinfo);
5111 }
5112 else
5113 {
5114 if (is_remote_clause)
5115 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5116 else
5117 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5118 }
5119 }
5120
5121 /*
5122 * deparseExplicitTargetList() isn't smart enough to handle anything other
5123 * than a Var. In particular, if there's some PlaceHolderVar that would
5124 * need to be evaluated within this join tree (because there's an upper
5125 * reference to a quantity that may go to NULL as a result of an outer
5126 * join), then we can't try to push the join down because we'll fail when
5127 * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
5128 * needs to be evaluated *at the top* of this join tree is OK, because we
5129 * can do that locally after fetching the results from the remote side.
5130 */
5131 foreach(lc, root->placeholder_list)
5132 {
5133 PlaceHolderInfo *phinfo = lfirst(lc);
5134 Relids relids;
5135
5136 /* PlaceHolderInfo refers to parent relids, not child relids. */
5137 relids = IS_OTHER_REL(joinrel) ?
5138 joinrel->top_parent_relids : joinrel->relids;
5139
5140 if (bms_is_subset(phinfo->ph_eval_at, relids) &&
5141 bms_nonempty_difference(relids, phinfo->ph_eval_at))
5142 return false;
5143 }
5144
5145 /* Save the join clauses, for later use. */
5146 fpinfo->joinclauses = joinclauses;
5147
5148 fpinfo->outerrel = outerrel;
5149 fpinfo->innerrel = innerrel;
5150 fpinfo->jointype = jointype;
5151
5152 /*
5153 * By default, both the input relations are not required to be deparsed as
5154 * subqueries, but there might be some relations covered by the input
5155 * relations that are required to be deparsed as subqueries, so save the
5156 * relids of those relations for later use by the deparser.
5157 */
5158 fpinfo->make_outerrel_subquery = false;
5159 fpinfo->make_innerrel_subquery = false;
5160 Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
5161 Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
5162 fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
5163 fpinfo_i->lower_subquery_rels);
5164
5165 /*
5166 * Pull the other remote conditions from the joining relations into join
5167 * clauses or other remote clauses (remote_conds) of this relation
5168 * wherever possible. This avoids building subqueries at every join step.
5169 *
5170 * For an inner join, clauses from both the relations are added to the
5171 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
5172 * the outer side are added to remote_conds since those can be evaluated
5173 * after the join is evaluated. The clauses from inner side are added to
5174 * the joinclauses, since they need to be evaluated while constructing the
5175 * join.
5176 *
5177 * For a FULL OUTER JOIN, the other clauses from either relation can not
5178 * be added to the joinclauses or remote_conds, since each relation acts
5179 * as an outer relation for the other.
5180 *
5181 * The joining sides can not have local conditions, thus no need to test
5182 * shippability of the clauses being pulled up.
5183 */
5184 switch (jointype)
5185 {
5186 case JOIN_INNER:
5187 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5188 fpinfo_i->remote_conds);
5189 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5190 fpinfo_o->remote_conds);
5191 break;
5192
5193 case JOIN_LEFT:
5194 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5195 fpinfo_i->remote_conds);
5196 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5197 fpinfo_o->remote_conds);
5198 break;
5199
5200 case JOIN_RIGHT:
5201 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5202 fpinfo_o->remote_conds);
5203 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5204 fpinfo_i->remote_conds);
5205 break;
5206
5207 case JOIN_FULL:
5208
5209 /*
5210 * In this case, if any of the input relations has conditions, we
5211 * need to deparse that relation as a subquery so that the
5212 * conditions can be evaluated before the join. Remember it in
5213 * the fpinfo of this relation so that the deparser can take
5214 * appropriate action. Also, save the relids of base relations
5215 * covered by that relation for later use by the deparser.
5216 */
5217 if (fpinfo_o->remote_conds)
5218 {
5219 fpinfo->make_outerrel_subquery = true;
5220 fpinfo->lower_subquery_rels =
5221 bms_add_members(fpinfo->lower_subquery_rels,
5222 outerrel->relids);
5223 }
5224 if (fpinfo_i->remote_conds)
5225 {
5226 fpinfo->make_innerrel_subquery = true;
5227 fpinfo->lower_subquery_rels =
5228 bms_add_members(fpinfo->lower_subquery_rels,
5229 innerrel->relids);
5230 }
5231 break;
5232
5233 default:
5234 /* Should not happen, we have just checked this above */
5235 elog(ERROR, "unsupported join type %d", jointype);
5236 }
5237
5238 /*
5239 * For an inner join, all restrictions can be treated alike. Treating the
5240 * pushed down conditions as join conditions allows a top level full outer
5241 * join to be deparsed without requiring subqueries.
5242 */
5243 if (jointype == JOIN_INNER)
5244 {
5245 Assert(!fpinfo->joinclauses);
5246 fpinfo->joinclauses = fpinfo->remote_conds;
5247 fpinfo->remote_conds = NIL;
5248 }
5249
5250 /* Mark that this join can be pushed down safely */
5251 fpinfo->pushdown_safe = true;
5252
5253 /* Get user mapping */
5254 if (fpinfo->use_remote_estimate)
5255 {
5256 if (fpinfo_o->use_remote_estimate)
5257 fpinfo->user = fpinfo_o->user;
5258 else
5259 fpinfo->user = fpinfo_i->user;
5260 }
5261 else
5262 fpinfo->user = NULL;
5263
5264 /*
5265 * Set # of retrieved rows and cached relation costs to some negative
5266 * value, so that we can detect when they are set to some sensible values,
5267 * during one (usually the first) of the calls to estimate_path_cost_size.
5268 */
5269 fpinfo->retrieved_rows = -1;
5270 fpinfo->rel_startup_cost = -1;
5271 fpinfo->rel_total_cost = -1;
5272
5273 /*
5274 * Set the string describing this join relation to be used in EXPLAIN
5275 * output of corresponding ForeignScan. Note that the decoration we add
5276 * to the base relation names mustn't include any digits, or it'll confuse
5277 * postgresExplainForeignScan.
5278 */
5279 fpinfo->relation_name = psprintf("(%s) %s JOIN (%s)",
5280 fpinfo_o->relation_name,
5281 get_jointype_name(fpinfo->jointype),
5282 fpinfo_i->relation_name);
5283
5284 /*
5285 * Set the relation index. This is defined as the position of this
5286 * joinrel in the join_rel_list list plus the length of the rtable list.
5287 * Note that since this joinrel is at the end of the join_rel_list list
5288 * when we are called, we can get the position by list_length.
5289 */
5290 Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
5291 fpinfo->relation_index =
5292 list_length(root->parse->rtable) + list_length(root->join_rel_list);
5293
5294 return true;
5295 }
5296
5297 static void
5298 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
5299 Path *epq_path)
5300 {
5301 List *useful_pathkeys_list = NIL; /* List of all pathkeys */
5302 ListCell *lc;
5303
5304 useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
5305
5306 /* Create one path for each set of pathkeys we found above. */
5307 foreach(lc, useful_pathkeys_list)
5308 {
5309 double rows;
5310 int width;
5311 Cost startup_cost;
5312 Cost total_cost;
5313 List *useful_pathkeys = lfirst(lc);
5314 Path *sorted_epq_path;
5315
5316 estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL,
5317 &rows, &width, &startup_cost, &total_cost);
5318
5319 /*
5320 * The EPQ path must be at least as well sorted as the path itself, in
5321 * case it gets used as input to a mergejoin.
5322 */
5323 sorted_epq_path = epq_path;
5324 if (sorted_epq_path != NULL &&
5325 !pathkeys_contained_in(useful_pathkeys,
5326 sorted_epq_path->pathkeys))
5327 sorted_epq_path = (Path *)
5328 create_sort_path(root,
5329 rel,
5330 sorted_epq_path,
5331 useful_pathkeys,
5332 -1.0);
5333
5334 if (IS_SIMPLE_REL(rel))
5335 add_path(rel, (Path *)
5336 create_foreignscan_path(root, rel,
5337 NULL,
5338 rows,
5339 startup_cost,
5340 total_cost,
5341 useful_pathkeys,
5342 rel->lateral_relids,
5343 sorted_epq_path,
5344 NIL));
5345 else
5346 add_path(rel, (Path *)
5347 create_foreign_join_path(root, rel,
5348 NULL,
5349 rows,
5350 startup_cost,
5351 total_cost,
5352 useful_pathkeys,
5353 rel->lateral_relids,
5354 sorted_epq_path,
5355 NIL));
5356 }
5357 }
5358
5359 /*
5360 * Parse options from foreign server and apply them to fpinfo.
5361 *
5362 * New options might also require tweaking merge_fdw_options().
5363 */
5364 static void
5365 apply_server_options(PgFdwRelationInfo *fpinfo)
5366 {
5367 ListCell *lc;
5368
5369 foreach(lc, fpinfo->server->options)
5370 {
5371 DefElem *def = (DefElem *) lfirst(lc);
5372
5373 if (strcmp(def->defname, "use_remote_estimate") == 0)
5374 fpinfo->use_remote_estimate = defGetBoolean(def);
5375 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
5376 fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
5377 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
5378 fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
5379 else if (strcmp(def->defname, "extensions") == 0)
5380 fpinfo->shippable_extensions =
5381 ExtractExtensionList(defGetString(def), false);
5382 else if (strcmp(def->defname, "fetch_size") == 0)
5383 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5384 }
5385 }
5386
5387 /*
5388 * Parse options from foreign table and apply them to fpinfo.
5389 *
5390 * New options might also require tweaking merge_fdw_options().
5391 */
5392 static void
5393 apply_table_options(PgFdwRelationInfo *fpinfo)
5394 {
5395 ListCell *lc;
5396
5397 foreach(lc, fpinfo->table->options)
5398 {
5399 DefElem *def = (DefElem *) lfirst(lc);
5400
5401 if (strcmp(def->defname, "use_remote_estimate") == 0)
5402 fpinfo->use_remote_estimate = defGetBoolean(def);
5403 else if (strcmp(def->defname, "fetch_size") == 0)
5404 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5405 }
5406 }
5407
5408 /*
5409 * Merge FDW options from input relations into a new set of options for a join
5410 * or an upper rel.
5411 *
5412 * For a join relation, FDW-specific information about the inner and outer
5413 * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
5414 * fpinfo_o provides the information for the input relation; fpinfo_i is
5415 * expected to NULL.
5416 */
5417 static void
5418 merge_fdw_options(PgFdwRelationInfo *fpinfo,
5419 const PgFdwRelationInfo *fpinfo_o,
5420 const PgFdwRelationInfo *fpinfo_i)
5421 {
5422 /* We must always have fpinfo_o. */
5423 Assert(fpinfo_o);
5424
5425 /* fpinfo_i may be NULL, but if present the servers must both match. */
5426 Assert(!fpinfo_i ||
5427 fpinfo_i->server->serverid == fpinfo_o->server->serverid);
5428
5429 /*
5430 * Copy the server specific FDW options. (For a join, both relations come
5431 * from the same server, so the server options should have the same value
5432 * for both relations.)
5433 */
5434 fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
5435 fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
5436 fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
5437 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
5438 fpinfo->fetch_size = fpinfo_o->fetch_size;
5439
5440 /* Merge the table level options from either side of the join. */
5441 if (fpinfo_i)
5442 {
5443 /*
5444 * We'll prefer to use remote estimates for this join if any table
5445 * from either side of the join is using remote estimates. This is
5446 * most likely going to be preferred since they're already willing to
5447 * pay the price of a round trip to get the remote EXPLAIN. In any
5448 * case it's not entirely clear how we might otherwise handle this
5449 * best.
5450 */
5451 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
5452 fpinfo_i->use_remote_estimate;
5453
5454 /*
5455 * Set fetch size to maximum of the joining sides, since we are
5456 * expecting the rows returned by the join to be proportional to the
5457 * relation sizes.
5458 */
5459 fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
5460 }
5461 }
5462
5463 /*
5464 * postgresGetForeignJoinPaths
5465 * Add possible ForeignPath to joinrel, if join is safe to push down.
5466 */
5467 static void
5468 postgresGetForeignJoinPaths(PlannerInfo *root,
5469 RelOptInfo *joinrel,
5470 RelOptInfo *outerrel,
5471 RelOptInfo *innerrel,
5472 JoinType jointype,
5473 JoinPathExtraData *extra)
5474 {
5475 PgFdwRelationInfo *fpinfo;
5476 ForeignPath *joinpath;
5477 double rows;
5478 int width;
5479 Cost startup_cost;
5480 Cost total_cost;
5481 Path *epq_path; /* Path to create plan to be executed when
5482 * EvalPlanQual gets triggered. */
5483
5484 /*
5485 * Skip if this join combination has been considered already.
5486 */
5487 if (joinrel->fdw_private)
5488 return;
5489
5490 /*
5491 * This code does not work for joins with lateral references, since those
5492 * must have parameterized paths, which we don't generate yet.
5493 */
5494 if (!bms_is_empty(joinrel->lateral_relids))
5495 return;
5496
5497 /*
5498 * Create unfinished PgFdwRelationInfo entry which is used to indicate
5499 * that the join relation is already considered, so that we won't waste
5500 * time in judging safety of join pushdown and adding the same paths again
5501 * if found safe. Once we know that this join can be pushed down, we fill
5502 * the entry.
5503 */
5504 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5505 fpinfo->pushdown_safe = false;
5506 joinrel->fdw_private = fpinfo;
5507 /* attrs_used is only for base relations. */
5508 fpinfo->attrs_used = NULL;
5509
5510 /*
5511 * If there is a possibility that EvalPlanQual will be executed, we need
5512 * to be able to reconstruct the row using scans of the base relations.
5513 * GetExistingLocalJoinPath will find a suitable path for this purpose in
5514 * the path list of the joinrel, if one exists. We must be careful to
5515 * call it before adding any ForeignPath, since the ForeignPath might
5516 * dominate the only suitable local path available. We also do it before
5517 * calling foreign_join_ok(), since that function updates fpinfo and marks
5518 * it as pushable if the join is found to be pushable.
5519 */
5520 if (root->parse->commandType == CMD_DELETE ||
5521 root->parse->commandType == CMD_UPDATE ||
5522 root->rowMarks)
5523 {
5524 epq_path = GetExistingLocalJoinPath(joinrel);
5525 if (!epq_path)
5526 {
5527 elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
5528 return;
5529 }
5530 }
5531 else
5532 epq_path = NULL;
5533
5534 if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
5535 {
5536 /* Free path required for EPQ if we copied one; we don't need it now */
5537 if (epq_path)
5538 pfree(epq_path);
5539 return;
5540 }
5541
5542 /*
5543 * Compute the selectivity and cost of the local_conds, so we don't have
5544 * to do it over again for each path. The best we can do for these
5545 * conditions is to estimate selectivity on the basis of local statistics.
5546 * The local conditions are applied after the join has been computed on
5547 * the remote side like quals in WHERE clause, so pass jointype as
5548 * JOIN_INNER.
5549 */
5550 fpinfo->local_conds_sel = clauselist_selectivity(root,
5551 fpinfo->local_conds,
5552 0,
5553 JOIN_INNER,
5554 NULL);
5555 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5556
5557 /*
5558 * If we are going to estimate costs locally, estimate the join clause
5559 * selectivity here while we have special join info.
5560 */
5561 if (!fpinfo->use_remote_estimate)
5562 fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
5563 0, fpinfo->jointype,
5564 extra->sjinfo);
5565
5566 /* Estimate costs for bare join relation */
5567 estimate_path_cost_size(root, joinrel, NIL, NIL, NULL,
5568 &rows, &width, &startup_cost, &total_cost);
5569 /* Now update this information in the joinrel */
5570 joinrel->rows = rows;
5571 joinrel->reltarget->width = width;
5572 fpinfo->rows = rows;
5573 fpinfo->width = width;
5574 fpinfo->startup_cost = startup_cost;
5575 fpinfo->total_cost = total_cost;
5576
5577 /*
5578 * Create a new join path and add it to the joinrel which represents a
5579 * join between foreign tables.
5580 */
5581 joinpath = create_foreign_join_path(root,
5582 joinrel,
5583 NULL, /* default pathtarget */
5584 rows,
5585 startup_cost,
5586 total_cost,
5587 NIL, /* no pathkeys */
5588 joinrel->lateral_relids,
5589 epq_path,
5590 NIL); /* no fdw_private */
5591
5592 /* Add generated path into joinrel by add_path(). */
5593 add_path(joinrel, (Path *) joinpath);
5594
5595 /* Consider pathkeys for the join relation */
5596 add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
5597
5598 /* XXX Consider parameterized paths for the join relation */
5599 }
5600
5601 /*
5602 * Assess whether the aggregation, grouping and having operations can be pushed
5603 * down to the foreign server. As a side effect, save information we obtain in
5604 * this function to PgFdwRelationInfo of the input relation.
5605 */
5606 static bool
5607 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
5608 Node *havingQual)
5609 {
5610 Query *query = root->parse;
5611 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
5612 PathTarget *grouping_target = grouped_rel->reltarget;
5613 PgFdwRelationInfo *ofpinfo;
5614 ListCell *lc;
5615 int i;
5616 List *tlist = NIL;
5617
5618 /* We currently don't support pushing Grouping Sets. */
5619 if (query->groupingSets)
5620 return false;
5621
5622 /* Get the fpinfo of the underlying scan relation. */
5623 ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
5624
5625 /*
5626 * If underlying scan relation has any local conditions, those conditions
5627 * are required to be applied before performing aggregation. Hence the
5628 * aggregate cannot be pushed down.
5629 */
5630 if (ofpinfo->local_conds)
5631 return false;
5632
5633 /*
5634 * Examine grouping expressions, as well as other expressions we'd need to
5635 * compute, and check whether they are safe to push down to the foreign
5636 * server. All GROUP BY expressions will be part of the grouping target
5637 * and thus there is no need to search for them separately. Add grouping
5638 * expressions into target list which will be passed to foreign server.
5639 *
5640 * A tricky fine point is that we must not put any expression into the
5641 * target list that is just a foreign param (that is, something that
5642 * deparse.c would conclude has to be sent to the foreign server). If we
5643 * do, the expression will also appear in the fdw_exprs list of the plan
5644 * node, and setrefs.c will get confused and decide that the fdw_exprs
5645 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
5646 * a broken plan. Somewhat oddly, it's OK if the expression contains such
5647 * a node, as long as it's not at top level; then no match is possible.
5648 */
5649 i = 0;
5650 foreach(lc, grouping_target->exprs)
5651 {
5652 Expr *expr = (Expr *) lfirst(lc);
5653 Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
5654 ListCell *l;
5655
5656 /* Check whether this expression is part of GROUP BY clause */
5657 if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
5658 {
5659 TargetEntry *tle;
5660
5661 /*
5662 * If any GROUP BY expression is not shippable, then we cannot
5663 * push down aggregation to the foreign server.
5664 */
5665 if (!is_foreign_expr(root, grouped_rel, expr))
5666 return false;
5667
5668 /*
5669 * If it would be a foreign param, we can't put it into the tlist,
5670 * so we have to fail.
5671 */
5672 if (is_foreign_param(root, grouped_rel, expr))
5673 return false;
5674
5675 /*
5676 * Pushable, so add to tlist. We need to create a TLE for this
5677 * expression and apply the sortgroupref to it. We cannot use
5678 * add_to_flat_tlist() here because that avoids making duplicate
5679 * entries in the tlist. If there are duplicate entries with
5680 * distinct sortgrouprefs, we have to duplicate that situation in
5681 * the output tlist.
5682 */
5683 tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
5684 tle->ressortgroupref = sgref;
5685 tlist = lappend(tlist, tle);
5686 }
5687 else
5688 {
5689 /*
5690 * Non-grouping expression we need to compute. Can we ship it
5691 * as-is to the foreign server?
5692 */
5693 if (is_foreign_expr(root, grouped_rel, expr) &&
5694 !is_foreign_param(root, grouped_rel, expr))
5695 {
5696 /* Yes, so add to tlist as-is; OK to suppress duplicates */
5697 tlist = add_to_flat_tlist(tlist, list_make1(expr));
5698 }
5699 else
5700 {
5701 /* Not pushable as a whole; extract its Vars and aggregates */
5702 List *aggvars;
5703
5704 aggvars = pull_var_clause((Node *) expr,
5705 PVC_INCLUDE_AGGREGATES);
5706
5707 /*
5708 * If any aggregate expression is not shippable, then we
5709 * cannot push down aggregation to the foreign server. (We
5710 * don't have to check is_foreign_param, since that certainly
5711 * won't return true for any such expression.)
5712 */
5713 if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
5714 return false;
5715
5716 /*
5717 * Add aggregates, if any, into the targetlist. Plain Vars
5718 * outside an aggregate can be ignored, because they should be
5719 * either same as some GROUP BY column or part of some GROUP
5720 * BY expression. In either case, they are already part of
5721 * the targetlist and thus no need to add them again. In fact
5722 * including plain Vars in the tlist when they do not match a
5723 * GROUP BY column would cause the foreign server to complain
5724 * that the shipped query is invalid.
5725 */
5726 foreach(l, aggvars)
5727 {
5728 Expr *expr = (Expr *) lfirst(l);
5729
5730 if (IsA(expr, Aggref))
5731 tlist = add_to_flat_tlist(tlist, list_make1(expr));
5732 }
5733 }
5734 }
5735
5736 i++;
5737 }
5738
5739 /*
5740 * Classify the pushable and non-pushable HAVING clauses and save them in
5741 * remote_conds and local_conds of the grouped rel's fpinfo.
5742 */
5743 if (havingQual)
5744 {
5745 ListCell *lc;
5746
5747 foreach(lc, (List *) havingQual)
5748 {
5749 Expr *expr = (Expr *) lfirst(lc);
5750 RestrictInfo *rinfo;
5751
5752 /*
5753 * Currently, the core code doesn't wrap havingQuals in
5754 * RestrictInfos, so we must make our own.
5755 */
5756 Assert(!IsA(expr, RestrictInfo));
5757 rinfo = make_restrictinfo(root,
5758 expr,
5759 true,
5760 false,
5761 false,
5762 root->qual_security_level,
5763 grouped_rel->relids,
5764 NULL,
5765 NULL);
5766 if (is_foreign_expr(root, grouped_rel, expr))
5767 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5768 else
5769 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5770 }
5771 }
5772
5773 /*
5774 * If there are any local conditions, pull Vars and aggregates from it and
5775 * check whether they are safe to pushdown or not.
5776 */
5777 if (fpinfo->local_conds)
5778 {
5779 List *aggvars = NIL;
5780 ListCell *lc;
5781
5782 foreach(lc, fpinfo->local_conds)
5783 {
5784 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5785
5786 aggvars = list_concat(aggvars,
5787 pull_var_clause((Node *) rinfo->clause,
5788 PVC_INCLUDE_AGGREGATES));
5789 }
5790
5791 foreach(lc, aggvars)
5792 {
5793 Expr *expr = (Expr *) lfirst(lc);
5794
5795 /*
5796 * If aggregates within local conditions are not safe to push
5797 * down, then we cannot push down the query. Vars are already
5798 * part of GROUP BY clause which are checked above, so no need to
5799 * access them again here. Again, we need not check
5800 * is_foreign_param for a foreign aggregate.
5801 */
5802 if (IsA(expr, Aggref))
5803 {
5804 if (!is_foreign_expr(root, grouped_rel, expr))
5805 return false;
5806
5807 tlist = add_to_flat_tlist(tlist, list_make1(expr));
5808 }
5809 }
5810 }
5811
5812 /* Store generated targetlist */
5813 fpinfo->grouped_tlist = tlist;
5814
5815 /* Safe to pushdown */
5816 fpinfo->pushdown_safe = true;
5817
5818 /*
5819 * Set # of retrieved rows and cached relation costs to some negative
5820 * value, so that we can detect when they are set to some sensible values,
5821 * during one (usually the first) of the calls to estimate_path_cost_size.
5822 */
5823 fpinfo->retrieved_rows = -1;
5824 fpinfo->rel_startup_cost = -1;
5825 fpinfo->rel_total_cost = -1;
5826
5827 /*
5828 * Set the string describing this grouped relation to be used in EXPLAIN
5829 * output of corresponding ForeignScan. Note that the decoration we add
5830 * to the base relation name mustn't include any digits, or it'll confuse
5831 * postgresExplainForeignScan.
5832 */
5833 fpinfo->relation_name = psprintf("Aggregate on (%s)",
5834 ofpinfo->relation_name);
5835
5836 return true;
5837 }
5838
5839 /*
5840 * postgresGetForeignUpperPaths
5841 * Add paths for post-join operations like aggregation, grouping etc. if
5842 * corresponding operations are safe to push down.
5843 */
5844 static void
5845 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
5846 RelOptInfo *input_rel, RelOptInfo *output_rel,
5847 void *extra)
5848 {
5849 PgFdwRelationInfo *fpinfo;
5850
5851 /*
5852 * If input rel is not safe to pushdown, then simply return as we cannot
5853 * perform any post-join operations on the foreign server.
5854 */
5855 if (!input_rel->fdw_private ||
5856 !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
5857 return;
5858
5859 /* Ignore stages we don't support; and skip any duplicate calls. */
5860 if ((stage != UPPERREL_GROUP_AGG &&
5861 stage != UPPERREL_ORDERED &&
5862 stage != UPPERREL_FINAL) ||
5863 output_rel->fdw_private)
5864 return;
5865
5866 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5867 fpinfo->pushdown_safe = false;
5868 fpinfo->stage = stage;
5869 output_rel->fdw_private = fpinfo;
5870
5871 switch (stage)
5872 {
5873 case UPPERREL_GROUP_AGG:
5874 add_foreign_grouping_paths(root, input_rel, output_rel,
5875 (GroupPathExtraData *) extra);
5876 break;
5877 case UPPERREL_ORDERED:
5878 add_foreign_ordered_paths(root, input_rel, output_rel);
5879 break;
5880 case UPPERREL_FINAL:
5881 add_foreign_final_paths(root, input_rel, output_rel,
5882 (FinalPathExtraData *) extra);
5883 break;
5884 default:
5885 elog(ERROR, "unexpected upper relation: %d", (int) stage);
5886 break;
5887 }
5888 }
5889
5890 /*
5891 * add_foreign_grouping_paths
5892 * Add foreign path for grouping and/or aggregation.
5893 *
5894 * Given input_rel represents the underlying scan. The paths are added to the
5895 * given grouped_rel.
5896 */
5897 static void
5898 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
5899 RelOptInfo *grouped_rel,
5900 GroupPathExtraData *extra)
5901 {
5902 Query *parse = root->parse;
5903 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5904 PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
5905 ForeignPath *grouppath;
5906 double rows;
5907 int width;
5908 Cost startup_cost;
5909 Cost total_cost;
5910
5911 /* Nothing to be done, if there is no grouping or aggregation required. */
5912 if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
5913 !root->hasHavingQual)
5914 return;
5915
5916 Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
5917 extra->patype == PARTITIONWISE_AGGREGATE_FULL);
5918
5919 /* save the input_rel as outerrel in fpinfo */
5920 fpinfo->outerrel = input_rel;
5921
5922 /*
5923 * Copy foreign table, foreign server, user mapping, FDW options etc.
5924 * details from the input relation's fpinfo.
5925 */
5926 fpinfo->table = ifpinfo->table;
5927 fpinfo->server = ifpinfo->server;
5928 fpinfo->user = ifpinfo->user;
5929 merge_fdw_options(fpinfo, ifpinfo, NULL);
5930
5931 /*
5932 * Assess if it is safe to push down aggregation and grouping.
5933 *
5934 * Use HAVING qual from extra. In case of child partition, it will have
5935 * translated Vars.
5936 */
5937 if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
5938 return;
5939
5940 /*
5941 * Compute the selectivity and cost of the local_conds, so we don't have
5942 * to do it over again for each path. (Currently we create just a single
5943 * path here, but in future it would be possible that we build more paths
5944 * such as pre-sorted paths as in postgresGetForeignPaths and
5945 * postgresGetForeignJoinPaths.) The best we can do for these conditions
5946 * is to estimate selectivity on the basis of local statistics.
5947 */
5948 fpinfo->local_conds_sel = clauselist_selectivity(root,
5949 fpinfo->local_conds,
5950 0,
5951 JOIN_INNER,
5952 NULL);
5953
5954 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5955
5956 /* Estimate the cost of push down */
5957 estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL,
5958 &rows, &width, &startup_cost, &total_cost);
5959
5960 /* Now update this information in the fpinfo */
5961 fpinfo->rows = rows;
5962 fpinfo->width = width;
5963 fpinfo->startup_cost = startup_cost;
5964 fpinfo->total_cost = total_cost;
5965
5966 /* Create and add foreign path to the grouping relation. */
5967 grouppath = create_foreign_upper_path(root,
5968 grouped_rel,
5969 grouped_rel->reltarget,
5970 rows,
5971 startup_cost,
5972 total_cost,
5973 NIL, /* no pathkeys */
5974 NULL,
5975 NIL); /* no fdw_private */
5976
5977 /* Add generated path into grouped_rel by add_path(). */
5978 add_path(grouped_rel, (Path *) grouppath);
5979 }
5980
5981 /*
5982 * add_foreign_ordered_paths
5983 * Add foreign paths for performing the final sort remotely.
5984 *
5985 * Given input_rel contains the source-data Paths. The paths are added to the
5986 * given ordered_rel.
5987 */
5988 static void
5989 add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel,
5990 RelOptInfo *ordered_rel)
5991 {
5992 Query *parse = root->parse;
5993 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5994 PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private;
5995 PgFdwPathExtraData *fpextra;
5996 double rows;
5997 int width;
5998 Cost startup_cost;
5999 Cost total_cost;
6000 List *fdw_private;
6001 ForeignPath *ordered_path;
6002 ListCell *lc;
6003
6004 /* Shouldn't get here unless the query has ORDER BY */
6005 Assert(parse->sortClause);
6006
6007 /* We don't support cases where there are any SRFs in the targetlist */
6008 if (parse->hasTargetSRFs)
6009 return;
6010
6011 /* Save the input_rel as outerrel in fpinfo */
6012 fpinfo->outerrel = input_rel;
6013
6014 /*
6015 * Copy foreign table, foreign server, user mapping, FDW options etc.
6016 * details from the input relation's fpinfo.
6017 */
6018 fpinfo->table = ifpinfo->table;
6019 fpinfo->server = ifpinfo->server;
6020 fpinfo->user = ifpinfo->user;
6021 merge_fdw_options(fpinfo, ifpinfo, NULL);
6022
6023 /*
6024 * If the input_rel is a base or join relation, we would already have
6025 * considered pushing down the final sort to the remote server when
6026 * creating pre-sorted foreign paths for that relation, because the
6027 * query_pathkeys is set to the root->sort_pathkeys in that case (see
6028 * standard_qp_callback()).
6029 */
6030 if (input_rel->reloptkind == RELOPT_BASEREL ||
6031 input_rel->reloptkind == RELOPT_JOINREL)
6032 {
6033 Assert(root->query_pathkeys == root->sort_pathkeys);
6034
6035 /* Safe to push down if the query_pathkeys is safe to push down */
6036 fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe;
6037
6038 return;
6039 }
6040
6041 /* The input_rel should be a grouping relation */
6042 Assert(input_rel->reloptkind == RELOPT_UPPER_REL &&
6043 ifpinfo->stage == UPPERREL_GROUP_AGG);
6044
6045 /*
6046 * We try to create a path below by extending a simple foreign path for
6047 * the underlying grouping relation to perform the final sort remotely,
6048 * which is stored into the fdw_private list of the resulting path.
6049 */
6050
6051 /* Assess if it is safe to push down the final sort */
6052 foreach(lc, root->sort_pathkeys)
6053 {
6054 PathKey *pathkey = (PathKey *) lfirst(lc);
6055 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
6056 Expr *sort_expr;
6057
6058 /*
6059 * is_foreign_expr would detect volatile expressions as well, but
6060 * checking ec_has_volatile here saves some cycles.
6061 */
6062 if (pathkey_ec->ec_has_volatile)
6063 return;
6064
6065 /* Get the sort expression for the pathkey_ec */
6066 sort_expr = find_em_expr_for_input_target(root,
6067 pathkey_ec,
6068 input_rel->reltarget);
6069
6070 /* If it's unsafe to remote, we cannot push down the final sort */
6071 if (!is_foreign_expr(root, input_rel, sort_expr))
6072 return;
6073 }
6074
6075 /* Safe to push down */
6076 fpinfo->pushdown_safe = true;
6077
6078 /* Construct PgFdwPathExtraData */
6079 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6080 fpextra->target = root->upper_targets[UPPERREL_ORDERED];
6081 fpextra->has_final_sort = true;
6082
6083 /* Estimate the costs of performing the final sort remotely */
6084 estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra,
6085 &rows, &width, &startup_cost, &total_cost);
6086
6087 /*
6088 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6089 * Items in the list must match order in enum FdwPathPrivateIndex.
6090 */
6091 fdw_private = list_make2(makeInteger(true), makeInteger(false));
6092
6093 /* Create foreign ordering path */
6094 ordered_path = create_foreign_upper_path(root,
6095 input_rel,
6096 root->upper_targets[UPPERREL_ORDERED],
6097 rows,
6098 startup_cost,
6099 total_cost,
6100 root->sort_pathkeys,
6101 NULL, /* no extra plan */
6102 fdw_private);
6103
6104 /* and add it to the ordered_rel */
6105 add_path(ordered_rel, (Path *) ordered_path);
6106 }
6107
6108 /*
6109 * add_foreign_final_paths
6110 * Add foreign paths for performing the final processing remotely.
6111 *
6112 * Given input_rel contains the source-data Paths. The paths are added to the
6113 * given final_rel.
6114 */
6115 static void
6116 add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
6117 RelOptInfo *final_rel,
6118 FinalPathExtraData *extra)
6119 {
6120 Query *parse = root->parse;
6121 PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6122 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) final_rel->fdw_private;
6123 bool has_final_sort = false;
6124 List *pathkeys = NIL;
6125 PgFdwPathExtraData *fpextra;
6126 bool save_use_remote_estimate = false;
6127 double rows;
6128 int width;
6129 Cost startup_cost;
6130 Cost total_cost;
6131 List *fdw_private;
6132 ForeignPath *final_path;
6133
6134 /*
6135 * Currently, we only support this for SELECT commands
6136 */
6137 if (parse->commandType != CMD_SELECT)
6138 return;
6139
6140 /*
6141 * No work if there is no FOR UPDATE/SHARE clause and if there is no need
6142 * to add a LIMIT node
6143 */
6144 if (!parse->rowMarks && !extra->limit_needed)
6145 return;
6146
6147 /* We don't support cases where there are any SRFs in the targetlist */
6148 if (parse->hasTargetSRFs)
6149 return;
6150
6151 /* Save the input_rel as outerrel in fpinfo */
6152 fpinfo->outerrel = input_rel;
6153
6154 /*
6155 * Copy foreign table, foreign server, user mapping, FDW options etc.
6156 * details from the input relation's fpinfo.
6157 */
6158 fpinfo->table = ifpinfo->table;
6159 fpinfo->server = ifpinfo->server;
6160 fpinfo->user = ifpinfo->user;
6161 merge_fdw_options(fpinfo, ifpinfo, NULL);
6162
6163 /*
6164 * If there is no need to add a LIMIT node, there might be a ForeignPath
6165 * in the input_rel's pathlist that implements all behavior of the query.
6166 * Note: we would already have accounted for the query's FOR UPDATE/SHARE
6167 * (if any) before we get here.
6168 */
6169 if (!extra->limit_needed)
6170 {
6171 ListCell *lc;
6172
6173 Assert(parse->rowMarks);
6174
6175 /*
6176 * Grouping and aggregation are not supported with FOR UPDATE/SHARE,
6177 * so the input_rel should be a base, join, or ordered relation; and
6178 * if it's an ordered relation, its input relation should be a base or
6179 * join relation.
6180 */
6181 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
6182 input_rel->reloptkind == RELOPT_JOINREL ||
6183 (input_rel->reloptkind == RELOPT_UPPER_REL &&
6184 ifpinfo->stage == UPPERREL_ORDERED &&
6185 (ifpinfo->outerrel->reloptkind == RELOPT_BASEREL ||
6186 ifpinfo->outerrel->reloptkind == RELOPT_JOINREL)));
6187
6188 foreach(lc, input_rel->pathlist)
6189 {
6190 Path *path = (Path *) lfirst(lc);
6191
6192 /*
6193 * apply_scanjoin_target_to_paths() uses create_projection_path()
6194 * to adjust each of its input paths if needed, whereas
6195 * create_ordered_paths() uses apply_projection_to_path() to do
6196 * that. So the former might have put a ProjectionPath on top of
6197 * the ForeignPath; look through ProjectionPath and see if the
6198 * path underneath it is ForeignPath.
6199 */
6200 if (IsA(path, ForeignPath) ||
6201 (IsA(path, ProjectionPath) &&
6202 IsA(((ProjectionPath *) path)->subpath, ForeignPath)))
6203 {
6204 /*
6205 * Create foreign final path; this gets rid of a
6206 * no-longer-needed outer plan (if any), which makes the
6207 * EXPLAIN output look cleaner
6208 */
6209 final_path = create_foreign_upper_path(root,
6210 path->parent,
6211 path->pathtarget,
6212 path->rows,
6213 path->startup_cost,
6214 path->total_cost,
6215 path->pathkeys,
6216 NULL, /* no extra plan */
6217 NULL); /* no fdw_private */
6218
6219 /* and add it to the final_rel */
6220 add_path(final_rel, (Path *) final_path);
6221
6222 /* Safe to push down */
6223 fpinfo->pushdown_safe = true;
6224
6225 return;
6226 }
6227 }
6228
6229 /*
6230 * If we get here it means no ForeignPaths; since we would already
6231 * have considered pushing down all operations for the query to the
6232 * remote server, give up on it.
6233 */
6234 return;
6235 }
6236
6237 Assert(extra->limit_needed);
6238
6239 /*
6240 * If the input_rel is an ordered relation, replace the input_rel with its
6241 * input relation
6242 */
6243 if (input_rel->reloptkind == RELOPT_UPPER_REL &&
6244 ifpinfo->stage == UPPERREL_ORDERED)
6245 {
6246 input_rel = ifpinfo->outerrel;
6247 ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6248 has_final_sort = true;
6249 pathkeys = root->sort_pathkeys;
6250 }
6251
6252 /* The input_rel should be a base, join, or grouping relation */
6253 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
6254 input_rel->reloptkind == RELOPT_JOINREL ||
6255 (input_rel->reloptkind == RELOPT_UPPER_REL &&
6256 ifpinfo->stage == UPPERREL_GROUP_AGG));
6257
6258 /*
6259 * We try to create a path below by extending a simple foreign path for
6260 * the underlying base, join, or grouping relation to perform the final
6261 * sort (if has_final_sort) and the LIMIT restriction remotely, which is
6262 * stored into the fdw_private list of the resulting path. (We
6263 * re-estimate the costs of sorting the underlying relation, if
6264 * has_final_sort.)
6265 */
6266
6267 /*
6268 * Assess if it is safe to push down the LIMIT and OFFSET to the remote
6269 * server
6270 */
6271
6272 /*
6273 * If the underlying relation has any local conditions, the LIMIT/OFFSET
6274 * cannot be pushed down.
6275 */
6276 if (ifpinfo->local_conds)
6277 return;
6278
6279 /*
6280 * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are
6281 * not safe to remote.
6282 */
6283 if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) ||
6284 !is_foreign_expr(root, input_rel, (Expr *) parse->limitCount))
6285 return;
6286
6287 /* Safe to push down */
6288 fpinfo->pushdown_safe = true;
6289
6290 /* Construct PgFdwPathExtraData */
6291 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6292 fpextra->target = root->upper_targets[UPPERREL_FINAL];
6293 fpextra->has_final_sort = has_final_sort;
6294 fpextra->has_limit = extra->limit_needed;
6295 fpextra->limit_tuples = extra->limit_tuples;
6296 fpextra->count_est = extra->count_est;
6297 fpextra->offset_est = extra->offset_est;
6298
6299 /*
6300 * Estimate the costs of performing the final sort and the LIMIT
6301 * restriction remotely. If has_final_sort is false, we wouldn't need to
6302 * execute EXPLAIN anymore if use_remote_estimate, since the costs can be
6303 * roughly estimated using the costs we already have for the underlying
6304 * relation, in the same way as when use_remote_estimate is false. Since
6305 * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to
6306 * false in that case.
6307 */
6308 if (!fpextra->has_final_sort)
6309 {
6310 save_use_remote_estimate = ifpinfo->use_remote_estimate;
6311 ifpinfo->use_remote_estimate = false;
6312 }
6313 estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra,
6314 &rows, &width, &startup_cost, &total_cost);
6315 if (!fpextra->has_final_sort)
6316 ifpinfo->use_remote_estimate = save_use_remote_estimate;
6317
6318 /*
6319 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6320 * Items in the list must match order in enum FdwPathPrivateIndex.
6321 */
6322 fdw_private = list_make2(makeInteger(has_final_sort),
6323 makeInteger(extra->limit_needed));
6324
6325 /*
6326 * Create foreign final path; this gets rid of a no-longer-needed outer
6327 * plan (if any), which makes the EXPLAIN output look cleaner
6328 */
6329 final_path = create_foreign_upper_path(root,
6330 input_rel,
6331 root->upper_targets[UPPERREL_FINAL],
6332 rows,
6333 startup_cost,
6334 total_cost,
6335 pathkeys,
6336 NULL, /* no extra plan */
6337 fdw_private);
6338
6339 /* and add it to the final_rel */
6340 add_path(final_rel, (Path *) final_path);
6341 }
6342
6343 /*
6344 * Create a tuple from the specified row of the PGresult.
6345 *
6346 * rel is the local representation of the foreign table, attinmeta is
6347 * conversion data for the rel's tupdesc, and retrieved_attrs is an
6348 * integer list of the table column numbers present in the PGresult.
6349 * fsstate is the ForeignScan plan node's execution state.
6350 * temp_context is a working context that can be reset after each tuple.
6351 *
6352 * Note: either rel or fsstate, but not both, can be NULL. rel is NULL
6353 * if we're processing a remote join, while fsstate is NULL in a non-query
6354 * context such as ANALYZE, or if we're processing a non-scan query node.
6355 */
6356 static HeapTuple
6357 make_tuple_from_result_row(PGresult *res,
6358 int row,
6359 Relation rel,
6360 AttInMetadata *attinmeta,
6361 List *retrieved_attrs,
6362 ForeignScanState *fsstate,
6363 MemoryContext temp_context)
6364 {
6365 HeapTuple tuple;
6366 TupleDesc tupdesc;
6367 Datum *values;
6368 bool *nulls;
6369 ItemPointer ctid = NULL;
6370 ConversionLocation errpos;
6371 ErrorContextCallback errcallback;
6372 MemoryContext oldcontext;
6373 ListCell *lc;
6374 int j;
6375
6376 Assert(row < PQntuples(res));
6377
6378 /*
6379 * Do the following work in a temp context that we reset after each tuple.
6380 * This cleans up not only the data we have direct access to, but any
6381 * cruft the I/O functions might leak.
6382 */
6383 oldcontext = MemoryContextSwitchTo(temp_context);
6384
6385 /*
6386 * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
6387 * provided, otherwise look to the scan node's ScanTupleSlot.
6388 */
6389 if (rel)
6390 tupdesc = RelationGetDescr(rel);
6391 else
6392 {
6393 Assert(fsstate);
6394 tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
6395 }
6396
6397 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
6398 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
6399 /* Initialize to nulls for any columns not present in result */
6400 memset(nulls, true, tupdesc->natts * sizeof(bool));
6401
6402 /*
6403 * Set up and install callback to report where conversion error occurs.
6404 */
6405 errpos.cur_attno = 0;
6406 errpos.rel = rel;
6407 errpos.fsstate = fsstate;
6408 errcallback.callback = conversion_error_callback;
6409 errcallback.arg = (void *) &errpos;
6410 errcallback.previous = error_context_stack;
6411 error_context_stack = &errcallback;
6412
6413 /*
6414 * i indexes columns in the relation, j indexes columns in the PGresult.
6415 */
6416 j = 0;
6417 foreach(lc, retrieved_attrs)
6418 {
6419 int i = lfirst_int(lc);
6420 char *valstr;
6421
6422 /* fetch next column's textual value */
6423 if (PQgetisnull(res, row, j))
6424 valstr = NULL;
6425 else
6426 valstr = PQgetvalue(res, row, j);
6427
6428 /*
6429 * convert value to internal representation
6430 *
6431 * Note: we ignore system columns other than ctid and oid in result
6432 */
6433 errpos.cur_attno = i;
6434 if (i > 0)
6435 {
6436 /* ordinary column */
6437 Assert(i <= tupdesc->natts);
6438 nulls[i - 1] = (valstr == NULL);
6439 /* Apply the input function even to nulls, to support domains */
6440 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
6441 valstr,
6442 attinmeta->attioparams[i - 1],
6443 attinmeta->atttypmods[i - 1]);
6444 }
6445 else if (i == SelfItemPointerAttributeNumber)
6446 {
6447 /* ctid */
6448 if (valstr != NULL)
6449 {
6450 Datum datum;
6451
6452 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
6453 ctid = (ItemPointer) DatumGetPointer(datum);
6454 }
6455 }
6456 errpos.cur_attno = 0;
6457
6458 j++;
6459 }
6460
6461 /* Uninstall error context callback. */
6462 error_context_stack = errcallback.previous;
6463
6464 /*
6465 * Check we got the expected number of columns. Note: j == 0 and
6466 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
6467 */
6468 if (j > 0 && j != PQnfields(res))
6469 elog(ERROR, "remote query result does not match the foreign table");
6470
6471 /*
6472 * Build the result tuple in caller's memory context.
6473 */
6474 MemoryContextSwitchTo(oldcontext);
6475
6476 tuple = heap_form_tuple(tupdesc, values, nulls);
6477
6478 /*
6479 * If we have a CTID to return, install it in both t_self and t_ctid.
6480 * t_self is the normal place, but if the tuple is converted to a
6481 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
6482 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
6483 */
6484 if (ctid)
6485 tuple->t_self = tuple->t_data->t_ctid = *ctid;
6486
6487 /*
6488 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
6489 * heap_form_tuple. heap_form_tuple actually creates the tuple with
6490 * DatumTupleFields, not HeapTupleFields, but the executor expects
6491 * HeapTupleFields and will happily extract system columns on that
6492 * assumption. If we don't do this then, for example, the tuple length
6493 * ends up in the xmin field, which isn't what we want.
6494 */
6495 HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
6496 HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
6497 HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
6498
6499 /* Clean up */
6500 MemoryContextReset(temp_context);
6501
6502 return tuple;
6503 }
6504
6505 /*
6506 * Callback function which is called when error occurs during column value
6507 * conversion. Print names of column and relation.
6508 *
6509 * Note that this function mustn't do any catalog lookups, since we are in
6510 * an already-failed transaction. Fortunately, we can get the needed info
6511 * from the relation or the query's rangetable instead.
6512 */
6513 static void
6514 conversion_error_callback(void *arg)
6515 {
6516 ConversionLocation *errpos = (ConversionLocation *) arg;
6517 Relation rel = errpos->rel;
6518 ForeignScanState *fsstate = errpos->fsstate;
6519 const char *attname = NULL;
6520 const char *relname = NULL;
6521 bool is_wholerow = false;
6522
6523 /*
6524 * If we're in a scan node, always use aliases from the rangetable, for
6525 * consistency between the simple-relation and remote-join cases. Look at
6526 * the relation's tupdesc only if we're not in a scan node.
6527 */
6528 if (fsstate)
6529 {
6530 /* ForeignScan case */
6531 ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
6532 int varno = 0;
6533 AttrNumber colno = 0;
6534
6535 if (fsplan->scan.scanrelid > 0)
6536 {
6537 /* error occurred in a scan against a foreign table */
6538 varno = fsplan->scan.scanrelid;
6539 colno = errpos->cur_attno;
6540 }
6541 else
6542 {
6543 /* error occurred in a scan against a foreign join */
6544 TargetEntry *tle;
6545
6546 tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
6547 errpos->cur_attno - 1);
6548
6549 /*
6550 * Target list can have Vars and expressions. For Vars, we can
6551 * get some information, however for expressions we can't. Thus
6552 * for expressions, just show generic context message.
6553 */
6554 if (IsA(tle->expr, Var))
6555 {
6556 Var *var = (Var *) tle->expr;
6557
6558 varno = var->varno;
6559 colno = var->varattno;
6560 }
6561 }
6562
6563 if (varno > 0)
6564 {
6565 EState *estate = fsstate->ss.ps.state;
6566 RangeTblEntry *rte = exec_rt_fetch(varno, estate);
6567
6568 relname = rte->eref->aliasname;
6569
6570 if (colno == 0)
6571 is_wholerow = true;
6572 else if (colno > 0 && colno <= list_length(rte->eref->colnames))
6573 attname = strVal(list_nth(rte->eref->colnames, colno - 1));
6574 else if (colno == SelfItemPointerAttributeNumber)
6575 attname = "ctid";
6576 }
6577 }
6578 else if (rel)
6579 {
6580 /* Non-ForeignScan case (we should always have a rel here) */
6581 TupleDesc tupdesc = RelationGetDescr(rel);
6582
6583 relname = RelationGetRelationName(rel);
6584 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
6585 {
6586 Form_pg_attribute attr = TupleDescAttr(tupdesc,
6587 errpos->cur_attno - 1);
6588
6589 attname = NameStr(attr->attname);
6590 }
6591 else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
6592 attname = "ctid";
6593 }
6594
6595 if (relname && is_wholerow)
6596 errcontext("whole-row reference to foreign table \"%s\"", relname);
6597 else if (relname && attname)
6598 errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
6599 else
6600 errcontext("processing expression at position %d in select list",
6601 errpos->cur_attno);
6602 }
6603
6604 /*
6605 * Find an equivalence class member expression to be computed as a sort column
6606 * in the given target.
6607 */
6608 Expr *
6609 find_em_expr_for_input_target(PlannerInfo *root,
6610 EquivalenceClass *ec,
6611 PathTarget *target)
6612 {
6613 ListCell *lc1;
6614 int i;
6615
6616 i = 0;
6617 foreach(lc1, target->exprs)
6618 {
6619 Expr *expr = (Expr *) lfirst(lc1);
6620 Index sgref = get_pathtarget_sortgroupref(target, i);
6621 ListCell *lc2;
6622
6623 /* Ignore non-sort expressions */
6624 if (sgref == 0 ||
6625 get_sortgroupref_clause_noerr(sgref,
6626 root->parse->sortClause) == NULL)
6627 {
6628 i++;
6629 continue;
6630 }
6631
6632 /* We ignore binary-compatible relabeling on both ends */
6633 while (expr && IsA(expr, RelabelType))
6634 expr = ((RelabelType *) expr)->arg;
6635
6636 /* Locate an EquivalenceClass member matching this expr, if any */
6637 foreach(lc2, ec->ec_members)
6638 {
6639 EquivalenceMember *em = (EquivalenceMember *) lfirst(lc2);
6640 Expr *em_expr;
6641
6642 /* Don't match constants */
6643 if (em->em_is_const)
6644 continue;
6645
6646 /* Ignore child members */
6647 if (em->em_is_child)
6648 continue;
6649
6650 /* Match if same expression (after stripping relabel) */
6651 em_expr = em->em_expr;
6652 while (em_expr && IsA(em_expr, RelabelType))
6653 em_expr = ((RelabelType *) em_expr)->arg;
6654
6655 if (equal(em_expr, expr))
6656 return em->em_expr;
6657 }
6658
6659 i++;
6660 }
6661
6662 elog(ERROR, "could not find pathkey item to sort");
6663 return NULL; /* keep compiler quiet */
6664 }
6665