1 /*-------------------------------------------------------------------------
2 *
3 * postgres_fdw.c
4 * Foreign-data wrapper for remote PostgreSQL servers
5 *
6 * Portions Copyright (c) 2012-2019, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/postgres_fdw/postgres_fdw.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "postgres_fdw.h"
16
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "commands/defrem.h"
22 #include "commands/explain.h"
23 #include "commands/vacuum.h"
24 #include "foreign/fdwapi.h"
25 #include "funcapi.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "nodes/nodeFuncs.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/cost.h"
31 #include "optimizer/optimizer.h"
32 #include "optimizer/pathnode.h"
33 #include "optimizer/paths.h"
34 #include "optimizer/planmain.h"
35 #include "optimizer/restrictinfo.h"
36 #include "optimizer/tlist.h"
37 #include "parser/parsetree.h"
38 #include "utils/builtins.h"
39 #include "utils/float.h"
40 #include "utils/guc.h"
41 #include "utils/lsyscache.h"
42 #include "utils/memutils.h"
43 #include "utils/rel.h"
44 #include "utils/sampling.h"
45 #include "utils/selfuncs.h"
46
47 /* source-code-compatibility hacks for pull_varnos() API change */
48 #define make_restrictinfo(a,b,c,d,e,f,g,h,i) make_restrictinfo_new(a,b,c,d,e,f,g,h,i)
49
50 PG_MODULE_MAGIC;
51
52 /* Default CPU cost to start up a foreign query. */
53 #define DEFAULT_FDW_STARTUP_COST 100.0
54
55 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
56 #define DEFAULT_FDW_TUPLE_COST 0.01
57
58 /* If no remote estimates, assume a sort costs 20% extra */
59 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
60
61 /*
62 * Indexes of FDW-private information stored in fdw_private lists.
63 *
64 * These items are indexed with the enum FdwScanPrivateIndex, so an item
65 * can be fetched with list_nth(). For example, to get the SELECT statement:
66 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
67 */
68 enum FdwScanPrivateIndex
69 {
70 /* SQL statement to execute remotely (as a String node) */
71 FdwScanPrivateSelectSql,
72 /* Integer list of attribute numbers retrieved by the SELECT */
73 FdwScanPrivateRetrievedAttrs,
74 /* Integer representing the desired fetch_size */
75 FdwScanPrivateFetchSize,
76
77 /*
78 * String describing join i.e. names of relations being joined and types
79 * of join, added when the scan is join
80 */
81 FdwScanPrivateRelations
82 };
83
84 /*
85 * Similarly, this enum describes what's kept in the fdw_private list for
86 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
87 *
88 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
89 * 2) Integer list of target attribute numbers for INSERT/UPDATE
90 * (NIL for a DELETE)
91 * 3) Boolean flag showing if the remote query has a RETURNING clause
92 * 4) Integer list of attribute numbers retrieved by RETURNING, if any
93 */
94 enum FdwModifyPrivateIndex
95 {
96 /* SQL statement to execute remotely (as a String node) */
97 FdwModifyPrivateUpdateSql,
98 /* Integer list of target attribute numbers for INSERT/UPDATE */
99 FdwModifyPrivateTargetAttnums,
100 /* has-returning flag (as an integer Value node) */
101 FdwModifyPrivateHasReturning,
102 /* Integer list of attribute numbers retrieved by RETURNING */
103 FdwModifyPrivateRetrievedAttrs
104 };
105
106 /*
107 * Similarly, this enum describes what's kept in the fdw_private list for
108 * a ForeignScan node that modifies a foreign table directly. We store:
109 *
110 * 1) UPDATE/DELETE statement text to be sent to the remote server
111 * 2) Boolean flag showing if the remote query has a RETURNING clause
112 * 3) Integer list of attribute numbers retrieved by RETURNING, if any
113 * 4) Boolean flag showing if we set the command es_processed
114 */
115 enum FdwDirectModifyPrivateIndex
116 {
117 /* SQL statement to execute remotely (as a String node) */
118 FdwDirectModifyPrivateUpdateSql,
119 /* has-returning flag (as an integer Value node) */
120 FdwDirectModifyPrivateHasReturning,
121 /* Integer list of attribute numbers retrieved by RETURNING */
122 FdwDirectModifyPrivateRetrievedAttrs,
123 /* set-processed flag (as an integer Value node) */
124 FdwDirectModifyPrivateSetProcessed
125 };
126
127 /*
128 * Execution state of a foreign scan using postgres_fdw.
129 */
130 typedef struct PgFdwScanState
131 {
132 Relation rel; /* relcache entry for the foreign table. NULL
133 * for a foreign join scan. */
134 TupleDesc tupdesc; /* tuple descriptor of scan */
135 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
136
137 /* extracted fdw_private data */
138 char *query; /* text of SELECT command */
139 List *retrieved_attrs; /* list of retrieved attribute numbers */
140
141 /* for remote query execution */
142 PGconn *conn; /* connection for the scan */
143 unsigned int cursor_number; /* quasi-unique ID for my cursor */
144 bool cursor_exists; /* have we created the cursor? */
145 int numParams; /* number of parameters passed to query */
146 FmgrInfo *param_flinfo; /* output conversion functions for them */
147 List *param_exprs; /* executable expressions for param values */
148 const char **param_values; /* textual values of query parameters */
149
150 /* for storing result tuples */
151 HeapTuple *tuples; /* array of currently-retrieved tuples */
152 int num_tuples; /* # of tuples in array */
153 int next_tuple; /* index of next one to return */
154
155 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
156 int fetch_ct_2; /* Min(# of fetches done, 2) */
157 bool eof_reached; /* true if last fetch reached EOF */
158
159 /* working memory contexts */
160 MemoryContext batch_cxt; /* context holding current batch of tuples */
161 MemoryContext temp_cxt; /* context for per-tuple temporary data */
162
163 int fetch_size; /* number of tuples per fetch */
164 } PgFdwScanState;
165
166 /*
167 * Execution state of a foreign insert/update/delete operation.
168 */
169 typedef struct PgFdwModifyState
170 {
171 Relation rel; /* relcache entry for the foreign table */
172 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
173
174 /* for remote query execution */
175 PGconn *conn; /* connection for the scan */
176 char *p_name; /* name of prepared statement, if created */
177
178 /* extracted fdw_private data */
179 char *query; /* text of INSERT/UPDATE/DELETE command */
180 List *target_attrs; /* list of target attribute numbers */
181 bool has_returning; /* is there a RETURNING clause? */
182 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
183
184 /* info about parameters for prepared statement */
185 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
186 int p_nums; /* number of parameters to transmit */
187 FmgrInfo *p_flinfo; /* output conversion functions for them */
188
189 /* working memory context */
190 MemoryContext temp_cxt; /* context for per-tuple temporary data */
191
192 /* for update row movement if subplan result rel */
193 struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
194 * created */
195 } PgFdwModifyState;
196
197 /*
198 * Execution state of a foreign scan that modifies a foreign table directly.
199 */
200 typedef struct PgFdwDirectModifyState
201 {
202 Relation rel; /* relcache entry for the foreign table */
203 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
204
205 /* extracted fdw_private data */
206 char *query; /* text of UPDATE/DELETE command */
207 bool has_returning; /* is there a RETURNING clause? */
208 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
209 bool set_processed; /* do we set the command es_processed? */
210
211 /* for remote query execution */
212 PGconn *conn; /* connection for the update */
213 int numParams; /* number of parameters passed to query */
214 FmgrInfo *param_flinfo; /* output conversion functions for them */
215 List *param_exprs; /* executable expressions for param values */
216 const char **param_values; /* textual values of query parameters */
217
218 /* for storing result tuples */
219 PGresult *result; /* result for query */
220 int num_tuples; /* # of result tuples */
221 int next_tuple; /* index of next one to return */
222 Relation resultRel; /* relcache entry for the target relation */
223 AttrNumber *attnoMap; /* array of attnums of input user columns */
224 AttrNumber ctidAttno; /* attnum of input ctid column */
225 AttrNumber oidAttno; /* attnum of input oid column */
226 bool hasSystemCols; /* are there system columns of resultRel? */
227
228 /* working memory context */
229 MemoryContext temp_cxt; /* context for per-tuple temporary data */
230 } PgFdwDirectModifyState;
231
232 /*
233 * Workspace for analyzing a foreign table.
234 */
235 typedef struct PgFdwAnalyzeState
236 {
237 Relation rel; /* relcache entry for the foreign table */
238 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
239 List *retrieved_attrs; /* attr numbers retrieved by query */
240
241 /* collected sample rows */
242 HeapTuple *rows; /* array of size targrows */
243 int targrows; /* target # of sample rows */
244 int numrows; /* # of sample rows collected */
245
246 /* for random sampling */
247 double samplerows; /* # of rows fetched */
248 double rowstoskip; /* # of rows to skip before next sample */
249 ReservoirStateData rstate; /* state for reservoir sampling */
250
251 /* working memory contexts */
252 MemoryContext anl_cxt; /* context for per-analyze lifespan data */
253 MemoryContext temp_cxt; /* context for per-tuple temporary data */
254 } PgFdwAnalyzeState;
255
256 /*
257 * This enum describes what's kept in the fdw_private list for a ForeignPath.
258 * We store:
259 *
260 * 1) Boolean flag showing if the remote query has the final sort
261 * 2) Boolean flag showing if the remote query has the LIMIT clause
262 */
263 enum FdwPathPrivateIndex
264 {
265 /* has-final-sort flag (as an integer Value node) */
266 FdwPathPrivateHasFinalSort,
267 /* has-limit flag (as an integer Value node) */
268 FdwPathPrivateHasLimit
269 };
270
271 /* Struct for extra information passed to estimate_path_cost_size() */
272 typedef struct
273 {
274 PathTarget *target;
275 bool has_final_sort;
276 bool has_limit;
277 double limit_tuples;
278 int64 count_est;
279 int64 offset_est;
280 } PgFdwPathExtraData;
281
282 /*
283 * Identify the attribute where data conversion fails.
284 */
285 typedef struct ConversionLocation
286 {
287 AttrNumber cur_attno; /* attribute number being processed, or 0 */
288 Relation rel; /* foreign table being processed, or NULL */
289 ForeignScanState *fsstate; /* plan node being processed, or NULL */
290 } ConversionLocation;
291
292 /* Callback argument for ec_member_matches_foreign */
293 typedef struct
294 {
295 Expr *current; /* current expr, or NULL if not yet found */
296 List *already_used; /* expressions already dealt with */
297 } ec_member_foreign_arg;
298
299 /*
300 * SQL functions
301 */
302 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
303
304 /*
305 * FDW callback routines
306 */
307 static void postgresGetForeignRelSize(PlannerInfo *root,
308 RelOptInfo *baserel,
309 Oid foreigntableid);
310 static void postgresGetForeignPaths(PlannerInfo *root,
311 RelOptInfo *baserel,
312 Oid foreigntableid);
313 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
314 RelOptInfo *foreignrel,
315 Oid foreigntableid,
316 ForeignPath *best_path,
317 List *tlist,
318 List *scan_clauses,
319 Plan *outer_plan);
320 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
321 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
322 static void postgresReScanForeignScan(ForeignScanState *node);
323 static void postgresEndForeignScan(ForeignScanState *node);
324 static void postgresAddForeignUpdateTargets(Query *parsetree,
325 RangeTblEntry *target_rte,
326 Relation target_relation);
327 static List *postgresPlanForeignModify(PlannerInfo *root,
328 ModifyTable *plan,
329 Index resultRelation,
330 int subplan_index);
331 static void postgresBeginForeignModify(ModifyTableState *mtstate,
332 ResultRelInfo *resultRelInfo,
333 List *fdw_private,
334 int subplan_index,
335 int eflags);
336 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
337 ResultRelInfo *resultRelInfo,
338 TupleTableSlot *slot,
339 TupleTableSlot *planSlot);
340 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
341 ResultRelInfo *resultRelInfo,
342 TupleTableSlot *slot,
343 TupleTableSlot *planSlot);
344 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
345 ResultRelInfo *resultRelInfo,
346 TupleTableSlot *slot,
347 TupleTableSlot *planSlot);
348 static void postgresEndForeignModify(EState *estate,
349 ResultRelInfo *resultRelInfo);
350 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
351 ResultRelInfo *resultRelInfo);
352 static void postgresEndForeignInsert(EState *estate,
353 ResultRelInfo *resultRelInfo);
354 static int postgresIsForeignRelUpdatable(Relation rel);
355 static bool postgresPlanDirectModify(PlannerInfo *root,
356 ModifyTable *plan,
357 Index resultRelation,
358 int subplan_index);
359 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
360 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
361 static void postgresEndDirectModify(ForeignScanState *node);
362 static void postgresExplainForeignScan(ForeignScanState *node,
363 ExplainState *es);
364 static void postgresExplainForeignModify(ModifyTableState *mtstate,
365 ResultRelInfo *rinfo,
366 List *fdw_private,
367 int subplan_index,
368 ExplainState *es);
369 static void postgresExplainDirectModify(ForeignScanState *node,
370 ExplainState *es);
371 static bool postgresAnalyzeForeignTable(Relation relation,
372 AcquireSampleRowsFunc *func,
373 BlockNumber *totalpages);
374 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
375 Oid serverOid);
376 static void postgresGetForeignJoinPaths(PlannerInfo *root,
377 RelOptInfo *joinrel,
378 RelOptInfo *outerrel,
379 RelOptInfo *innerrel,
380 JoinType jointype,
381 JoinPathExtraData *extra);
382 static bool postgresRecheckForeignScan(ForeignScanState *node,
383 TupleTableSlot *slot);
384 static void postgresGetForeignUpperPaths(PlannerInfo *root,
385 UpperRelationKind stage,
386 RelOptInfo *input_rel,
387 RelOptInfo *output_rel,
388 void *extra);
389
390 /*
391 * Helper functions
392 */
393 static void estimate_path_cost_size(PlannerInfo *root,
394 RelOptInfo *foreignrel,
395 List *param_join_conds,
396 List *pathkeys,
397 PgFdwPathExtraData *fpextra,
398 double *p_rows, int *p_width,
399 Cost *p_startup_cost, Cost *p_total_cost);
400 static void get_remote_estimate(const char *sql,
401 PGconn *conn,
402 double *rows,
403 int *width,
404 Cost *startup_cost,
405 Cost *total_cost);
406 static void adjust_foreign_grouping_path_cost(PlannerInfo *root,
407 List *pathkeys,
408 double retrieved_rows,
409 double width,
410 double limit_tuples,
411 Cost *p_startup_cost,
412 Cost *p_run_cost);
413 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
414 EquivalenceClass *ec, EquivalenceMember *em,
415 void *arg);
416 static void create_cursor(ForeignScanState *node);
417 static void fetch_more_data(ForeignScanState *node);
418 static void close_cursor(PGconn *conn, unsigned int cursor_number);
419 static PgFdwModifyState *create_foreign_modify(EState *estate,
420 RangeTblEntry *rte,
421 ResultRelInfo *resultRelInfo,
422 CmdType operation,
423 Plan *subplan,
424 char *query,
425 List *target_attrs,
426 bool has_returning,
427 List *retrieved_attrs);
428 static TupleTableSlot *execute_foreign_modify(EState *estate,
429 ResultRelInfo *resultRelInfo,
430 CmdType operation,
431 TupleTableSlot *slot,
432 TupleTableSlot *planSlot);
433 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
434 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
435 ItemPointer tupleid,
436 TupleTableSlot *slot);
437 static void store_returning_result(PgFdwModifyState *fmstate,
438 TupleTableSlot *slot, PGresult *res);
439 static void finish_foreign_modify(PgFdwModifyState *fmstate);
440 static List *build_remote_returning(Index rtindex, Relation rel,
441 List *returningList);
442 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
443 static void execute_dml_stmt(ForeignScanState *node);
444 static TupleTableSlot *get_returning_data(ForeignScanState *node);
445 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
446 List *fdw_scan_tlist,
447 Index rtindex);
448 static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
449 TupleTableSlot *slot,
450 EState *estate);
451 static void prepare_query_params(PlanState *node,
452 List *fdw_exprs,
453 int numParams,
454 FmgrInfo **param_flinfo,
455 List **param_exprs,
456 const char ***param_values);
457 static void process_query_params(ExprContext *econtext,
458 FmgrInfo *param_flinfo,
459 List *param_exprs,
460 const char **param_values);
461 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
462 HeapTuple *rows, int targrows,
463 double *totalrows,
464 double *totaldeadrows);
465 static void analyze_row_processor(PGresult *res, int row,
466 PgFdwAnalyzeState *astate);
467 static HeapTuple make_tuple_from_result_row(PGresult *res,
468 int row,
469 Relation rel,
470 AttInMetadata *attinmeta,
471 List *retrieved_attrs,
472 ForeignScanState *fsstate,
473 MemoryContext temp_context);
474 static void conversion_error_callback(void *arg);
475 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
476 JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
477 JoinPathExtraData *extra);
478 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
479 Node *havingQual);
480 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
481 RelOptInfo *rel);
482 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
483 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
484 Path *epq_path);
485 static void add_foreign_grouping_paths(PlannerInfo *root,
486 RelOptInfo *input_rel,
487 RelOptInfo *grouped_rel,
488 GroupPathExtraData *extra);
489 static void add_foreign_ordered_paths(PlannerInfo *root,
490 RelOptInfo *input_rel,
491 RelOptInfo *ordered_rel);
492 static void add_foreign_final_paths(PlannerInfo *root,
493 RelOptInfo *input_rel,
494 RelOptInfo *final_rel,
495 FinalPathExtraData *extra);
496 static void apply_server_options(PgFdwRelationInfo *fpinfo);
497 static void apply_table_options(PgFdwRelationInfo *fpinfo);
498 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
499 const PgFdwRelationInfo *fpinfo_o,
500 const PgFdwRelationInfo *fpinfo_i);
501
502
503 /*
504 * Foreign-data wrapper handler function: return a struct with pointers
505 * to my callback routines.
506 */
507 Datum
postgres_fdw_handler(PG_FUNCTION_ARGS)508 postgres_fdw_handler(PG_FUNCTION_ARGS)
509 {
510 FdwRoutine *routine = makeNode(FdwRoutine);
511
512 /* Functions for scanning foreign tables */
513 routine->GetForeignRelSize = postgresGetForeignRelSize;
514 routine->GetForeignPaths = postgresGetForeignPaths;
515 routine->GetForeignPlan = postgresGetForeignPlan;
516 routine->BeginForeignScan = postgresBeginForeignScan;
517 routine->IterateForeignScan = postgresIterateForeignScan;
518 routine->ReScanForeignScan = postgresReScanForeignScan;
519 routine->EndForeignScan = postgresEndForeignScan;
520
521 /* Functions for updating foreign tables */
522 routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
523 routine->PlanForeignModify = postgresPlanForeignModify;
524 routine->BeginForeignModify = postgresBeginForeignModify;
525 routine->ExecForeignInsert = postgresExecForeignInsert;
526 routine->ExecForeignUpdate = postgresExecForeignUpdate;
527 routine->ExecForeignDelete = postgresExecForeignDelete;
528 routine->EndForeignModify = postgresEndForeignModify;
529 routine->BeginForeignInsert = postgresBeginForeignInsert;
530 routine->EndForeignInsert = postgresEndForeignInsert;
531 routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
532 routine->PlanDirectModify = postgresPlanDirectModify;
533 routine->BeginDirectModify = postgresBeginDirectModify;
534 routine->IterateDirectModify = postgresIterateDirectModify;
535 routine->EndDirectModify = postgresEndDirectModify;
536
537 /* Function for EvalPlanQual rechecks */
538 routine->RecheckForeignScan = postgresRecheckForeignScan;
539 /* Support functions for EXPLAIN */
540 routine->ExplainForeignScan = postgresExplainForeignScan;
541 routine->ExplainForeignModify = postgresExplainForeignModify;
542 routine->ExplainDirectModify = postgresExplainDirectModify;
543
544 /* Support functions for ANALYZE */
545 routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
546
547 /* Support functions for IMPORT FOREIGN SCHEMA */
548 routine->ImportForeignSchema = postgresImportForeignSchema;
549
550 /* Support functions for join push-down */
551 routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
552
553 /* Support functions for upper relation push-down */
554 routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
555
556 PG_RETURN_POINTER(routine);
557 }
558
559 /*
560 * postgresGetForeignRelSize
561 * Estimate # of rows and width of the result of the scan
562 *
563 * We should consider the effect of all baserestrictinfo clauses here, but
564 * not any join clauses.
565 */
566 static void
postgresGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)567 postgresGetForeignRelSize(PlannerInfo *root,
568 RelOptInfo *baserel,
569 Oid foreigntableid)
570 {
571 PgFdwRelationInfo *fpinfo;
572 ListCell *lc;
573 RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
574 const char *namespace;
575 const char *relname;
576 const char *refname;
577
578 /*
579 * We use PgFdwRelationInfo to pass various information to subsequent
580 * functions.
581 */
582 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
583 baserel->fdw_private = (void *) fpinfo;
584
585 /* Base foreign tables need to be pushed down always. */
586 fpinfo->pushdown_safe = true;
587
588 /* Look up foreign-table catalog info. */
589 fpinfo->table = GetForeignTable(foreigntableid);
590 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
591
592 /*
593 * Extract user-settable option values. Note that per-table settings of
594 * use_remote_estimate and fetch_size override per-server settings of
595 * them, respectively.
596 */
597 fpinfo->use_remote_estimate = false;
598 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
599 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
600 fpinfo->shippable_extensions = NIL;
601 fpinfo->fetch_size = 100;
602
603 apply_server_options(fpinfo);
604 apply_table_options(fpinfo);
605
606 /*
607 * If the table or the server is configured to use remote estimates,
608 * identify which user to do remote access as during planning. This
609 * should match what ExecCheckRTEPerms() does. If we fail due to lack of
610 * permissions, the query would have failed at runtime anyway.
611 */
612 if (fpinfo->use_remote_estimate)
613 {
614 Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
615
616 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
617 }
618 else
619 fpinfo->user = NULL;
620
621 /*
622 * Identify which baserestrictinfo clauses can be sent to the remote
623 * server and which can't.
624 */
625 classifyConditions(root, baserel, baserel->baserestrictinfo,
626 &fpinfo->remote_conds, &fpinfo->local_conds);
627
628 /*
629 * Identify which attributes will need to be retrieved from the remote
630 * server. These include all attrs needed for joins or final output, plus
631 * all attrs used in the local_conds. (Note: if we end up using a
632 * parameterized scan, it's possible that some of the join clauses will be
633 * sent to the remote and thus we wouldn't really need to retrieve the
634 * columns used in them. Doesn't seem worth detecting that case though.)
635 */
636 fpinfo->attrs_used = NULL;
637 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
638 &fpinfo->attrs_used);
639 foreach(lc, fpinfo->local_conds)
640 {
641 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
642
643 pull_varattnos((Node *) rinfo->clause, baserel->relid,
644 &fpinfo->attrs_used);
645 }
646
647 /*
648 * Compute the selectivity and cost of the local_conds, so we don't have
649 * to do it over again for each path. The best we can do for these
650 * conditions is to estimate selectivity on the basis of local statistics.
651 */
652 fpinfo->local_conds_sel = clauselist_selectivity(root,
653 fpinfo->local_conds,
654 baserel->relid,
655 JOIN_INNER,
656 NULL);
657
658 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
659
660 /*
661 * Set # of retrieved rows and cached relation costs to some negative
662 * value, so that we can detect when they are set to some sensible values,
663 * during one (usually the first) of the calls to estimate_path_cost_size.
664 */
665 fpinfo->retrieved_rows = -1;
666 fpinfo->rel_startup_cost = -1;
667 fpinfo->rel_total_cost = -1;
668
669 /*
670 * If the table or the server is configured to use remote estimates,
671 * connect to the foreign server and execute EXPLAIN to estimate the
672 * number of rows selected by the restriction clauses, as well as the
673 * average row width. Otherwise, estimate using whatever statistics we
674 * have locally, in a way similar to ordinary tables.
675 */
676 if (fpinfo->use_remote_estimate)
677 {
678 /*
679 * Get cost/size estimates with help of remote server. Save the
680 * values in fpinfo so we don't need to do it again to generate the
681 * basic foreign path.
682 */
683 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
684 &fpinfo->rows, &fpinfo->width,
685 &fpinfo->startup_cost, &fpinfo->total_cost);
686
687 /* Report estimated baserel size to planner. */
688 baserel->rows = fpinfo->rows;
689 baserel->reltarget->width = fpinfo->width;
690 }
691 else
692 {
693 /*
694 * If the foreign table has never been ANALYZEd, it will have relpages
695 * and reltuples equal to zero, which most likely has nothing to do
696 * with reality. We can't do a whole lot about that if we're not
697 * allowed to consult the remote server, but we can use a hack similar
698 * to plancat.c's treatment of empty relations: use a minimum size
699 * estimate of 10 pages, and divide by the column-datatype-based width
700 * estimate to get the corresponding number of tuples.
701 */
702 if (baserel->pages == 0 && baserel->tuples == 0)
703 {
704 baserel->pages = 10;
705 baserel->tuples =
706 (10 * BLCKSZ) / (baserel->reltarget->width +
707 MAXALIGN(SizeofHeapTupleHeader));
708 }
709
710 /* Estimate baserel size as best we can with local statistics. */
711 set_baserel_size_estimates(root, baserel);
712
713 /* Fill in basically-bogus cost estimates for use later. */
714 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
715 &fpinfo->rows, &fpinfo->width,
716 &fpinfo->startup_cost, &fpinfo->total_cost);
717 }
718
719 /*
720 * Set the name of relation in fpinfo, while we are constructing it here.
721 * It will be used to build the string describing the join relation in
722 * EXPLAIN output. We can't know whether VERBOSE option is specified or
723 * not, so always schema-qualify the foreign table name.
724 */
725 fpinfo->relation_name = makeStringInfo();
726 namespace = get_namespace_name(get_rel_namespace(foreigntableid));
727 relname = get_rel_name(foreigntableid);
728 refname = rte->eref->aliasname;
729 appendStringInfo(fpinfo->relation_name, "%s.%s",
730 quote_identifier(namespace),
731 quote_identifier(relname));
732 if (*refname && strcmp(refname, relname) != 0)
733 appendStringInfo(fpinfo->relation_name, " %s",
734 quote_identifier(rte->eref->aliasname));
735
736 /* No outer and inner relations. */
737 fpinfo->make_outerrel_subquery = false;
738 fpinfo->make_innerrel_subquery = false;
739 fpinfo->lower_subquery_rels = NULL;
740 /* Set the relation index. */
741 fpinfo->relation_index = baserel->relid;
742 }
743
744 /*
745 * get_useful_ecs_for_relation
746 * Determine which EquivalenceClasses might be involved in useful
747 * orderings of this relation.
748 *
749 * This function is in some respects a mirror image of the core function
750 * pathkeys_useful_for_merging: for a regular table, we know what indexes
751 * we have and want to test whether any of them are useful. For a foreign
752 * table, we don't know what indexes are present on the remote side but
753 * want to speculate about which ones we'd like to use if they existed.
754 *
755 * This function returns a list of potentially-useful equivalence classes,
756 * but it does not guarantee that an EquivalenceMember exists which contains
757 * Vars only from the given relation. For example, given ft1 JOIN t1 ON
758 * ft1.x + t1.x = 0, this function will say that the equivalence class
759 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
760 * t1 is local (or on a different server), it will turn out that no useful
761 * ORDER BY clause can be generated. It's not our job to figure that out
762 * here; we're only interested in identifying relevant ECs.
763 */
764 static List *
get_useful_ecs_for_relation(PlannerInfo * root,RelOptInfo * rel)765 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
766 {
767 List *useful_eclass_list = NIL;
768 ListCell *lc;
769 Relids relids;
770
771 /*
772 * First, consider whether any active EC is potentially useful for a merge
773 * join against this relation.
774 */
775 if (rel->has_eclass_joins)
776 {
777 foreach(lc, root->eq_classes)
778 {
779 EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
780
781 if (eclass_useful_for_merging(root, cur_ec, rel))
782 useful_eclass_list = lappend(useful_eclass_list, cur_ec);
783 }
784 }
785
786 /*
787 * Next, consider whether there are any non-EC derivable join clauses that
788 * are merge-joinable. If the joininfo list is empty, we can exit
789 * quickly.
790 */
791 if (rel->joininfo == NIL)
792 return useful_eclass_list;
793
794 /* If this is a child rel, we must use the topmost parent rel to search. */
795 if (IS_OTHER_REL(rel))
796 {
797 Assert(!bms_is_empty(rel->top_parent_relids));
798 relids = rel->top_parent_relids;
799 }
800 else
801 relids = rel->relids;
802
803 /* Check each join clause in turn. */
804 foreach(lc, rel->joininfo)
805 {
806 RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
807
808 /* Consider only mergejoinable clauses */
809 if (restrictinfo->mergeopfamilies == NIL)
810 continue;
811
812 /* Make sure we've got canonical ECs. */
813 update_mergeclause_eclasses(root, restrictinfo);
814
815 /*
816 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
817 * that left_ec and right_ec will be initialized, per comments in
818 * distribute_qual_to_rels.
819 *
820 * We want to identify which side of this merge-joinable clause
821 * contains columns from the relation produced by this RelOptInfo. We
822 * test for overlap, not containment, because there could be extra
823 * relations on either side. For example, suppose we've got something
824 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
825 * A.y = D.y. The input rel might be the joinrel between A and B, and
826 * we'll consider the join clause A.y = D.y. relids contains a
827 * relation not involved in the join class (B) and the equivalence
828 * class for the left-hand side of the clause contains a relation not
829 * involved in the input rel (C). Despite the fact that we have only
830 * overlap and not containment in either direction, A.y is potentially
831 * useful as a sort column.
832 *
833 * Note that it's even possible that relids overlaps neither side of
834 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
835 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
836 * but overlaps neither side of B. In that case, we just skip this
837 * join clause, since it doesn't suggest a useful sort order for this
838 * relation.
839 */
840 if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
841 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
842 restrictinfo->right_ec);
843 else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
844 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
845 restrictinfo->left_ec);
846 }
847
848 return useful_eclass_list;
849 }
850
851 /*
852 * get_useful_pathkeys_for_relation
853 * Determine which orderings of a relation might be useful.
854 *
855 * Getting data in sorted order can be useful either because the requested
856 * order matches the final output ordering for the overall query we're
857 * planning, or because it enables an efficient merge join. Here, we try
858 * to figure out which pathkeys to consider.
859 */
860 static List *
get_useful_pathkeys_for_relation(PlannerInfo * root,RelOptInfo * rel)861 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
862 {
863 List *useful_pathkeys_list = NIL;
864 List *useful_eclass_list;
865 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
866 EquivalenceClass *query_ec = NULL;
867 ListCell *lc;
868
869 /*
870 * Pushing the query_pathkeys to the remote server is always worth
871 * considering, because it might let us avoid a local sort.
872 */
873 fpinfo->qp_is_pushdown_safe = false;
874 if (root->query_pathkeys)
875 {
876 bool query_pathkeys_ok = true;
877
878 foreach(lc, root->query_pathkeys)
879 {
880 PathKey *pathkey = (PathKey *) lfirst(lc);
881 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
882 Expr *em_expr;
883
884 /*
885 * The planner and executor don't have any clever strategy for
886 * taking data sorted by a prefix of the query's pathkeys and
887 * getting it to be sorted by all of those pathkeys. We'll just
888 * end up resorting the entire data set. So, unless we can push
889 * down all of the query pathkeys, forget it.
890 *
891 * is_foreign_expr would detect volatile expressions as well, but
892 * checking ec_has_volatile here saves some cycles.
893 */
894 if (pathkey_ec->ec_has_volatile ||
895 !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
896 !is_foreign_expr(root, rel, em_expr))
897 {
898 query_pathkeys_ok = false;
899 break;
900 }
901 }
902
903 if (query_pathkeys_ok)
904 {
905 useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
906 fpinfo->qp_is_pushdown_safe = true;
907 }
908 }
909
910 /*
911 * Even if we're not using remote estimates, having the remote side do the
912 * sort generally won't be any worse than doing it locally, and it might
913 * be much better if the remote side can generate data in the right order
914 * without needing a sort at all. However, what we're going to do next is
915 * try to generate pathkeys that seem promising for possible merge joins,
916 * and that's more speculative. A wrong choice might hurt quite a bit, so
917 * bail out if we can't use remote estimates.
918 */
919 if (!fpinfo->use_remote_estimate)
920 return useful_pathkeys_list;
921
922 /* Get the list of interesting EquivalenceClasses. */
923 useful_eclass_list = get_useful_ecs_for_relation(root, rel);
924
925 /* Extract unique EC for query, if any, so we don't consider it again. */
926 if (list_length(root->query_pathkeys) == 1)
927 {
928 PathKey *query_pathkey = linitial(root->query_pathkeys);
929
930 query_ec = query_pathkey->pk_eclass;
931 }
932
933 /*
934 * As a heuristic, the only pathkeys we consider here are those of length
935 * one. It's surely possible to consider more, but since each one we
936 * choose to consider will generate a round-trip to the remote side, we
937 * need to be a bit cautious here. It would sure be nice to have a local
938 * cache of information about remote index definitions...
939 */
940 foreach(lc, useful_eclass_list)
941 {
942 EquivalenceClass *cur_ec = lfirst(lc);
943 Expr *em_expr;
944 PathKey *pathkey;
945
946 /* If redundant with what we did above, skip it. */
947 if (cur_ec == query_ec)
948 continue;
949
950 /* If no pushable expression for this rel, skip it. */
951 em_expr = find_em_expr_for_rel(cur_ec, rel);
952 if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
953 continue;
954
955 /* Looks like we can generate a pathkey, so let's do it. */
956 pathkey = make_canonical_pathkey(root, cur_ec,
957 linitial_oid(cur_ec->ec_opfamilies),
958 BTLessStrategyNumber,
959 false);
960 useful_pathkeys_list = lappend(useful_pathkeys_list,
961 list_make1(pathkey));
962 }
963
964 return useful_pathkeys_list;
965 }
966
967 /*
968 * postgresGetForeignPaths
969 * Create possible scan paths for a scan on the foreign table
970 */
971 static void
postgresGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)972 postgresGetForeignPaths(PlannerInfo *root,
973 RelOptInfo *baserel,
974 Oid foreigntableid)
975 {
976 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
977 ForeignPath *path;
978 List *ppi_list;
979 ListCell *lc;
980
981 /*
982 * Create simplest ForeignScan path node and add it to baserel. This path
983 * corresponds to SeqScan path of regular tables (though depending on what
984 * baserestrict conditions we were able to send to remote, there might
985 * actually be an indexscan happening there). We already did all the work
986 * to estimate cost and size of this path.
987 *
988 * Although this path uses no join clauses, it could still have required
989 * parameterization due to LATERAL refs in its tlist.
990 */
991 path = create_foreignscan_path(root, baserel,
992 NULL, /* default pathtarget */
993 fpinfo->rows,
994 fpinfo->startup_cost,
995 fpinfo->total_cost,
996 NIL, /* no pathkeys */
997 baserel->lateral_relids,
998 NULL, /* no extra plan */
999 NIL); /* no fdw_private list */
1000 add_path(baserel, (Path *) path);
1001
1002 /* Add paths with pathkeys */
1003 add_paths_with_pathkeys_for_rel(root, baserel, NULL);
1004
1005 /*
1006 * If we're not using remote estimates, stop here. We have no way to
1007 * estimate whether any join clauses would be worth sending across, so
1008 * don't bother building parameterized paths.
1009 */
1010 if (!fpinfo->use_remote_estimate)
1011 return;
1012
1013 /*
1014 * Thumb through all join clauses for the rel to identify which outer
1015 * relations could supply one or more safe-to-send-to-remote join clauses.
1016 * We'll build a parameterized path for each such outer relation.
1017 *
1018 * It's convenient to manage this by representing each candidate outer
1019 * relation by the ParamPathInfo node for it. We can then use the
1020 * ppi_clauses list in the ParamPathInfo node directly as a list of the
1021 * interesting join clauses for that rel. This takes care of the
1022 * possibility that there are multiple safe join clauses for such a rel,
1023 * and also ensures that we account for unsafe join clauses that we'll
1024 * still have to enforce locally (since the parameterized-path machinery
1025 * insists that we handle all movable clauses).
1026 */
1027 ppi_list = NIL;
1028 foreach(lc, baserel->joininfo)
1029 {
1030 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1031 Relids required_outer;
1032 ParamPathInfo *param_info;
1033
1034 /* Check if clause can be moved to this rel */
1035 if (!join_clause_is_movable_to(rinfo, baserel))
1036 continue;
1037
1038 /* See if it is safe to send to remote */
1039 if (!is_foreign_expr(root, baserel, rinfo->clause))
1040 continue;
1041
1042 /* Calculate required outer rels for the resulting path */
1043 required_outer = bms_union(rinfo->clause_relids,
1044 baserel->lateral_relids);
1045 /* We do not want the foreign rel itself listed in required_outer */
1046 required_outer = bms_del_member(required_outer, baserel->relid);
1047
1048 /*
1049 * required_outer probably can't be empty here, but if it were, we
1050 * couldn't make a parameterized path.
1051 */
1052 if (bms_is_empty(required_outer))
1053 continue;
1054
1055 /* Get the ParamPathInfo */
1056 param_info = get_baserel_parampathinfo(root, baserel,
1057 required_outer);
1058 Assert(param_info != NULL);
1059
1060 /*
1061 * Add it to list unless we already have it. Testing pointer equality
1062 * is OK since get_baserel_parampathinfo won't make duplicates.
1063 */
1064 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1065 }
1066
1067 /*
1068 * The above scan examined only "generic" join clauses, not those that
1069 * were absorbed into EquivalenceClauses. See if we can make anything out
1070 * of EquivalenceClauses.
1071 */
1072 if (baserel->has_eclass_joins)
1073 {
1074 /*
1075 * We repeatedly scan the eclass list looking for column references
1076 * (or expressions) belonging to the foreign rel. Each time we find
1077 * one, we generate a list of equivalence joinclauses for it, and then
1078 * see if any are safe to send to the remote. Repeat till there are
1079 * no more candidate EC members.
1080 */
1081 ec_member_foreign_arg arg;
1082
1083 arg.already_used = NIL;
1084 for (;;)
1085 {
1086 List *clauses;
1087
1088 /* Make clauses, skipping any that join to lateral_referencers */
1089 arg.current = NULL;
1090 clauses = generate_implied_equalities_for_column(root,
1091 baserel,
1092 ec_member_matches_foreign,
1093 (void *) &arg,
1094 baserel->lateral_referencers);
1095
1096 /* Done if there are no more expressions in the foreign rel */
1097 if (arg.current == NULL)
1098 {
1099 Assert(clauses == NIL);
1100 break;
1101 }
1102
1103 /* Scan the extracted join clauses */
1104 foreach(lc, clauses)
1105 {
1106 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1107 Relids required_outer;
1108 ParamPathInfo *param_info;
1109
1110 /* Check if clause can be moved to this rel */
1111 if (!join_clause_is_movable_to(rinfo, baserel))
1112 continue;
1113
1114 /* See if it is safe to send to remote */
1115 if (!is_foreign_expr(root, baserel, rinfo->clause))
1116 continue;
1117
1118 /* Calculate required outer rels for the resulting path */
1119 required_outer = bms_union(rinfo->clause_relids,
1120 baserel->lateral_relids);
1121 required_outer = bms_del_member(required_outer, baserel->relid);
1122 if (bms_is_empty(required_outer))
1123 continue;
1124
1125 /* Get the ParamPathInfo */
1126 param_info = get_baserel_parampathinfo(root, baserel,
1127 required_outer);
1128 Assert(param_info != NULL);
1129
1130 /* Add it to list unless we already have it */
1131 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1132 }
1133
1134 /* Try again, now ignoring the expression we found this time */
1135 arg.already_used = lappend(arg.already_used, arg.current);
1136 }
1137 }
1138
1139 /*
1140 * Now build a path for each useful outer relation.
1141 */
1142 foreach(lc, ppi_list)
1143 {
1144 ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1145 double rows;
1146 int width;
1147 Cost startup_cost;
1148 Cost total_cost;
1149
1150 /* Get a cost estimate from the remote */
1151 estimate_path_cost_size(root, baserel,
1152 param_info->ppi_clauses, NIL, NULL,
1153 &rows, &width,
1154 &startup_cost, &total_cost);
1155
1156 /*
1157 * ppi_rows currently won't get looked at by anything, but still we
1158 * may as well ensure that it matches our idea of the rowcount.
1159 */
1160 param_info->ppi_rows = rows;
1161
1162 /* Make the path */
1163 path = create_foreignscan_path(root, baserel,
1164 NULL, /* default pathtarget */
1165 rows,
1166 startup_cost,
1167 total_cost,
1168 NIL, /* no pathkeys */
1169 param_info->ppi_req_outer,
1170 NULL,
1171 NIL); /* no fdw_private list */
1172 add_path(baserel, (Path *) path);
1173 }
1174 }
1175
1176 /*
1177 * postgresGetForeignPlan
1178 * Create ForeignScan plan node which implements selected best path
1179 */
1180 static ForeignScan *
postgresGetForeignPlan(PlannerInfo * root,RelOptInfo * foreignrel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)1181 postgresGetForeignPlan(PlannerInfo *root,
1182 RelOptInfo *foreignrel,
1183 Oid foreigntableid,
1184 ForeignPath *best_path,
1185 List *tlist,
1186 List *scan_clauses,
1187 Plan *outer_plan)
1188 {
1189 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1190 Index scan_relid;
1191 List *fdw_private;
1192 List *remote_exprs = NIL;
1193 List *local_exprs = NIL;
1194 List *params_list = NIL;
1195 List *fdw_scan_tlist = NIL;
1196 List *fdw_recheck_quals = NIL;
1197 List *retrieved_attrs;
1198 StringInfoData sql;
1199 bool has_final_sort = false;
1200 bool has_limit = false;
1201 ListCell *lc;
1202
1203 /*
1204 * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1205 */
1206 if (best_path->fdw_private)
1207 {
1208 has_final_sort = intVal(list_nth(best_path->fdw_private,
1209 FdwPathPrivateHasFinalSort));
1210 has_limit = intVal(list_nth(best_path->fdw_private,
1211 FdwPathPrivateHasLimit));
1212 }
1213
1214 if (IS_SIMPLE_REL(foreignrel))
1215 {
1216 /*
1217 * For base relations, set scan_relid as the relid of the relation.
1218 */
1219 scan_relid = foreignrel->relid;
1220
1221 /*
1222 * In a base-relation scan, we must apply the given scan_clauses.
1223 *
1224 * Separate the scan_clauses into those that can be executed remotely
1225 * and those that can't. baserestrictinfo clauses that were
1226 * previously determined to be safe or unsafe by classifyConditions
1227 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1228 * else in the scan_clauses list will be a join clause, which we have
1229 * to check for remote-safety.
1230 *
1231 * Note: the join clauses we see here should be the exact same ones
1232 * previously examined by postgresGetForeignPaths. Possibly it'd be
1233 * worth passing forward the classification work done then, rather
1234 * than repeating it here.
1235 *
1236 * This code must match "extract_actual_clauses(scan_clauses, false)"
1237 * except for the additional decision about remote versus local
1238 * execution.
1239 */
1240 foreach(lc, scan_clauses)
1241 {
1242 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1243
1244 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1245 if (rinfo->pseudoconstant)
1246 continue;
1247
1248 if (list_member_ptr(fpinfo->remote_conds, rinfo))
1249 remote_exprs = lappend(remote_exprs, rinfo->clause);
1250 else if (list_member_ptr(fpinfo->local_conds, rinfo))
1251 local_exprs = lappend(local_exprs, rinfo->clause);
1252 else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1253 remote_exprs = lappend(remote_exprs, rinfo->clause);
1254 else
1255 local_exprs = lappend(local_exprs, rinfo->clause);
1256 }
1257
1258 /*
1259 * For a base-relation scan, we have to support EPQ recheck, which
1260 * should recheck all the remote quals.
1261 */
1262 fdw_recheck_quals = remote_exprs;
1263 }
1264 else
1265 {
1266 /*
1267 * Join relation or upper relation - set scan_relid to 0.
1268 */
1269 scan_relid = 0;
1270
1271 /*
1272 * For a join rel, baserestrictinfo is NIL and we are not considering
1273 * parameterization right now, so there should be no scan_clauses for
1274 * a joinrel or an upper rel either.
1275 */
1276 Assert(!scan_clauses);
1277
1278 /*
1279 * Instead we get the conditions to apply from the fdw_private
1280 * structure.
1281 */
1282 remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1283 local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1284
1285 /*
1286 * We leave fdw_recheck_quals empty in this case, since we never need
1287 * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1288 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1289 * If we're planning an upperrel (ie, remote grouping or aggregation)
1290 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1291 * allowed, and indeed we *can't* put the remote clauses into
1292 * fdw_recheck_quals because the unaggregated Vars won't be available
1293 * locally.
1294 */
1295
1296 /* Build the list of columns to be fetched from the foreign server. */
1297 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1298
1299 /*
1300 * Ensure that the outer plan produces a tuple whose descriptor
1301 * matches our scan tuple slot. Also, remove the local conditions
1302 * from outer plan's quals, lest they be evaluated twice, once by the
1303 * local plan and once by the scan.
1304 */
1305 if (outer_plan)
1306 {
1307 ListCell *lc;
1308
1309 /*
1310 * Right now, we only consider grouping and aggregation beyond
1311 * joins. Queries involving aggregates or grouping do not require
1312 * EPQ mechanism, hence should not have an outer plan here.
1313 */
1314 Assert(!IS_UPPER_REL(foreignrel));
1315
1316 /*
1317 * First, update the plan's qual list if possible. In some cases
1318 * the quals might be enforced below the topmost plan level, in
1319 * which case we'll fail to remove them; it's not worth working
1320 * harder than this.
1321 */
1322 foreach(lc, local_exprs)
1323 {
1324 Node *qual = lfirst(lc);
1325
1326 outer_plan->qual = list_delete(outer_plan->qual, qual);
1327
1328 /*
1329 * For an inner join the local conditions of foreign scan plan
1330 * can be part of the joinquals as well. (They might also be
1331 * in the mergequals or hashquals, but we can't touch those
1332 * without breaking the plan.)
1333 */
1334 if (IsA(outer_plan, NestLoop) ||
1335 IsA(outer_plan, MergeJoin) ||
1336 IsA(outer_plan, HashJoin))
1337 {
1338 Join *join_plan = (Join *) outer_plan;
1339
1340 if (join_plan->jointype == JOIN_INNER)
1341 join_plan->joinqual = list_delete(join_plan->joinqual,
1342 qual);
1343 }
1344 }
1345
1346 /*
1347 * Now fix the subplan's tlist --- this might result in inserting
1348 * a Result node atop the plan tree.
1349 */
1350 outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1351 best_path->path.parallel_safe);
1352 }
1353 }
1354
1355 /*
1356 * Build the query string to be sent for execution, and identify
1357 * expressions to be sent as parameters.
1358 */
1359 initStringInfo(&sql);
1360 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1361 remote_exprs, best_path->path.pathkeys,
1362 has_final_sort, has_limit, false,
1363 &retrieved_attrs, ¶ms_list);
1364
1365 /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1366 fpinfo->final_remote_exprs = remote_exprs;
1367
1368 /*
1369 * Build the fdw_private list that will be available to the executor.
1370 * Items in the list must match order in enum FdwScanPrivateIndex.
1371 */
1372 fdw_private = list_make3(makeString(sql.data),
1373 retrieved_attrs,
1374 makeInteger(fpinfo->fetch_size));
1375 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1376 fdw_private = lappend(fdw_private,
1377 makeString(fpinfo->relation_name->data));
1378
1379 /*
1380 * Create the ForeignScan node for the given relation.
1381 *
1382 * Note that the remote parameter expressions are stored in the fdw_exprs
1383 * field of the finished plan node; we can't keep them in private state
1384 * because then they wouldn't be subject to later planner processing.
1385 */
1386 return make_foreignscan(tlist,
1387 local_exprs,
1388 scan_relid,
1389 params_list,
1390 fdw_private,
1391 fdw_scan_tlist,
1392 fdw_recheck_quals,
1393 outer_plan);
1394 }
1395
1396 /*
1397 * postgresBeginForeignScan
1398 * Initiate an executor scan of a foreign PostgreSQL table.
1399 */
1400 static void
postgresBeginForeignScan(ForeignScanState * node,int eflags)1401 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1402 {
1403 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1404 EState *estate = node->ss.ps.state;
1405 PgFdwScanState *fsstate;
1406 RangeTblEntry *rte;
1407 Oid userid;
1408 ForeignTable *table;
1409 UserMapping *user;
1410 int rtindex;
1411 int numParams;
1412
1413 /*
1414 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1415 */
1416 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1417 return;
1418
1419 /*
1420 * We'll save private state in node->fdw_state.
1421 */
1422 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1423 node->fdw_state = (void *) fsstate;
1424
1425 /*
1426 * Identify which user to do the remote access as. This should match what
1427 * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1428 * lowest-numbered member RTE as a representative; we would get the same
1429 * result from any.
1430 */
1431 if (fsplan->scan.scanrelid > 0)
1432 rtindex = fsplan->scan.scanrelid;
1433 else
1434 rtindex = bms_next_member(fsplan->fs_relids, -1);
1435 rte = exec_rt_fetch(rtindex, estate);
1436 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1437
1438 /* Get info about foreign table. */
1439 table = GetForeignTable(rte->relid);
1440 user = GetUserMapping(userid, table->serverid);
1441
1442 /*
1443 * Get connection to the foreign server. Connection manager will
1444 * establish new connection if necessary.
1445 */
1446 fsstate->conn = GetConnection(user, false);
1447
1448 /* Assign a unique ID for my cursor */
1449 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1450 fsstate->cursor_exists = false;
1451
1452 /* Get private info created by planner functions. */
1453 fsstate->query = strVal(list_nth(fsplan->fdw_private,
1454 FdwScanPrivateSelectSql));
1455 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1456 FdwScanPrivateRetrievedAttrs);
1457 fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1458 FdwScanPrivateFetchSize));
1459
1460 /* Create contexts for batches of tuples and per-tuple temp workspace. */
1461 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1462 "postgres_fdw tuple data",
1463 ALLOCSET_DEFAULT_SIZES);
1464 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1465 "postgres_fdw temporary data",
1466 ALLOCSET_SMALL_SIZES);
1467
1468 /*
1469 * Get info we'll need for converting data fetched from the foreign server
1470 * into local representation and error reporting during that process.
1471 */
1472 if (fsplan->scan.scanrelid > 0)
1473 {
1474 fsstate->rel = node->ss.ss_currentRelation;
1475 fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1476 }
1477 else
1478 {
1479 fsstate->rel = NULL;
1480 fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1481 }
1482
1483 fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1484
1485 /*
1486 * Prepare for processing of parameters used in remote query, if any.
1487 */
1488 numParams = list_length(fsplan->fdw_exprs);
1489 fsstate->numParams = numParams;
1490 if (numParams > 0)
1491 prepare_query_params((PlanState *) node,
1492 fsplan->fdw_exprs,
1493 numParams,
1494 &fsstate->param_flinfo,
1495 &fsstate->param_exprs,
1496 &fsstate->param_values);
1497 }
1498
1499 /*
1500 * postgresIterateForeignScan
1501 * Retrieve next row from the result set, or clear tuple slot to indicate
1502 * EOF.
1503 */
1504 static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState * node)1505 postgresIterateForeignScan(ForeignScanState *node)
1506 {
1507 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1508 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1509
1510 /*
1511 * If this is the first call after Begin or ReScan, we need to create the
1512 * cursor on the remote side.
1513 */
1514 if (!fsstate->cursor_exists)
1515 create_cursor(node);
1516
1517 /*
1518 * Get some more tuples, if we've run out.
1519 */
1520 if (fsstate->next_tuple >= fsstate->num_tuples)
1521 {
1522 /* No point in another fetch if we already detected EOF, though. */
1523 if (!fsstate->eof_reached)
1524 fetch_more_data(node);
1525 /* If we didn't get any tuples, must be end of data. */
1526 if (fsstate->next_tuple >= fsstate->num_tuples)
1527 return ExecClearTuple(slot);
1528 }
1529
1530 /*
1531 * Return the next tuple.
1532 */
1533 ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1534 slot,
1535 false);
1536
1537 return slot;
1538 }
1539
1540 /*
1541 * postgresReScanForeignScan
1542 * Restart the scan.
1543 */
1544 static void
postgresReScanForeignScan(ForeignScanState * node)1545 postgresReScanForeignScan(ForeignScanState *node)
1546 {
1547 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1548 char sql[64];
1549 PGresult *res;
1550
1551 /* If we haven't created the cursor yet, nothing to do. */
1552 if (!fsstate->cursor_exists)
1553 return;
1554
1555 /*
1556 * If any internal parameters affecting this node have changed, we'd
1557 * better destroy and recreate the cursor. Otherwise, rewinding it should
1558 * be good enough. If we've only fetched zero or one batch, we needn't
1559 * even rewind the cursor, just rescan what we have.
1560 */
1561 if (node->ss.ps.chgParam != NULL)
1562 {
1563 fsstate->cursor_exists = false;
1564 snprintf(sql, sizeof(sql), "CLOSE c%u",
1565 fsstate->cursor_number);
1566 }
1567 else if (fsstate->fetch_ct_2 > 1)
1568 {
1569 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1570 fsstate->cursor_number);
1571 }
1572 else
1573 {
1574 /* Easy: just rescan what we already have in memory, if anything */
1575 fsstate->next_tuple = 0;
1576 return;
1577 }
1578
1579 /*
1580 * We don't use a PG_TRY block here, so be careful not to throw error
1581 * without releasing the PGresult.
1582 */
1583 res = pgfdw_exec_query(fsstate->conn, sql);
1584 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1585 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1586 PQclear(res);
1587
1588 /* Now force a fresh FETCH. */
1589 fsstate->tuples = NULL;
1590 fsstate->num_tuples = 0;
1591 fsstate->next_tuple = 0;
1592 fsstate->fetch_ct_2 = 0;
1593 fsstate->eof_reached = false;
1594 }
1595
1596 /*
1597 * postgresEndForeignScan
1598 * Finish scanning foreign table and dispose objects used for this scan
1599 */
1600 static void
postgresEndForeignScan(ForeignScanState * node)1601 postgresEndForeignScan(ForeignScanState *node)
1602 {
1603 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1604
1605 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1606 if (fsstate == NULL)
1607 return;
1608
1609 /* Close the cursor if open, to prevent accumulation of cursors */
1610 if (fsstate->cursor_exists)
1611 close_cursor(fsstate->conn, fsstate->cursor_number);
1612
1613 /* Release remote connection */
1614 ReleaseConnection(fsstate->conn);
1615 fsstate->conn = NULL;
1616
1617 /* MemoryContexts will be deleted automatically. */
1618 }
1619
1620 /*
1621 * postgresAddForeignUpdateTargets
1622 * Add resjunk column(s) needed for update/delete on a foreign table
1623 */
1624 static void
postgresAddForeignUpdateTargets(Query * parsetree,RangeTblEntry * target_rte,Relation target_relation)1625 postgresAddForeignUpdateTargets(Query *parsetree,
1626 RangeTblEntry *target_rte,
1627 Relation target_relation)
1628 {
1629 Var *var;
1630 const char *attrname;
1631 TargetEntry *tle;
1632
1633 /*
1634 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1635 */
1636
1637 /* Make a Var representing the desired value */
1638 var = makeVar(parsetree->resultRelation,
1639 SelfItemPointerAttributeNumber,
1640 TIDOID,
1641 -1,
1642 InvalidOid,
1643 0);
1644
1645 /* Wrap it in a resjunk TLE with the right name ... */
1646 attrname = "ctid";
1647
1648 tle = makeTargetEntry((Expr *) var,
1649 list_length(parsetree->targetList) + 1,
1650 pstrdup(attrname),
1651 true);
1652
1653 /* ... and add it to the query's targetlist */
1654 parsetree->targetList = lappend(parsetree->targetList, tle);
1655 }
1656
1657 /*
1658 * postgresPlanForeignModify
1659 * Plan an insert/update/delete operation on a foreign table
1660 */
1661 static List *
postgresPlanForeignModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)1662 postgresPlanForeignModify(PlannerInfo *root,
1663 ModifyTable *plan,
1664 Index resultRelation,
1665 int subplan_index)
1666 {
1667 CmdType operation = plan->operation;
1668 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1669 Relation rel;
1670 StringInfoData sql;
1671 List *targetAttrs = NIL;
1672 List *withCheckOptionList = NIL;
1673 List *returningList = NIL;
1674 List *retrieved_attrs = NIL;
1675 bool doNothing = false;
1676
1677 initStringInfo(&sql);
1678
1679 /*
1680 * Core code already has some lock on each rel being planned, so we can
1681 * use NoLock here.
1682 */
1683 rel = table_open(rte->relid, NoLock);
1684
1685 /*
1686 * In an INSERT, we transmit all columns that are defined in the foreign
1687 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1688 * foreign table, we transmit all columns like INSERT; else we transmit
1689 * only columns that were explicitly targets of the UPDATE, so as to avoid
1690 * unnecessary data transmission. (We can't do that for INSERT since we
1691 * would miss sending default values for columns not listed in the source
1692 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1693 * those triggers might change values for non-target columns, in which
1694 * case we would miss sending changed values for those columns.)
1695 */
1696 if (operation == CMD_INSERT ||
1697 (operation == CMD_UPDATE &&
1698 rel->trigdesc &&
1699 rel->trigdesc->trig_update_before_row))
1700 {
1701 TupleDesc tupdesc = RelationGetDescr(rel);
1702 int attnum;
1703
1704 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1705 {
1706 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1707
1708 if (!attr->attisdropped)
1709 targetAttrs = lappend_int(targetAttrs, attnum);
1710 }
1711 }
1712 else if (operation == CMD_UPDATE)
1713 {
1714 int col;
1715 Bitmapset *allUpdatedCols = bms_union(rte->updatedCols, rte->extraUpdatedCols);
1716
1717 col = -1;
1718 while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1719 {
1720 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1721 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
1722
1723 if (attno <= InvalidAttrNumber) /* shouldn't happen */
1724 elog(ERROR, "system-column update is not supported");
1725 targetAttrs = lappend_int(targetAttrs, attno);
1726 }
1727 }
1728
1729 /*
1730 * Extract the relevant WITH CHECK OPTION list if any.
1731 */
1732 if (plan->withCheckOptionLists)
1733 withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1734 subplan_index);
1735
1736 /*
1737 * Extract the relevant RETURNING list if any.
1738 */
1739 if (plan->returningLists)
1740 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1741
1742 /*
1743 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1744 * should have already been rejected in the optimizer, as presently there
1745 * is no way to recognize an arbiter index on a foreign table. Only DO
1746 * NOTHING is supported without an inference specification.
1747 */
1748 if (plan->onConflictAction == ONCONFLICT_NOTHING)
1749 doNothing = true;
1750 else if (plan->onConflictAction != ONCONFLICT_NONE)
1751 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1752 (int) plan->onConflictAction);
1753
1754 /*
1755 * Construct the SQL command string.
1756 */
1757 switch (operation)
1758 {
1759 case CMD_INSERT:
1760 deparseInsertSql(&sql, rte, resultRelation, rel,
1761 targetAttrs, doNothing,
1762 withCheckOptionList, returningList,
1763 &retrieved_attrs);
1764 break;
1765 case CMD_UPDATE:
1766 deparseUpdateSql(&sql, rte, resultRelation, rel,
1767 targetAttrs,
1768 withCheckOptionList, returningList,
1769 &retrieved_attrs);
1770 break;
1771 case CMD_DELETE:
1772 deparseDeleteSql(&sql, rte, resultRelation, rel,
1773 returningList,
1774 &retrieved_attrs);
1775 break;
1776 default:
1777 elog(ERROR, "unexpected operation: %d", (int) operation);
1778 break;
1779 }
1780
1781 table_close(rel, NoLock);
1782
1783 /*
1784 * Build the fdw_private list that will be available to the executor.
1785 * Items in the list must match enum FdwModifyPrivateIndex, above.
1786 */
1787 return list_make4(makeString(sql.data),
1788 targetAttrs,
1789 makeInteger((retrieved_attrs != NIL)),
1790 retrieved_attrs);
1791 }
1792
1793 /*
1794 * postgresBeginForeignModify
1795 * Begin an insert/update/delete operation on a foreign table
1796 */
1797 static void
postgresBeginForeignModify(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo,List * fdw_private,int subplan_index,int eflags)1798 postgresBeginForeignModify(ModifyTableState *mtstate,
1799 ResultRelInfo *resultRelInfo,
1800 List *fdw_private,
1801 int subplan_index,
1802 int eflags)
1803 {
1804 PgFdwModifyState *fmstate;
1805 char *query;
1806 List *target_attrs;
1807 bool has_returning;
1808 List *retrieved_attrs;
1809 RangeTblEntry *rte;
1810
1811 /*
1812 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1813 * stays NULL.
1814 */
1815 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1816 return;
1817
1818 /* Deconstruct fdw_private data. */
1819 query = strVal(list_nth(fdw_private,
1820 FdwModifyPrivateUpdateSql));
1821 target_attrs = (List *) list_nth(fdw_private,
1822 FdwModifyPrivateTargetAttnums);
1823 has_returning = intVal(list_nth(fdw_private,
1824 FdwModifyPrivateHasReturning));
1825 retrieved_attrs = (List *) list_nth(fdw_private,
1826 FdwModifyPrivateRetrievedAttrs);
1827
1828 /* Find RTE. */
1829 rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1830 mtstate->ps.state);
1831
1832 /* Construct an execution state. */
1833 fmstate = create_foreign_modify(mtstate->ps.state,
1834 rte,
1835 resultRelInfo,
1836 mtstate->operation,
1837 mtstate->mt_plans[subplan_index]->plan,
1838 query,
1839 target_attrs,
1840 has_returning,
1841 retrieved_attrs);
1842
1843 resultRelInfo->ri_FdwState = fmstate;
1844 }
1845
1846 /*
1847 * postgresExecForeignInsert
1848 * Insert one row into a foreign table
1849 */
1850 static TupleTableSlot *
postgresExecForeignInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1851 postgresExecForeignInsert(EState *estate,
1852 ResultRelInfo *resultRelInfo,
1853 TupleTableSlot *slot,
1854 TupleTableSlot *planSlot)
1855 {
1856 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1857 TupleTableSlot *rslot;
1858
1859 /*
1860 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1861 * postgresBeginForeignInsert())
1862 */
1863 if (fmstate->aux_fmstate)
1864 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1865 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1866 slot, planSlot);
1867 /* Revert that change */
1868 if (fmstate->aux_fmstate)
1869 resultRelInfo->ri_FdwState = fmstate;
1870
1871 return rslot;
1872 }
1873
1874 /*
1875 * postgresExecForeignUpdate
1876 * Update one row in a foreign table
1877 */
1878 static TupleTableSlot *
postgresExecForeignUpdate(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1879 postgresExecForeignUpdate(EState *estate,
1880 ResultRelInfo *resultRelInfo,
1881 TupleTableSlot *slot,
1882 TupleTableSlot *planSlot)
1883 {
1884 return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
1885 slot, planSlot);
1886 }
1887
1888 /*
1889 * postgresExecForeignDelete
1890 * Delete one row from a foreign table
1891 */
1892 static TupleTableSlot *
postgresExecForeignDelete(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1893 postgresExecForeignDelete(EState *estate,
1894 ResultRelInfo *resultRelInfo,
1895 TupleTableSlot *slot,
1896 TupleTableSlot *planSlot)
1897 {
1898 return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
1899 slot, planSlot);
1900 }
1901
1902 /*
1903 * postgresEndForeignModify
1904 * Finish an insert/update/delete operation on a foreign table
1905 */
1906 static void
postgresEndForeignModify(EState * estate,ResultRelInfo * resultRelInfo)1907 postgresEndForeignModify(EState *estate,
1908 ResultRelInfo *resultRelInfo)
1909 {
1910 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1911
1912 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1913 if (fmstate == NULL)
1914 return;
1915
1916 /* Destroy the execution state */
1917 finish_foreign_modify(fmstate);
1918 }
1919
1920 /*
1921 * postgresBeginForeignInsert
1922 * Begin an insert operation on a foreign table
1923 */
1924 static void
postgresBeginForeignInsert(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo)1925 postgresBeginForeignInsert(ModifyTableState *mtstate,
1926 ResultRelInfo *resultRelInfo)
1927 {
1928 PgFdwModifyState *fmstate;
1929 ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
1930 EState *estate = mtstate->ps.state;
1931 Index resultRelation;
1932 Relation rel = resultRelInfo->ri_RelationDesc;
1933 RangeTblEntry *rte;
1934 TupleDesc tupdesc = RelationGetDescr(rel);
1935 int attnum;
1936 StringInfoData sql;
1937 List *targetAttrs = NIL;
1938 List *retrieved_attrs = NIL;
1939 bool doNothing = false;
1940
1941 /*
1942 * If the foreign table we are about to insert routed rows into is also an
1943 * UPDATE subplan result rel that will be updated later, proceeding with
1944 * the INSERT will result in the later UPDATE incorrectly modifying those
1945 * routed rows, so prevent the INSERT --- it would be nice if we could
1946 * handle this case; but for now, throw an error for safety.
1947 */
1948 if (plan && plan->operation == CMD_UPDATE &&
1949 (resultRelInfo->ri_usesFdwDirectModify ||
1950 resultRelInfo->ri_FdwState) &&
1951 resultRelInfo > mtstate->resultRelInfo + mtstate->mt_whichplan)
1952 ereport(ERROR,
1953 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1954 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
1955 RelationGetRelationName(rel))));
1956
1957 initStringInfo(&sql);
1958
1959 /* We transmit all columns that are defined in the foreign table. */
1960 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1961 {
1962 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1963
1964 if (!attr->attisdropped)
1965 targetAttrs = lappend_int(targetAttrs, attnum);
1966 }
1967
1968 /* Check if we add the ON CONFLICT clause to the remote query. */
1969 if (plan)
1970 {
1971 OnConflictAction onConflictAction = plan->onConflictAction;
1972
1973 /* We only support DO NOTHING without an inference specification. */
1974 if (onConflictAction == ONCONFLICT_NOTHING)
1975 doNothing = true;
1976 else if (onConflictAction != ONCONFLICT_NONE)
1977 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1978 (int) onConflictAction);
1979 }
1980
1981 /*
1982 * If the foreign table is a partition that doesn't have a corresponding
1983 * RTE entry, we need to create a new RTE
1984 * describing the foreign table for use by deparseInsertSql and
1985 * create_foreign_modify() below, after first copying the parent's RTE and
1986 * modifying some fields to describe the foreign partition to work on.
1987 * However, if this is invoked by UPDATE, the existing RTE may already
1988 * correspond to this partition if it is one of the UPDATE subplan target
1989 * rels; in that case, we can just use the existing RTE as-is.
1990 */
1991 if (resultRelInfo->ri_RangeTableIndex == 0)
1992 {
1993 ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
1994
1995 rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
1996 rte = copyObject(rte);
1997 rte->relid = RelationGetRelid(rel);
1998 rte->relkind = RELKIND_FOREIGN_TABLE;
1999
2000 /*
2001 * For UPDATE, we must use the RT index of the first subplan target
2002 * rel's RTE, because the core code would have built expressions for
2003 * the partition, such as RETURNING, using that RT index as varno of
2004 * Vars contained in those expressions.
2005 */
2006 if (plan && plan->operation == CMD_UPDATE &&
2007 rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
2008 resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2009 else
2010 resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2011 }
2012 else
2013 {
2014 resultRelation = resultRelInfo->ri_RangeTableIndex;
2015 rte = exec_rt_fetch(resultRelation, estate);
2016 }
2017
2018 /* Construct the SQL command string. */
2019 deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2020 resultRelInfo->ri_WithCheckOptions,
2021 resultRelInfo->ri_returningList,
2022 &retrieved_attrs);
2023
2024 /* Construct an execution state. */
2025 fmstate = create_foreign_modify(mtstate->ps.state,
2026 rte,
2027 resultRelInfo,
2028 CMD_INSERT,
2029 NULL,
2030 sql.data,
2031 targetAttrs,
2032 retrieved_attrs != NIL,
2033 retrieved_attrs);
2034
2035 /*
2036 * If the given resultRelInfo already has PgFdwModifyState set, it means
2037 * the foreign table is an UPDATE subplan result rel; in which case, store
2038 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2039 */
2040 if (resultRelInfo->ri_FdwState)
2041 {
2042 Assert(plan && plan->operation == CMD_UPDATE);
2043 Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2044 ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2045 }
2046 else
2047 resultRelInfo->ri_FdwState = fmstate;
2048 }
2049
2050 /*
2051 * postgresEndForeignInsert
2052 * Finish an insert operation on a foreign table
2053 */
2054 static void
postgresEndForeignInsert(EState * estate,ResultRelInfo * resultRelInfo)2055 postgresEndForeignInsert(EState *estate,
2056 ResultRelInfo *resultRelInfo)
2057 {
2058 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2059
2060 Assert(fmstate != NULL);
2061
2062 /*
2063 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2064 * postgresBeginForeignInsert())
2065 */
2066 if (fmstate->aux_fmstate)
2067 fmstate = fmstate->aux_fmstate;
2068
2069 /* Destroy the execution state */
2070 finish_foreign_modify(fmstate);
2071 }
2072
2073 /*
2074 * postgresIsForeignRelUpdatable
2075 * Determine whether a foreign table supports INSERT, UPDATE and/or
2076 * DELETE.
2077 */
2078 static int
postgresIsForeignRelUpdatable(Relation rel)2079 postgresIsForeignRelUpdatable(Relation rel)
2080 {
2081 bool updatable;
2082 ForeignTable *table;
2083 ForeignServer *server;
2084 ListCell *lc;
2085
2086 /*
2087 * By default, all postgres_fdw foreign tables are assumed updatable. This
2088 * can be overridden by a per-server setting, which in turn can be
2089 * overridden by a per-table setting.
2090 */
2091 updatable = true;
2092
2093 table = GetForeignTable(RelationGetRelid(rel));
2094 server = GetForeignServer(table->serverid);
2095
2096 foreach(lc, server->options)
2097 {
2098 DefElem *def = (DefElem *) lfirst(lc);
2099
2100 if (strcmp(def->defname, "updatable") == 0)
2101 updatable = defGetBoolean(def);
2102 }
2103 foreach(lc, table->options)
2104 {
2105 DefElem *def = (DefElem *) lfirst(lc);
2106
2107 if (strcmp(def->defname, "updatable") == 0)
2108 updatable = defGetBoolean(def);
2109 }
2110
2111 /*
2112 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2113 */
2114 return updatable ?
2115 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2116 }
2117
2118 /*
2119 * postgresRecheckForeignScan
2120 * Execute a local join execution plan for a foreign join
2121 */
2122 static bool
postgresRecheckForeignScan(ForeignScanState * node,TupleTableSlot * slot)2123 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2124 {
2125 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2126 PlanState *outerPlan = outerPlanState(node);
2127 TupleTableSlot *result;
2128
2129 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2130 if (scanrelid > 0)
2131 return true;
2132
2133 Assert(outerPlan != NULL);
2134
2135 /* Execute a local join execution plan */
2136 result = ExecProcNode(outerPlan);
2137 if (TupIsNull(result))
2138 return false;
2139
2140 /* Store result in the given slot */
2141 ExecCopySlot(slot, result);
2142
2143 return true;
2144 }
2145
2146 /*
2147 * postgresPlanDirectModify
2148 * Consider a direct foreign table modification
2149 *
2150 * Decide whether it is safe to modify a foreign table directly, and if so,
2151 * rewrite subplan accordingly.
2152 */
2153 static bool
postgresPlanDirectModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)2154 postgresPlanDirectModify(PlannerInfo *root,
2155 ModifyTable *plan,
2156 Index resultRelation,
2157 int subplan_index)
2158 {
2159 CmdType operation = plan->operation;
2160 Plan *subplan;
2161 RelOptInfo *foreignrel;
2162 RangeTblEntry *rte;
2163 PgFdwRelationInfo *fpinfo;
2164 Relation rel;
2165 StringInfoData sql;
2166 ForeignScan *fscan;
2167 List *targetAttrs = NIL;
2168 List *remote_exprs;
2169 List *params_list = NIL;
2170 List *returningList = NIL;
2171 List *retrieved_attrs = NIL;
2172
2173 /*
2174 * Decide whether it is safe to modify a foreign table directly.
2175 */
2176
2177 /*
2178 * The table modification must be an UPDATE or DELETE.
2179 */
2180 if (operation != CMD_UPDATE && operation != CMD_DELETE)
2181 return false;
2182
2183 /*
2184 * It's unsafe to modify a foreign table directly if there are any local
2185 * joins needed.
2186 */
2187 subplan = (Plan *) list_nth(plan->plans, subplan_index);
2188 if (!IsA(subplan, ForeignScan))
2189 return false;
2190 fscan = (ForeignScan *) subplan;
2191
2192 /*
2193 * It's unsafe to modify a foreign table directly if there are any quals
2194 * that should be evaluated locally.
2195 */
2196 if (subplan->qual != NIL)
2197 return false;
2198
2199 /* Safe to fetch data about the target foreign rel */
2200 if (fscan->scan.scanrelid == 0)
2201 {
2202 foreignrel = find_join_rel(root, fscan->fs_relids);
2203 /* We should have a rel for this foreign join. */
2204 Assert(foreignrel);
2205 }
2206 else
2207 foreignrel = root->simple_rel_array[resultRelation];
2208 rte = root->simple_rte_array[resultRelation];
2209 fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2210
2211 /*
2212 * It's unsafe to update a foreign table directly, if any expressions to
2213 * assign to the target columns are unsafe to evaluate remotely.
2214 */
2215 if (operation == CMD_UPDATE)
2216 {
2217 int col;
2218
2219 /*
2220 * We transmit only columns that were explicitly targets of the
2221 * UPDATE, so as to avoid unnecessary data transmission.
2222 */
2223 col = -1;
2224 while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2225 {
2226 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2227 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
2228 TargetEntry *tle;
2229
2230 if (attno <= InvalidAttrNumber) /* shouldn't happen */
2231 elog(ERROR, "system-column update is not supported");
2232
2233 tle = get_tle_by_resno(subplan->targetlist, attno);
2234
2235 if (!tle)
2236 elog(ERROR, "attribute number %d not found in subplan targetlist",
2237 attno);
2238
2239 if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2240 return false;
2241
2242 targetAttrs = lappend_int(targetAttrs, attno);
2243 }
2244 }
2245
2246 /*
2247 * Ok, rewrite subplan so as to modify the foreign table directly.
2248 */
2249 initStringInfo(&sql);
2250
2251 /*
2252 * Core code already has some lock on each rel being planned, so we can
2253 * use NoLock here.
2254 */
2255 rel = table_open(rte->relid, NoLock);
2256
2257 /*
2258 * Recall the qual clauses that must be evaluated remotely. (These are
2259 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2260 * doesn't care.)
2261 */
2262 remote_exprs = fpinfo->final_remote_exprs;
2263
2264 /*
2265 * Extract the relevant RETURNING list if any.
2266 */
2267 if (plan->returningLists)
2268 {
2269 returningList = (List *) list_nth(plan->returningLists, subplan_index);
2270
2271 /*
2272 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2273 * we fetch from the foreign server any Vars specified in RETURNING
2274 * that refer not only to the target relation but to non-target
2275 * relations. So we'll deparse them into the RETURNING clause of the
2276 * remote query; use a targetlist consisting of them instead, which
2277 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2278 * node below.
2279 */
2280 if (fscan->scan.scanrelid == 0)
2281 returningList = build_remote_returning(resultRelation, rel,
2282 returningList);
2283 }
2284
2285 /*
2286 * Construct the SQL command string.
2287 */
2288 switch (operation)
2289 {
2290 case CMD_UPDATE:
2291 deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2292 foreignrel,
2293 ((Plan *) fscan)->targetlist,
2294 targetAttrs,
2295 remote_exprs, ¶ms_list,
2296 returningList, &retrieved_attrs);
2297 break;
2298 case CMD_DELETE:
2299 deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2300 foreignrel,
2301 remote_exprs, ¶ms_list,
2302 returningList, &retrieved_attrs);
2303 break;
2304 default:
2305 elog(ERROR, "unexpected operation: %d", (int) operation);
2306 break;
2307 }
2308
2309 /*
2310 * Update the operation info.
2311 */
2312 fscan->operation = operation;
2313
2314 /*
2315 * Update the fdw_exprs list that will be available to the executor.
2316 */
2317 fscan->fdw_exprs = params_list;
2318
2319 /*
2320 * Update the fdw_private list that will be available to the executor.
2321 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2322 */
2323 fscan->fdw_private = list_make4(makeString(sql.data),
2324 makeInteger((retrieved_attrs != NIL)),
2325 retrieved_attrs,
2326 makeInteger(plan->canSetTag));
2327
2328 /*
2329 * Update the foreign-join-related fields.
2330 */
2331 if (fscan->scan.scanrelid == 0)
2332 {
2333 /* No need for the outer subplan. */
2334 fscan->scan.plan.lefttree = NULL;
2335
2336 /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2337 if (returningList)
2338 rebuild_fdw_scan_tlist(fscan, returningList);
2339 }
2340
2341 table_close(rel, NoLock);
2342 return true;
2343 }
2344
2345 /*
2346 * postgresBeginDirectModify
2347 * Prepare a direct foreign table modification
2348 */
2349 static void
postgresBeginDirectModify(ForeignScanState * node,int eflags)2350 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2351 {
2352 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2353 EState *estate = node->ss.ps.state;
2354 PgFdwDirectModifyState *dmstate;
2355 Index rtindex;
2356 RangeTblEntry *rte;
2357 Oid userid;
2358 ForeignTable *table;
2359 UserMapping *user;
2360 int numParams;
2361
2362 /*
2363 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2364 */
2365 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2366 return;
2367
2368 /*
2369 * We'll save private state in node->fdw_state.
2370 */
2371 dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2372 node->fdw_state = (void *) dmstate;
2373
2374 /*
2375 * Identify which user to do the remote access as. This should match what
2376 * ExecCheckRTEPerms() does.
2377 */
2378 rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
2379 rte = exec_rt_fetch(rtindex, estate);
2380 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2381
2382 /* Get info about foreign table. */
2383 if (fsplan->scan.scanrelid == 0)
2384 dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2385 else
2386 dmstate->rel = node->ss.ss_currentRelation;
2387 table = GetForeignTable(RelationGetRelid(dmstate->rel));
2388 user = GetUserMapping(userid, table->serverid);
2389
2390 /*
2391 * Get connection to the foreign server. Connection manager will
2392 * establish new connection if necessary.
2393 */
2394 dmstate->conn = GetConnection(user, false);
2395
2396 /* Update the foreign-join-related fields. */
2397 if (fsplan->scan.scanrelid == 0)
2398 {
2399 /* Save info about foreign table. */
2400 dmstate->resultRel = dmstate->rel;
2401
2402 /*
2403 * Set dmstate->rel to NULL to teach get_returning_data() and
2404 * make_tuple_from_result_row() that columns fetched from the remote
2405 * server are described by fdw_scan_tlist of the foreign-scan plan
2406 * node, not the tuple descriptor for the target relation.
2407 */
2408 dmstate->rel = NULL;
2409 }
2410
2411 /* Initialize state variable */
2412 dmstate->num_tuples = -1; /* -1 means not set yet */
2413
2414 /* Get private info created by planner functions. */
2415 dmstate->query = strVal(list_nth(fsplan->fdw_private,
2416 FdwDirectModifyPrivateUpdateSql));
2417 dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2418 FdwDirectModifyPrivateHasReturning));
2419 dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2420 FdwDirectModifyPrivateRetrievedAttrs);
2421 dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2422 FdwDirectModifyPrivateSetProcessed));
2423
2424 /* Create context for per-tuple temp workspace. */
2425 dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2426 "postgres_fdw temporary data",
2427 ALLOCSET_SMALL_SIZES);
2428
2429 /* Prepare for input conversion of RETURNING results. */
2430 if (dmstate->has_returning)
2431 {
2432 TupleDesc tupdesc;
2433
2434 if (fsplan->scan.scanrelid == 0)
2435 tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2436 else
2437 tupdesc = RelationGetDescr(dmstate->rel);
2438
2439 dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2440
2441 /*
2442 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2443 * initialize a filter to extract an updated/deleted tuple from a scan
2444 * tuple.
2445 */
2446 if (fsplan->scan.scanrelid == 0)
2447 init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2448 }
2449
2450 /*
2451 * Prepare for processing of parameters used in remote query, if any.
2452 */
2453 numParams = list_length(fsplan->fdw_exprs);
2454 dmstate->numParams = numParams;
2455 if (numParams > 0)
2456 prepare_query_params((PlanState *) node,
2457 fsplan->fdw_exprs,
2458 numParams,
2459 &dmstate->param_flinfo,
2460 &dmstate->param_exprs,
2461 &dmstate->param_values);
2462 }
2463
2464 /*
2465 * postgresIterateDirectModify
2466 * Execute a direct foreign table modification
2467 */
2468 static TupleTableSlot *
postgresIterateDirectModify(ForeignScanState * node)2469 postgresIterateDirectModify(ForeignScanState *node)
2470 {
2471 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2472 EState *estate = node->ss.ps.state;
2473 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2474
2475 /*
2476 * If this is the first call after Begin, execute the statement.
2477 */
2478 if (dmstate->num_tuples == -1)
2479 execute_dml_stmt(node);
2480
2481 /*
2482 * If the local query doesn't specify RETURNING, just clear tuple slot.
2483 */
2484 if (!resultRelInfo->ri_projectReturning)
2485 {
2486 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2487 Instrumentation *instr = node->ss.ps.instrument;
2488
2489 Assert(!dmstate->has_returning);
2490
2491 /* Increment the command es_processed count if necessary. */
2492 if (dmstate->set_processed)
2493 estate->es_processed += dmstate->num_tuples;
2494
2495 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2496 if (instr)
2497 instr->tuplecount += dmstate->num_tuples;
2498
2499 return ExecClearTuple(slot);
2500 }
2501
2502 /*
2503 * Get the next RETURNING tuple.
2504 */
2505 return get_returning_data(node);
2506 }
2507
2508 /*
2509 * postgresEndDirectModify
2510 * Finish a direct foreign table modification
2511 */
2512 static void
postgresEndDirectModify(ForeignScanState * node)2513 postgresEndDirectModify(ForeignScanState *node)
2514 {
2515 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2516
2517 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2518 if (dmstate == NULL)
2519 return;
2520
2521 /* Release PGresult */
2522 if (dmstate->result)
2523 PQclear(dmstate->result);
2524
2525 /* Release remote connection */
2526 ReleaseConnection(dmstate->conn);
2527 dmstate->conn = NULL;
2528
2529 /* MemoryContext will be deleted automatically. */
2530 }
2531
2532 /*
2533 * postgresExplainForeignScan
2534 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2535 */
2536 static void
postgresExplainForeignScan(ForeignScanState * node,ExplainState * es)2537 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2538 {
2539 List *fdw_private;
2540 char *sql;
2541 char *relations;
2542
2543 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2544
2545 /*
2546 * Add names of relation handled by the foreign scan when the scan is a
2547 * join
2548 */
2549 if (list_length(fdw_private) > FdwScanPrivateRelations)
2550 {
2551 relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2552 ExplainPropertyText("Relations", relations, es);
2553 }
2554
2555 /*
2556 * Add remote query, when VERBOSE option is specified.
2557 */
2558 if (es->verbose)
2559 {
2560 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2561 ExplainPropertyText("Remote SQL", sql, es);
2562 }
2563 }
2564
2565 /*
2566 * postgresExplainForeignModify
2567 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2568 */
2569 static void
postgresExplainForeignModify(ModifyTableState * mtstate,ResultRelInfo * rinfo,List * fdw_private,int subplan_index,ExplainState * es)2570 postgresExplainForeignModify(ModifyTableState *mtstate,
2571 ResultRelInfo *rinfo,
2572 List *fdw_private,
2573 int subplan_index,
2574 ExplainState *es)
2575 {
2576 if (es->verbose)
2577 {
2578 char *sql = strVal(list_nth(fdw_private,
2579 FdwModifyPrivateUpdateSql));
2580
2581 ExplainPropertyText("Remote SQL", sql, es);
2582 }
2583 }
2584
2585 /*
2586 * postgresExplainDirectModify
2587 * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2588 * foreign table directly
2589 */
2590 static void
postgresExplainDirectModify(ForeignScanState * node,ExplainState * es)2591 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2592 {
2593 List *fdw_private;
2594 char *sql;
2595
2596 if (es->verbose)
2597 {
2598 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2599 sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2600 ExplainPropertyText("Remote SQL", sql, es);
2601 }
2602 }
2603
2604
2605 /*
2606 * estimate_path_cost_size
2607 * Get cost and size estimates for a foreign scan on given foreign relation
2608 * either a base relation or a join between foreign relations or an upper
2609 * relation containing foreign relations.
2610 *
2611 * param_join_conds are the parameterization clauses with outer relations.
2612 * pathkeys specify the expected sort order if any for given path being costed.
2613 * fpextra specifies additional post-scan/join-processing steps such as the
2614 * final sort and the LIMIT restriction.
2615 *
2616 * The function returns the cost and size estimates in p_rows, p_width,
2617 * p_startup_cost and p_total_cost variables.
2618 */
2619 static void
estimate_path_cost_size(PlannerInfo * root,RelOptInfo * foreignrel,List * param_join_conds,List * pathkeys,PgFdwPathExtraData * fpextra,double * p_rows,int * p_width,Cost * p_startup_cost,Cost * p_total_cost)2620 estimate_path_cost_size(PlannerInfo *root,
2621 RelOptInfo *foreignrel,
2622 List *param_join_conds,
2623 List *pathkeys,
2624 PgFdwPathExtraData *fpextra,
2625 double *p_rows, int *p_width,
2626 Cost *p_startup_cost, Cost *p_total_cost)
2627 {
2628 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2629 double rows;
2630 double retrieved_rows;
2631 int width;
2632 Cost startup_cost;
2633 Cost total_cost;
2634
2635 /* Make sure the core code has set up the relation's reltarget */
2636 Assert(foreignrel->reltarget);
2637
2638 /*
2639 * If the table or the server is configured to use remote estimates,
2640 * connect to the foreign server and execute EXPLAIN to estimate the
2641 * number of rows selected by the restriction+join clauses. Otherwise,
2642 * estimate rows using whatever statistics we have locally, in a way
2643 * similar to ordinary tables.
2644 */
2645 if (fpinfo->use_remote_estimate)
2646 {
2647 List *remote_param_join_conds;
2648 List *local_param_join_conds;
2649 StringInfoData sql;
2650 PGconn *conn;
2651 Selectivity local_sel;
2652 QualCost local_cost;
2653 List *fdw_scan_tlist = NIL;
2654 List *remote_conds;
2655
2656 /* Required only to be passed to deparseSelectStmtForRel */
2657 List *retrieved_attrs;
2658
2659 /*
2660 * param_join_conds might contain both clauses that are safe to send
2661 * across, and clauses that aren't.
2662 */
2663 classifyConditions(root, foreignrel, param_join_conds,
2664 &remote_param_join_conds, &local_param_join_conds);
2665
2666 /* Build the list of columns to be fetched from the foreign server. */
2667 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2668 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2669 else
2670 fdw_scan_tlist = NIL;
2671
2672 /*
2673 * The complete list of remote conditions includes everything from
2674 * baserestrictinfo plus any extra join_conds relevant to this
2675 * particular path.
2676 */
2677 remote_conds = list_concat(list_copy(remote_param_join_conds),
2678 fpinfo->remote_conds);
2679
2680 /*
2681 * Construct EXPLAIN query including the desired SELECT, FROM, and
2682 * WHERE clauses. Params and other-relation Vars are replaced by dummy
2683 * values, so don't request params_list.
2684 */
2685 initStringInfo(&sql);
2686 appendStringInfoString(&sql, "EXPLAIN ");
2687 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2688 remote_conds, pathkeys,
2689 fpextra ? fpextra->has_final_sort : false,
2690 fpextra ? fpextra->has_limit : false,
2691 false, &retrieved_attrs, NULL);
2692
2693 /* Get the remote estimate */
2694 conn = GetConnection(fpinfo->user, false);
2695 get_remote_estimate(sql.data, conn, &rows, &width,
2696 &startup_cost, &total_cost);
2697 ReleaseConnection(conn);
2698
2699 retrieved_rows = rows;
2700
2701 /* Factor in the selectivity of the locally-checked quals */
2702 local_sel = clauselist_selectivity(root,
2703 local_param_join_conds,
2704 foreignrel->relid,
2705 JOIN_INNER,
2706 NULL);
2707 local_sel *= fpinfo->local_conds_sel;
2708
2709 rows = clamp_row_est(rows * local_sel);
2710
2711 /* Add in the eval cost of the locally-checked quals */
2712 startup_cost += fpinfo->local_conds_cost.startup;
2713 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2714 cost_qual_eval(&local_cost, local_param_join_conds, root);
2715 startup_cost += local_cost.startup;
2716 total_cost += local_cost.per_tuple * retrieved_rows;
2717
2718 /*
2719 * Add in tlist eval cost for each output row. In case of an
2720 * aggregate, some of the tlist expressions such as grouping
2721 * expressions will be evaluated remotely, so adjust the costs.
2722 */
2723 startup_cost += foreignrel->reltarget->cost.startup;
2724 total_cost += foreignrel->reltarget->cost.startup;
2725 total_cost += foreignrel->reltarget->cost.per_tuple * rows;
2726 if (IS_UPPER_REL(foreignrel))
2727 {
2728 QualCost tlist_cost;
2729
2730 cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
2731 startup_cost -= tlist_cost.startup;
2732 total_cost -= tlist_cost.startup;
2733 total_cost -= tlist_cost.per_tuple * rows;
2734 }
2735 }
2736 else
2737 {
2738 Cost run_cost = 0;
2739
2740 /*
2741 * We don't support join conditions in this mode (hence, no
2742 * parameterized paths can be made).
2743 */
2744 Assert(param_join_conds == NIL);
2745
2746 /*
2747 * We will come here again and again with different set of pathkeys or
2748 * additional post-scan/join-processing steps that caller wants to
2749 * cost. We don't need to calculate the cost/size estimates for the
2750 * underlying scan, join, or grouping each time. Instead, use those
2751 * estimates if we have cached them already.
2752 */
2753 if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
2754 {
2755 Assert(fpinfo->retrieved_rows >= 1);
2756
2757 rows = fpinfo->rows;
2758 retrieved_rows = fpinfo->retrieved_rows;
2759 width = fpinfo->width;
2760 startup_cost = fpinfo->rel_startup_cost;
2761 run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2762
2763 /*
2764 * If we estimate the costs of a foreign scan or a foreign join
2765 * with additional post-scan/join-processing steps, the scan or
2766 * join costs obtained from the cache wouldn't yet contain the
2767 * eval costs for the final scan/join target, which would've been
2768 * updated by apply_scanjoin_target_to_paths(); add the eval costs
2769 * now.
2770 */
2771 if (fpextra && !IS_UPPER_REL(foreignrel))
2772 {
2773 /* Shouldn't get here unless we have LIMIT */
2774 Assert(fpextra->has_limit);
2775 Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
2776 foreignrel->reloptkind == RELOPT_JOINREL);
2777 startup_cost += foreignrel->reltarget->cost.startup;
2778 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2779 }
2780 }
2781 else if (IS_JOIN_REL(foreignrel))
2782 {
2783 PgFdwRelationInfo *fpinfo_i;
2784 PgFdwRelationInfo *fpinfo_o;
2785 QualCost join_cost;
2786 QualCost remote_conds_cost;
2787 double nrows;
2788
2789 /* Use rows/width estimates made by the core code. */
2790 rows = foreignrel->rows;
2791 width = foreignrel->reltarget->width;
2792
2793 /* For join we expect inner and outer relations set */
2794 Assert(fpinfo->innerrel && fpinfo->outerrel);
2795
2796 fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2797 fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2798
2799 /* Estimate of number of rows in cross product */
2800 nrows = fpinfo_i->rows * fpinfo_o->rows;
2801
2802 /*
2803 * Back into an estimate of the number of retrieved rows. Just in
2804 * case this is nuts, clamp to at most nrow.
2805 */
2806 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2807 retrieved_rows = Min(retrieved_rows, nrows);
2808
2809 /*
2810 * The cost of foreign join is estimated as cost of generating
2811 * rows for the joining relations + cost for applying quals on the
2812 * rows.
2813 */
2814
2815 /*
2816 * Calculate the cost of clauses pushed down to the foreign server
2817 */
2818 cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2819 /* Calculate the cost of applying join clauses */
2820 cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2821
2822 /*
2823 * Startup cost includes startup cost of joining relations and the
2824 * startup cost for join and other clauses. We do not include the
2825 * startup cost specific to join strategy (e.g. setting up hash
2826 * tables) since we do not know what strategy the foreign server
2827 * is going to use.
2828 */
2829 startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2830 startup_cost += join_cost.startup;
2831 startup_cost += remote_conds_cost.startup;
2832 startup_cost += fpinfo->local_conds_cost.startup;
2833
2834 /*
2835 * Run time cost includes:
2836 *
2837 * 1. Run time cost (total_cost - startup_cost) of relations being
2838 * joined
2839 *
2840 * 2. Run time cost of applying join clauses on the cross product
2841 * of the joining relations.
2842 *
2843 * 3. Run time cost of applying pushed down other clauses on the
2844 * result of join
2845 *
2846 * 4. Run time cost of applying nonpushable other clauses locally
2847 * on the result fetched from the foreign server.
2848 */
2849 run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2850 run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2851 run_cost += nrows * join_cost.per_tuple;
2852 nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2853 run_cost += nrows * remote_conds_cost.per_tuple;
2854 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2855
2856 /* Add in tlist eval cost for each output row */
2857 startup_cost += foreignrel->reltarget->cost.startup;
2858 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2859 }
2860 else if (IS_UPPER_REL(foreignrel))
2861 {
2862 RelOptInfo *outerrel = fpinfo->outerrel;
2863 PgFdwRelationInfo *ofpinfo;
2864 AggClauseCosts aggcosts;
2865 double input_rows;
2866 int numGroupCols;
2867 double numGroups = 1;
2868
2869 /* The upper relation should have its outer relation set */
2870 Assert(outerrel);
2871 /* and that outer relation should have its reltarget set */
2872 Assert(outerrel->reltarget);
2873
2874 /*
2875 * This cost model is mixture of costing done for sorted and
2876 * hashed aggregates in cost_agg(). We are not sure which
2877 * strategy will be considered at remote side, thus for
2878 * simplicity, we put all startup related costs in startup_cost
2879 * and all finalization and run cost are added in total_cost.
2880 */
2881
2882 ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
2883
2884 /* Get rows from input rel */
2885 input_rows = ofpinfo->rows;
2886
2887 /* Collect statistics about aggregates for estimating costs. */
2888 MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2889 if (root->parse->hasAggs)
2890 {
2891 get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2892 AGGSPLIT_SIMPLE, &aggcosts);
2893
2894 /*
2895 * The cost of aggregates in the HAVING qual will be the same
2896 * for each child as it is for the parent, so there's no need
2897 * to use a translated version of havingQual.
2898 */
2899 get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2900 AGGSPLIT_SIMPLE, &aggcosts);
2901 }
2902
2903 /* Get number of grouping columns and possible number of groups */
2904 numGroupCols = list_length(root->parse->groupClause);
2905 numGroups = estimate_num_groups(root,
2906 get_sortgrouplist_exprs(root->parse->groupClause,
2907 fpinfo->grouped_tlist),
2908 input_rows, NULL);
2909
2910 /*
2911 * Get the retrieved_rows and rows estimates. If there are HAVING
2912 * quals, account for their selectivity.
2913 */
2914 if (root->parse->havingQual)
2915 {
2916 /* Factor in the selectivity of the remotely-checked quals */
2917 retrieved_rows =
2918 clamp_row_est(numGroups *
2919 clauselist_selectivity(root,
2920 fpinfo->remote_conds,
2921 0,
2922 JOIN_INNER,
2923 NULL));
2924 /* Factor in the selectivity of the locally-checked quals */
2925 rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
2926 }
2927 else
2928 {
2929 rows = retrieved_rows = numGroups;
2930 }
2931
2932 /* Use width estimate made by the core code. */
2933 width = foreignrel->reltarget->width;
2934
2935 /*-----
2936 * Startup cost includes:
2937 * 1. Startup cost for underneath input relation, adjusted for
2938 * tlist replacement by apply_scanjoin_target_to_paths()
2939 * 2. Cost of performing aggregation, per cost_agg()
2940 *-----
2941 */
2942 startup_cost = ofpinfo->rel_startup_cost;
2943 startup_cost += outerrel->reltarget->cost.startup;
2944 startup_cost += aggcosts.transCost.startup;
2945 startup_cost += aggcosts.transCost.per_tuple * input_rows;
2946 startup_cost += aggcosts.finalCost.startup;
2947 startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2948
2949 /*-----
2950 * Run time cost includes:
2951 * 1. Run time cost of underneath input relation, adjusted for
2952 * tlist replacement by apply_scanjoin_target_to_paths()
2953 * 2. Run time cost of performing aggregation, per cost_agg()
2954 *-----
2955 */
2956 run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2957 run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
2958 run_cost += aggcosts.finalCost.per_tuple * numGroups;
2959 run_cost += cpu_tuple_cost * numGroups;
2960
2961 /* Account for the eval cost of HAVING quals, if any */
2962 if (root->parse->havingQual)
2963 {
2964 QualCost remote_cost;
2965
2966 /* Add in the eval cost of the remotely-checked quals */
2967 cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
2968 startup_cost += remote_cost.startup;
2969 run_cost += remote_cost.per_tuple * numGroups;
2970 /* Add in the eval cost of the locally-checked quals */
2971 startup_cost += fpinfo->local_conds_cost.startup;
2972 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2973 }
2974
2975 /* Add in tlist eval cost for each output row */
2976 startup_cost += foreignrel->reltarget->cost.startup;
2977 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2978 }
2979 else
2980 {
2981 Cost cpu_per_tuple;
2982
2983 /* Use rows/width estimates made by set_baserel_size_estimates. */
2984 rows = foreignrel->rows;
2985 width = foreignrel->reltarget->width;
2986
2987 /*
2988 * Back into an estimate of the number of retrieved rows. Just in
2989 * case this is nuts, clamp to at most foreignrel->tuples.
2990 */
2991 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2992 retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2993
2994 /*
2995 * Cost as though this were a seqscan, which is pessimistic. We
2996 * effectively imagine the local_conds are being evaluated
2997 * remotely, too.
2998 */
2999 startup_cost = 0;
3000 run_cost = 0;
3001 run_cost += seq_page_cost * foreignrel->pages;
3002
3003 startup_cost += foreignrel->baserestrictcost.startup;
3004 cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3005 run_cost += cpu_per_tuple * foreignrel->tuples;
3006
3007 /* Add in tlist eval cost for each output row */
3008 startup_cost += foreignrel->reltarget->cost.startup;
3009 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3010 }
3011
3012 /*
3013 * Without remote estimates, we have no real way to estimate the cost
3014 * of generating sorted output. It could be free if the query plan
3015 * the remote side would have chosen generates properly-sorted output
3016 * anyway, but in most cases it will cost something. Estimate a value
3017 * high enough that we won't pick the sorted path when the ordering
3018 * isn't locally useful, but low enough that we'll err on the side of
3019 * pushing down the ORDER BY clause when it's useful to do so.
3020 */
3021 if (pathkeys != NIL)
3022 {
3023 if (IS_UPPER_REL(foreignrel))
3024 {
3025 Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3026 fpinfo->stage == UPPERREL_GROUP_AGG);
3027 adjust_foreign_grouping_path_cost(root, pathkeys,
3028 retrieved_rows, width,
3029 fpextra->limit_tuples,
3030 &startup_cost, &run_cost);
3031 }
3032 else
3033 {
3034 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3035 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3036 }
3037 }
3038
3039 total_cost = startup_cost + run_cost;
3040
3041 /* Adjust the cost estimates if we have LIMIT */
3042 if (fpextra && fpextra->has_limit)
3043 {
3044 adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3045 fpextra->offset_est, fpextra->count_est);
3046 retrieved_rows = rows;
3047 }
3048 }
3049
3050 /*
3051 * If this includes the final sort step, the given target, which will be
3052 * applied to the resulting path, might have different expressions from
3053 * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3054 * eval costs.
3055 */
3056 if (fpextra && fpextra->has_final_sort &&
3057 fpextra->target != foreignrel->reltarget)
3058 {
3059 QualCost oldcost = foreignrel->reltarget->cost;
3060 QualCost newcost = fpextra->target->cost;
3061
3062 startup_cost += newcost.startup - oldcost.startup;
3063 total_cost += newcost.startup - oldcost.startup;
3064 total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3065 }
3066
3067 /*
3068 * Cache the retrieved rows and cost estimates for scans, joins, or
3069 * groupings without any parameterization, pathkeys, or additional
3070 * post-scan/join-processing steps, before adding the costs for
3071 * transferring data from the foreign server. These estimates are useful
3072 * for costing remote joins involving this relation or costing other
3073 * remote operations on this relation such as remote sorts and remote
3074 * LIMIT restrictions, when the costs can not be obtained from the foreign
3075 * server. This function will be called at least once for every foreign
3076 * relation without any parameterization, pathkeys, or additional
3077 * post-scan/join-processing steps.
3078 */
3079 if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3080 {
3081 fpinfo->retrieved_rows = retrieved_rows;
3082 fpinfo->rel_startup_cost = startup_cost;
3083 fpinfo->rel_total_cost = total_cost;
3084 }
3085
3086 /*
3087 * Add some additional cost factors to account for connection overhead
3088 * (fdw_startup_cost), transferring data across the network
3089 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3090 * (cpu_tuple_cost per retrieved row).
3091 */
3092 startup_cost += fpinfo->fdw_startup_cost;
3093 total_cost += fpinfo->fdw_startup_cost;
3094 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3095 total_cost += cpu_tuple_cost * retrieved_rows;
3096
3097 /*
3098 * If we have LIMIT, we should prefer performing the restriction remotely
3099 * rather than locally, as the former avoids extra row fetches from the
3100 * remote that the latter might cause. But since the core code doesn't
3101 * account for such fetches when estimating the costs of the local
3102 * restriction (see create_limit_path()), there would be no difference
3103 * between the costs of the local restriction and the costs of the remote
3104 * restriction estimated above if we don't use remote estimates (except
3105 * for the case where the foreignrel is a grouping relation, the given
3106 * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3107 * accounted for in costing the remote restriction). Tweak the costs of
3108 * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3109 * one.
3110 */
3111 if (!fpinfo->use_remote_estimate &&
3112 fpextra && fpextra->has_limit &&
3113 fpextra->limit_tuples > 0 &&
3114 fpextra->limit_tuples < fpinfo->rows)
3115 {
3116 Assert(fpinfo->rows > 0);
3117 total_cost -= (total_cost - startup_cost) * 0.05 *
3118 (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3119 }
3120
3121 /* Return results. */
3122 *p_rows = rows;
3123 *p_width = width;
3124 *p_startup_cost = startup_cost;
3125 *p_total_cost = total_cost;
3126 }
3127
3128 /*
3129 * Estimate costs of executing a SQL statement remotely.
3130 * The given "sql" must be an EXPLAIN command.
3131 */
3132 static void
get_remote_estimate(const char * sql,PGconn * conn,double * rows,int * width,Cost * startup_cost,Cost * total_cost)3133 get_remote_estimate(const char *sql, PGconn *conn,
3134 double *rows, int *width,
3135 Cost *startup_cost, Cost *total_cost)
3136 {
3137 PGresult *volatile res = NULL;
3138
3139 /* PGresult must be released before leaving this function. */
3140 PG_TRY();
3141 {
3142 char *line;
3143 char *p;
3144 int n;
3145
3146 /*
3147 * Execute EXPLAIN remotely.
3148 */
3149 res = pgfdw_exec_query(conn, sql);
3150 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3151 pgfdw_report_error(ERROR, res, conn, false, sql);
3152
3153 /*
3154 * Extract cost numbers for topmost plan node. Note we search for a
3155 * left paren from the end of the line to avoid being confused by
3156 * other uses of parentheses.
3157 */
3158 line = PQgetvalue(res, 0, 0);
3159 p = strrchr(line, '(');
3160 if (p == NULL)
3161 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3162 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3163 startup_cost, total_cost, rows, width);
3164 if (n != 4)
3165 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3166
3167 PQclear(res);
3168 res = NULL;
3169 }
3170 PG_CATCH();
3171 {
3172 if (res)
3173 PQclear(res);
3174 PG_RE_THROW();
3175 }
3176 PG_END_TRY();
3177 }
3178
3179 /*
3180 * Adjust the cost estimates of a foreign grouping path to include the cost of
3181 * generating properly-sorted output.
3182 */
3183 static void
adjust_foreign_grouping_path_cost(PlannerInfo * root,List * pathkeys,double retrieved_rows,double width,double limit_tuples,Cost * p_startup_cost,Cost * p_run_cost)3184 adjust_foreign_grouping_path_cost(PlannerInfo *root,
3185 List *pathkeys,
3186 double retrieved_rows,
3187 double width,
3188 double limit_tuples,
3189 Cost *p_startup_cost,
3190 Cost *p_run_cost)
3191 {
3192 /*
3193 * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3194 * side is unlikely to generate properly-sorted output, so it would need
3195 * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3196 * if the GROUP BY clause is sort-able but isn't a superset of the given
3197 * pathkeys, adjust the costs with that function. Otherwise, adjust the
3198 * costs by applying the same heuristic as for the scan or join case.
3199 */
3200 if (!grouping_is_sortable(root->parse->groupClause) ||
3201 !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3202 {
3203 Path sort_path; /* dummy for result of cost_sort */
3204
3205 cost_sort(&sort_path,
3206 root,
3207 pathkeys,
3208 *p_startup_cost + *p_run_cost,
3209 retrieved_rows,
3210 width,
3211 0.0,
3212 work_mem,
3213 limit_tuples);
3214
3215 *p_startup_cost = sort_path.startup_cost;
3216 *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3217 }
3218 else
3219 {
3220 /*
3221 * The default extra cost seems too large for foreign-grouping cases;
3222 * add 1/4th of that default.
3223 */
3224 double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3225 - 1.0) * 0.25;
3226
3227 *p_startup_cost *= sort_multiplier;
3228 *p_run_cost *= sort_multiplier;
3229 }
3230 }
3231
3232 /*
3233 * Detect whether we want to process an EquivalenceClass member.
3234 *
3235 * This is a callback for use by generate_implied_equalities_for_column.
3236 */
3237 static bool
ec_member_matches_foreign(PlannerInfo * root,RelOptInfo * rel,EquivalenceClass * ec,EquivalenceMember * em,void * arg)3238 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
3239 EquivalenceClass *ec, EquivalenceMember *em,
3240 void *arg)
3241 {
3242 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
3243 Expr *expr = em->em_expr;
3244
3245 /*
3246 * If we've identified what we're processing in the current scan, we only
3247 * want to match that expression.
3248 */
3249 if (state->current != NULL)
3250 return equal(expr, state->current);
3251
3252 /*
3253 * Otherwise, ignore anything we've already processed.
3254 */
3255 if (list_member(state->already_used, expr))
3256 return false;
3257
3258 /* This is the new target to process. */
3259 state->current = expr;
3260 return true;
3261 }
3262
3263 /*
3264 * Create cursor for node's query with current parameter values.
3265 */
3266 static void
create_cursor(ForeignScanState * node)3267 create_cursor(ForeignScanState *node)
3268 {
3269 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3270 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3271 int numParams = fsstate->numParams;
3272 const char **values = fsstate->param_values;
3273 PGconn *conn = fsstate->conn;
3274 StringInfoData buf;
3275 PGresult *res;
3276
3277 /*
3278 * Construct array of query parameter values in text format. We do the
3279 * conversions in the short-lived per-tuple context, so as not to cause a
3280 * memory leak over repeated scans.
3281 */
3282 if (numParams > 0)
3283 {
3284 MemoryContext oldcontext;
3285
3286 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3287
3288 process_query_params(econtext,
3289 fsstate->param_flinfo,
3290 fsstate->param_exprs,
3291 values);
3292
3293 MemoryContextSwitchTo(oldcontext);
3294 }
3295
3296 /* Construct the DECLARE CURSOR command */
3297 initStringInfo(&buf);
3298 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3299 fsstate->cursor_number, fsstate->query);
3300
3301 /*
3302 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3303 * to infer types for all parameters. Since we explicitly cast every
3304 * parameter (see deparse.c), the "inference" is trivial and will produce
3305 * the desired result. This allows us to avoid assuming that the remote
3306 * server has the same OIDs we do for the parameters' types.
3307 */
3308 if (!PQsendQueryParams(conn, buf.data, numParams,
3309 NULL, values, NULL, NULL, 0))
3310 pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3311
3312 /*
3313 * Get the result, and check for success.
3314 *
3315 * We don't use a PG_TRY block here, so be careful not to throw error
3316 * without releasing the PGresult.
3317 */
3318 res = pgfdw_get_result(conn, buf.data);
3319 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3320 pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3321 PQclear(res);
3322
3323 /* Mark the cursor as created, and show no tuples have been retrieved */
3324 fsstate->cursor_exists = true;
3325 fsstate->tuples = NULL;
3326 fsstate->num_tuples = 0;
3327 fsstate->next_tuple = 0;
3328 fsstate->fetch_ct_2 = 0;
3329 fsstate->eof_reached = false;
3330
3331 /* Clean up */
3332 pfree(buf.data);
3333 }
3334
3335 /*
3336 * Fetch some more rows from the node's cursor.
3337 */
3338 static void
fetch_more_data(ForeignScanState * node)3339 fetch_more_data(ForeignScanState *node)
3340 {
3341 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3342 PGresult *volatile res = NULL;
3343 MemoryContext oldcontext;
3344
3345 /*
3346 * We'll store the tuples in the batch_cxt. First, flush the previous
3347 * batch.
3348 */
3349 fsstate->tuples = NULL;
3350 MemoryContextReset(fsstate->batch_cxt);
3351 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3352
3353 /* PGresult must be released before leaving this function. */
3354 PG_TRY();
3355 {
3356 PGconn *conn = fsstate->conn;
3357 char sql[64];
3358 int numrows;
3359 int i;
3360
3361 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3362 fsstate->fetch_size, fsstate->cursor_number);
3363
3364 res = pgfdw_exec_query(conn, sql);
3365 /* On error, report the original query, not the FETCH. */
3366 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3367 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3368
3369 /* Convert the data into HeapTuples */
3370 numrows = PQntuples(res);
3371 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3372 fsstate->num_tuples = numrows;
3373 fsstate->next_tuple = 0;
3374
3375 for (i = 0; i < numrows; i++)
3376 {
3377 Assert(IsA(node->ss.ps.plan, ForeignScan));
3378
3379 fsstate->tuples[i] =
3380 make_tuple_from_result_row(res, i,
3381 fsstate->rel,
3382 fsstate->attinmeta,
3383 fsstate->retrieved_attrs,
3384 node,
3385 fsstate->temp_cxt);
3386 }
3387
3388 /* Update fetch_ct_2 */
3389 if (fsstate->fetch_ct_2 < 2)
3390 fsstate->fetch_ct_2++;
3391
3392 /* Must be EOF if we didn't get as many tuples as we asked for. */
3393 fsstate->eof_reached = (numrows < fsstate->fetch_size);
3394
3395 PQclear(res);
3396 res = NULL;
3397 }
3398 PG_CATCH();
3399 {
3400 if (res)
3401 PQclear(res);
3402 PG_RE_THROW();
3403 }
3404 PG_END_TRY();
3405
3406 MemoryContextSwitchTo(oldcontext);
3407 }
3408
3409 /*
3410 * Force assorted GUC parameters to settings that ensure that we'll output
3411 * data values in a form that is unambiguous to the remote server.
3412 *
3413 * This is rather expensive and annoying to do once per row, but there's
3414 * little choice if we want to be sure values are transmitted accurately;
3415 * we can't leave the settings in place between rows for fear of affecting
3416 * user-visible computations.
3417 *
3418 * We use the equivalent of a function SET option to allow the settings to
3419 * persist only until the caller calls reset_transmission_modes(). If an
3420 * error is thrown in between, guc.c will take care of undoing the settings.
3421 *
3422 * The return value is the nestlevel that must be passed to
3423 * reset_transmission_modes() to undo things.
3424 */
3425 int
set_transmission_modes(void)3426 set_transmission_modes(void)
3427 {
3428 int nestlevel = NewGUCNestLevel();
3429
3430 /*
3431 * The values set here should match what pg_dump does. See also
3432 * configure_remote_session in connection.c.
3433 */
3434 if (DateStyle != USE_ISO_DATES)
3435 (void) set_config_option("datestyle", "ISO",
3436 PGC_USERSET, PGC_S_SESSION,
3437 GUC_ACTION_SAVE, true, 0, false);
3438 if (IntervalStyle != INTSTYLE_POSTGRES)
3439 (void) set_config_option("intervalstyle", "postgres",
3440 PGC_USERSET, PGC_S_SESSION,
3441 GUC_ACTION_SAVE, true, 0, false);
3442 if (extra_float_digits < 3)
3443 (void) set_config_option("extra_float_digits", "3",
3444 PGC_USERSET, PGC_S_SESSION,
3445 GUC_ACTION_SAVE, true, 0, false);
3446
3447 return nestlevel;
3448 }
3449
3450 /*
3451 * Undo the effects of set_transmission_modes().
3452 */
3453 void
reset_transmission_modes(int nestlevel)3454 reset_transmission_modes(int nestlevel)
3455 {
3456 AtEOXact_GUC(true, nestlevel);
3457 }
3458
3459 /*
3460 * Utility routine to close a cursor.
3461 */
3462 static void
close_cursor(PGconn * conn,unsigned int cursor_number)3463 close_cursor(PGconn *conn, unsigned int cursor_number)
3464 {
3465 char sql[64];
3466 PGresult *res;
3467
3468 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3469
3470 /*
3471 * We don't use a PG_TRY block here, so be careful not to throw error
3472 * without releasing the PGresult.
3473 */
3474 res = pgfdw_exec_query(conn, sql);
3475 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3476 pgfdw_report_error(ERROR, res, conn, true, sql);
3477 PQclear(res);
3478 }
3479
3480 /*
3481 * create_foreign_modify
3482 * Construct an execution state of a foreign insert/update/delete
3483 * operation
3484 */
3485 static PgFdwModifyState *
create_foreign_modify(EState * estate,RangeTblEntry * rte,ResultRelInfo * resultRelInfo,CmdType operation,Plan * subplan,char * query,List * target_attrs,bool has_returning,List * retrieved_attrs)3486 create_foreign_modify(EState *estate,
3487 RangeTblEntry *rte,
3488 ResultRelInfo *resultRelInfo,
3489 CmdType operation,
3490 Plan *subplan,
3491 char *query,
3492 List *target_attrs,
3493 bool has_returning,
3494 List *retrieved_attrs)
3495 {
3496 PgFdwModifyState *fmstate;
3497 Relation rel = resultRelInfo->ri_RelationDesc;
3498 TupleDesc tupdesc = RelationGetDescr(rel);
3499 Oid userid;
3500 ForeignTable *table;
3501 UserMapping *user;
3502 AttrNumber n_params;
3503 Oid typefnoid;
3504 bool isvarlena;
3505 ListCell *lc;
3506
3507 /* Begin constructing PgFdwModifyState. */
3508 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3509 fmstate->rel = rel;
3510
3511 /*
3512 * Identify which user to do the remote access as. This should match what
3513 * ExecCheckRTEPerms() does.
3514 */
3515 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3516
3517 /* Get info about foreign table. */
3518 table = GetForeignTable(RelationGetRelid(rel));
3519 user = GetUserMapping(userid, table->serverid);
3520
3521 /* Open connection; report that we'll create a prepared statement. */
3522 fmstate->conn = GetConnection(user, true);
3523 fmstate->p_name = NULL; /* prepared statement not made yet */
3524
3525 /* Set up remote query information. */
3526 fmstate->query = query;
3527 fmstate->target_attrs = target_attrs;
3528 fmstate->has_returning = has_returning;
3529 fmstate->retrieved_attrs = retrieved_attrs;
3530
3531 /* Create context for per-tuple temp workspace. */
3532 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3533 "postgres_fdw temporary data",
3534 ALLOCSET_SMALL_SIZES);
3535
3536 /* Prepare for input conversion of RETURNING results. */
3537 if (fmstate->has_returning)
3538 fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3539
3540 /* Prepare for output conversion of parameters used in prepared stmt. */
3541 n_params = list_length(fmstate->target_attrs) + 1;
3542 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3543 fmstate->p_nums = 0;
3544
3545 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3546 {
3547 Assert(subplan != NULL);
3548
3549 /* Find the ctid resjunk column in the subplan's result */
3550 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
3551 "ctid");
3552 if (!AttributeNumberIsValid(fmstate->ctidAttno))
3553 elog(ERROR, "could not find junk ctid column");
3554
3555 /* First transmittable parameter will be ctid */
3556 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
3557 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3558 fmstate->p_nums++;
3559 }
3560
3561 if (operation == CMD_INSERT || operation == CMD_UPDATE)
3562 {
3563 /* Set up for remaining transmittable parameters */
3564 foreach(lc, fmstate->target_attrs)
3565 {
3566 int attnum = lfirst_int(lc);
3567 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3568
3569 Assert(!attr->attisdropped);
3570
3571 /* Ignore generated columns; they are set to DEFAULT */
3572 if (attr->attgenerated)
3573 continue;
3574 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
3575 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3576 fmstate->p_nums++;
3577 }
3578 }
3579
3580 Assert(fmstate->p_nums <= n_params);
3581
3582 /* Initialize auxiliary state */
3583 fmstate->aux_fmstate = NULL;
3584
3585 return fmstate;
3586 }
3587
3588 /*
3589 * execute_foreign_modify
3590 * Perform foreign-table modification as required, and fetch RETURNING
3591 * result if any. (This is the shared guts of postgresExecForeignInsert,
3592 * postgresExecForeignUpdate, and postgresExecForeignDelete.)
3593 */
3594 static TupleTableSlot *
execute_foreign_modify(EState * estate,ResultRelInfo * resultRelInfo,CmdType operation,TupleTableSlot * slot,TupleTableSlot * planSlot)3595 execute_foreign_modify(EState *estate,
3596 ResultRelInfo *resultRelInfo,
3597 CmdType operation,
3598 TupleTableSlot *slot,
3599 TupleTableSlot *planSlot)
3600 {
3601 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
3602 ItemPointer ctid = NULL;
3603 const char **p_values;
3604 PGresult *res;
3605 int n_rows;
3606
3607 /* The operation should be INSERT, UPDATE, or DELETE */
3608 Assert(operation == CMD_INSERT ||
3609 operation == CMD_UPDATE ||
3610 operation == CMD_DELETE);
3611
3612 /* Set up the prepared statement on the remote server, if we didn't yet */
3613 if (!fmstate->p_name)
3614 prepare_foreign_modify(fmstate);
3615
3616 /*
3617 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
3618 */
3619 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3620 {
3621 Datum datum;
3622 bool isNull;
3623
3624 datum = ExecGetJunkAttribute(planSlot,
3625 fmstate->ctidAttno,
3626 &isNull);
3627 /* shouldn't ever get a null result... */
3628 if (isNull)
3629 elog(ERROR, "ctid is NULL");
3630 ctid = (ItemPointer) DatumGetPointer(datum);
3631 }
3632
3633 /* Convert parameters needed by prepared statement to text form */
3634 p_values = convert_prep_stmt_params(fmstate, ctid, slot);
3635
3636 /*
3637 * Execute the prepared statement.
3638 */
3639 if (!PQsendQueryPrepared(fmstate->conn,
3640 fmstate->p_name,
3641 fmstate->p_nums,
3642 p_values,
3643 NULL,
3644 NULL,
3645 0))
3646 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3647
3648 /*
3649 * Get the result, and check for success.
3650 *
3651 * We don't use a PG_TRY block here, so be careful not to throw error
3652 * without releasing the PGresult.
3653 */
3654 res = pgfdw_get_result(fmstate->conn, fmstate->query);
3655 if (PQresultStatus(res) !=
3656 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
3657 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3658
3659 /* Check number of rows affected, and fetch RETURNING tuple if any */
3660 if (fmstate->has_returning)
3661 {
3662 n_rows = PQntuples(res);
3663 if (n_rows > 0)
3664 store_returning_result(fmstate, slot, res);
3665 }
3666 else
3667 n_rows = atoi(PQcmdTuples(res));
3668
3669 /* And clean up */
3670 PQclear(res);
3671
3672 MemoryContextReset(fmstate->temp_cxt);
3673
3674 /*
3675 * Return NULL if nothing was inserted/updated/deleted on the remote end
3676 */
3677 return (n_rows > 0) ? slot : NULL;
3678 }
3679
3680 /*
3681 * prepare_foreign_modify
3682 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3683 */
3684 static void
prepare_foreign_modify(PgFdwModifyState * fmstate)3685 prepare_foreign_modify(PgFdwModifyState *fmstate)
3686 {
3687 char prep_name[NAMEDATALEN];
3688 char *p_name;
3689 PGresult *res;
3690
3691 /* Construct name we'll use for the prepared statement. */
3692 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3693 GetPrepStmtNumber(fmstate->conn));
3694 p_name = pstrdup(prep_name);
3695
3696 /*
3697 * We intentionally do not specify parameter types here, but leave the
3698 * remote server to derive them by default. This avoids possible problems
3699 * with the remote server using different type OIDs than we do. All of
3700 * the prepared statements we use in this module are simple enough that
3701 * the remote server will make the right choices.
3702 */
3703 if (!PQsendPrepare(fmstate->conn,
3704 p_name,
3705 fmstate->query,
3706 0,
3707 NULL))
3708 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3709
3710 /*
3711 * Get the result, and check for success.
3712 *
3713 * We don't use a PG_TRY block here, so be careful not to throw error
3714 * without releasing the PGresult.
3715 */
3716 res = pgfdw_get_result(fmstate->conn, fmstate->query);
3717 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3718 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3719 PQclear(res);
3720
3721 /* This action shows that the prepare has been done. */
3722 fmstate->p_name = p_name;
3723 }
3724
3725 /*
3726 * convert_prep_stmt_params
3727 * Create array of text strings representing parameter values
3728 *
3729 * tupleid is ctid to send, or NULL if none
3730 * slot is slot to get remaining parameters from, or NULL if none
3731 *
3732 * Data is constructed in temp_cxt; caller should reset that after use.
3733 */
3734 static const char **
convert_prep_stmt_params(PgFdwModifyState * fmstate,ItemPointer tupleid,TupleTableSlot * slot)3735 convert_prep_stmt_params(PgFdwModifyState *fmstate,
3736 ItemPointer tupleid,
3737 TupleTableSlot *slot)
3738 {
3739 const char **p_values;
3740 int pindex = 0;
3741 MemoryContext oldcontext;
3742
3743 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3744
3745 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3746
3747 /* 1st parameter should be ctid, if it's in use */
3748 if (tupleid != NULL)
3749 {
3750 /* don't need set_transmission_modes for TID output */
3751 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3752 PointerGetDatum(tupleid));
3753 pindex++;
3754 }
3755
3756 /* get following parameters from slot */
3757 if (slot != NULL && fmstate->target_attrs != NIL)
3758 {
3759 TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
3760 int nestlevel;
3761 ListCell *lc;
3762
3763 nestlevel = set_transmission_modes();
3764
3765 foreach(lc, fmstate->target_attrs)
3766 {
3767 int attnum = lfirst_int(lc);
3768 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3769 Datum value;
3770 bool isnull;
3771
3772 /* Ignore generated columns; they are set to DEFAULT */
3773 if (attr->attgenerated)
3774 continue;
3775 value = slot_getattr(slot, attnum, &isnull);
3776 if (isnull)
3777 p_values[pindex] = NULL;
3778 else
3779 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3780 value);
3781 pindex++;
3782 }
3783
3784 reset_transmission_modes(nestlevel);
3785 }
3786
3787 Assert(pindex == fmstate->p_nums);
3788
3789 MemoryContextSwitchTo(oldcontext);
3790
3791 return p_values;
3792 }
3793
3794 /*
3795 * store_returning_result
3796 * Store the result of a RETURNING clause
3797 *
3798 * On error, be sure to release the PGresult on the way out. Callers do not
3799 * have PG_TRY blocks to ensure this happens.
3800 */
3801 static void
store_returning_result(PgFdwModifyState * fmstate,TupleTableSlot * slot,PGresult * res)3802 store_returning_result(PgFdwModifyState *fmstate,
3803 TupleTableSlot *slot, PGresult *res)
3804 {
3805 PG_TRY();
3806 {
3807 HeapTuple newtup;
3808
3809 newtup = make_tuple_from_result_row(res, 0,
3810 fmstate->rel,
3811 fmstate->attinmeta,
3812 fmstate->retrieved_attrs,
3813 NULL,
3814 fmstate->temp_cxt);
3815
3816 /*
3817 * The returning slot will not necessarily be suitable to store
3818 * heaptuples directly, so allow for conversion.
3819 */
3820 ExecForceStoreHeapTuple(newtup, slot, true);
3821 }
3822 PG_CATCH();
3823 {
3824 if (res)
3825 PQclear(res);
3826 PG_RE_THROW();
3827 }
3828 PG_END_TRY();
3829 }
3830
3831 /*
3832 * finish_foreign_modify
3833 * Release resources for a foreign insert/update/delete operation
3834 */
3835 static void
finish_foreign_modify(PgFdwModifyState * fmstate)3836 finish_foreign_modify(PgFdwModifyState *fmstate)
3837 {
3838 Assert(fmstate != NULL);
3839
3840 /* If we created a prepared statement, destroy it */
3841 if (fmstate->p_name)
3842 {
3843 char sql[64];
3844 PGresult *res;
3845
3846 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
3847
3848 /*
3849 * We don't use a PG_TRY block here, so be careful not to throw error
3850 * without releasing the PGresult.
3851 */
3852 res = pgfdw_exec_query(fmstate->conn, sql);
3853 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3854 pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
3855 PQclear(res);
3856 fmstate->p_name = NULL;
3857 }
3858
3859 /* Release remote connection */
3860 ReleaseConnection(fmstate->conn);
3861 fmstate->conn = NULL;
3862 }
3863
3864 /*
3865 * build_remote_returning
3866 * Build a RETURNING targetlist of a remote query for performing an
3867 * UPDATE/DELETE .. RETURNING on a join directly
3868 */
3869 static List *
build_remote_returning(Index rtindex,Relation rel,List * returningList)3870 build_remote_returning(Index rtindex, Relation rel, List *returningList)
3871 {
3872 bool have_wholerow = false;
3873 List *tlist = NIL;
3874 List *vars;
3875 ListCell *lc;
3876
3877 Assert(returningList);
3878
3879 vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
3880
3881 /*
3882 * If there's a whole-row reference to the target relation, then we'll
3883 * need all the columns of the relation.
3884 */
3885 foreach(lc, vars)
3886 {
3887 Var *var = (Var *) lfirst(lc);
3888
3889 if (IsA(var, Var) &&
3890 var->varno == rtindex &&
3891 var->varattno == InvalidAttrNumber)
3892 {
3893 have_wholerow = true;
3894 break;
3895 }
3896 }
3897
3898 if (have_wholerow)
3899 {
3900 TupleDesc tupdesc = RelationGetDescr(rel);
3901 int i;
3902
3903 for (i = 1; i <= tupdesc->natts; i++)
3904 {
3905 Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
3906 Var *var;
3907
3908 /* Ignore dropped attributes. */
3909 if (attr->attisdropped)
3910 continue;
3911
3912 var = makeVar(rtindex,
3913 i,
3914 attr->atttypid,
3915 attr->atttypmod,
3916 attr->attcollation,
3917 0);
3918
3919 tlist = lappend(tlist,
3920 makeTargetEntry((Expr *) var,
3921 list_length(tlist) + 1,
3922 NULL,
3923 false));
3924 }
3925 }
3926
3927 /* Now add any remaining columns to tlist. */
3928 foreach(lc, vars)
3929 {
3930 Var *var = (Var *) lfirst(lc);
3931
3932 /*
3933 * No need for whole-row references to the target relation. We don't
3934 * need system columns other than ctid and oid either, since those are
3935 * set locally.
3936 */
3937 if (IsA(var, Var) &&
3938 var->varno == rtindex &&
3939 var->varattno <= InvalidAttrNumber &&
3940 var->varattno != SelfItemPointerAttributeNumber)
3941 continue; /* don't need it */
3942
3943 if (tlist_member((Expr *) var, tlist))
3944 continue; /* already got it */
3945
3946 tlist = lappend(tlist,
3947 makeTargetEntry((Expr *) var,
3948 list_length(tlist) + 1,
3949 NULL,
3950 false));
3951 }
3952
3953 list_free(vars);
3954
3955 return tlist;
3956 }
3957
3958 /*
3959 * rebuild_fdw_scan_tlist
3960 * Build new fdw_scan_tlist of given foreign-scan plan node from given
3961 * tlist
3962 *
3963 * There might be columns that the fdw_scan_tlist of the given foreign-scan
3964 * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
3965 * have contained resjunk columns such as 'ctid' of the target relation and
3966 * 'wholerow' of non-target relations, but the tlist might not contain them,
3967 * for example. So, adjust the tlist so it contains all the columns specified
3968 * in the fdw_scan_tlist; else setrefs.c will get confused.
3969 */
3970 static void
rebuild_fdw_scan_tlist(ForeignScan * fscan,List * tlist)3971 rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
3972 {
3973 List *new_tlist = tlist;
3974 List *old_tlist = fscan->fdw_scan_tlist;
3975 ListCell *lc;
3976
3977 foreach(lc, old_tlist)
3978 {
3979 TargetEntry *tle = (TargetEntry *) lfirst(lc);
3980
3981 if (tlist_member(tle->expr, new_tlist))
3982 continue; /* already got it */
3983
3984 new_tlist = lappend(new_tlist,
3985 makeTargetEntry(tle->expr,
3986 list_length(new_tlist) + 1,
3987 NULL,
3988 false));
3989 }
3990 fscan->fdw_scan_tlist = new_tlist;
3991 }
3992
3993 /*
3994 * Execute a direct UPDATE/DELETE statement.
3995 */
3996 static void
execute_dml_stmt(ForeignScanState * node)3997 execute_dml_stmt(ForeignScanState *node)
3998 {
3999 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4000 ExprContext *econtext = node->ss.ps.ps_ExprContext;
4001 int numParams = dmstate->numParams;
4002 const char **values = dmstate->param_values;
4003
4004 /*
4005 * Construct array of query parameter values in text format.
4006 */
4007 if (numParams > 0)
4008 process_query_params(econtext,
4009 dmstate->param_flinfo,
4010 dmstate->param_exprs,
4011 values);
4012
4013 /*
4014 * Notice that we pass NULL for paramTypes, thus forcing the remote server
4015 * to infer types for all parameters. Since we explicitly cast every
4016 * parameter (see deparse.c), the "inference" is trivial and will produce
4017 * the desired result. This allows us to avoid assuming that the remote
4018 * server has the same OIDs we do for the parameters' types.
4019 */
4020 if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4021 NULL, values, NULL, NULL, 0))
4022 pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4023
4024 /*
4025 * Get the result, and check for success.
4026 *
4027 * We don't use a PG_TRY block here, so be careful not to throw error
4028 * without releasing the PGresult.
4029 */
4030 dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4031 if (PQresultStatus(dmstate->result) !=
4032 (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4033 pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4034 dmstate->query);
4035
4036 /* Get the number of rows affected. */
4037 if (dmstate->has_returning)
4038 dmstate->num_tuples = PQntuples(dmstate->result);
4039 else
4040 dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4041 }
4042
4043 /*
4044 * Get the result of a RETURNING clause.
4045 */
4046 static TupleTableSlot *
get_returning_data(ForeignScanState * node)4047 get_returning_data(ForeignScanState *node)
4048 {
4049 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4050 EState *estate = node->ss.ps.state;
4051 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
4052 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4053 TupleTableSlot *resultSlot;
4054
4055 Assert(resultRelInfo->ri_projectReturning);
4056
4057 /* If we didn't get any tuples, must be end of data. */
4058 if (dmstate->next_tuple >= dmstate->num_tuples)
4059 return ExecClearTuple(slot);
4060
4061 /* Increment the command es_processed count if necessary. */
4062 if (dmstate->set_processed)
4063 estate->es_processed += 1;
4064
4065 /*
4066 * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4067 * tuple. (has_returning is false when the local query is of the form
4068 * "UPDATE/DELETE .. RETURNING 1" for example.)
4069 */
4070 if (!dmstate->has_returning)
4071 {
4072 ExecStoreAllNullTuple(slot);
4073 resultSlot = slot;
4074 }
4075 else
4076 {
4077 /*
4078 * On error, be sure to release the PGresult on the way out. Callers
4079 * do not have PG_TRY blocks to ensure this happens.
4080 */
4081 PG_TRY();
4082 {
4083 HeapTuple newtup;
4084
4085 newtup = make_tuple_from_result_row(dmstate->result,
4086 dmstate->next_tuple,
4087 dmstate->rel,
4088 dmstate->attinmeta,
4089 dmstate->retrieved_attrs,
4090 node,
4091 dmstate->temp_cxt);
4092 ExecStoreHeapTuple(newtup, slot, false);
4093 }
4094 PG_CATCH();
4095 {
4096 if (dmstate->result)
4097 PQclear(dmstate->result);
4098 PG_RE_THROW();
4099 }
4100 PG_END_TRY();
4101
4102 /* Get the updated/deleted tuple. */
4103 if (dmstate->rel)
4104 resultSlot = slot;
4105 else
4106 resultSlot = apply_returning_filter(dmstate, slot, estate);
4107 }
4108 dmstate->next_tuple++;
4109
4110 /* Make slot available for evaluation of the local query RETURNING list. */
4111 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4112 resultSlot;
4113
4114 return slot;
4115 }
4116
4117 /*
4118 * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4119 */
4120 static void
init_returning_filter(PgFdwDirectModifyState * dmstate,List * fdw_scan_tlist,Index rtindex)4121 init_returning_filter(PgFdwDirectModifyState *dmstate,
4122 List *fdw_scan_tlist,
4123 Index rtindex)
4124 {
4125 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4126 ListCell *lc;
4127 int i;
4128
4129 /*
4130 * Calculate the mapping between the fdw_scan_tlist's entries and the
4131 * result tuple's attributes.
4132 *
4133 * The "map" is an array of indexes of the result tuple's attributes in
4134 * fdw_scan_tlist, i.e., one entry for every attribute of the result
4135 * tuple. We store zero for any attributes that don't have the
4136 * corresponding entries in that list, marking that a NULL is needed in
4137 * the result tuple.
4138 *
4139 * Also get the indexes of the entries for ctid and oid if any.
4140 */
4141 dmstate->attnoMap = (AttrNumber *)
4142 palloc0(resultTupType->natts * sizeof(AttrNumber));
4143
4144 dmstate->ctidAttno = dmstate->oidAttno = 0;
4145
4146 i = 1;
4147 dmstate->hasSystemCols = false;
4148 foreach(lc, fdw_scan_tlist)
4149 {
4150 TargetEntry *tle = (TargetEntry *) lfirst(lc);
4151 Var *var = (Var *) tle->expr;
4152
4153 Assert(IsA(var, Var));
4154
4155 /*
4156 * If the Var is a column of the target relation to be retrieved from
4157 * the foreign server, get the index of the entry.
4158 */
4159 if (var->varno == rtindex &&
4160 list_member_int(dmstate->retrieved_attrs, i))
4161 {
4162 int attrno = var->varattno;
4163
4164 if (attrno < 0)
4165 {
4166 /*
4167 * We don't retrieve system columns other than ctid and oid.
4168 */
4169 if (attrno == SelfItemPointerAttributeNumber)
4170 dmstate->ctidAttno = i;
4171 else
4172 Assert(false);
4173 dmstate->hasSystemCols = true;
4174 }
4175 else
4176 {
4177 /*
4178 * We don't retrieve whole-row references to the target
4179 * relation either.
4180 */
4181 Assert(attrno > 0);
4182
4183 dmstate->attnoMap[attrno - 1] = i;
4184 }
4185 }
4186 i++;
4187 }
4188 }
4189
4190 /*
4191 * Extract and return an updated/deleted tuple from a scan tuple.
4192 */
4193 static TupleTableSlot *
apply_returning_filter(PgFdwDirectModifyState * dmstate,TupleTableSlot * slot,EState * estate)4194 apply_returning_filter(PgFdwDirectModifyState *dmstate,
4195 TupleTableSlot *slot,
4196 EState *estate)
4197 {
4198 ResultRelInfo *relInfo = estate->es_result_relation_info;
4199 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4200 TupleTableSlot *resultSlot;
4201 Datum *values;
4202 bool *isnull;
4203 Datum *old_values;
4204 bool *old_isnull;
4205 int i;
4206
4207 /*
4208 * Use the return tuple slot as a place to store the result tuple.
4209 */
4210 resultSlot = ExecGetReturningSlot(estate, relInfo);
4211
4212 /*
4213 * Extract all the values of the scan tuple.
4214 */
4215 slot_getallattrs(slot);
4216 old_values = slot->tts_values;
4217 old_isnull = slot->tts_isnull;
4218
4219 /*
4220 * Prepare to build the result tuple.
4221 */
4222 ExecClearTuple(resultSlot);
4223 values = resultSlot->tts_values;
4224 isnull = resultSlot->tts_isnull;
4225
4226 /*
4227 * Transpose data into proper fields of the result tuple.
4228 */
4229 for (i = 0; i < resultTupType->natts; i++)
4230 {
4231 int j = dmstate->attnoMap[i];
4232
4233 if (j == 0)
4234 {
4235 values[i] = (Datum) 0;
4236 isnull[i] = true;
4237 }
4238 else
4239 {
4240 values[i] = old_values[j - 1];
4241 isnull[i] = old_isnull[j - 1];
4242 }
4243 }
4244
4245 /*
4246 * Build the virtual tuple.
4247 */
4248 ExecStoreVirtualTuple(resultSlot);
4249
4250 /*
4251 * If we have any system columns to return, materialize a heap tuple in
4252 * the slot from column values set above and install system columns in
4253 * that tuple.
4254 */
4255 if (dmstate->hasSystemCols)
4256 {
4257 HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4258
4259 /* ctid */
4260 if (dmstate->ctidAttno)
4261 {
4262 ItemPointer ctid = NULL;
4263
4264 ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4265 resultTup->t_self = *ctid;
4266 }
4267
4268 /*
4269 * And remaining columns
4270 *
4271 * Note: since we currently don't allow the target relation to appear
4272 * on the nullable side of an outer join, any system columns wouldn't
4273 * go to NULL.
4274 *
4275 * Note: no need to care about tableoid here because it will be
4276 * initialized in ExecProcessReturning().
4277 */
4278 HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
4279 HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
4280 HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
4281 }
4282
4283 /*
4284 * And return the result tuple.
4285 */
4286 return resultSlot;
4287 }
4288
4289 /*
4290 * Prepare for processing of parameters used in remote query.
4291 */
4292 static void
prepare_query_params(PlanState * node,List * fdw_exprs,int numParams,FmgrInfo ** param_flinfo,List ** param_exprs,const char *** param_values)4293 prepare_query_params(PlanState *node,
4294 List *fdw_exprs,
4295 int numParams,
4296 FmgrInfo **param_flinfo,
4297 List **param_exprs,
4298 const char ***param_values)
4299 {
4300 int i;
4301 ListCell *lc;
4302
4303 Assert(numParams > 0);
4304
4305 /* Prepare for output conversion of parameters used in remote query. */
4306 *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4307
4308 i = 0;
4309 foreach(lc, fdw_exprs)
4310 {
4311 Node *param_expr = (Node *) lfirst(lc);
4312 Oid typefnoid;
4313 bool isvarlena;
4314
4315 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4316 fmgr_info(typefnoid, &(*param_flinfo)[i]);
4317 i++;
4318 }
4319
4320 /*
4321 * Prepare remote-parameter expressions for evaluation. (Note: in
4322 * practice, we expect that all these expressions will be just Params, so
4323 * we could possibly do something more efficient than using the full
4324 * expression-eval machinery for this. But probably there would be little
4325 * benefit, and it'd require postgres_fdw to know more than is desirable
4326 * about Param evaluation.)
4327 */
4328 *param_exprs = ExecInitExprList(fdw_exprs, node);
4329
4330 /* Allocate buffer for text form of query parameters. */
4331 *param_values = (const char **) palloc0(numParams * sizeof(char *));
4332 }
4333
4334 /*
4335 * Construct array of query parameter values in text format.
4336 */
4337 static void
process_query_params(ExprContext * econtext,FmgrInfo * param_flinfo,List * param_exprs,const char ** param_values)4338 process_query_params(ExprContext *econtext,
4339 FmgrInfo *param_flinfo,
4340 List *param_exprs,
4341 const char **param_values)
4342 {
4343 int nestlevel;
4344 int i;
4345 ListCell *lc;
4346
4347 nestlevel = set_transmission_modes();
4348
4349 i = 0;
4350 foreach(lc, param_exprs)
4351 {
4352 ExprState *expr_state = (ExprState *) lfirst(lc);
4353 Datum expr_value;
4354 bool isNull;
4355
4356 /* Evaluate the parameter expression */
4357 expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4358
4359 /*
4360 * Get string representation of each parameter value by invoking
4361 * type-specific output function, unless the value is null.
4362 */
4363 if (isNull)
4364 param_values[i] = NULL;
4365 else
4366 param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value);
4367
4368 i++;
4369 }
4370
4371 reset_transmission_modes(nestlevel);
4372 }
4373
4374 /*
4375 * postgresAnalyzeForeignTable
4376 * Test whether analyzing this foreign table is supported
4377 */
4378 static bool
postgresAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)4379 postgresAnalyzeForeignTable(Relation relation,
4380 AcquireSampleRowsFunc *func,
4381 BlockNumber *totalpages)
4382 {
4383 ForeignTable *table;
4384 UserMapping *user;
4385 PGconn *conn;
4386 StringInfoData sql;
4387 PGresult *volatile res = NULL;
4388
4389 /* Return the row-analysis function pointer */
4390 *func = postgresAcquireSampleRowsFunc;
4391
4392 /*
4393 * Now we have to get the number of pages. It's annoying that the ANALYZE
4394 * API requires us to return that now, because it forces some duplication
4395 * of effort between this routine and postgresAcquireSampleRowsFunc. But
4396 * it's probably not worth redefining that API at this point.
4397 */
4398
4399 /*
4400 * Get the connection to use. We do the remote access as the table's
4401 * owner, even if the ANALYZE was started by some other user.
4402 */
4403 table = GetForeignTable(RelationGetRelid(relation));
4404 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4405 conn = GetConnection(user, false);
4406
4407 /*
4408 * Construct command to get page count for relation.
4409 */
4410 initStringInfo(&sql);
4411 deparseAnalyzeSizeSql(&sql, relation);
4412
4413 /* In what follows, do not risk leaking any PGresults. */
4414 PG_TRY();
4415 {
4416 res = pgfdw_exec_query(conn, sql.data);
4417 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4418 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4419
4420 if (PQntuples(res) != 1 || PQnfields(res) != 1)
4421 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4422 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4423
4424 PQclear(res);
4425 res = NULL;
4426 }
4427 PG_CATCH();
4428 {
4429 if (res)
4430 PQclear(res);
4431 PG_RE_THROW();
4432 }
4433 PG_END_TRY();
4434
4435 ReleaseConnection(conn);
4436
4437 return true;
4438 }
4439
4440 /*
4441 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4442 *
4443 * We fetch the whole table from the remote side and pick out some sample rows.
4444 *
4445 * Selected rows are returned in the caller-allocated array rows[],
4446 * which must have at least targrows entries.
4447 * The actual number of rows selected is returned as the function result.
4448 * We also count the total number of rows in the table and return it into
4449 * *totalrows. Note that *totaldeadrows is always set to 0.
4450 *
4451 * Note that the returned list of rows is not always in order by physical
4452 * position in the table. Therefore, correlation estimates derived later
4453 * may be meaningless, but it's OK because we don't use the estimates
4454 * currently (the planner only pays attention to correlation for indexscans).
4455 */
4456 static int
postgresAcquireSampleRowsFunc(Relation relation,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)4457 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
4458 HeapTuple *rows, int targrows,
4459 double *totalrows,
4460 double *totaldeadrows)
4461 {
4462 PgFdwAnalyzeState astate;
4463 ForeignTable *table;
4464 ForeignServer *server;
4465 UserMapping *user;
4466 PGconn *conn;
4467 unsigned int cursor_number;
4468 StringInfoData sql;
4469 PGresult *volatile res = NULL;
4470
4471 /* Initialize workspace state */
4472 astate.rel = relation;
4473 astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
4474
4475 astate.rows = rows;
4476 astate.targrows = targrows;
4477 astate.numrows = 0;
4478 astate.samplerows = 0;
4479 astate.rowstoskip = -1; /* -1 means not set yet */
4480 reservoir_init_selection_state(&astate.rstate, targrows);
4481
4482 /* Remember ANALYZE context, and create a per-tuple temp context */
4483 astate.anl_cxt = CurrentMemoryContext;
4484 astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
4485 "postgres_fdw temporary data",
4486 ALLOCSET_SMALL_SIZES);
4487
4488 /*
4489 * Get the connection to use. We do the remote access as the table's
4490 * owner, even if the ANALYZE was started by some other user.
4491 */
4492 table = GetForeignTable(RelationGetRelid(relation));
4493 server = GetForeignServer(table->serverid);
4494 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4495 conn = GetConnection(user, false);
4496
4497 /*
4498 * Construct cursor that retrieves whole rows from remote.
4499 */
4500 cursor_number = GetCursorNumber(conn);
4501 initStringInfo(&sql);
4502 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
4503 deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
4504
4505 /* In what follows, do not risk leaking any PGresults. */
4506 PG_TRY();
4507 {
4508 res = pgfdw_exec_query(conn, sql.data);
4509 if (PQresultStatus(res) != PGRES_COMMAND_OK)
4510 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4511 PQclear(res);
4512 res = NULL;
4513
4514 /* Retrieve and process rows a batch at a time. */
4515 for (;;)
4516 {
4517 char fetch_sql[64];
4518 int fetch_size;
4519 int numrows;
4520 int i;
4521 ListCell *lc;
4522
4523 /* Allow users to cancel long query */
4524 CHECK_FOR_INTERRUPTS();
4525
4526 /*
4527 * XXX possible future improvement: if rowstoskip is large, we
4528 * could issue a MOVE rather than physically fetching the rows,
4529 * then just adjust rowstoskip and samplerows appropriately.
4530 */
4531
4532 /* The fetch size is arbitrary, but shouldn't be enormous. */
4533 fetch_size = 100;
4534 foreach(lc, server->options)
4535 {
4536 DefElem *def = (DefElem *) lfirst(lc);
4537
4538 if (strcmp(def->defname, "fetch_size") == 0)
4539 {
4540 fetch_size = strtol(defGetString(def), NULL, 10);
4541 break;
4542 }
4543 }
4544 foreach(lc, table->options)
4545 {
4546 DefElem *def = (DefElem *) lfirst(lc);
4547
4548 if (strcmp(def->defname, "fetch_size") == 0)
4549 {
4550 fetch_size = strtol(defGetString(def), NULL, 10);
4551 break;
4552 }
4553 }
4554
4555 /* Fetch some rows */
4556 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
4557 fetch_size, cursor_number);
4558
4559 res = pgfdw_exec_query(conn, fetch_sql);
4560 /* On error, report the original query, not the FETCH. */
4561 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4562 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4563
4564 /* Process whatever we got. */
4565 numrows = PQntuples(res);
4566 for (i = 0; i < numrows; i++)
4567 analyze_row_processor(res, i, &astate);
4568
4569 PQclear(res);
4570 res = NULL;
4571
4572 /* Must be EOF if we didn't get all the rows requested. */
4573 if (numrows < fetch_size)
4574 break;
4575 }
4576
4577 /* Close the cursor, just to be tidy. */
4578 close_cursor(conn, cursor_number);
4579 }
4580 PG_CATCH();
4581 {
4582 if (res)
4583 PQclear(res);
4584 PG_RE_THROW();
4585 }
4586 PG_END_TRY();
4587
4588 ReleaseConnection(conn);
4589
4590 /* We assume that we have no dead tuple. */
4591 *totaldeadrows = 0.0;
4592
4593 /* We've retrieved all living tuples from foreign server. */
4594 *totalrows = astate.samplerows;
4595
4596 /*
4597 * Emit some interesting relation info
4598 */
4599 ereport(elevel,
4600 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
4601 RelationGetRelationName(relation),
4602 astate.samplerows, astate.numrows)));
4603
4604 return astate.numrows;
4605 }
4606
4607 /*
4608 * Collect sample rows from the result of query.
4609 * - Use all tuples in sample until target # of samples are collected.
4610 * - Subsequently, replace already-sampled tuples randomly.
4611 */
4612 static void
analyze_row_processor(PGresult * res,int row,PgFdwAnalyzeState * astate)4613 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
4614 {
4615 int targrows = astate->targrows;
4616 int pos; /* array index to store tuple in */
4617 MemoryContext oldcontext;
4618
4619 /* Always increment sample row counter. */
4620 astate->samplerows += 1;
4621
4622 /*
4623 * Determine the slot where this sample row should be stored. Set pos to
4624 * negative value to indicate the row should be skipped.
4625 */
4626 if (astate->numrows < targrows)
4627 {
4628 /* First targrows rows are always included into the sample */
4629 pos = astate->numrows++;
4630 }
4631 else
4632 {
4633 /*
4634 * Now we start replacing tuples in the sample until we reach the end
4635 * of the relation. Same algorithm as in acquire_sample_rows in
4636 * analyze.c; see Jeff Vitter's paper.
4637 */
4638 if (astate->rowstoskip < 0)
4639 astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
4640
4641 if (astate->rowstoskip <= 0)
4642 {
4643 /* Choose a random reservoir element to replace. */
4644 pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
4645 Assert(pos >= 0 && pos < targrows);
4646 heap_freetuple(astate->rows[pos]);
4647 }
4648 else
4649 {
4650 /* Skip this tuple. */
4651 pos = -1;
4652 }
4653
4654 astate->rowstoskip -= 1;
4655 }
4656
4657 if (pos >= 0)
4658 {
4659 /*
4660 * Create sample tuple from current result row, and store it in the
4661 * position determined above. The tuple has to be created in anl_cxt.
4662 */
4663 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
4664
4665 astate->rows[pos] = make_tuple_from_result_row(res, row,
4666 astate->rel,
4667 astate->attinmeta,
4668 astate->retrieved_attrs,
4669 NULL,
4670 astate->temp_cxt);
4671
4672 MemoryContextSwitchTo(oldcontext);
4673 }
4674 }
4675
4676 /*
4677 * Import a foreign schema
4678 */
4679 static List *
postgresImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)4680 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
4681 {
4682 List *commands = NIL;
4683 bool import_collate = true;
4684 bool import_default = false;
4685 bool import_generated = true;
4686 bool import_not_null = true;
4687 ForeignServer *server;
4688 UserMapping *mapping;
4689 PGconn *conn;
4690 StringInfoData buf;
4691 PGresult *volatile res = NULL;
4692 int numrows,
4693 i;
4694 ListCell *lc;
4695
4696 /* Parse statement options */
4697 foreach(lc, stmt->options)
4698 {
4699 DefElem *def = (DefElem *) lfirst(lc);
4700
4701 if (strcmp(def->defname, "import_collate") == 0)
4702 import_collate = defGetBoolean(def);
4703 else if (strcmp(def->defname, "import_default") == 0)
4704 import_default = defGetBoolean(def);
4705 else if (strcmp(def->defname, "import_generated") == 0)
4706 import_generated = defGetBoolean(def);
4707 else if (strcmp(def->defname, "import_not_null") == 0)
4708 import_not_null = defGetBoolean(def);
4709 else
4710 ereport(ERROR,
4711 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
4712 errmsg("invalid option \"%s\"", def->defname)));
4713 }
4714
4715 /*
4716 * Get connection to the foreign server. Connection manager will
4717 * establish new connection if necessary.
4718 */
4719 server = GetForeignServer(serverOid);
4720 mapping = GetUserMapping(GetUserId(), server->serverid);
4721 conn = GetConnection(mapping, false);
4722
4723 /* Don't attempt to import collation if remote server hasn't got it */
4724 if (PQserverVersion(conn) < 90100)
4725 import_collate = false;
4726
4727 /* Create workspace for strings */
4728 initStringInfo(&buf);
4729
4730 /* In what follows, do not risk leaking any PGresults. */
4731 PG_TRY();
4732 {
4733 /* Check that the schema really exists */
4734 appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
4735 deparseStringLiteral(&buf, stmt->remote_schema);
4736
4737 res = pgfdw_exec_query(conn, buf.data);
4738 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4739 pgfdw_report_error(ERROR, res, conn, false, buf.data);
4740
4741 if (PQntuples(res) != 1)
4742 ereport(ERROR,
4743 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
4744 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
4745 stmt->remote_schema, server->servername)));
4746
4747 PQclear(res);
4748 res = NULL;
4749 resetStringInfo(&buf);
4750
4751 /*
4752 * Fetch all table data from this schema, possibly restricted by
4753 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
4754 * to EXCEPT/LIMIT TO here, because the core code will filter the
4755 * statements we return according to those lists anyway. But it
4756 * should save a few cycles to not process excluded tables in the
4757 * first place.)
4758 *
4759 * Ignore table data for partitions and only include the definitions
4760 * of the root partitioned tables to allow access to the complete
4761 * remote data set locally in the schema imported.
4762 *
4763 * Note: because we run the connection with search_path restricted to
4764 * pg_catalog, the format_type() and pg_get_expr() outputs will always
4765 * include a schema name for types/functions in other schemas, which
4766 * is what we want.
4767 */
4768 appendStringInfoString(&buf,
4769 "SELECT relname, "
4770 " attname, "
4771 " format_type(atttypid, atttypmod), "
4772 " attnotnull, ");
4773
4774 /* Generated columns are supported since Postgres 12 */
4775 if (PQserverVersion(conn) >= 120000)
4776 appendStringInfoString(&buf,
4777 " attgenerated, "
4778 " pg_get_expr(adbin, adrelid), ");
4779 else
4780 appendStringInfoString(&buf,
4781 " NULL, "
4782 " pg_get_expr(adbin, adrelid), ");
4783
4784 if (import_collate)
4785 appendStringInfoString(&buf,
4786 " collname, "
4787 " collnsp.nspname "
4788 "FROM pg_class c "
4789 " JOIN pg_namespace n ON "
4790 " relnamespace = n.oid "
4791 " LEFT JOIN pg_attribute a ON "
4792 " attrelid = c.oid AND attnum > 0 "
4793 " AND NOT attisdropped "
4794 " LEFT JOIN pg_attrdef ad ON "
4795 " adrelid = c.oid AND adnum = attnum "
4796 " LEFT JOIN pg_collation coll ON "
4797 " coll.oid = attcollation "
4798 " LEFT JOIN pg_namespace collnsp ON "
4799 " collnsp.oid = collnamespace ");
4800 else
4801 appendStringInfoString(&buf,
4802 " NULL, NULL "
4803 "FROM pg_class c "
4804 " JOIN pg_namespace n ON "
4805 " relnamespace = n.oid "
4806 " LEFT JOIN pg_attribute a ON "
4807 " attrelid = c.oid AND attnum > 0 "
4808 " AND NOT attisdropped "
4809 " LEFT JOIN pg_attrdef ad ON "
4810 " adrelid = c.oid AND adnum = attnum ");
4811
4812 appendStringInfoString(&buf,
4813 "WHERE c.relkind IN ("
4814 CppAsString2(RELKIND_RELATION) ","
4815 CppAsString2(RELKIND_VIEW) ","
4816 CppAsString2(RELKIND_FOREIGN_TABLE) ","
4817 CppAsString2(RELKIND_MATVIEW) ","
4818 CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
4819 " AND n.nspname = ");
4820 deparseStringLiteral(&buf, stmt->remote_schema);
4821
4822 /* Partitions are supported since Postgres 10 */
4823 if (PQserverVersion(conn) >= 100000)
4824 appendStringInfoString(&buf, " AND NOT c.relispartition ");
4825
4826 /* Apply restrictions for LIMIT TO and EXCEPT */
4827 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
4828 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4829 {
4830 bool first_item = true;
4831
4832 appendStringInfoString(&buf, " AND c.relname ");
4833 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4834 appendStringInfoString(&buf, "NOT ");
4835 appendStringInfoString(&buf, "IN (");
4836
4837 /* Append list of table names within IN clause */
4838 foreach(lc, stmt->table_list)
4839 {
4840 RangeVar *rv = (RangeVar *) lfirst(lc);
4841
4842 if (first_item)
4843 first_item = false;
4844 else
4845 appendStringInfoString(&buf, ", ");
4846 deparseStringLiteral(&buf, rv->relname);
4847 }
4848 appendStringInfoChar(&buf, ')');
4849 }
4850
4851 /* Append ORDER BY at the end of query to ensure output ordering */
4852 appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
4853
4854 /* Fetch the data */
4855 res = pgfdw_exec_query(conn, buf.data);
4856 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4857 pgfdw_report_error(ERROR, res, conn, false, buf.data);
4858
4859 /* Process results */
4860 numrows = PQntuples(res);
4861 /* note: incrementation of i happens in inner loop's while() test */
4862 for (i = 0; i < numrows;)
4863 {
4864 char *tablename = PQgetvalue(res, i, 0);
4865 bool first_item = true;
4866
4867 resetStringInfo(&buf);
4868 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
4869 quote_identifier(tablename));
4870
4871 /* Scan all rows for this table */
4872 do
4873 {
4874 char *attname;
4875 char *typename;
4876 char *attnotnull;
4877 char *attgenerated;
4878 char *attdefault;
4879 char *collname;
4880 char *collnamespace;
4881
4882 /* If table has no columns, we'll see nulls here */
4883 if (PQgetisnull(res, i, 1))
4884 continue;
4885
4886 attname = PQgetvalue(res, i, 1);
4887 typename = PQgetvalue(res, i, 2);
4888 attnotnull = PQgetvalue(res, i, 3);
4889 attgenerated = PQgetisnull(res, i, 4) ? (char *) NULL :
4890 PQgetvalue(res, i, 4);
4891 attdefault = PQgetisnull(res, i, 5) ? (char *) NULL :
4892 PQgetvalue(res, i, 5);
4893 collname = PQgetisnull(res, i, 6) ? (char *) NULL :
4894 PQgetvalue(res, i, 6);
4895 collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL :
4896 PQgetvalue(res, i, 7);
4897
4898 if (first_item)
4899 first_item = false;
4900 else
4901 appendStringInfoString(&buf, ",\n");
4902
4903 /* Print column name and type */
4904 appendStringInfo(&buf, " %s %s",
4905 quote_identifier(attname),
4906 typename);
4907
4908 /*
4909 * Add column_name option so that renaming the foreign table's
4910 * column doesn't break the association to the underlying
4911 * column.
4912 */
4913 appendStringInfoString(&buf, " OPTIONS (column_name ");
4914 deparseStringLiteral(&buf, attname);
4915 appendStringInfoChar(&buf, ')');
4916
4917 /* Add COLLATE if needed */
4918 if (import_collate && collname != NULL && collnamespace != NULL)
4919 appendStringInfo(&buf, " COLLATE %s.%s",
4920 quote_identifier(collnamespace),
4921 quote_identifier(collname));
4922
4923 /* Add DEFAULT if needed */
4924 if (import_default && attdefault != NULL &&
4925 (!attgenerated || !attgenerated[0]))
4926 appendStringInfo(&buf, " DEFAULT %s", attdefault);
4927
4928 /* Add GENERATED if needed */
4929 if (import_generated && attgenerated != NULL &&
4930 attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
4931 {
4932 Assert(attdefault != NULL);
4933 appendStringInfo(&buf,
4934 " GENERATED ALWAYS AS (%s) STORED",
4935 attdefault);
4936 }
4937
4938 /* Add NOT NULL if needed */
4939 if (import_not_null && attnotnull[0] == 't')
4940 appendStringInfoString(&buf, " NOT NULL");
4941 }
4942 while (++i < numrows &&
4943 strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4944
4945 /*
4946 * Add server name and table-level options. We specify remote
4947 * schema and table name as options (the latter to ensure that
4948 * renaming the foreign table doesn't break the association).
4949 */
4950 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4951 quote_identifier(server->servername));
4952
4953 appendStringInfoString(&buf, "schema_name ");
4954 deparseStringLiteral(&buf, stmt->remote_schema);
4955 appendStringInfoString(&buf, ", table_name ");
4956 deparseStringLiteral(&buf, tablename);
4957
4958 appendStringInfoString(&buf, ");");
4959
4960 commands = lappend(commands, pstrdup(buf.data));
4961 }
4962
4963 /* Clean up */
4964 PQclear(res);
4965 res = NULL;
4966 }
4967 PG_CATCH();
4968 {
4969 if (res)
4970 PQclear(res);
4971 PG_RE_THROW();
4972 }
4973 PG_END_TRY();
4974
4975 ReleaseConnection(conn);
4976
4977 return commands;
4978 }
4979
4980 /*
4981 * Assess whether the join between inner and outer relations can be pushed down
4982 * to the foreign server. As a side effect, save information we obtain in this
4983 * function to PgFdwRelationInfo passed in.
4984 */
4985 static bool
foreign_join_ok(PlannerInfo * root,RelOptInfo * joinrel,JoinType jointype,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinPathExtraData * extra)4986 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
4987 RelOptInfo *outerrel, RelOptInfo *innerrel,
4988 JoinPathExtraData *extra)
4989 {
4990 PgFdwRelationInfo *fpinfo;
4991 PgFdwRelationInfo *fpinfo_o;
4992 PgFdwRelationInfo *fpinfo_i;
4993 ListCell *lc;
4994 List *joinclauses;
4995
4996 /*
4997 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4998 * Constructing queries representing SEMI and ANTI joins is hard, hence
4999 * not considered right now.
5000 */
5001 if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
5002 jointype != JOIN_RIGHT && jointype != JOIN_FULL)
5003 return false;
5004
5005 /*
5006 * If either of the joining relations is marked as unsafe to pushdown, the
5007 * join can not be pushed down.
5008 */
5009 fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
5010 fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
5011 fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
5012 if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
5013 !fpinfo_i || !fpinfo_i->pushdown_safe)
5014 return false;
5015
5016 /*
5017 * If joining relations have local conditions, those conditions are
5018 * required to be applied before joining the relations. Hence the join can
5019 * not be pushed down.
5020 */
5021 if (fpinfo_o->local_conds || fpinfo_i->local_conds)
5022 return false;
5023
5024 /*
5025 * Merge FDW options. We might be tempted to do this after we have deemed
5026 * the foreign join to be OK. But we must do this beforehand so that we
5027 * know which quals can be evaluated on the foreign server, which might
5028 * depend on shippable_extensions.
5029 */
5030 fpinfo->server = fpinfo_o->server;
5031 merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
5032
5033 /*
5034 * Separate restrict list into join quals and pushed-down (other) quals.
5035 *
5036 * Join quals belonging to an outer join must all be shippable, else we
5037 * cannot execute the join remotely. Add such quals to 'joinclauses'.
5038 *
5039 * Add other quals to fpinfo->remote_conds if they are shippable, else to
5040 * fpinfo->local_conds. In an inner join it's okay to execute conditions
5041 * either locally or remotely; the same is true for pushed-down conditions
5042 * at an outer join.
5043 *
5044 * Note we might return failure after having already scribbled on
5045 * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
5046 * won't consult those lists again if we deem the join unshippable.
5047 */
5048 joinclauses = NIL;
5049 foreach(lc, extra->restrictlist)
5050 {
5051 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5052 bool is_remote_clause = is_foreign_expr(root, joinrel,
5053 rinfo->clause);
5054
5055 if (IS_OUTER_JOIN(jointype) &&
5056 !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
5057 {
5058 if (!is_remote_clause)
5059 return false;
5060 joinclauses = lappend(joinclauses, rinfo);
5061 }
5062 else
5063 {
5064 if (is_remote_clause)
5065 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5066 else
5067 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5068 }
5069 }
5070
5071 /*
5072 * deparseExplicitTargetList() isn't smart enough to handle anything other
5073 * than a Var. In particular, if there's some PlaceHolderVar that would
5074 * need to be evaluated within this join tree (because there's an upper
5075 * reference to a quantity that may go to NULL as a result of an outer
5076 * join), then we can't try to push the join down because we'll fail when
5077 * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
5078 * needs to be evaluated *at the top* of this join tree is OK, because we
5079 * can do that locally after fetching the results from the remote side.
5080 */
5081 foreach(lc, root->placeholder_list)
5082 {
5083 PlaceHolderInfo *phinfo = lfirst(lc);
5084 Relids relids;
5085
5086 /* PlaceHolderInfo refers to parent relids, not child relids. */
5087 relids = IS_OTHER_REL(joinrel) ?
5088 joinrel->top_parent_relids : joinrel->relids;
5089
5090 if (bms_is_subset(phinfo->ph_eval_at, relids) &&
5091 bms_nonempty_difference(relids, phinfo->ph_eval_at))
5092 return false;
5093 }
5094
5095 /* Save the join clauses, for later use. */
5096 fpinfo->joinclauses = joinclauses;
5097
5098 fpinfo->outerrel = outerrel;
5099 fpinfo->innerrel = innerrel;
5100 fpinfo->jointype = jointype;
5101
5102 /*
5103 * By default, both the input relations are not required to be deparsed as
5104 * subqueries, but there might be some relations covered by the input
5105 * relations that are required to be deparsed as subqueries, so save the
5106 * relids of those relations for later use by the deparser.
5107 */
5108 fpinfo->make_outerrel_subquery = false;
5109 fpinfo->make_innerrel_subquery = false;
5110 Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
5111 Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
5112 fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
5113 fpinfo_i->lower_subquery_rels);
5114
5115 /*
5116 * Pull the other remote conditions from the joining relations into join
5117 * clauses or other remote clauses (remote_conds) of this relation
5118 * wherever possible. This avoids building subqueries at every join step.
5119 *
5120 * For an inner join, clauses from both the relations are added to the
5121 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
5122 * the outer side are added to remote_conds since those can be evaluated
5123 * after the join is evaluated. The clauses from inner side are added to
5124 * the joinclauses, since they need to be evaluated while constructing the
5125 * join.
5126 *
5127 * For a FULL OUTER JOIN, the other clauses from either relation can not
5128 * be added to the joinclauses or remote_conds, since each relation acts
5129 * as an outer relation for the other.
5130 *
5131 * The joining sides can not have local conditions, thus no need to test
5132 * shippability of the clauses being pulled up.
5133 */
5134 switch (jointype)
5135 {
5136 case JOIN_INNER:
5137 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5138 list_copy(fpinfo_i->remote_conds));
5139 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5140 list_copy(fpinfo_o->remote_conds));
5141 break;
5142
5143 case JOIN_LEFT:
5144 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5145 list_copy(fpinfo_i->remote_conds));
5146 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5147 list_copy(fpinfo_o->remote_conds));
5148 break;
5149
5150 case JOIN_RIGHT:
5151 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5152 list_copy(fpinfo_o->remote_conds));
5153 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5154 list_copy(fpinfo_i->remote_conds));
5155 break;
5156
5157 case JOIN_FULL:
5158
5159 /*
5160 * In this case, if any of the input relations has conditions, we
5161 * need to deparse that relation as a subquery so that the
5162 * conditions can be evaluated before the join. Remember it in
5163 * the fpinfo of this relation so that the deparser can take
5164 * appropriate action. Also, save the relids of base relations
5165 * covered by that relation for later use by the deparser.
5166 */
5167 if (fpinfo_o->remote_conds)
5168 {
5169 fpinfo->make_outerrel_subquery = true;
5170 fpinfo->lower_subquery_rels =
5171 bms_add_members(fpinfo->lower_subquery_rels,
5172 outerrel->relids);
5173 }
5174 if (fpinfo_i->remote_conds)
5175 {
5176 fpinfo->make_innerrel_subquery = true;
5177 fpinfo->lower_subquery_rels =
5178 bms_add_members(fpinfo->lower_subquery_rels,
5179 innerrel->relids);
5180 }
5181 break;
5182
5183 default:
5184 /* Should not happen, we have just checked this above */
5185 elog(ERROR, "unsupported join type %d", jointype);
5186 }
5187
5188 /*
5189 * For an inner join, all restrictions can be treated alike. Treating the
5190 * pushed down conditions as join conditions allows a top level full outer
5191 * join to be deparsed without requiring subqueries.
5192 */
5193 if (jointype == JOIN_INNER)
5194 {
5195 Assert(!fpinfo->joinclauses);
5196 fpinfo->joinclauses = fpinfo->remote_conds;
5197 fpinfo->remote_conds = NIL;
5198 }
5199
5200 /* Mark that this join can be pushed down safely */
5201 fpinfo->pushdown_safe = true;
5202
5203 /* Get user mapping */
5204 if (fpinfo->use_remote_estimate)
5205 {
5206 if (fpinfo_o->use_remote_estimate)
5207 fpinfo->user = fpinfo_o->user;
5208 else
5209 fpinfo->user = fpinfo_i->user;
5210 }
5211 else
5212 fpinfo->user = NULL;
5213
5214 /*
5215 * Set # of retrieved rows and cached relation costs to some negative
5216 * value, so that we can detect when they are set to some sensible values,
5217 * during one (usually the first) of the calls to estimate_path_cost_size.
5218 */
5219 fpinfo->retrieved_rows = -1;
5220 fpinfo->rel_startup_cost = -1;
5221 fpinfo->rel_total_cost = -1;
5222
5223 /*
5224 * Set the string describing this join relation to be used in EXPLAIN
5225 * output of corresponding ForeignScan.
5226 */
5227 fpinfo->relation_name = makeStringInfo();
5228 appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
5229 fpinfo_o->relation_name->data,
5230 get_jointype_name(fpinfo->jointype),
5231 fpinfo_i->relation_name->data);
5232
5233 /*
5234 * Set the relation index. This is defined as the position of this
5235 * joinrel in the join_rel_list list plus the length of the rtable list.
5236 * Note that since this joinrel is at the end of the join_rel_list list
5237 * when we are called, we can get the position by list_length.
5238 */
5239 Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
5240 fpinfo->relation_index =
5241 list_length(root->parse->rtable) + list_length(root->join_rel_list);
5242
5243 return true;
5244 }
5245
5246 static void
add_paths_with_pathkeys_for_rel(PlannerInfo * root,RelOptInfo * rel,Path * epq_path)5247 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
5248 Path *epq_path)
5249 {
5250 List *useful_pathkeys_list = NIL; /* List of all pathkeys */
5251 ListCell *lc;
5252
5253 useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
5254
5255 /* Create one path for each set of pathkeys we found above. */
5256 foreach(lc, useful_pathkeys_list)
5257 {
5258 double rows;
5259 int width;
5260 Cost startup_cost;
5261 Cost total_cost;
5262 List *useful_pathkeys = lfirst(lc);
5263 Path *sorted_epq_path;
5264
5265 estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL,
5266 &rows, &width, &startup_cost, &total_cost);
5267
5268 /*
5269 * The EPQ path must be at least as well sorted as the path itself, in
5270 * case it gets used as input to a mergejoin.
5271 */
5272 sorted_epq_path = epq_path;
5273 if (sorted_epq_path != NULL &&
5274 !pathkeys_contained_in(useful_pathkeys,
5275 sorted_epq_path->pathkeys))
5276 sorted_epq_path = (Path *)
5277 create_sort_path(root,
5278 rel,
5279 sorted_epq_path,
5280 useful_pathkeys,
5281 -1.0);
5282
5283 if (IS_SIMPLE_REL(rel))
5284 add_path(rel, (Path *)
5285 create_foreignscan_path(root, rel,
5286 NULL,
5287 rows,
5288 startup_cost,
5289 total_cost,
5290 useful_pathkeys,
5291 rel->lateral_relids,
5292 sorted_epq_path,
5293 NIL));
5294 else
5295 add_path(rel, (Path *)
5296 create_foreign_join_path(root, rel,
5297 NULL,
5298 rows,
5299 startup_cost,
5300 total_cost,
5301 useful_pathkeys,
5302 rel->lateral_relids,
5303 sorted_epq_path,
5304 NIL));
5305 }
5306 }
5307
5308 /*
5309 * Parse options from foreign server and apply them to fpinfo.
5310 *
5311 * New options might also require tweaking merge_fdw_options().
5312 */
5313 static void
apply_server_options(PgFdwRelationInfo * fpinfo)5314 apply_server_options(PgFdwRelationInfo *fpinfo)
5315 {
5316 ListCell *lc;
5317
5318 foreach(lc, fpinfo->server->options)
5319 {
5320 DefElem *def = (DefElem *) lfirst(lc);
5321
5322 if (strcmp(def->defname, "use_remote_estimate") == 0)
5323 fpinfo->use_remote_estimate = defGetBoolean(def);
5324 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
5325 fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
5326 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
5327 fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
5328 else if (strcmp(def->defname, "extensions") == 0)
5329 fpinfo->shippable_extensions =
5330 ExtractExtensionList(defGetString(def), false);
5331 else if (strcmp(def->defname, "fetch_size") == 0)
5332 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5333 }
5334 }
5335
5336 /*
5337 * Parse options from foreign table and apply them to fpinfo.
5338 *
5339 * New options might also require tweaking merge_fdw_options().
5340 */
5341 static void
apply_table_options(PgFdwRelationInfo * fpinfo)5342 apply_table_options(PgFdwRelationInfo *fpinfo)
5343 {
5344 ListCell *lc;
5345
5346 foreach(lc, fpinfo->table->options)
5347 {
5348 DefElem *def = (DefElem *) lfirst(lc);
5349
5350 if (strcmp(def->defname, "use_remote_estimate") == 0)
5351 fpinfo->use_remote_estimate = defGetBoolean(def);
5352 else if (strcmp(def->defname, "fetch_size") == 0)
5353 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5354 }
5355 }
5356
5357 /*
5358 * Merge FDW options from input relations into a new set of options for a join
5359 * or an upper rel.
5360 *
5361 * For a join relation, FDW-specific information about the inner and outer
5362 * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
5363 * fpinfo_o provides the information for the input relation; fpinfo_i is
5364 * expected to NULL.
5365 */
5366 static void
merge_fdw_options(PgFdwRelationInfo * fpinfo,const PgFdwRelationInfo * fpinfo_o,const PgFdwRelationInfo * fpinfo_i)5367 merge_fdw_options(PgFdwRelationInfo *fpinfo,
5368 const PgFdwRelationInfo *fpinfo_o,
5369 const PgFdwRelationInfo *fpinfo_i)
5370 {
5371 /* We must always have fpinfo_o. */
5372 Assert(fpinfo_o);
5373
5374 /* fpinfo_i may be NULL, but if present the servers must both match. */
5375 Assert(!fpinfo_i ||
5376 fpinfo_i->server->serverid == fpinfo_o->server->serverid);
5377
5378 /*
5379 * Copy the server specific FDW options. (For a join, both relations come
5380 * from the same server, so the server options should have the same value
5381 * for both relations.)
5382 */
5383 fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
5384 fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
5385 fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
5386 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
5387 fpinfo->fetch_size = fpinfo_o->fetch_size;
5388
5389 /* Merge the table level options from either side of the join. */
5390 if (fpinfo_i)
5391 {
5392 /*
5393 * We'll prefer to use remote estimates for this join if any table
5394 * from either side of the join is using remote estimates. This is
5395 * most likely going to be preferred since they're already willing to
5396 * pay the price of a round trip to get the remote EXPLAIN. In any
5397 * case it's not entirely clear how we might otherwise handle this
5398 * best.
5399 */
5400 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
5401 fpinfo_i->use_remote_estimate;
5402
5403 /*
5404 * Set fetch size to maximum of the joining sides, since we are
5405 * expecting the rows returned by the join to be proportional to the
5406 * relation sizes.
5407 */
5408 fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
5409 }
5410 }
5411
5412 /*
5413 * postgresGetForeignJoinPaths
5414 * Add possible ForeignPath to joinrel, if join is safe to push down.
5415 */
5416 static void
postgresGetForeignJoinPaths(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)5417 postgresGetForeignJoinPaths(PlannerInfo *root,
5418 RelOptInfo *joinrel,
5419 RelOptInfo *outerrel,
5420 RelOptInfo *innerrel,
5421 JoinType jointype,
5422 JoinPathExtraData *extra)
5423 {
5424 PgFdwRelationInfo *fpinfo;
5425 ForeignPath *joinpath;
5426 double rows;
5427 int width;
5428 Cost startup_cost;
5429 Cost total_cost;
5430 Path *epq_path; /* Path to create plan to be executed when
5431 * EvalPlanQual gets triggered. */
5432
5433 /*
5434 * Skip if this join combination has been considered already.
5435 */
5436 if (joinrel->fdw_private)
5437 return;
5438
5439 /*
5440 * This code does not work for joins with lateral references, since those
5441 * must have parameterized paths, which we don't generate yet.
5442 */
5443 if (!bms_is_empty(joinrel->lateral_relids))
5444 return;
5445
5446 /*
5447 * Create unfinished PgFdwRelationInfo entry which is used to indicate
5448 * that the join relation is already considered, so that we won't waste
5449 * time in judging safety of join pushdown and adding the same paths again
5450 * if found safe. Once we know that this join can be pushed down, we fill
5451 * the entry.
5452 */
5453 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5454 fpinfo->pushdown_safe = false;
5455 joinrel->fdw_private = fpinfo;
5456 /* attrs_used is only for base relations. */
5457 fpinfo->attrs_used = NULL;
5458
5459 /*
5460 * If there is a possibility that EvalPlanQual will be executed, we need
5461 * to be able to reconstruct the row using scans of the base relations.
5462 * GetExistingLocalJoinPath will find a suitable path for this purpose in
5463 * the path list of the joinrel, if one exists. We must be careful to
5464 * call it before adding any ForeignPath, since the ForeignPath might
5465 * dominate the only suitable local path available. We also do it before
5466 * calling foreign_join_ok(), since that function updates fpinfo and marks
5467 * it as pushable if the join is found to be pushable.
5468 */
5469 if (root->parse->commandType == CMD_DELETE ||
5470 root->parse->commandType == CMD_UPDATE ||
5471 root->rowMarks)
5472 {
5473 epq_path = GetExistingLocalJoinPath(joinrel);
5474 if (!epq_path)
5475 {
5476 elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
5477 return;
5478 }
5479 }
5480 else
5481 epq_path = NULL;
5482
5483 if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
5484 {
5485 /* Free path required for EPQ if we copied one; we don't need it now */
5486 if (epq_path)
5487 pfree(epq_path);
5488 return;
5489 }
5490
5491 /*
5492 * Compute the selectivity and cost of the local_conds, so we don't have
5493 * to do it over again for each path. The best we can do for these
5494 * conditions is to estimate selectivity on the basis of local statistics.
5495 * The local conditions are applied after the join has been computed on
5496 * the remote side like quals in WHERE clause, so pass jointype as
5497 * JOIN_INNER.
5498 */
5499 fpinfo->local_conds_sel = clauselist_selectivity(root,
5500 fpinfo->local_conds,
5501 0,
5502 JOIN_INNER,
5503 NULL);
5504 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5505
5506 /*
5507 * If we are going to estimate costs locally, estimate the join clause
5508 * selectivity here while we have special join info.
5509 */
5510 if (!fpinfo->use_remote_estimate)
5511 fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
5512 0, fpinfo->jointype,
5513 extra->sjinfo);
5514
5515 /* Estimate costs for bare join relation */
5516 estimate_path_cost_size(root, joinrel, NIL, NIL, NULL,
5517 &rows, &width, &startup_cost, &total_cost);
5518 /* Now update this information in the joinrel */
5519 joinrel->rows = rows;
5520 joinrel->reltarget->width = width;
5521 fpinfo->rows = rows;
5522 fpinfo->width = width;
5523 fpinfo->startup_cost = startup_cost;
5524 fpinfo->total_cost = total_cost;
5525
5526 /*
5527 * Create a new join path and add it to the joinrel which represents a
5528 * join between foreign tables.
5529 */
5530 joinpath = create_foreign_join_path(root,
5531 joinrel,
5532 NULL, /* default pathtarget */
5533 rows,
5534 startup_cost,
5535 total_cost,
5536 NIL, /* no pathkeys */
5537 joinrel->lateral_relids,
5538 epq_path,
5539 NIL); /* no fdw_private */
5540
5541 /* Add generated path into joinrel by add_path(). */
5542 add_path(joinrel, (Path *) joinpath);
5543
5544 /* Consider pathkeys for the join relation */
5545 add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
5546
5547 /* XXX Consider parameterized paths for the join relation */
5548 }
5549
5550 /*
5551 * Assess whether the aggregation, grouping and having operations can be pushed
5552 * down to the foreign server. As a side effect, save information we obtain in
5553 * this function to PgFdwRelationInfo of the input relation.
5554 */
5555 static bool
foreign_grouping_ok(PlannerInfo * root,RelOptInfo * grouped_rel,Node * havingQual)5556 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
5557 Node *havingQual)
5558 {
5559 Query *query = root->parse;
5560 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
5561 PathTarget *grouping_target = grouped_rel->reltarget;
5562 PgFdwRelationInfo *ofpinfo;
5563 ListCell *lc;
5564 int i;
5565 List *tlist = NIL;
5566
5567 /* We currently don't support pushing Grouping Sets. */
5568 if (query->groupingSets)
5569 return false;
5570
5571 /* Get the fpinfo of the underlying scan relation. */
5572 ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
5573
5574 /*
5575 * If underlying scan relation has any local conditions, those conditions
5576 * are required to be applied before performing aggregation. Hence the
5577 * aggregate cannot be pushed down.
5578 */
5579 if (ofpinfo->local_conds)
5580 return false;
5581
5582 /*
5583 * Examine grouping expressions, as well as other expressions we'd need to
5584 * compute, and check whether they are safe to push down to the foreign
5585 * server. All GROUP BY expressions will be part of the grouping target
5586 * and thus there is no need to search for them separately. Add grouping
5587 * expressions into target list which will be passed to foreign server.
5588 *
5589 * A tricky fine point is that we must not put any expression into the
5590 * target list that is just a foreign param (that is, something that
5591 * deparse.c would conclude has to be sent to the foreign server). If we
5592 * do, the expression will also appear in the fdw_exprs list of the plan
5593 * node, and setrefs.c will get confused and decide that the fdw_exprs
5594 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
5595 * a broken plan. Somewhat oddly, it's OK if the expression contains such
5596 * a node, as long as it's not at top level; then no match is possible.
5597 */
5598 i = 0;
5599 foreach(lc, grouping_target->exprs)
5600 {
5601 Expr *expr = (Expr *) lfirst(lc);
5602 Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
5603 ListCell *l;
5604
5605 /* Check whether this expression is part of GROUP BY clause */
5606 if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
5607 {
5608 TargetEntry *tle;
5609
5610 /*
5611 * If any GROUP BY expression is not shippable, then we cannot
5612 * push down aggregation to the foreign server.
5613 */
5614 if (!is_foreign_expr(root, grouped_rel, expr))
5615 return false;
5616
5617 /*
5618 * If it would be a foreign param, we can't put it into the tlist,
5619 * so we have to fail.
5620 */
5621 if (is_foreign_param(root, grouped_rel, expr))
5622 return false;
5623
5624 /*
5625 * Pushable, so add to tlist. We need to create a TLE for this
5626 * expression and apply the sortgroupref to it. We cannot use
5627 * add_to_flat_tlist() here because that avoids making duplicate
5628 * entries in the tlist. If there are duplicate entries with
5629 * distinct sortgrouprefs, we have to duplicate that situation in
5630 * the output tlist.
5631 */
5632 tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
5633 tle->ressortgroupref = sgref;
5634 tlist = lappend(tlist, tle);
5635 }
5636 else
5637 {
5638 /*
5639 * Non-grouping expression we need to compute. Can we ship it
5640 * as-is to the foreign server?
5641 */
5642 if (is_foreign_expr(root, grouped_rel, expr) &&
5643 !is_foreign_param(root, grouped_rel, expr))
5644 {
5645 /* Yes, so add to tlist as-is; OK to suppress duplicates */
5646 tlist = add_to_flat_tlist(tlist, list_make1(expr));
5647 }
5648 else
5649 {
5650 /* Not pushable as a whole; extract its Vars and aggregates */
5651 List *aggvars;
5652
5653 aggvars = pull_var_clause((Node *) expr,
5654 PVC_INCLUDE_AGGREGATES);
5655
5656 /*
5657 * If any aggregate expression is not shippable, then we
5658 * cannot push down aggregation to the foreign server. (We
5659 * don't have to check is_foreign_param, since that certainly
5660 * won't return true for any such expression.)
5661 */
5662 if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
5663 return false;
5664
5665 /*
5666 * Add aggregates, if any, into the targetlist. Plain Vars
5667 * outside an aggregate can be ignored, because they should be
5668 * either same as some GROUP BY column or part of some GROUP
5669 * BY expression. In either case, they are already part of
5670 * the targetlist and thus no need to add them again. In fact
5671 * including plain Vars in the tlist when they do not match a
5672 * GROUP BY column would cause the foreign server to complain
5673 * that the shipped query is invalid.
5674 */
5675 foreach(l, aggvars)
5676 {
5677 Expr *expr = (Expr *) lfirst(l);
5678
5679 if (IsA(expr, Aggref))
5680 tlist = add_to_flat_tlist(tlist, list_make1(expr));
5681 }
5682 }
5683 }
5684
5685 i++;
5686 }
5687
5688 /*
5689 * Classify the pushable and non-pushable HAVING clauses and save them in
5690 * remote_conds and local_conds of the grouped rel's fpinfo.
5691 */
5692 if (havingQual)
5693 {
5694 ListCell *lc;
5695
5696 foreach(lc, (List *) havingQual)
5697 {
5698 Expr *expr = (Expr *) lfirst(lc);
5699 RestrictInfo *rinfo;
5700
5701 /*
5702 * Currently, the core code doesn't wrap havingQuals in
5703 * RestrictInfos, so we must make our own.
5704 */
5705 Assert(!IsA(expr, RestrictInfo));
5706 rinfo = make_restrictinfo(root,
5707 expr,
5708 true,
5709 false,
5710 false,
5711 root->qual_security_level,
5712 grouped_rel->relids,
5713 NULL,
5714 NULL);
5715 if (is_foreign_expr(root, grouped_rel, expr))
5716 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5717 else
5718 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5719 }
5720 }
5721
5722 /*
5723 * If there are any local conditions, pull Vars and aggregates from it and
5724 * check whether they are safe to pushdown or not.
5725 */
5726 if (fpinfo->local_conds)
5727 {
5728 List *aggvars = NIL;
5729 ListCell *lc;
5730
5731 foreach(lc, fpinfo->local_conds)
5732 {
5733 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5734
5735 aggvars = list_concat(aggvars,
5736 pull_var_clause((Node *) rinfo->clause,
5737 PVC_INCLUDE_AGGREGATES));
5738 }
5739
5740 foreach(lc, aggvars)
5741 {
5742 Expr *expr = (Expr *) lfirst(lc);
5743
5744 /*
5745 * If aggregates within local conditions are not safe to push
5746 * down, then we cannot push down the query. Vars are already
5747 * part of GROUP BY clause which are checked above, so no need to
5748 * access them again here. Again, we need not check
5749 * is_foreign_param for a foreign aggregate.
5750 */
5751 if (IsA(expr, Aggref))
5752 {
5753 if (!is_foreign_expr(root, grouped_rel, expr))
5754 return false;
5755
5756 tlist = add_to_flat_tlist(tlist, list_make1(expr));
5757 }
5758 }
5759 }
5760
5761 /* Store generated targetlist */
5762 fpinfo->grouped_tlist = tlist;
5763
5764 /* Safe to pushdown */
5765 fpinfo->pushdown_safe = true;
5766
5767 /*
5768 * Set # of retrieved rows and cached relation costs to some negative
5769 * value, so that we can detect when they are set to some sensible values,
5770 * during one (usually the first) of the calls to estimate_path_cost_size.
5771 */
5772 fpinfo->retrieved_rows = -1;
5773 fpinfo->rel_startup_cost = -1;
5774 fpinfo->rel_total_cost = -1;
5775
5776 /*
5777 * Set the string describing this grouped relation to be used in EXPLAIN
5778 * output of corresponding ForeignScan.
5779 */
5780 fpinfo->relation_name = makeStringInfo();
5781 appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
5782 ofpinfo->relation_name->data);
5783
5784 return true;
5785 }
5786
5787 /*
5788 * postgresGetForeignUpperPaths
5789 * Add paths for post-join operations like aggregation, grouping etc. if
5790 * corresponding operations are safe to push down.
5791 */
5792 static void
postgresGetForeignUpperPaths(PlannerInfo * root,UpperRelationKind stage,RelOptInfo * input_rel,RelOptInfo * output_rel,void * extra)5793 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
5794 RelOptInfo *input_rel, RelOptInfo *output_rel,
5795 void *extra)
5796 {
5797 PgFdwRelationInfo *fpinfo;
5798
5799 /*
5800 * If input rel is not safe to pushdown, then simply return as we cannot
5801 * perform any post-join operations on the foreign server.
5802 */
5803 if (!input_rel->fdw_private ||
5804 !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
5805 return;
5806
5807 /* Ignore stages we don't support; and skip any duplicate calls. */
5808 if ((stage != UPPERREL_GROUP_AGG &&
5809 stage != UPPERREL_ORDERED &&
5810 stage != UPPERREL_FINAL) ||
5811 output_rel->fdw_private)
5812 return;
5813
5814 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5815 fpinfo->pushdown_safe = false;
5816 fpinfo->stage = stage;
5817 output_rel->fdw_private = fpinfo;
5818
5819 switch (stage)
5820 {
5821 case UPPERREL_GROUP_AGG:
5822 add_foreign_grouping_paths(root, input_rel, output_rel,
5823 (GroupPathExtraData *) extra);
5824 break;
5825 case UPPERREL_ORDERED:
5826 add_foreign_ordered_paths(root, input_rel, output_rel);
5827 break;
5828 case UPPERREL_FINAL:
5829 add_foreign_final_paths(root, input_rel, output_rel,
5830 (FinalPathExtraData *) extra);
5831 break;
5832 default:
5833 elog(ERROR, "unexpected upper relation: %d", (int) stage);
5834 break;
5835 }
5836 }
5837
5838 /*
5839 * add_foreign_grouping_paths
5840 * Add foreign path for grouping and/or aggregation.
5841 *
5842 * Given input_rel represents the underlying scan. The paths are added to the
5843 * given grouped_rel.
5844 */
5845 static void
add_foreign_grouping_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * grouped_rel,GroupPathExtraData * extra)5846 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
5847 RelOptInfo *grouped_rel,
5848 GroupPathExtraData *extra)
5849 {
5850 Query *parse = root->parse;
5851 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5852 PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
5853 ForeignPath *grouppath;
5854 double rows;
5855 int width;
5856 Cost startup_cost;
5857 Cost total_cost;
5858
5859 /* Nothing to be done, if there is no grouping or aggregation required. */
5860 if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
5861 !root->hasHavingQual)
5862 return;
5863
5864 Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
5865 extra->patype == PARTITIONWISE_AGGREGATE_FULL);
5866
5867 /* save the input_rel as outerrel in fpinfo */
5868 fpinfo->outerrel = input_rel;
5869
5870 /*
5871 * Copy foreign table, foreign server, user mapping, FDW options etc.
5872 * details from the input relation's fpinfo.
5873 */
5874 fpinfo->table = ifpinfo->table;
5875 fpinfo->server = ifpinfo->server;
5876 fpinfo->user = ifpinfo->user;
5877 merge_fdw_options(fpinfo, ifpinfo, NULL);
5878
5879 /*
5880 * Assess if it is safe to push down aggregation and grouping.
5881 *
5882 * Use HAVING qual from extra. In case of child partition, it will have
5883 * translated Vars.
5884 */
5885 if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
5886 return;
5887
5888 /*
5889 * Compute the selectivity and cost of the local_conds, so we don't have
5890 * to do it over again for each path. (Currently we create just a single
5891 * path here, but in future it would be possible that we build more paths
5892 * such as pre-sorted paths as in postgresGetForeignPaths and
5893 * postgresGetForeignJoinPaths.) The best we can do for these conditions
5894 * is to estimate selectivity on the basis of local statistics.
5895 */
5896 fpinfo->local_conds_sel = clauselist_selectivity(root,
5897 fpinfo->local_conds,
5898 0,
5899 JOIN_INNER,
5900 NULL);
5901
5902 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5903
5904 /* Estimate the cost of push down */
5905 estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL,
5906 &rows, &width, &startup_cost, &total_cost);
5907
5908 /* Now update this information in the fpinfo */
5909 fpinfo->rows = rows;
5910 fpinfo->width = width;
5911 fpinfo->startup_cost = startup_cost;
5912 fpinfo->total_cost = total_cost;
5913
5914 /* Create and add foreign path to the grouping relation. */
5915 grouppath = create_foreign_upper_path(root,
5916 grouped_rel,
5917 grouped_rel->reltarget,
5918 rows,
5919 startup_cost,
5920 total_cost,
5921 NIL, /* no pathkeys */
5922 NULL,
5923 NIL); /* no fdw_private */
5924
5925 /* Add generated path into grouped_rel by add_path(). */
5926 add_path(grouped_rel, (Path *) grouppath);
5927 }
5928
5929 /*
5930 * add_foreign_ordered_paths
5931 * Add foreign paths for performing the final sort remotely.
5932 *
5933 * Given input_rel contains the source-data Paths. The paths are added to the
5934 * given ordered_rel.
5935 */
5936 static void
add_foreign_ordered_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * ordered_rel)5937 add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel,
5938 RelOptInfo *ordered_rel)
5939 {
5940 Query *parse = root->parse;
5941 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5942 PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private;
5943 PgFdwPathExtraData *fpextra;
5944 double rows;
5945 int width;
5946 Cost startup_cost;
5947 Cost total_cost;
5948 List *fdw_private;
5949 ForeignPath *ordered_path;
5950 ListCell *lc;
5951
5952 /* Shouldn't get here unless the query has ORDER BY */
5953 Assert(parse->sortClause);
5954
5955 /* We don't support cases where there are any SRFs in the targetlist */
5956 if (parse->hasTargetSRFs)
5957 return;
5958
5959 /* Save the input_rel as outerrel in fpinfo */
5960 fpinfo->outerrel = input_rel;
5961
5962 /*
5963 * Copy foreign table, foreign server, user mapping, FDW options etc.
5964 * details from the input relation's fpinfo.
5965 */
5966 fpinfo->table = ifpinfo->table;
5967 fpinfo->server = ifpinfo->server;
5968 fpinfo->user = ifpinfo->user;
5969 merge_fdw_options(fpinfo, ifpinfo, NULL);
5970
5971 /*
5972 * If the input_rel is a base or join relation, we would already have
5973 * considered pushing down the final sort to the remote server when
5974 * creating pre-sorted foreign paths for that relation, because the
5975 * query_pathkeys is set to the root->sort_pathkeys in that case (see
5976 * standard_qp_callback()).
5977 */
5978 if (input_rel->reloptkind == RELOPT_BASEREL ||
5979 input_rel->reloptkind == RELOPT_JOINREL)
5980 {
5981 Assert(root->query_pathkeys == root->sort_pathkeys);
5982
5983 /* Safe to push down if the query_pathkeys is safe to push down */
5984 fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe;
5985
5986 return;
5987 }
5988
5989 /* The input_rel should be a grouping relation */
5990 Assert(input_rel->reloptkind == RELOPT_UPPER_REL &&
5991 ifpinfo->stage == UPPERREL_GROUP_AGG);
5992
5993 /*
5994 * We try to create a path below by extending a simple foreign path for
5995 * the underlying grouping relation to perform the final sort remotely,
5996 * which is stored into the fdw_private list of the resulting path.
5997 */
5998
5999 /* Assess if it is safe to push down the final sort */
6000 foreach(lc, root->sort_pathkeys)
6001 {
6002 PathKey *pathkey = (PathKey *) lfirst(lc);
6003 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
6004 Expr *sort_expr;
6005
6006 /*
6007 * is_foreign_expr would detect volatile expressions as well, but
6008 * checking ec_has_volatile here saves some cycles.
6009 */
6010 if (pathkey_ec->ec_has_volatile)
6011 return;
6012
6013 /* Get the sort expression for the pathkey_ec */
6014 sort_expr = find_em_expr_for_input_target(root,
6015 pathkey_ec,
6016 input_rel->reltarget);
6017
6018 /* If it's unsafe to remote, we cannot push down the final sort */
6019 if (!is_foreign_expr(root, input_rel, sort_expr))
6020 return;
6021 }
6022
6023 /* Safe to push down */
6024 fpinfo->pushdown_safe = true;
6025
6026 /* Construct PgFdwPathExtraData */
6027 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6028 fpextra->target = root->upper_targets[UPPERREL_ORDERED];
6029 fpextra->has_final_sort = true;
6030
6031 /* Estimate the costs of performing the final sort remotely */
6032 estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra,
6033 &rows, &width, &startup_cost, &total_cost);
6034
6035 /*
6036 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6037 * Items in the list must match order in enum FdwPathPrivateIndex.
6038 */
6039 fdw_private = list_make2(makeInteger(true), makeInteger(false));
6040
6041 /* Create foreign ordering path */
6042 ordered_path = create_foreign_upper_path(root,
6043 input_rel,
6044 root->upper_targets[UPPERREL_ORDERED],
6045 rows,
6046 startup_cost,
6047 total_cost,
6048 root->sort_pathkeys,
6049 NULL, /* no extra plan */
6050 fdw_private);
6051
6052 /* and add it to the ordered_rel */
6053 add_path(ordered_rel, (Path *) ordered_path);
6054 }
6055
6056 /*
6057 * add_foreign_final_paths
6058 * Add foreign paths for performing the final processing remotely.
6059 *
6060 * Given input_rel contains the source-data Paths. The paths are added to the
6061 * given final_rel.
6062 */
6063 static void
add_foreign_final_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * final_rel,FinalPathExtraData * extra)6064 add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
6065 RelOptInfo *final_rel,
6066 FinalPathExtraData *extra)
6067 {
6068 Query *parse = root->parse;
6069 PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6070 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) final_rel->fdw_private;
6071 bool has_final_sort = false;
6072 List *pathkeys = NIL;
6073 PgFdwPathExtraData *fpextra;
6074 bool save_use_remote_estimate = false;
6075 double rows;
6076 int width;
6077 Cost startup_cost;
6078 Cost total_cost;
6079 List *fdw_private;
6080 ForeignPath *final_path;
6081
6082 /*
6083 * Currently, we only support this for SELECT commands
6084 */
6085 if (parse->commandType != CMD_SELECT)
6086 return;
6087
6088 /*
6089 * No work if there is no FOR UPDATE/SHARE clause and if there is no need
6090 * to add a LIMIT node
6091 */
6092 if (!parse->rowMarks && !extra->limit_needed)
6093 return;
6094
6095 /* We don't support cases where there are any SRFs in the targetlist */
6096 if (parse->hasTargetSRFs)
6097 return;
6098
6099 /* Save the input_rel as outerrel in fpinfo */
6100 fpinfo->outerrel = input_rel;
6101
6102 /*
6103 * Copy foreign table, foreign server, user mapping, FDW options etc.
6104 * details from the input relation's fpinfo.
6105 */
6106 fpinfo->table = ifpinfo->table;
6107 fpinfo->server = ifpinfo->server;
6108 fpinfo->user = ifpinfo->user;
6109 merge_fdw_options(fpinfo, ifpinfo, NULL);
6110
6111 /*
6112 * If there is no need to add a LIMIT node, there might be a ForeignPath
6113 * in the input_rel's pathlist that implements all behavior of the query.
6114 * Note: we would already have accounted for the query's FOR UPDATE/SHARE
6115 * (if any) before we get here.
6116 */
6117 if (!extra->limit_needed)
6118 {
6119 ListCell *lc;
6120
6121 Assert(parse->rowMarks);
6122
6123 /*
6124 * Grouping and aggregation are not supported with FOR UPDATE/SHARE,
6125 * so the input_rel should be a base, join, or ordered relation; and
6126 * if it's an ordered relation, its input relation should be a base or
6127 * join relation.
6128 */
6129 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
6130 input_rel->reloptkind == RELOPT_JOINREL ||
6131 (input_rel->reloptkind == RELOPT_UPPER_REL &&
6132 ifpinfo->stage == UPPERREL_ORDERED &&
6133 (ifpinfo->outerrel->reloptkind == RELOPT_BASEREL ||
6134 ifpinfo->outerrel->reloptkind == RELOPT_JOINREL)));
6135
6136 foreach(lc, input_rel->pathlist)
6137 {
6138 Path *path = (Path *) lfirst(lc);
6139
6140 /*
6141 * apply_scanjoin_target_to_paths() uses create_projection_path()
6142 * to adjust each of its input paths if needed, whereas
6143 * create_ordered_paths() uses apply_projection_to_path() to do
6144 * that. So the former might have put a ProjectionPath on top of
6145 * the ForeignPath; look through ProjectionPath and see if the
6146 * path underneath it is ForeignPath.
6147 */
6148 if (IsA(path, ForeignPath) ||
6149 (IsA(path, ProjectionPath) &&
6150 IsA(((ProjectionPath *) path)->subpath, ForeignPath)))
6151 {
6152 /*
6153 * Create foreign final path; this gets rid of a
6154 * no-longer-needed outer plan (if any), which makes the
6155 * EXPLAIN output look cleaner
6156 */
6157 final_path = create_foreign_upper_path(root,
6158 path->parent,
6159 path->pathtarget,
6160 path->rows,
6161 path->startup_cost,
6162 path->total_cost,
6163 path->pathkeys,
6164 NULL, /* no extra plan */
6165 NULL); /* no fdw_private */
6166
6167 /* and add it to the final_rel */
6168 add_path(final_rel, (Path *) final_path);
6169
6170 /* Safe to push down */
6171 fpinfo->pushdown_safe = true;
6172
6173 return;
6174 }
6175 }
6176
6177 /*
6178 * If we get here it means no ForeignPaths; since we would already
6179 * have considered pushing down all operations for the query to the
6180 * remote server, give up on it.
6181 */
6182 return;
6183 }
6184
6185 Assert(extra->limit_needed);
6186
6187 /*
6188 * If the input_rel is an ordered relation, replace the input_rel with its
6189 * input relation
6190 */
6191 if (input_rel->reloptkind == RELOPT_UPPER_REL &&
6192 ifpinfo->stage == UPPERREL_ORDERED)
6193 {
6194 input_rel = ifpinfo->outerrel;
6195 ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6196 has_final_sort = true;
6197 pathkeys = root->sort_pathkeys;
6198 }
6199
6200 /* The input_rel should be a base, join, or grouping relation */
6201 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
6202 input_rel->reloptkind == RELOPT_JOINREL ||
6203 (input_rel->reloptkind == RELOPT_UPPER_REL &&
6204 ifpinfo->stage == UPPERREL_GROUP_AGG));
6205
6206 /*
6207 * We try to create a path below by extending a simple foreign path for
6208 * the underlying base, join, or grouping relation to perform the final
6209 * sort (if has_final_sort) and the LIMIT restriction remotely, which is
6210 * stored into the fdw_private list of the resulting path. (We
6211 * re-estimate the costs of sorting the underlying relation, if
6212 * has_final_sort.)
6213 */
6214
6215 /*
6216 * Assess if it is safe to push down the LIMIT and OFFSET to the remote
6217 * server
6218 */
6219
6220 /*
6221 * If the underlying relation has any local conditions, the LIMIT/OFFSET
6222 * cannot be pushed down.
6223 */
6224 if (ifpinfo->local_conds)
6225 return;
6226
6227 /*
6228 * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are
6229 * not safe to remote.
6230 */
6231 if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) ||
6232 !is_foreign_expr(root, input_rel, (Expr *) parse->limitCount))
6233 return;
6234
6235 /* Safe to push down */
6236 fpinfo->pushdown_safe = true;
6237
6238 /* Construct PgFdwPathExtraData */
6239 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6240 fpextra->target = root->upper_targets[UPPERREL_FINAL];
6241 fpextra->has_final_sort = has_final_sort;
6242 fpextra->has_limit = extra->limit_needed;
6243 fpextra->limit_tuples = extra->limit_tuples;
6244 fpextra->count_est = extra->count_est;
6245 fpextra->offset_est = extra->offset_est;
6246
6247 /*
6248 * Estimate the costs of performing the final sort and the LIMIT
6249 * restriction remotely. If has_final_sort is false, we wouldn't need to
6250 * execute EXPLAIN anymore if use_remote_estimate, since the costs can be
6251 * roughly estimated using the costs we already have for the underlying
6252 * relation, in the same way as when use_remote_estimate is false. Since
6253 * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to
6254 * false in that case.
6255 */
6256 if (!fpextra->has_final_sort)
6257 {
6258 save_use_remote_estimate = ifpinfo->use_remote_estimate;
6259 ifpinfo->use_remote_estimate = false;
6260 }
6261 estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra,
6262 &rows, &width, &startup_cost, &total_cost);
6263 if (!fpextra->has_final_sort)
6264 ifpinfo->use_remote_estimate = save_use_remote_estimate;
6265
6266 /*
6267 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6268 * Items in the list must match order in enum FdwPathPrivateIndex.
6269 */
6270 fdw_private = list_make2(makeInteger(has_final_sort),
6271 makeInteger(extra->limit_needed));
6272
6273 /*
6274 * Create foreign final path; this gets rid of a no-longer-needed outer
6275 * plan (if any), which makes the EXPLAIN output look cleaner
6276 */
6277 final_path = create_foreign_upper_path(root,
6278 input_rel,
6279 root->upper_targets[UPPERREL_FINAL],
6280 rows,
6281 startup_cost,
6282 total_cost,
6283 pathkeys,
6284 NULL, /* no extra plan */
6285 fdw_private);
6286
6287 /* and add it to the final_rel */
6288 add_path(final_rel, (Path *) final_path);
6289 }
6290
6291 /*
6292 * Create a tuple from the specified row of the PGresult.
6293 *
6294 * rel is the local representation of the foreign table, attinmeta is
6295 * conversion data for the rel's tupdesc, and retrieved_attrs is an
6296 * integer list of the table column numbers present in the PGresult.
6297 * fsstate is the ForeignScan plan node's execution state.
6298 * temp_context is a working context that can be reset after each tuple.
6299 *
6300 * Note: either rel or fsstate, but not both, can be NULL. rel is NULL
6301 * if we're processing a remote join, while fsstate is NULL in a non-query
6302 * context such as ANALYZE, or if we're processing a non-scan query node.
6303 */
6304 static HeapTuple
make_tuple_from_result_row(PGresult * res,int row,Relation rel,AttInMetadata * attinmeta,List * retrieved_attrs,ForeignScanState * fsstate,MemoryContext temp_context)6305 make_tuple_from_result_row(PGresult *res,
6306 int row,
6307 Relation rel,
6308 AttInMetadata *attinmeta,
6309 List *retrieved_attrs,
6310 ForeignScanState *fsstate,
6311 MemoryContext temp_context)
6312 {
6313 HeapTuple tuple;
6314 TupleDesc tupdesc;
6315 Datum *values;
6316 bool *nulls;
6317 ItemPointer ctid = NULL;
6318 ConversionLocation errpos;
6319 ErrorContextCallback errcallback;
6320 MemoryContext oldcontext;
6321 ListCell *lc;
6322 int j;
6323
6324 Assert(row < PQntuples(res));
6325
6326 /*
6327 * Do the following work in a temp context that we reset after each tuple.
6328 * This cleans up not only the data we have direct access to, but any
6329 * cruft the I/O functions might leak.
6330 */
6331 oldcontext = MemoryContextSwitchTo(temp_context);
6332
6333 /*
6334 * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
6335 * provided, otherwise look to the scan node's ScanTupleSlot.
6336 */
6337 if (rel)
6338 tupdesc = RelationGetDescr(rel);
6339 else
6340 {
6341 Assert(fsstate);
6342 tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
6343 }
6344
6345 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
6346 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
6347 /* Initialize to nulls for any columns not present in result */
6348 memset(nulls, true, tupdesc->natts * sizeof(bool));
6349
6350 /*
6351 * Set up and install callback to report where conversion error occurs.
6352 */
6353 errpos.cur_attno = 0;
6354 errpos.rel = rel;
6355 errpos.fsstate = fsstate;
6356 errcallback.callback = conversion_error_callback;
6357 errcallback.arg = (void *) &errpos;
6358 errcallback.previous = error_context_stack;
6359 error_context_stack = &errcallback;
6360
6361 /*
6362 * i indexes columns in the relation, j indexes columns in the PGresult.
6363 */
6364 j = 0;
6365 foreach(lc, retrieved_attrs)
6366 {
6367 int i = lfirst_int(lc);
6368 char *valstr;
6369
6370 /* fetch next column's textual value */
6371 if (PQgetisnull(res, row, j))
6372 valstr = NULL;
6373 else
6374 valstr = PQgetvalue(res, row, j);
6375
6376 /*
6377 * convert value to internal representation
6378 *
6379 * Note: we ignore system columns other than ctid and oid in result
6380 */
6381 errpos.cur_attno = i;
6382 if (i > 0)
6383 {
6384 /* ordinary column */
6385 Assert(i <= tupdesc->natts);
6386 nulls[i - 1] = (valstr == NULL);
6387 /* Apply the input function even to nulls, to support domains */
6388 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
6389 valstr,
6390 attinmeta->attioparams[i - 1],
6391 attinmeta->atttypmods[i - 1]);
6392 }
6393 else if (i == SelfItemPointerAttributeNumber)
6394 {
6395 /* ctid */
6396 if (valstr != NULL)
6397 {
6398 Datum datum;
6399
6400 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
6401 ctid = (ItemPointer) DatumGetPointer(datum);
6402 }
6403 }
6404 errpos.cur_attno = 0;
6405
6406 j++;
6407 }
6408
6409 /* Uninstall error context callback. */
6410 error_context_stack = errcallback.previous;
6411
6412 /*
6413 * Check we got the expected number of columns. Note: j == 0 and
6414 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
6415 */
6416 if (j > 0 && j != PQnfields(res))
6417 elog(ERROR, "remote query result does not match the foreign table");
6418
6419 /*
6420 * Build the result tuple in caller's memory context.
6421 */
6422 MemoryContextSwitchTo(oldcontext);
6423
6424 tuple = heap_form_tuple(tupdesc, values, nulls);
6425
6426 /*
6427 * If we have a CTID to return, install it in both t_self and t_ctid.
6428 * t_self is the normal place, but if the tuple is converted to a
6429 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
6430 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
6431 */
6432 if (ctid)
6433 tuple->t_self = tuple->t_data->t_ctid = *ctid;
6434
6435 /*
6436 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
6437 * heap_form_tuple. heap_form_tuple actually creates the tuple with
6438 * DatumTupleFields, not HeapTupleFields, but the executor expects
6439 * HeapTupleFields and will happily extract system columns on that
6440 * assumption. If we don't do this then, for example, the tuple length
6441 * ends up in the xmin field, which isn't what we want.
6442 */
6443 HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
6444 HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
6445 HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
6446
6447 /* Clean up */
6448 MemoryContextReset(temp_context);
6449
6450 return tuple;
6451 }
6452
6453 /*
6454 * Callback function which is called when error occurs during column value
6455 * conversion. Print names of column and relation.
6456 *
6457 * Note that this function mustn't do any catalog lookups, since we are in
6458 * an already-failed transaction. Fortunately, we can get the needed info
6459 * from the relation or the query's rangetable instead.
6460 */
6461 static void
conversion_error_callback(void * arg)6462 conversion_error_callback(void *arg)
6463 {
6464 ConversionLocation *errpos = (ConversionLocation *) arg;
6465 Relation rel = errpos->rel;
6466 ForeignScanState *fsstate = errpos->fsstate;
6467 const char *attname = NULL;
6468 const char *relname = NULL;
6469 bool is_wholerow = false;
6470
6471 /*
6472 * If we're in a scan node, always use aliases from the rangetable, for
6473 * consistency between the simple-relation and remote-join cases. Look at
6474 * the relation's tupdesc only if we're not in a scan node.
6475 */
6476 if (fsstate)
6477 {
6478 /* ForeignScan case */
6479 ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
6480 int varno = 0;
6481 AttrNumber colno = 0;
6482
6483 if (fsplan->scan.scanrelid > 0)
6484 {
6485 /* error occurred in a scan against a foreign table */
6486 varno = fsplan->scan.scanrelid;
6487 colno = errpos->cur_attno;
6488 }
6489 else
6490 {
6491 /* error occurred in a scan against a foreign join */
6492 TargetEntry *tle;
6493
6494 tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
6495 errpos->cur_attno - 1);
6496
6497 /*
6498 * Target list can have Vars and expressions. For Vars, we can
6499 * get some information, however for expressions we can't. Thus
6500 * for expressions, just show generic context message.
6501 */
6502 if (IsA(tle->expr, Var))
6503 {
6504 Var *var = (Var *) tle->expr;
6505
6506 varno = var->varno;
6507 colno = var->varattno;
6508 }
6509 }
6510
6511 if (varno > 0)
6512 {
6513 EState *estate = fsstate->ss.ps.state;
6514 RangeTblEntry *rte = exec_rt_fetch(varno, estate);
6515
6516 relname = rte->eref->aliasname;
6517
6518 if (colno == 0)
6519 is_wholerow = true;
6520 else if (colno > 0 && colno <= list_length(rte->eref->colnames))
6521 attname = strVal(list_nth(rte->eref->colnames, colno - 1));
6522 else if (colno == SelfItemPointerAttributeNumber)
6523 attname = "ctid";
6524 }
6525 }
6526 else if (rel)
6527 {
6528 /* Non-ForeignScan case (we should always have a rel here) */
6529 TupleDesc tupdesc = RelationGetDescr(rel);
6530
6531 relname = RelationGetRelationName(rel);
6532 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
6533 {
6534 Form_pg_attribute attr = TupleDescAttr(tupdesc,
6535 errpos->cur_attno - 1);
6536
6537 attname = NameStr(attr->attname);
6538 }
6539 else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
6540 attname = "ctid";
6541 }
6542
6543 if (relname && is_wholerow)
6544 errcontext("whole-row reference to foreign table \"%s\"", relname);
6545 else if (relname && attname)
6546 errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
6547 else
6548 errcontext("processing expression at position %d in select list",
6549 errpos->cur_attno);
6550 }
6551
6552 /*
6553 * Find an equivalence class member expression, all of whose Vars, come from
6554 * the indicated relation.
6555 */
6556 Expr *
find_em_expr_for_rel(EquivalenceClass * ec,RelOptInfo * rel)6557 find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
6558 {
6559 ListCell *lc_em;
6560
6561 foreach(lc_em, ec->ec_members)
6562 {
6563 EquivalenceMember *em = lfirst(lc_em);
6564
6565 if (bms_is_subset(em->em_relids, rel->relids) &&
6566 !bms_is_empty(em->em_relids))
6567 {
6568 /*
6569 * If there is more than one equivalence member whose Vars are
6570 * taken entirely from this relation, we'll be content to choose
6571 * any one of those.
6572 */
6573 return em->em_expr;
6574 }
6575 }
6576
6577 /* We didn't find any suitable equivalence class expression */
6578 return NULL;
6579 }
6580
6581 /*
6582 * Find an equivalence class member expression to be computed as a sort column
6583 * in the given target.
6584 */
6585 Expr *
find_em_expr_for_input_target(PlannerInfo * root,EquivalenceClass * ec,PathTarget * target)6586 find_em_expr_for_input_target(PlannerInfo *root,
6587 EquivalenceClass *ec,
6588 PathTarget *target)
6589 {
6590 ListCell *lc1;
6591 int i;
6592
6593 i = 0;
6594 foreach(lc1, target->exprs)
6595 {
6596 Expr *expr = (Expr *) lfirst(lc1);
6597 Index sgref = get_pathtarget_sortgroupref(target, i);
6598 ListCell *lc2;
6599
6600 /* Ignore non-sort expressions */
6601 if (sgref == 0 ||
6602 get_sortgroupref_clause_noerr(sgref,
6603 root->parse->sortClause) == NULL)
6604 {
6605 i++;
6606 continue;
6607 }
6608
6609 /* We ignore binary-compatible relabeling on both ends */
6610 while (expr && IsA(expr, RelabelType))
6611 expr = ((RelabelType *) expr)->arg;
6612
6613 /* Locate an EquivalenceClass member matching this expr, if any */
6614 foreach(lc2, ec->ec_members)
6615 {
6616 EquivalenceMember *em = (EquivalenceMember *) lfirst(lc2);
6617 Expr *em_expr;
6618
6619 /* Don't match constants */
6620 if (em->em_is_const)
6621 continue;
6622
6623 /* Ignore child members */
6624 if (em->em_is_child)
6625 continue;
6626
6627 /* Match if same expression (after stripping relabel) */
6628 em_expr = em->em_expr;
6629 while (em_expr && IsA(em_expr, RelabelType))
6630 em_expr = ((RelabelType *) em_expr)->arg;
6631
6632 if (equal(em_expr, expr))
6633 return em->em_expr;
6634 }
6635
6636 i++;
6637 }
6638
6639 elog(ERROR, "could not find pathkey item to sort");
6640 return NULL; /* keep compiler quiet */
6641 }
6642